Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
La funzione create_auto_cdc_flow() crea un flusso che utilizza la funzionalità di Change Data Capture (CDC) delle Lakeflow Spark Declarative Pipelines per elaborare i dati di origine da un feed di dati dei cambiamenti (CDF).
Annotazioni
Questa funzione sostituisce la funzione apply_changes()precedente. Le due funzioni hanno la stessa firma. Databricks consiglia di eseguire l'aggiornamento per usare il nuovo nome.
Importante
È necessario dichiarare una tabella di streaming di destinazione in cui applicare le modifiche. Facoltativamente, è possibile specificare lo schema per la tabella di destinazione. Quando si specifica lo schema della create_auto_cdc_flow() tabella di destinazione, è necessario includere le __START_AT colonne e __END_AT con lo stesso tipo di dati dei sequence_by campi.
Per creare la tabella di destinazione necessaria, è possibile usare la funzione create_streaming_table() nell'interfaccia Python della pipeline.
Sintassi
from pyspark import pipelines as dp
dp.create_auto_cdc_flow(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = <bool>,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None,
name = None,
once = <bool>
)
Per create_auto_cdc_flow l'elaborazione, il comportamento predefinito per INSERT e UPDATE gli eventi consiste nell'aggiornare o inserire gli eventi CDC dall'origine: aggiornare tutte le righe nella tabella di destinazione che corrispondono alle chiavi specificate o inserire una nuova riga quando un record corrispondente non esiste nella tabella di destinazione. La gestione degli DELETE eventi può essere specificata con il apply_as_deletes parametro .
Per ulteriori informazioni sull'elaborazione CDC con un feed di modifiche, vedere Le API AUTO CDC: Per semplificare la Change Data Capture con le pipeline. Per un esempio di utilizzo della create_auto_cdc_flow() funzione, vedere Esempio: elaborazione scD di tipo 1 e SCD di tipo 2 con dati di origine CDF.
Parametri
| Parametro | TIPO | Description |
|---|---|---|
target |
str |
Obbligatorio. Nome della tabella da aggiornare. È possibile usare la funzione create_streaming_table() per creare la tabella di destinazione prima di eseguire la create_auto_cdc_flow() funzione. |
source |
str |
Obbligatorio. Fonte di dati contenente i record CDC. |
keys |
list |
Obbligatorio. Colonna o combinazione di colonne che identificano in modo univoco una riga nei dati di origine. Viene usato per identificare quali eventi CDC si applicano a record specifici nella tabella di destinazione. È possibile specificare uno dei due valori seguenti:
|
sequence_by |
str, col() o struct() |
Obbligatorio. Nomi di colonna che specificano l'ordine logico degli eventi CDC nei dati di origine. Le pipeline dichiarative di Lakeflow Spark usano questa sequenziazione per gestire gli eventi di modifica che arrivano non in ordine. La colonna specificata deve essere un tipo di dati ordinabile. È possibile specificare uno dei due valori seguenti:
|
ignore_null_updates |
bool |
Consentire l'inserimento di aggiornamenti contenenti un subset delle colonne di destinazione. Quando un evento CDC corrisponde a una riga esistente e ignore_null_updates è True, le colonne con un null mantengono i valori esistenti nella destinazione. Questo vale anche per le colonne nidificate con il valore null. Quando ignore_null_updates è False, i valori esistenti vengono sovrascritti con null valori.Il valore predefinito è False. |
apply_as_deletes |
str o expr() |
Specifica quando un evento CDC deve essere considerato come un DELETE piuttosto che un upsert. È possibile specificare uno dei due valori seguenti:
Per gestire i dati non ordinati, la riga eliminata viene temporaneamente mantenuta come segnaposto nella tabella Delta sottostante, e viene creata una vista nel metastore che filtra questi segnaposto. L'intervallo di conservazione viene impostato per impostazione predefinita su due giorni e può essere configurato con la pipelines.cdc.tombstoneGCThresholdInSeconds proprietà table. |
apply_as_truncates |
str o expr() |
Specifica quando un evento CDC deve essere considerato come una tabella TRUNCATEcompleta. È possibile specificare uno dei due valori seguenti:
Poiché questa clausola attiva un troncamento completo della tabella di destinazione, deve essere usata solo per casi d'uso specifici che richiedono questa funzionalità. Il apply_as_truncates parametro è supportato solo per scD di tipo 1. Il tipo SCD 2 non supporta le operazioni di troncamento. |
column_list o except_column_list |
list |
Subset di colonne da includere nella tabella di destinazione. Utilizzare column_list per specificare l'elenco completo di colonne da includere. Utilizzare except_column_list per specificare le colonne da escludere. È possibile dichiarare un valore come elenco di stringhe o come funzioni SPARK SQL col() :
Gli argomenti delle col() funzioni non possono includere qualificatori. Ad esempio, è possibile usare col(userId), ma non è possibile usare col(source.userId). L'impostazione predefinita è includere tutte le colonne nella tabella di destinazione quando alla funzione non vengono passati gli argomenti column_list o except_column_list. |
stored_as_scd_type |
str o int |
Indica se archiviare i record come scD di tipo 1 o SCD di tipo 2. Impostare su 1 per SCD di tipo 1 o 2 per SCD di tipo 2. Il valore predefinito è SCD di tipo 1. |
track_history_column_list o track_history_except_column_list |
list |
Un subset di colonne di output da tracciare per la cronologia nella tabella di destinazione. Utilizzare track_history_column_list per specificare l'elenco completo di colonne da tenere traccia. Utilizzare track_history_except_column_list per specificare le colonne da escludere dal rilevamento. È possibile dichiarare un valore come elenco di stringhe o come funzioni SPARK SQL col() :
Gli argomenti delle col() funzioni non possono includere qualificatori. Ad esempio, è possibile usare col(userId), ma non è possibile usare col(source.userId). L'impostazione predefinita è includere tutte le colonne nella tabella di destinazione quando alla funzione non vengono passati gli argomenti track_history_column_list o track_history_except_column_list. |
name |
str |
Nome del flusso. Se non specificato, per impostazione predefinita viene usato lo stesso valore di target. |
once |
bool |
Si può facoltativamente definire il flusso come un flusso monouso, ad esempio un riempimento retroattivo. L'uso di once=True modifica il flusso in due modi:
|