Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Si applica a:
Databricks SQL
Importante
Questa funzionalità è in versione beta. Richiede Databricks Runtime 17.3 e versioni successive.
Usare la FLOW AUTO CDC clausola con CREATE STREAMING TABLE per elaborare i record CDC (Change Data Capture) da un'origine in una tabella di streaming.
In precedenza, l'istruzione MERGE INTO era comunemente usata per l'elaborazione dei record CDC in Azure Databricks. Tuttavia, MERGE INTO può produrre risultati non corretti a causa di record out-of-sequence o richiede logica complessa per riordinare i record.
AUTO CDC semplifica CDC gestendo automaticamente i record non in ordine. Specificare le chiavi per identificare i record, una colonna di sequenza per l'ordinamento e se archiviare i risultati come scD di tipo 1 (aggiornamenti diretti) o scD tipo 2 (rilevamento cronologia).
Sintassi
CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Il comportamento predefinito per INSERT gli eventi e UPDATE consiste nell'eseguire l'upsert degli eventi CDC dall'origine: aggiornare tutte le righe nella tabella di destinazione che corrispondono alle chiavi specificate o inserire una nuova riga quando un record corrispondente non esiste nella tabella di destinazione. La gestione degli DELETE eventi può essere specificata con la APPLY AS DELETE WHEN condizione .
Parametri
sourceOrigine dei dati. L'origine deve essere un'origine di streaming. Utilizzare la parola chiave
STREAMper applicare la semantica di streaming, leggendo dalla sorgente. Se la lettura rileva una modifica o un'eliminazione in un record esistente, viene generato un errore. È più sicuro leggere da fonti statiche o a solo aggiunta.Per ulteriori informazioni sui dati di streaming, vedere Trasformare i dati con le pipeline.
KEYSColonna o combinazione di colonne che identificano in modo univoco una riga nei dati di origine. I valori in queste colonne vengono usati per identificare gli eventi CDC applicabili a record specifici nella tabella di destinazione.
Per definire una combinazione di colonne, usare un elenco delimitato da virgole di colonne.
Questa clausola è obbligatoria.
IGNORE NULL UPDATESConsente l'inserimento di aggiornamenti contenenti un subset delle colonne di destinazione. Quando un evento CDC corrisponde a una riga esistente e
IGNORE NULL UPDATESviene specificato, le colonne con unnullvalore mantengono i valori esistenti nella destinazione. Questo vale anche per le colonne annidate con unnullvalore .Questa clausola è facoltativa.
L'impostazione predefinita consiste nel sovrascrivere le colonne esistenti con
nullvalori.APPLY AS DELETE WHENSpecifica quando un evento CDC deve essere considerato come un
DELETEpiuttosto che un upsert.Per le origini di tipo 2 SCD, per gestire i dati non ordinati, la riga eliminata viene mantenuta temporaneamente come tombstone nella tabella Delta sottostante e viene creata una vista nel metastore che filtra queste marcature. L'intervallo di conservazione può essere configurato con la
pipelines.cdc.tombstoneGCThresholdInSecondsproprietà table.Questa clausola è facoltativa.
APPLY AS TRUNCATE WHENSpecifica quando un evento CDC deve essere considerato come una tabella
TRUNCATEcompleta. Poiché questa clausola attiva un troncamento completo della tabella di destinazione, deve essere usata solo per casi d'uso specifici che richiedono questa funzionalità.La
APPLY AS TRUNCATE WHENclausola è supportata solo per scD di tipo 1. Il tipo SCD 2 non supporta l'operazione di troncamento.Questa clausola è facoltativa.
SEQUENCE BYNome della colonna che specifica l'ordine logico degli eventi CDC nei dati di origine. L'elaborazione della pipeline usa questa sequenziazione per gestire gli eventi di modifica che arrivano non in ordine.
Se sono necessarie più colonne per l'ordinamento, usare un'espressione
STRUCT: ordinerà in base al primo campo dello struct, quindi al secondo campo in caso di parità e così via.Le colonne specificate devono essere tipi di dati ordinabili.
Questa clausola è obbligatoria.
COLUMNSSpecifica un subset di colonne da includere nella tabella di destinazione. È possibile:
- Specificare l'elenco completo di colonne da includere:
COLUMNS (userId, name, city). - Specificare un elenco di colonne da escludere:
COLUMNS * EXCEPT (operation, sequenceNum)
Questa clausola è facoltativa.
L'impostazione predefinita consiste nell'includere tutte le colonne nella tabella di destinazione quando la
COLUMNSclausola non è specificata.- Specificare l'elenco completo di colonne da includere:
STORED ASIndica se archiviare i record come scD di tipo 1 o SCD di tipo 2.
Questa clausola è facoltativa.
Il valore predefinito è SCD di tipo 1.
TRACK HISTORY ONSpecifica un subset di colonne di output per generare record di cronologia quando sono presenti modifiche a tali colonne specificate. È possibile:
- Specificare l'elenco completo delle colonne da tenere traccia:
COLUMNS (userId, name, city). - Specificare un elenco di colonne da escludere dal rilevamento:
COLUMNS * EXCEPT (operation, sequenceNum)
Questa clausola è facoltativa. L'impostazione predefinita consiste nel tenere traccia della cronologia per tutte le colonne di output quando sono presenti modifiche, equivalenti a
TRACK HISTORY ON *.- Specificare l'elenco completo delle colonne da tenere traccia:
Examples
-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
SEQUENCE BY sequenceNum
STORED AS SCD TYPE 1;
-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;
-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);