Freigeben über


create_auto_cdc_from_snapshot_flow

Von Bedeutung

Diese Funktionalität befindet sich in der öffentlichen Vorschau.

Die create_auto_cdc_from_snapshot_flow-Funktion erstellt einen Flow, der die Change Data Capture-Funktion (CDC) von Lakeflow Spark Declarative Pipelines verwendet, um Quelldaten aus Datenbank-Snapshots zu verarbeiten. Erfahren Sie , wie cdC mit der AUTO CDC FROM SNAPSHOT API implementiert wird?.

Hinweis

Diese Funktion ersetzt die vorherige Funktion apply_changes_from_snapshot(). Die beiden Funktionen weisen dieselbe Signatur auf. Databricks empfiehlt eine Aktualisierung, um den neuen Namen zu verwenden.

Von Bedeutung

Für diesen Vorgang benötigen Sie eine Zielstreamingtabelle.

Zum Erstellen der erforderlichen Zieltabelle können Sie die funktion create_streaming_table() verwenden.

Syntax

from pyspark import pipelines as dp

dp.create_auto_cdc_from_snapshot_flow(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Hinweis

Bei AUTO CDC FROM SNAPSHOT der Verarbeitung besteht das Standardverhalten darin, eine neue Zeile einzufügen, wenn im Ziel kein übereinstimmender Datensatz mit demselben Schlüssel vorhanden ist. Wenn ein übereinstimmenden Datensatz vorhanden ist, wird er nur aktualisiert, wenn sich einer der Werte in der Zeile geändert hat. Zeilen mit Schlüsseln, die im Ziel vorhanden sind, aber nicht mehr in der Quelle vorhanden sind, werden gelöscht.

Weitere Informationen zur CDC-Verarbeitung mit Momentaufnahmen finden Sie in den AUTO CDC-APIs: Vereinfachen der Änderungsdatenerfassung mit Pipelines. Beispiele für die Verwendung der create_auto_cdc_from_snapshot_flow() Funktion finden Sie in den Beispielen für die regelmäßige Aufnahme von Momentaufnahmen und historischen Momentaufnahmen .

Die Parameter

Parameter Typ Description
target str Erforderlich. Der Name der zu aktualisierenden Tabelle. Sie können die create_streaming_table() -Funktion verwenden, um die Zieltabelle zu erstellen, bevor Sie die create_auto_cdc_from_snapshot_flow() Funktion ausführen.
source str oder lambda function Erforderlich. Entweder der Name einer Tabelle oder Ansicht, die in regelmäßigen Abständen aufgenommen wird, oder eine Python-Lambda-Funktion, die den zu verarbeitenden Snapshot DataFrame und die Version der Momentaufnahme zurückgibt. Siehe Implementieren des source Arguments.
keys list Erforderlich. Die Spalte oder Kombination von Spalten, die eine Zeile in den Quelldaten eindeutig identifizieren. Dies wird verwendet, um zu identifizieren, welche CDC-Ereignisse für bestimmte Datensätze in der Zieltabelle gelten. Sie können eine der folgenden Optionen angeben:
  • Eine Liste der Zeichenfolgen: ["userId", "orderId"]
  • Eine Liste der Spark SQL-Funktionen col() : [col("userId"), col("orderId"]. Parameter für col() Funktionen dürfen keine Qualifizierungen enthalten. Sie können z. B. verwenden col(userId), aber nicht verwenden col(source.userId).
stored_as_scd_type str oder int Gibt an, ob Datensätze als SCD-Typ 1 oder SCD-Typ 2 gespeichert werden sollen. Legen Sie auf 1 für SCD-Typ 1 oder auf 2 für SCD-Typ 2 fest. Der SCD-Typ 1 ist der Standardwert.
track_history_column_list oder track_history_except_column_list list Ein Teil der Ausgabespalten, die in der Zieltabelle für die Verlaufserfassung überwacht werden sollen. Verwenden Sie track_history_column_list, um die vollständige Liste der zu verfolgenden Spalten anzugeben. Verwenden Sie track_history_except_column_list, um die Spalten anzugeben, die von der Nachverfolgung ausgeschlossen werden sollen. Sie können entweder einen Wert als Liste von Zeichenfolgen oder als Spark SQL-Funktionen col() deklarieren:
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Parameter für col() Funktionen dürfen keine Qualifizierungen enthalten. Sie können z. B. verwenden col(userId), aber nicht verwenden col(source.userId). Standardmäßig werden alle Spalten in die Zieltabelle eingefügt, wenn kein track_history_column_list- oder track_history_except_column_list-Argument an die Funktion übergeben wird.

Implementieren des source Arguments

Die create_auto_cdc_from_snapshot_flow() Funktion enthält das source Argument. Für die Verarbeitung historischer Momentaufnahmen wird erwartet, dass das source Argument eine Python-Lambda-Funktion ist, die zwei Werte an die create_auto_cdc_from_snapshot_flow() Funktion zurückgibt: ein Python DataFrame, der die zu verarbeitenden Momentaufnahmen und eine Momentaufnahmeversion enthält.

Im Folgenden sehen Sie die Signatur der Lambda-Funktion:

lambda Any => Optional[(DataFrame, Any)]
  • Das Argument für die Lambda-Funktion ist die zuletzt verarbeitete Momentaufnahmeversion.
  • Der Rückgabewert der Lambda-Funktion ist None oder ein Tupel von zwei Werten: Der erste Wert des Tupels ist ein DataFrame, der die zu verarbeitende Momentaufnahme enthält. Der zweite Wert des Tupels ist die Schnappschussversion, die die logische Reihenfolge des Schnappschusses repräsentiert.

Ein Beispiel, das die Lambda-Funktion implementiert und aufruft:

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

create_auto_cdc_from_snapshot_flow(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Die Lakeflow Spark Declarative Pipelines Runtime führt jedes Mal die folgenden Schritte aus, wenn die Pipeline, die die create_auto_cdc_from_snapshot_flow() Funktion enthält, ausgelöst wird:

  1. Führt die Funktion next_snapshot_and_version aus, um den nächsten Snapshot-DataFrame und die entsprechende Snapshot-Version zu laden.
  2. Wenn kein DataFrame zurückgegeben wird, wird die Ausführung beendet, und das Pipelineupdate ist als abgeschlossen markiert.
  3. Erkennt die Änderungen in der neuen Momentaufnahme und wendet sie inkrementell auf die Zieltabelle an.
  4. Kehrt zu Schritt 1 zurück, um die nächste Momentaufnahme und deren Version zu laden.