Partilhar via


AUTO CDC EM (pipelines)

Use a instrução AUTO CDC ... INTO para criar um fluxo que utilize a funcionalidade de captura de dados de alteração (CDC) do Lakeflow Spark Declarative Pipelines. Esta declaração lê as alterações de uma fonte CDC e as aplica a um destino de streaming.

Sintaxe

CREATE OR REFRESH STREAMING TABLE table_name;

CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Você define restrições de qualidade de dados para o alvo utilizando a mesma cláusula CONSTRAINT que nas outras consultas de pipeline. Consulte Gerir a qualidade dos dados com as expectativas do fluxo de dados.

O comportamento padrão para INSERT eventos e UPDATE é atualizar eventos CDC da origem: atualizar quaisquer linhas na tabela de destino que correspondam à(s) chave(s) especificada(s) ou inserir uma nova linha quando um registro correspondente não existir na tabela de destino. A manipulação de DELETE eventos pode ser especificada com a APPLY AS DELETE WHEN condição.

Importante

Você deve declarar uma tabela de streaming de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Para tabelas SCD tipo 2, ao especificar o esquema da tabela de destino, você também deve incluir as __START_AT colunas e __END_AT com o mesmo tipo de dados que o sequence_by campo.

Consulte As APIs do AUTO CDC: Simplifique a captura de dados de alteração com pipelines.

Parâmetros

  • flow_name

    O nome do fluxo a ser criado.

  • source

    A fonte dos dados. A fonte deve ser uma fonte de streaming . Utilize a palavra-chave STREAM para aplicar a semântica de transmissão e ler a partir da fonte. Se a leitura encontrar uma alteração ou exclusão em um registro existente, um erro será gerado. É mais seguro ler a partir de fontes estáticas ou apenas de anexação. Para ingerir dados que têm confirmações de alteração, podes usar Python e a opção SkipChangeCommits para manipular erros.

    Para obter mais informações sobre a transmissão de dados, consulte Transformar dados com pipelines.

  • KEYS

    A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Os valores nessas colunas são usados para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino.

    Para definir uma combinação de colunas, use uma lista de colunas separadas por vírgula.

    Esta cláusula é obrigatória.

  • IGNORE NULL UPDATES

    Permite a ingestão de atualizações contendo um subconjunto das colunas de destino. Quando um evento CDC corresponde a uma linha existente e IGNORE NULL UPDATES é especificado, as colunas com valor null manterão os seus valores existentes no destino alvo. Isso também se aplica a colunas aninhadas com um valor null.

    Esta cláusula é facultativa.

    O padrão é sobrescrever colunas existentes com valores de null.

  • APPLY AS DELETE WHEN

    Especifica quando um evento CDC deve ser tratado como um DELETE em vez de um upsert.

    Para fontes do tipo SCD 2, para lidar com dados fora de ordem, a linha eliminada é mantida temporariamente como uma marca de exclusão na tabela Delta subjacente, e uma vista é criada no metastore que filtra essas marcas de exclusão. O intervalo de retenção pode ser configurado com a pipelines.cdc.tombstoneGCThresholdInSecondspropriedade da tabela.

    Esta cláusula é facultativa.

  • APPLY AS TRUNCATE WHEN

    Especifica quando um evento CDC deve ser tratado como uma tabela TRUNCATEcompleta. Como essa cláusula aciona um truncado completo da tabela de destino, ela deve ser usada apenas para casos de uso específicos que exijam essa funcionalidade.

    A APPLY AS TRUNCATE WHEN cláusula é suportada apenas para SCD tipo 1. O SCD tipo 2 não suporta a operação de truncar.

    Esta cláusula é facultativa.

  • SEQUENCE BY

    O nome da coluna que especifica a ordem lógica dos eventos CDC nos dados de origem. O processamento de pipeline usa esse sequenciamento para lidar com eventos de mudança que chegam de forma desordenada.

    Se várias colunas forem necessárias para o sequenciamento, use uma STRUCT expressão: ela será ordenada pelo primeiro campo struct primeiro, depois pelo segundo campo se houver um empate, e assim por diante.

    As colunas especificadas devem ser tipos de dados classificáveis.

    Esta cláusula é obrigatória.

  • COLUMNS

    Especifica um subconjunto de colunas a serem incluídas na tabela de destino. Pode optar por uma das seguintes opções:

    • Especifique a lista completa de colunas a incluir: COLUMNS (userId, name, city).
    • Especifique uma lista de colunas a serem excluídas: COLUMNS * EXCEPT (operation, sequenceNum)

    Esta cláusula é facultativa.

    O padrão é incluir todas as colunas na tabela de destino quando a COLUMNS cláusula não é especificada.

  • STORED AS

    Se deseja armazenar registros como SCD tipo 1 ou SCD tipo 2.

    Esta cláusula é facultativa.

    O padrão é SCD tipo 1.

  • TRACK HISTORY ON

    Especifica um subconjunto de colunas de saída para gerar registros de histórico quando houver alterações nessas colunas especificadas. Pode optar por uma das seguintes opções:

    • Especifique a lista completa de colunas a controlar: COLUMNS (userId, name, city).
    • Especifique uma lista de colunas a serem excluídas do acompanhamento: COLUMNS * EXCEPT (operation, sequenceNum)

    Esta cláusula é facultativa. O padrão é controlar o histórico de todas as colunas de saída quando houver alterações, equivalente a TRACK HISTORY ON *.

Examples

-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flow
AS AUTO CDC INTO
  target
FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);