Freigeben über


create_auto_cdc_flow

Die create_auto_cdc_flow()-Funktion erstellt einen Datenfluss, der Lakeflow Spark Declarative Pipelines Change Data Capture (CDC)-Funktionalität verwendet, um Änderungsdaten aus einem Änderungsdatenfeed (CDF) zu verarbeiten.

Hinweis

Diese Funktion ersetzt die vorherige Funktion apply_changes(). Die beiden Funktionen weisen dieselbe Signatur auf. Databricks empfiehlt eine Aktualisierung, um den neuen Namen zu verwenden.

Von Bedeutung

Sie müssen eine Zielstreamingtabelle deklarieren, um Änderungen anzuwenden. Optional können Sie das Schema für Die Zieltabelle angeben. Wenn Sie das Schema der create_auto_cdc_flow() Zieltabelle angeben, müssen Sie die und __START_AT die __END_AT Spalten mit demselben Datentyp wie die sequence_by Felder einschließen.

Zum Erstellen der erforderlichen Zieltabelle können Sie die funktion create_streaming_table() in der Pipeline-Python-Schnittstelle verwenden.

Syntax

from pyspark import pipelines as dp

dp.create_auto_cdc_flow(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = <bool>,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None,
  name = None,
  once = <bool>
)

Bei der create_auto_cdc_flow Verarbeitung besteht das Standardverhalten für INSERT und UPDATE Ereignisse darin, CDC-Ereignisse aus der Quelle einzufügen oder zu aktualisieren: Aktualisieren Sie Zeilen in der Zieltabelle, die mit den angegebenen Schlüsseln übereinstimmen, oder fügen Sie eine neue Zeile ein, wenn kein entsprechender Datensatz in der Zieltabelle vorhanden ist. Die Behandlung für DELETE Ereignisse kann mit dem apply_as_deletes Parameter angegeben werden.

Weitere Informationen zur CDC-Verarbeitung mit einem Änderungsfeed finden Sie in den AUTO CDC-APIs: Vereinfachen der Änderungsdatenerfassung mit Pipelines. Ein Beispiel für die Verwendung der create_auto_cdc_flow() Funktion finden Sie unter Beispiel: SCD-Typ 1 und SCD-Typ 2-Verarbeitung mit CDF-Quelldaten.

Die Parameter

Parameter Typ Description
target str Erforderlich. Der Name der zu aktualisierenden Tabelle. Sie können die create_streaming_table() -Funktion verwenden, um die Zieltabelle zu erstellen, bevor Sie die create_auto_cdc_flow() Funktion ausführen.
source str Erforderlich. Die Datenquelle, die CDC-Einträge enthält.
keys list Erforderlich. Die Spalte oder Kombination von Spalten, die eine Zeile in den Quelldaten eindeutig identifizieren. Dies wird verwendet, um zu identifizieren, welche CDC-Ereignisse für bestimmte Datensätze in der Zieltabelle gelten. Sie können eine der folgenden Optionen angeben:
  • Eine Liste der Zeichenfolgen: ["userId", "orderId"]
  • Eine Liste der Spark SQL-Funktionen col() : [col("userId"), col("orderId")]. Argumente für col() Funktionen können keine Qualifizierer enthalten. Sie können z. B. verwenden col(userId), aber nicht verwenden col(source.userId).
sequence_by str, col() oder struct() Erforderlich. Die Spaltennamen, die die logische Reihenfolge von CDC-Ereignissen in den Quelldaten angeben. Lakeflow Spark Declarative Pipelines verwendet diese Sequenzierung, um Änderungsereignisse zu behandeln, die außerhalb der Reihenfolge eingehen. Die angegebene Spalte muss ein sortierbarer Datentyp sein. Sie können eine der folgenden Optionen angeben:
  • Eine Zeichenfolge: "sequenceNum"
  • Eine Spark SQL-Funktion col() : col("sequenceNum"). Argumente für col() Funktionen können keine Qualifizierer enthalten. Sie können z. B. verwenden col(userId), aber nicht verwenden col(source.userId).
  • Eine struct() Kombination mehrerer Spalten zum Unterbrechen von Bindungen: struct("timestamp_col", "id_col"), sie sortiert zuerst nach dem ersten Strukturfeld, dann nach dem zweiten Feld, wenn eine Bindung vorhanden ist usw.
