Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Aplica-se a:
Databricks SQL
Importante
Este recurso está em versão Beta. Requer Databricks Runtime 17.3 e superiores.
Use a FLOW AUTO CDC cláusula com CREATE STREAMING TABLE para processar registos de captura de dados de alteração (CDC) de uma fonte para uma tabela de streaming.
Anteriormente, a MERGE INTO instrução era comumente usada para processar registros CDC no Azure Databricks. No entanto, MERGE INTO pode produzir resultados incorretos devido a registros fora de sequência ou requer lógica complexa para reordenar registros.
AUTO CDC simplifica o CDC ao tratar automaticamente os registos fora de ordem. Especifica chaves para identificar registos, uma coluna de sequência para ordenação e se deve armazenar os resultados como SCD tipo 1 (atualizações diretas) ou SCD tipo 2 (rastreio de histórico).
Sintaxe
CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
O comportamento padrão para INSERT eventos e UPDATE é atualizar eventos CDC a partir da fonte: atualizar quaisquer linhas na tabela de destino que correspondam às chaves especificadas ou inserir uma nova linha quando não existe registo correspondente na tabela de destino. A manipulação de DELETE eventos pode ser especificada com a APPLY AS DELETE WHEN condição.
Parâmetros
sourceA fonte dos dados. A fonte deve ser uma fonte em streaming. Use a palavra-chave
STREAMpara aplicar as semânticas de streaming na leitura a partir da fonte. Se a leitura encontrar uma alteração ou exclusão em um registro existente, um erro será gerado. É mais seguro ler a partir de fontes estáticas ou apenas de anexação.Para obter mais informações sobre a transmissão de dados, consulte Transformar dados com pipelines.
KEYSA coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Os valores nessas colunas são usados para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino.
Para definir uma combinação de colunas, use uma lista de colunas separadas por vírgula.
Esta cláusula é obrigatória.
IGNORE NULL UPDATESPermite a ingestão de atualizações contendo um subconjunto das colunas de destino. Quando um evento CDC corresponde a uma linha existente e
IGNORE NULL UPDATESé especificado, as colunas com umnullvalor mantêm os seus valores existentes no alvo. Isso também se aplica a colunas aninhadas com um valornull.Esta cláusula é opcional.
O padrão é sobrescrever colunas existentes com valores de
null.APPLY AS DELETE WHENEspecifica quando um evento CDC deve ser tratado como um
DELETEem vez de um upsert.Para fontes do tipo SCD 2, para lidar com dados fora de ordem, a linha eliminada é mantida temporariamente como uma marca de exclusão na tabela Delta subjacente, e uma vista é criada no metastore que filtra essas marcas de exclusão. O intervalo de retenção pode ser configurado com a
pipelines.cdc.tombstoneGCThresholdInSecondspropriedade da tabela.Esta cláusula é opcional.
APPLY AS TRUNCATE WHENEspecifica quando um evento CDC deve ser tratado como uma tabela
TRUNCATEcompleta. Como essa cláusula aciona um truncado completo da tabela de destino, ela deve ser usada apenas para casos de uso específicos que exijam essa funcionalidade.A
APPLY AS TRUNCATE WHENcláusula é suportada apenas para SCD tipo 1. O SCD tipo 2 não suporta a operação de truncar.Esta cláusula é opcional.
SEQUENCE BYO nome da coluna que especifica a ordem lógica dos eventos CDC nos dados de origem. O processamento de pipeline usa esse sequenciamento para lidar com eventos de mudança que chegam de forma desordenada.
Se várias colunas forem necessárias para o sequenciamento, use uma
STRUCTexpressão: ela será ordenada pelo primeiro campo struct primeiro, depois pelo segundo campo se houver um empate, e assim por diante.As colunas especificadas devem ser tipos de dados classificáveis.
Esta cláusula é obrigatória.
COLUMNSEspecifica um subconjunto de colunas a serem incluídas na tabela de destino. Pode optar por uma das seguintes opções:
- Especifique a lista completa de colunas a incluir:
COLUMNS (userId, name, city). - Especifique uma lista de colunas a serem excluídas:
COLUMNS * EXCEPT (operation, sequenceNum)
Esta cláusula é opcional.
O padrão é incluir todas as colunas na tabela de destino quando a
COLUMNScláusula não é especificada.- Especifique a lista completa de colunas a incluir:
STORED ASSe deseja armazenar registros como SCD tipo 1 ou SCD tipo 2.
Esta cláusula é opcional.
O padrão é SCD tipo 1.
TRACK HISTORY ONEspecifica um subconjunto de colunas de saída para gerar registros de histórico quando houver alterações nessas colunas especificadas. Pode optar por uma das seguintes opções:
- Especifique a lista completa de colunas a controlar:
COLUMNS (userId, name, city). - Especifique uma lista de colunas a serem excluídas do acompanhamento:
COLUMNS * EXCEPT (operation, sequenceNum)
Esta cláusula é opcional. O padrão é controlar o histórico de todas as colunas de saída quando houver alterações, equivalente a
TRACK HISTORY ON *.- Especifique a lista completa de colunas a controlar:
Exemplos
-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
SEQUENCE BY sequenceNum
STORED AS SCD TYPE 1;
-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;
-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);
Artigos relacionados
- CREATE STREAMING TABLE
- AUTO CDC APIs: Simplifique a captura de dados de mudanças através de pipelines
- AUTO CDC PARA (pipelines)
- CRIAR FLUXO (pipelines)
- Fluxos