Condividi tramite


create_auto_cdc_from_snapshot_flow

Importante

Questa funzionalità è disponibile in anteprima pubblica.

La create_auto_cdc_from_snapshot_flow funzione crea un flusso che usa la funzionalità di Change Data Capture (CDC) di Lakeflow Spark per elaborare i dati di origine dagli snapshot del database. Vedere Come viene implementato CDC con l'APIAUTO CDC FROM SNAPSHOT?

Annotazioni

Questa funzione sostituisce la funzione apply_changes_from_snapshot()precedente. Le due funzioni hanno la stessa firma. Databricks consiglia di eseguire l'aggiornamento per usare il nuovo nome.

Importante

Per questa operazione è necessario disporre di una tabella di streaming di destinazione.

Per creare la tabella di destinazione richiesta, è possibile usare la funzione create_streaming_table().

Sintassi

from pyspark import pipelines as dp

dp.create_auto_cdc_from_snapshot_flow(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Annotazioni

Per AUTO CDC FROM SNAPSHOT l'elaborazione, il comportamento predefinito consiste nell'inserire una nuova riga quando un record corrispondente con le stesse chiavi non esiste nella destinazione. Se esiste un record corrispondente, viene aggiornato solo se uno dei valori nella riga è stato modificato. Le righe con chiavi presenti nella destinazione ma non più presenti nell'origine vengono eliminate.

Per ulteriori informazioni sull'elaborazione di CDC con gli snapshot, vedere Le AUTO CDC API: semplifica il Change Data Capture con le pipeline. Per esempi di utilizzo della create_auto_cdc_from_snapshot_flow() funzione, vedere gli esempi periodici di inserimento di snapshot e inserimento di snapshot cronologici .

Parametri

Parametro TIPO Description
target str Obbligatorio. Nome della tabella da aggiornare. È possibile usare la funzione create_streaming_table() per creare la tabella di destinazione prima di eseguire la create_auto_cdc_from_snapshot_flow() funzione.
source str o lambda function Obbligatorio. Il nome di una tabella o di una vista da snapshotare periodicamente oppure una funzione lambda Python che restituisce il DataFrame dello snapshot e la sua versione da elaborare. Consulta Implementazione dell'argomento source.
keys list Obbligatorio. Colonna o combinazione di colonne che identificano in modo univoco una riga nei dati di origine. Viene usato per identificare quali eventi CDC si applicano a record specifici nella tabella di destinazione. È possibile specificare uno dei due valori seguenti:
  • Elenco di stringhe: ["userId", "orderId"]
  • Elenco di funzioni SPARK SQL col() : [col("userId"), col("orderId"]. Gli argomenti delle col() funzioni non possono includere qualificatori. Ad esempio, è possibile usare col(userId), ma non è possibile usare col(source.userId).
stored_as_scd_type str o int Indica se archiviare i record come scD di tipo 1 o SCD di tipo 2. Impostare su 1 per SCD di tipo 1 o 2 per SCD di tipo 2. Il valore predefinito è SCD di tipo 1.
track_history_column_list o track_history_except_column_list list Un subset di colonne di output da tracciare per la cronologia nella tabella di destinazione. Utilizzare track_history_column_list per specificare l'elenco completo di colonne da tenere traccia. Utilizzare track_history_except_column_list per specificare le colonne da escludere dal rilevamento. È possibile dichiarare un valore come elenco di stringhe o come funzioni SPARK SQL col() :
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Gli argomenti delle col() funzioni non possono includere qualificatori. Ad esempio, è possibile usare col(userId), ma non è possibile usare col(source.userId). L'impostazione predefinita è includere tutte le colonne nella tabella di destinazione quando la funzione non riceve alcun argomento track_history_column_list o track_history_except_column_list.

Implementare l'argomento source

La create_auto_cdc_from_snapshot_flow() funzione include l'argomento source . Per l'elaborazione degli snapshot cronologici, si prevede che l'argomento source sia una funzione lambda Python che restituisce due valori alla create_auto_cdc_from_snapshot_flow() funzione: un dataframe Python contenente i dati dello snapshot da elaborare e una versione snapshot.

Di seguito è riportata la firma della funzione lambda:

lambda Any => Optional[(DataFrame, Any)]
  • L'argomento della funzione lambda è la versione snapshot elaborata più di recente.
  • Il valore restituito della funzione lambda è None o una tupla di due valori: il primo valore della tupla è un dataframe contenente lo snapshot da elaborare. Il secondo valore della tupla è la versione dello snapshot che rappresenta l'ordine logico dello snapshot.

Esempio che implementa e chiama la funzione lambda:

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

create_auto_cdc_from_snapshot_flow(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Il runtime di Pipeline dichiarative di Lakeflow Spark esegue i passaggi seguenti ogni volta che viene attivata la pipeline che contiene la create_auto_cdc_from_snapshot_flow() funzione:

  1. Esegue la next_snapshot_and_version funzione per caricare il dataframe dello snapshot successivo e la versione dello snapshot corrispondente.
  2. Se non viene restituito alcun dataframe, l'esecuzione viene terminata e l'aggiornamento della pipeline viene contrassegnato come completo.
  3. Rileva le modifiche nel nuovo snapshot e le applica in modo incrementale alla tabella di destinazione.
  4. Torna al passaggio 1 per caricare lo snapshot successivo e la relativa versione.