Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Usare l'istruzione AUTO CDC ... INTO per creare un flusso che usa la funzionalità di Change Data Capture (CDC) di Lakeflow Spark Declarative Pipelines. Questa istruzione legge le modifiche da un'origine CDC e le applica a una destinazione di streaming.
- Per informazioni su CDC, vedere Che cos'è Change Data Capture (CDC)?.
- Per ulteriori dettagli sull'uso di
AUTO CDC, consulta le API AUTO CDC: semplificare il Change Data Capture con le pipeline. - Per altri dettagli su
CREATE FLOW, vedere CREATE FLOW (pipelines).
Sintassi
CREATE OR REFRESH STREAMING TABLE table_name;
CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
I vincoli di qualità dei dati per la destinazione vengono definiti usando la stessa CONSTRAINT clausola delle altre query della pipeline. Vedi Gestisci la qualità dei dati con le aspettative della pipeline.
Il comportamento predefinito per gli eventi INSERT e UPDATE consiste nell'eseguire l'upsert degli eventi CDC dalla sorgente dati: aggiornare qualsiasi riga nella tabella di destinazione che corrisponda alle chiavi specificate o inserire una nuova riga quando un record corrispondente non esiste nella tabella di destinazione. La gestione degli DELETE eventi può essere specificata con la APPLY AS DELETE WHEN condizione .
Importante
È necessario dichiarare una tabella di streaming di destinazione in cui applicare le modifiche. Facoltativamente, è possibile specificare lo schema per la tabella di destinazione. Per le tabelle di tipo SCD 2, quando si specifica lo schema della tabella di destinazione, è necessario includere anche la colonna __START_AT e __END_AT con lo stesso tipo di dati del campo sequence_by.
Consulta le API AUTO CDC: semplificare la cattura dei dati modificati con le pipeline.
Parametri
flow_nameNome del flusso da creare.
sourceOrigine dei dati. L'origine deve essere un'origine di streaming . Utilizzare la parola chiave STREAM per utilizzare la semantica di streaming per leggere dalla sorgente. Se la lettura rileva una modifica o un'eliminazione in un record esistente, viene generato un errore. È più sicuro leggere da fonti statiche o a solo aggiunta. Per inserire dati con commit delle modifiche, è possibile usare Python e l'opzione
SkipChangeCommitsper gestire gli errori.Per ulteriori informazioni sui dati di streaming, vedere Trasformare i dati con le pipeline.
KEYSColonna o combinazione di colonne che identificano in modo univoco una riga nei dati di origine. I valori in queste colonne vengono usati per identificare gli eventi CDC applicabili a record specifici nella tabella di destinazione.
Per definire una combinazione di colonne, usare un elenco delimitato da virgole di colonne.
Questa clausola è obbligatoria.
IGNORE NULL UPDATESConsente l'inserimento di aggiornamenti contenenti un subset delle colonne di destinazione. Quando un evento CDC corrisponde a una riga esistente e viene specificato IGNORE NULL UPDATES, le colonne con un
nullvalore manterranno i valori esistenti nella destinazione. Questo vale anche per le colonne annidate con unnullvalore .Questa clausola è facoltativa.
L'impostazione predefinita consiste nel sovrascrivere le colonne esistenti con
nullvalori.APPLY AS DELETE WHENSpecifica quando un evento CDC deve essere considerato come un
DELETEpiuttosto che un upsert.Per le origini di tipo 2 SCD, per gestire i dati non ordinati, la riga eliminata viene mantenuta temporaneamente come tombstone nella tabella Delta sottostante e viene creata una vista nel metastore che filtra queste marcature. L'intervallo di conservazione può essere configurato con la
pipelines.cdc.tombstoneGCThresholdInSecondsproprietà table.Questa clausola è facoltativa.
APPLY AS TRUNCATE WHENSpecifica quando un evento CDC deve essere considerato come una tabella
TRUNCATEcompleta. Poiché questa clausola attiva un troncamento completo della tabella di destinazione, deve essere usata solo per casi d'uso specifici che richiedono questa funzionalità.La
APPLY AS TRUNCATE WHENclausola è supportata solo per scD di tipo 1. Il tipo SCD 2 non supporta l'operazione di troncamento.Questa clausola è facoltativa.
SEQUENCE BYNome della colonna che specifica l'ordine logico degli eventi CDC nei dati di origine. L'elaborazione della pipeline usa questa sequenziazione per gestire gli eventi di modifica che arrivano non in ordine.
Se sono necessarie più colonne per l'ordinamento, usare un'espressione
STRUCT: ordinerà in base al primo campo dello struct, quindi al secondo campo in caso di parità e così via.Le colonne specificate devono essere tipi di dati ordinabili.
Questa clausola è obbligatoria.
COLUMNSSpecifica un subset di colonne da includere nella tabella di destinazione. È possibile:
- Specificare l'elenco completo di colonne da includere:
COLUMNS (userId, name, city). - Specificare un elenco di colonne da escludere:
COLUMNS * EXCEPT (operation, sequenceNum)
Questa clausola è facoltativa.
L'impostazione predefinita consiste nell'includere tutte le colonne nella tabella di destinazione quando la
COLUMNSclausola non è specificata.- Specificare l'elenco completo di colonne da includere:
STORED ASIndica se archiviare i record come scD di tipo 1 o SCD di tipo 2.
Questa clausola è facoltativa.
Il valore predefinito è SCD di tipo 1.
TRACK HISTORY ONSpecifica un subset di colonne di output per generare record di cronologia quando sono presenti modifiche a tali colonne specificate. È possibile:
- Specificare l'elenco completo delle colonne da tenere traccia:
COLUMNS (userId, name, city). - Specificare un elenco di colonne da escludere dal rilevamento:
COLUMNS * EXCEPT (operation, sequenceNum)
Questa clausola è facoltativa. L'impostazione predefinita consiste nel tenere traccia della cronologia per tutte le colonne di output quando sono presenti modifiche, equivalenti a
TRACK HISTORY ON *.- Specificare l'elenco completo delle colonne da tenere traccia:
Esempi
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);