Condividi tramite


create_auto_cdc_flow

La funzione create_auto_cdc_flow() crea un flusso che utilizza la funzionalità di Change Data Capture (CDC) delle Lakeflow Spark Declarative Pipelines per elaborare i dati di origine da un feed di dati dei cambiamenti (CDF).

Annotazioni

Questa funzione sostituisce la funzione apply_changes()precedente. Le due funzioni hanno la stessa firma. Databricks consiglia di eseguire l'aggiornamento per usare il nuovo nome.

Importante

È necessario dichiarare una tabella di streaming di destinazione in cui applicare le modifiche. Facoltativamente, è possibile specificare lo schema per la tabella di destinazione. Quando si specifica lo schema della create_auto_cdc_flow() tabella di destinazione, è necessario includere le __START_AT colonne e __END_AT con lo stesso tipo di dati dei sequence_by campi.

Per creare la tabella di destinazione necessaria, è possibile usare la funzione create_streaming_table() nell'interfaccia Python della pipeline.

Sintassi

from pyspark import pipelines as dp

dp.create_auto_cdc_flow(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = <bool>,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None,
  name = None,
  once = <bool>
)

Per create_auto_cdc_flow l'elaborazione, il comportamento predefinito per INSERT e UPDATE gli eventi consiste nell'aggiornare o inserire gli eventi CDC dall'origine: aggiornare tutte le righe nella tabella di destinazione che corrispondono alle chiavi specificate o inserire una nuova riga quando un record corrispondente non esiste nella tabella di destinazione. La gestione degli DELETE eventi può essere specificata con il apply_as_deletes parametro .

Per ulteriori informazioni sull'elaborazione CDC con un feed di modifiche, vedere Le API AUTO CDC: Per semplificare la Change Data Capture con le pipeline. Per un esempio di utilizzo della create_auto_cdc_flow() funzione, vedere Esempio: elaborazione scD di tipo 1 e SCD di tipo 2 con dati di origine CDF.

Parametri

Parametro TIPO Description
target str Obbligatorio. Nome della tabella da aggiornare. È possibile usare la funzione create_streaming_table() per creare la tabella di destinazione prima di eseguire la create_auto_cdc_flow() funzione.
source str Obbligatorio. Fonte di dati contenente i record CDC.
keys list Obbligatorio. Colonna o combinazione di colonne che identificano in modo univoco una riga nei dati di origine. Viene usato per identificare quali eventi CDC si applicano a record specifici nella tabella di destinazione. È possibile specificare uno dei due valori seguenti:
  • Elenco di stringhe: ["userId", "orderId"]
  • Elenco di funzioni SPARK SQL col() : [col("userId"), col("orderId")]. Gli argomenti delle col() funzioni non possono includere qualificatori. Ad esempio, è possibile usare col(userId), ma non è possibile usare col(source.userId).
sequence_by str, col() o struct() Obbligatorio. Nomi di colonna che specificano l'ordine logico degli eventi CDC nei dati di origine. Le pipeline dichiarative di Lakeflow Spark usano questa sequenziazione per gestire gli eventi di modifica che arrivano non in ordine. La colonna specificata deve essere un tipo di dati ordinabile. È possibile specificare uno dei due valori seguenti:
  • Stringa: "sequenceNum"
  • Funzione SPARK SQL col() : col("sequenceNum"). Gli argomenti delle col() funzioni non possono includere qualificatori. Ad esempio, è possibile usare col(userId), ma non è possibile usare col(source.userId).
  • Una struct() combinando più colonne per risolvere i conflitti: struct("timestamp_col", "id_col"), ordinerà prima in base al primo campo struct, quindi al secondo campo se c'è un pareggio, e così via.
ignore_null_updates bool Consentire l'inserimento di aggiornamenti contenenti un subset delle colonne di destinazione. Quando un evento CDC corrisponde a una riga esistente e ignore_null_updates è True, le colonne con un null mantengono i valori esistenti nella destinazione. Questo vale anche per le colonne nidificate con il valore null. Quando ignore_null_updates è False, i valori esistenti vengono sovrascritti con null valori.
Il valore predefinito è False.
apply_as_deletes str o expr() Specifica quando un evento CDC deve essere considerato come un DELETE piuttosto che un upsert. È possibile specificare uno dei due valori seguenti:
  • Stringa: "Operation = 'DELETE'"
  • Una funzione Spark SQL expr() : expr("Operation = 'DELETE'")

Per gestire i dati non ordinati, la riga eliminata viene temporaneamente mantenuta come segnaposto nella tabella Delta sottostante, e viene creata una vista nel metastore che filtra questi segnaposto. L'intervallo di conservazione viene impostato per impostazione predefinita su due giorni e può essere configurato con la pipelines.cdc.tombstoneGCThresholdInSeconds proprietà table.
apply_as_truncates str o expr() Specifica quando un evento CDC deve essere considerato come una tabella TRUNCATEcompleta. È possibile specificare uno dei due valori seguenti:
  • Stringa: "Operation = 'TRUNCATE'"
  • Una funzione Spark SQL expr() : expr("Operation = 'TRUNCATE'")

Poiché questa clausola attiva un troncamento completo della tabella di destinazione, deve essere usata solo per casi d'uso specifici che richiedono questa funzionalità. Il apply_as_truncates parametro è supportato solo per scD di tipo 1. Il tipo SCD 2 non supporta le operazioni di troncamento.
column_list o except_column_list list Subset di colonne da includere nella tabella di destinazione. Utilizzare column_list per specificare l'elenco completo di colonne da includere. Utilizzare except_column_list per specificare le colonne da escludere. È possibile dichiarare un valore come elenco di stringhe o come funzioni SPARK SQL col() :
  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

Gli argomenti delle col() funzioni non possono includere qualificatori. Ad esempio, è possibile usare col(userId), ma non è possibile usare col(source.userId). L'impostazione predefinita è includere tutte le colonne nella tabella di destinazione quando alla funzione non vengono passati gli argomenti column_list o except_column_list.
stored_as_scd_type str o int Indica se archiviare i record come scD di tipo 1 o SCD di tipo 2. Impostare su 1 per SCD di tipo 1 o 2 per SCD di tipo 2. Il valore predefinito è SCD di tipo 1.
track_history_column_list o track_history_except_column_list list Un subset di colonne di output da tracciare per la cronologia nella tabella di destinazione. Utilizzare track_history_column_list per specificare l'elenco completo di colonne da tenere traccia. Utilizzare track_history_except_column_list per specificare le colonne da escludere dal rilevamento. È possibile dichiarare un valore come elenco di stringhe o come funzioni SPARK SQL col() :
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Gli argomenti delle col() funzioni non possono includere qualificatori. Ad esempio, è possibile usare col(userId), ma non è possibile usare col(source.userId). L'impostazione predefinita è includere tutte le colonne nella tabella di destinazione quando alla funzione non vengono passati gli argomenti track_history_column_list o track_history_except_column_list.
name str Nome del flusso. Se non specificato, per impostazione predefinita viene usato lo stesso valore di target.
once bool Si può facoltativamente definire il flusso come un flusso monouso, ad esempio un riempimento retroattivo. L'uso di once=True modifica il flusso in due modi:
  • Il valore di ritorno. streaming-query. deve essere un dataframe batch in questo caso, non un dataframe di streaming.
  • Il flusso viene eseguito una sola volta per impostazione predefinita. Se la pipeline viene aggiornata con un aggiornamento completo, il ONCE flusso viene eseguito di nuovo per ricreare i dati.