Compartilhar via


CREATE STREAMING TABLE ... FLUXO AUTOMÁTICO CDC

Aplica-se a:marcado como 'sim' Databricks SQL

Importante

Esse recurso está em Beta. Requer o Databricks Runtime 17.3 e superior.

Use a FLOW AUTO CDC cláusula com CREATE STREAMING TABLE a qual processar registros CDC (captura de dados de alteração) de uma fonte em uma tabela de streaming.

Anteriormente, a MERGE INTO instrução era comumente usada para processar registros CDC no Azure Databricks. No entanto, MERGE INTO pode produzir resultados incorretos devido a registros fora de sequência ou requer lógica complexa para re-ordenar registros.

AUTO CDC simplifica o CDC manipulando automaticamente registros fora de ordem. Especifique chaves para identificar registros, uma coluna de sequência para ordenação e se deseja armazenar resultados como SCD tipo 1 (atualizações diretas) ou SCD tipo 2 (acompanhamento de histórico).

Sintaxe

CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

O comportamento padrão e INSERTUPDATE os eventos são para upsert eventos CDC da origem: atualizar todas as linhas na tabela de destino que correspondam às chaves especificadas ou inserir uma nova linha quando um registro correspondente não existir na tabela de destino. A manipulação de DELETE eventos pode ser especificada com a APPLY AS DELETE WHEN condição.

Parâmetros

  • source

    A fonte dos dados. A origem deve ser uma fonte de streaming. Use a palavra-chave STREAM para usar a semântica de streaming para leitura da origem. Se a leitura encontrar uma alteração ou exclusão em um registro existente, um erro será gerado. É mais seguro ler de fontes estáticas ou somente de acréscimos.

    Para obter mais informações sobre dados de fluxo, consulte Transformar dados com pipelines.

  • KEYS

    A coluna ou combinação de colunas que identificam exclusivamente uma linha nos dados de origem. Os valores nessas colunas são usados para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino.

    Para definir uma combinação de colunas, use uma lista separada por vírgulas de colunas.

    Essa cláusula é necessária.

  • IGNORE NULL UPDATES

    Permite a ingestão de atualizações que contêm um subconjunto das colunas de destino. Quando um evento CDC corresponde a uma linha existente e IGNORE NULL UPDATES é especificado, as colunas com um null valor mantêm seus valores existentes no destino. Isso também se aplica a colunas aninhadas com valor null.

    Essa cláusula é opcional.

    O padrão é substituir as colunas existentes pelos valores de null.

  • APPLY AS DELETE WHEN

    Especifica quando um evento CDC deve ser tratado como um DELETE em vez de um upsert.

    Para fontes scd tipo 2, para lidar com dados fora de ordem, a linha excluída é temporariamente mantida como uma pedra de tumba na tabela Delta subjacente e uma exibição é criada no metastore que filtra essas pedras de tumba. O intervalo de retenção pode ser configurado com a propriedade da pipelines.cdc.tombstoneGCThresholdInSecondstabela.

    Essa cláusula é opcional.

  • APPLY AS TRUNCATE WHEN

    Especifica quando um evento CDC deve ser tratado como uma tabela TRUNCATEcompleta. Como essa cláusula dispara um truncamento completo da tabela de destino, ela deve ser usada apenas para casos específicos que requerem essa funcionalidade.

    A APPLY AS TRUNCATE WHEN cláusula tem suporte apenas para SCD tipo 1. O SCD tipo 2 não dá suporte à operação de truncação.

    Essa cláusula é opcional.

  • SEQUENCE BY

    O nome da coluna que especifica a ordem lógica dos eventos CDC nos dados de origem. O processamento em pipeline usa esse sequenciamento para gerenciar eventos de alteração que chegam fora de ordem.

    Se várias colunas forem necessárias para sequenciamento, use uma STRUCT expressão: ela será ordenada primeiro pelo primeiro campo de struct, depois pelo segundo campo, se houver um empate e assim por diante.

    As colunas especificadas devem ser tipos de dados classificáveis.

    Essa cláusula é necessária.

  • COLUMNS

    Especifica um subconjunto de colunas a serem incluídas na tabela de destino. Você pode:

    • Especifique a lista completa de colunas a serem incluídas: COLUMNS (userId, name, city).
    • Especifique uma lista de colunas a serem excluídas: COLUMNS * EXCEPT (operation, sequenceNum)

    Essa cláusula é opcional.

    O padrão é incluir todas as colunas na tabela de destino quando a COLUMNS cláusula não for especificada.

  • STORED AS

    Se deseja armazenar registros como SCD tipo 1 ou SCD tipo 2.

    Essa cláusula é opcional.

    O padrão é SCD tipo 1.

  • TRACK HISTORY ON

    Especifica um subconjunto de colunas de saída para gerar registros de histórico quando houver alterações nessas colunas especificadas. Você pode:

    • Especifique a lista completa de colunas a serem rastreadas: COLUMNS (userId, name, city).
    • Especifique uma lista de colunas a serem excluídas do acompanhamento: COLUMNS * EXCEPT (operation, sequenceNum)

    Essa cláusula é opcional. O padrão é acompanhar o histórico de todas as colunas de saída quando houver alterações, equivalentes a TRACK HISTORY ON *.

Exemplos

-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  SEQUENCE BY sequenceNum
  STORED AS SCD TYPE 1;

-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2;

-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);