Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Die create_auto_cdc_flow()-Funktion erstellt einen Datenfluss, der Lakeflow Spark Declarative Pipelines Change Data Capture (CDC)-Funktionalität verwendet, um Änderungsdaten aus einem Änderungsdatenfeed (CDF) zu verarbeiten.
Hinweis
Diese Funktion ersetzt die vorherige Funktion apply_changes(). Die beiden Funktionen weisen dieselbe Signatur auf. Databricks empfiehlt eine Aktualisierung, um den neuen Namen zu verwenden.
Von Bedeutung
Sie müssen eine Zielstreamingtabelle deklarieren, um Änderungen anzuwenden. Optional können Sie das Schema für Die Zieltabelle angeben. Wenn Sie das Schema der create_auto_cdc_flow() Zieltabelle angeben, müssen Sie die und __START_AT die __END_AT Spalten mit demselben Datentyp wie die sequence_by Felder einschließen.
Zum Erstellen der erforderlichen Zieltabelle können Sie die funktion create_streaming_table() in der Pipeline-Python-Schnittstelle verwenden.
Syntax
from pyspark import pipelines as dp
dp.create_auto_cdc_flow(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = <bool>,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None,
name = None,
once = <bool>
)
Bei der create_auto_cdc_flow Verarbeitung besteht das Standardverhalten für INSERT und UPDATE Ereignisse darin, CDC-Ereignisse aus der Quelle einzufügen oder zu aktualisieren: Aktualisieren Sie Zeilen in der Zieltabelle, die mit den angegebenen Schlüsseln übereinstimmen, oder fügen Sie eine neue Zeile ein, wenn kein entsprechender Datensatz in der Zieltabelle vorhanden ist. Die Behandlung für DELETE Ereignisse kann mit dem apply_as_deletes Parameter angegeben werden.
Weitere Informationen zur CDC-Verarbeitung mit einem Änderungsfeed finden Sie in den AUTO CDC-APIs: Vereinfachen der Änderungsdatenerfassung mit Pipelines. Ein Beispiel für die Verwendung der create_auto_cdc_flow() Funktion finden Sie unter Beispiel: SCD-Typ 1 und SCD-Typ 2-Verarbeitung mit CDF-Quelldaten.
Die Parameter
| Parameter | Typ | Description |
|---|---|---|
target |
str |
Erforderlich. Der Name der zu aktualisierenden Tabelle. Sie können die create_streaming_table() -Funktion verwenden, um die Zieltabelle zu erstellen, bevor Sie die create_auto_cdc_flow() Funktion ausführen. |
source |
str |
Erforderlich. Die Datenquelle, die CDC-Einträge enthält. |
keys |
list |
Erforderlich. Die Spalte oder Kombination von Spalten, die eine Zeile in den Quelldaten eindeutig identifizieren. Dies wird verwendet, um zu identifizieren, welche CDC-Ereignisse für bestimmte Datensätze in der Zieltabelle gelten. Sie können eine der folgenden Optionen angeben:
|
sequence_by |
str, col() oder struct() |
Erforderlich. Die Spaltennamen, die die logische Reihenfolge von CDC-Ereignissen in den Quelldaten angeben. Lakeflow Spark Declarative Pipelines verwendet diese Sequenzierung, um Änderungsereignisse zu behandeln, die außerhalb der Reihenfolge eingehen. Die angegebene Spalte muss ein sortierbarer Datentyp sein. Sie können eine der folgenden Optionen angeben:
|
ignore_null_updates |
bool |
Zulassen, dass Aktualisierungen aufgenommen werden, die eine Teilmenge der Zielspalten enthalten. Wenn ein CDC-Ereignis mit einer vorhandenen Zeile übereinstimmt und ignore_null_updates ist, behalten Spalten mit einem True ihre vorhandenen Werte im Ziel bei, wenn null gilt. Dies gilt auch für geschachtelte Spalten mit einem Wert von null. Wenn ignore_null_updates der Fall ist, werden vorhandene Werte mit False überschrieben.Der Standardwert lautet False. |
apply_as_deletes |
str oder expr() |
Gibt an, wann ein CDC-Ereignis als DELETE und nicht als Upsert behandelt werden soll. Sie können eine der folgenden Optionen angeben:
Um out-of-order-Daten zu verarbeiten, wird die gelöschte Zeile vorübergehend als Grabstein in der zugrunde liegenden Delta-Tabelle beibehalten, und eine Ansicht wird im Metastore erstellt, der diese Grabsteine herausfiltert. Das Aufbewahrungsintervall ist standardmäßig auf zwei Tage festgelegt und kann mit der pipelines.cdc.tombstoneGCThresholdInSeconds Tabelleneigenschaft konfiguriert werden. |
apply_as_truncates |
str oder expr() |
Gibt an, wann ein CDC-Ereignis als vollständige Tabelle TRUNCATEbehandelt werden soll. Sie können eine der folgenden Optionen angeben:
Da diese Klausel eine vollständige Abkürzung der Zieltabelle auslöst, sollte sie nur für bestimmte Anwendungsfälle verwendet werden, die diese Funktionalität erfordern. Der apply_as_truncates Parameter wird nur für SCD-Typ 1 unterstützt. SCD-Typ 2 unterstützt keine Abkürzungsvorgänge. |
column_list oder except_column_list |
list |
Eine Teilmenge von Spalten, die in die Zieltabelle aufgenommen werden sollen. Verwenden Sie column_list, um die vollständige Liste der einzuschließenden Spalten anzugeben. Verwenden Sie except_column_list, um die auszuschließenden Spalten anzugeben. Sie können entweder einen Wert als Liste von Zeichenfolgen oder als Spark SQL-Funktionen col() deklarieren:
Argumente für col() Funktionen können keine Qualifizierer enthalten. Sie können z. B. verwenden col(userId), aber nicht verwenden col(source.userId). Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn kein column_list- oder except_column_list-Argument an die Funktion übergeben wird. |
stored_as_scd_type |
str oder int |
Gibt an, ob Datensätze als SCD-Typ 1 oder SCD-Typ 2 gespeichert werden sollen. Legen Sie auf 1 für SCD-Typ 1 oder auf 2 für SCD-Typ 2 fest. Der SCD-Typ 1 ist der Standardwert. |
track_history_column_list oder track_history_except_column_list |
list |
Ein Teil der Ausgabespalten, die in der Zieltabelle für die Verlaufserfassung überwacht werden sollen. Verwenden Sie track_history_column_list, um die vollständige Liste der zu verfolgenden Spalten anzugeben. Verwenden Sie track_history_except_column_list, um die Spalten anzugeben, die von der Nachverfolgung ausgeschlossen werden sollen. Sie können entweder einen Wert als Liste von Zeichenfolgen oder als Spark SQL-Funktionen col() deklarieren:
Argumente für col() Funktionen können keine Qualifizierer enthalten. Sie können z. B. verwenden col(userId), aber nicht verwenden col(source.userId). Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn kein track_history_column_list- oder track_history_except_column_list-Argument an die Funktion übergeben wird. |
name |
str |
Der Flussname. Wenn nicht angegeben, wird standardmäßig derselbe Wert wie target verwendet. |
once |
bool |
Definieren Sie optional den Fluss als einmaligen Ablauf, z. B. als Rückfüllvorgang. Die Verwendung von once=True ändert den Ablauf auf zwei Arten:
|