Condividi tramite


AUTO CDC INTO (pipeline)

Usare l'istruzione AUTO CDC ... INTO per creare un flusso che usa la funzionalità di Change Data Capture (CDC) di Lakeflow Spark Declarative Pipelines. Questa istruzione legge le modifiche da un'origine CDC e le applica a una destinazione di streaming.

Sintassi

CREATE OR REFRESH STREAMING TABLE table_name;

CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

I vincoli di qualità dei dati per la destinazione vengono definiti usando la stessa CONSTRAINT clausola delle altre query della pipeline. Vedi Gestisci la qualità dei dati con le aspettative della pipeline.

Il comportamento predefinito per gli eventi INSERT e UPDATE consiste nell'eseguire l'upsert degli eventi CDC dalla sorgente dati: aggiornare qualsiasi riga nella tabella di destinazione che corrisponda alle chiavi specificate o inserire una nuova riga quando un record corrispondente non esiste nella tabella di destinazione. La gestione degli DELETE eventi può essere specificata con la APPLY AS DELETE WHEN condizione .

Importante

È necessario dichiarare una tabella di streaming di destinazione in cui applicare le modifiche. Facoltativamente, è possibile specificare lo schema per la tabella di destinazione. Per le tabelle di tipo SCD 2, quando si specifica lo schema della tabella di destinazione, è necessario includere anche la colonna __START_AT e __END_AT con lo stesso tipo di dati del campo sequence_by.

Consulta le API AUTO CDC: semplificare la cattura dei dati modificati con le pipeline.

Parametri

  • flow_name

    Nome del flusso da creare.

  • source

    Origine dei dati. L'origine deve essere un'origine di streaming . Utilizzare la parola chiave STREAM per utilizzare la semantica di streaming per leggere dalla sorgente. Se la lettura rileva una modifica o un'eliminazione in un record esistente, viene generato un errore. È più sicuro leggere da fonti statiche o a solo aggiunta. Per inserire dati con commit delle modifiche, è possibile usare Python e l'opzione SkipChangeCommits per gestire gli errori.

    Per ulteriori informazioni sui dati di streaming, vedere Trasformare i dati con le pipeline.

  • KEYS

    Colonna o combinazione di colonne che identificano in modo univoco una riga nei dati di origine. I valori in queste colonne vengono usati per identificare gli eventi CDC applicabili a record specifici nella tabella di destinazione.

    Per definire una combinazione di colonne, usare un elenco delimitato da virgole di colonne.

    Questa clausola è obbligatoria.

  • IGNORE NULL UPDATES

    Consente l'inserimento di aggiornamenti contenenti un subset delle colonne di destinazione. Quando un evento CDC corrisponde a una riga esistente e viene specificato IGNORE NULL UPDATES, le colonne con un null valore manterranno i valori esistenti nella destinazione. Questo vale anche per le colonne annidate con un null valore .

    Questa clausola è facoltativa.

    L'impostazione predefinita consiste nel sovrascrivere le colonne esistenti con null valori.

  • APPLY AS DELETE WHEN

    Specifica quando un evento CDC deve essere considerato come un DELETE piuttosto che un upsert.

    Per le origini di tipo 2 SCD, per gestire i dati non ordinati, la riga eliminata viene mantenuta temporaneamente come tombstone nella tabella Delta sottostante e viene creata una vista nel metastore che filtra queste marcature. L'intervallo di conservazione può essere configurato con la pipelines.cdc.tombstoneGCThresholdInSecondsproprietà table.

    Questa clausola è facoltativa.

  • APPLY AS TRUNCATE WHEN

    Specifica quando un evento CDC deve essere considerato come una tabella TRUNCATEcompleta. Poiché questa clausola attiva un troncamento completo della tabella di destinazione, deve essere usata solo per casi d'uso specifici che richiedono questa funzionalità.

    La APPLY AS TRUNCATE WHEN clausola è supportata solo per scD di tipo 1. Il tipo SCD 2 non supporta l'operazione di troncamento.

    Questa clausola è facoltativa.

  • SEQUENCE BY

    Nome della colonna che specifica l'ordine logico degli eventi CDC nei dati di origine. L'elaborazione della pipeline usa questa sequenziazione per gestire gli eventi di modifica che arrivano non in ordine.

    Se sono necessarie più colonne per l'ordinamento, usare un'espressione STRUCT: ordinerà in base al primo campo dello struct, quindi al secondo campo in caso di parità e così via.

    Le colonne specificate devono essere tipi di dati ordinabili.

    Questa clausola è obbligatoria.

  • COLUMNS

    Specifica un subset di colonne da includere nella tabella di destinazione. È possibile:

    • Specificare l'elenco completo di colonne da includere: COLUMNS (userId, name, city).
    • Specificare un elenco di colonne da escludere: COLUMNS * EXCEPT (operation, sequenceNum)

    Questa clausola è facoltativa.

    L'impostazione predefinita consiste nell'includere tutte le colonne nella tabella di destinazione quando la COLUMNS clausola non è specificata.

  • STORED AS

    Indica se archiviare i record come scD di tipo 1 o SCD di tipo 2.

    Questa clausola è facoltativa.

    Il valore predefinito è SCD di tipo 1.

  • TRACK HISTORY ON

    Specifica un subset di colonne di output per generare record di cronologia quando sono presenti modifiche a tali colonne specificate. È possibile:

    • Specificare l'elenco completo delle colonne da tenere traccia: COLUMNS (userId, name, city).
    • Specificare un elenco di colonne da escludere dal rilevamento: COLUMNS * EXCEPT (operation, sequenceNum)

    Questa clausola è facoltativa. L'impostazione predefinita consiste nel tenere traccia della cronologia per tutte le colonne di output quando sono presenti modifiche, equivalenti a TRACK HISTORY ON *.

Esempi

-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flow
AS AUTO CDC INTO
  target
FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);