Condividi tramite


CREATE STREAMING TABLE ... FLOW AUTO CDC

Si applica a:segno di spunta sì Databricks SQL

Importante

Questa funzionalità è in versione beta. Richiede Databricks Runtime 17.3 e versioni successive.

Usare la FLOW AUTO CDC clausola con CREATE STREAMING TABLE per elaborare i record CDC (Change Data Capture) da un'origine in una tabella di streaming.

In precedenza, l'istruzione MERGE INTO era comunemente usata per l'elaborazione dei record CDC in Azure Databricks. Tuttavia, MERGE INTO può produrre risultati non corretti a causa di record out-of-sequence o richiede logica complessa per riordinare i record.

AUTO CDC semplifica CDC gestendo automaticamente i record non in ordine. Specificare le chiavi per identificare i record, una colonna di sequenza per l'ordinamento e se archiviare i risultati come scD di tipo 1 (aggiornamenti diretti) o scD tipo 2 (rilevamento cronologia).

Sintassi

CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Il comportamento predefinito per INSERT gli eventi e UPDATE consiste nell'eseguire l'upsert degli eventi CDC dall'origine: aggiornare tutte le righe nella tabella di destinazione che corrispondono alle chiavi specificate o inserire una nuova riga quando un record corrispondente non esiste nella tabella di destinazione. La gestione degli DELETE eventi può essere specificata con la APPLY AS DELETE WHEN condizione .

Parametri

  • source

    Origine dei dati. L'origine deve essere un'origine di streaming. Utilizzare la parola chiave STREAM per applicare la semantica di streaming, leggendo dalla sorgente. Se la lettura rileva una modifica o un'eliminazione in un record esistente, viene generato un errore. È più sicuro leggere da fonti statiche o a solo aggiunta.

    Per ulteriori informazioni sui dati di streaming, vedere Trasformare i dati con le pipeline.

  • KEYS

    Colonna o combinazione di colonne che identificano in modo univoco una riga nei dati di origine. I valori in queste colonne vengono usati per identificare gli eventi CDC applicabili a record specifici nella tabella di destinazione.

    Per definire una combinazione di colonne, usare un elenco delimitato da virgole di colonne.

    Questa clausola è obbligatoria.

  • IGNORE NULL UPDATES

    Consente l'inserimento di aggiornamenti contenenti un subset delle colonne di destinazione. Quando un evento CDC corrisponde a una riga esistente e IGNORE NULL UPDATES viene specificato, le colonne con un null valore mantengono i valori esistenti nella destinazione. Questo vale anche per le colonne annidate con un null valore .

    Questa clausola è facoltativa.

    L'impostazione predefinita consiste nel sovrascrivere le colonne esistenti con null valori.

  • APPLY AS DELETE WHEN

    Specifica quando un evento CDC deve essere considerato come un DELETE piuttosto che un upsert.

    Per le origini di tipo 2 SCD, per gestire i dati non ordinati, la riga eliminata viene mantenuta temporaneamente come tombstone nella tabella Delta sottostante e viene creata una vista nel metastore che filtra queste marcature. L'intervallo di conservazione può essere configurato con la pipelines.cdc.tombstoneGCThresholdInSecondsproprietà table.

    Questa clausola è facoltativa.

  • APPLY AS TRUNCATE WHEN

    Specifica quando un evento CDC deve essere considerato come una tabella TRUNCATEcompleta. Poiché questa clausola attiva un troncamento completo della tabella di destinazione, deve essere usata solo per casi d'uso specifici che richiedono questa funzionalità.

    La APPLY AS TRUNCATE WHEN clausola è supportata solo per scD di tipo 1. Il tipo SCD 2 non supporta l'operazione di troncamento.

    Questa clausola è facoltativa.

  • SEQUENCE BY

    Nome della colonna che specifica l'ordine logico degli eventi CDC nei dati di origine. L'elaborazione della pipeline usa questa sequenziazione per gestire gli eventi di modifica che arrivano non in ordine.

    Se sono necessarie più colonne per l'ordinamento, usare un'espressione STRUCT: ordinerà in base al primo campo dello struct, quindi al secondo campo in caso di parità e così via.

    Le colonne specificate devono essere tipi di dati ordinabili.

    Questa clausola è obbligatoria.

  • COLUMNS

    Specifica un subset di colonne da includere nella tabella di destinazione. È possibile:

    • Specificare l'elenco completo di colonne da includere: COLUMNS (userId, name, city).
    • Specificare un elenco di colonne da escludere: COLUMNS * EXCEPT (operation, sequenceNum)

    Questa clausola è facoltativa.

    L'impostazione predefinita consiste nell'includere tutte le colonne nella tabella di destinazione quando la COLUMNS clausola non è specificata.

  • STORED AS

    Indica se archiviare i record come scD di tipo 1 o SCD di tipo 2.

    Questa clausola è facoltativa.

    Il valore predefinito è SCD di tipo 1.

  • TRACK HISTORY ON

    Specifica un subset di colonne di output per generare record di cronologia quando sono presenti modifiche a tali colonne specificate. È possibile:

    • Specificare l'elenco completo delle colonne da tenere traccia: COLUMNS (userId, name, city).
    • Specificare un elenco di colonne da escludere dal rilevamento: COLUMNS * EXCEPT (operation, sequenceNum)

    Questa clausola è facoltativa. L'impostazione predefinita consiste nel tenere traccia della cronologia per tutte le colonne di output quando sono presenti modifiche, equivalenti a TRACK HISTORY ON *.

Examples

-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  SEQUENCE BY sequenceNum
  STORED AS SCD TYPE 1;

-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2;

-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);