ignore_null_updates bool Zulassen, dass Aktualisierungen aufgenommen werden, die eine Teilmenge der Zielspalten enthalten. Wenn ein CDC-Ereignis mit einer vorhandenen Zeile übereinstimmt und ignore_null_updates ist, behalten Spalten mit einem True ihre vorhandenen Werte im Ziel bei, wenn null gilt. Dies gilt auch für geschachtelte Spalten mit einem Wert von null. Wenn ignore_null_updates der Fall ist, werden vorhandene Werte mit False überschrieben.
Der Standardwert lautet False.
apply_as_deletes str oder expr() Gibt an, wann ein CDC-Ereignis als DELETE und nicht als Upsert behandelt werden soll. Sie können eine der folgenden Optionen angeben:
  • Eine Zeichenfolge: "Operation = 'DELETE'"
  • Eine Spark SQL-Funktion expr() : expr("Operation = 'DELETE'")

Um out-of-order-Daten zu verarbeiten, wird die gelöschte Zeile vorübergehend als Grabstein in der zugrunde liegenden Delta-Tabelle beibehalten, und eine Ansicht wird im Metastore erstellt, der diese Grabsteine herausfiltert. Das Aufbewahrungsintervall ist standardmäßig auf zwei Tage festgelegt und kann mit der pipelines.cdc.tombstoneGCThresholdInSeconds Tabelleneigenschaft konfiguriert werden.
apply_as_truncates str oder expr() Gibt an, wann ein CDC-Ereignis als vollständige Tabelle TRUNCATEbehandelt werden soll. Sie können eine der folgenden Optionen angeben:
  • Eine Zeichenfolge: "Operation = 'TRUNCATE'"
  • Eine Spark SQL-Funktion expr() : expr("Operation = 'TRUNCATE'")

Da diese Klausel eine vollständige Abkürzung der Zieltabelle auslöst, sollte sie nur für bestimmte Anwendungsfälle verwendet werden, die diese Funktionalität erfordern. Der apply_as_truncates Parameter wird nur für SCD-Typ 1 unterstützt. SCD-Typ 2 unterstützt keine Abkürzungsvorgänge.
column_list oder except_column_list list Eine Teilmenge von Spalten, die in die Zieltabelle aufgenommen werden sollen. Verwenden Sie column_list, um die vollständige Liste der einzuschließenden Spalten anzugeben. Verwenden Sie except_column_list, um die auszuschließenden Spalten anzugeben. Sie können entweder einen Wert als Liste von Zeichenfolgen oder als Spark SQL-Funktionen col() deklarieren:
  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

Argumente für col() Funktionen können keine Qualifizierer enthalten. Sie können z. B. verwenden col(userId), aber nicht verwenden col(source.userId). Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn kein column_list- oder except_column_list-Argument an die Funktion übergeben wird.
stored_as_scd_type str oder int Gibt an, ob Datensätze als SCD-Typ 1 oder SCD-Typ 2 gespeichert werden sollen. Legen Sie auf 1 für SCD-Typ 1 oder auf 2 für SCD-Typ 2 fest. Der SCD-Typ 1 ist der Standardwert.
track_history_column_list oder track_history_except_column_list list Ein Teil der Ausgabespalten, die in der Zieltabelle für die Verlaufserfassung überwacht werden sollen. Verwenden Sie track_history_column_list, um die vollständige Liste der zu verfolgenden Spalten anzugeben. Verwenden Sie track_history_except_column_list, um die Spalten anzugeben, die von der Nachverfolgung ausgeschlossen werden sollen. Sie können entweder einen Wert als Liste von Zeichenfolgen oder als Spark SQL-Funktionen col() deklarieren:
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumente für col() Funktionen können keine Qualifizierer enthalten. Sie können z. B. verwenden col(userId), aber nicht verwenden col(source.userId). Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn kein track_history_column_list- oder track_history_except_column_list-Argument an die Funktion übergeben wird.
name str Der Flussname. Wenn nicht angegeben, wird standardmäßig derselbe Wert wie target verwendet.
once bool Definieren Sie optional den Fluss als einmaligen Ablauf, z. B. als Rückfüllvorgang. Die Verwendung von once=True ändert den Ablauf auf zwei Arten:
  • Der Rückgabewert. streaming-query. In diesem Fall muss es sich um einen Batch-DataFrame handeln, nicht um einen Streaming-DataFrame.
  • Der Fluss wird standardmäßig einmal ausgeführt. Wenn die Pipeline mit einer vollständigen Aktualisierung aktualisiert wird, wird der ONCE Flow erneut ausgeführt, um die Daten neu zu erstellen.