Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Verwenden Sie die AUTO CDC ... INTO-Anweisung, um einen Fluss zu erstellen, der die Change Data Capture (CDC)-Funktionalität von Lakeflow Spark Declarative Pipelines verwendet. Diese Anweisung liest Änderungen aus einer CDC-Datenquelle und wendet sie auf ein Streaming-Ziel an.
- Weitere Informationen zu CDC finden Sie unter Was ist die Datenerfassung (Change Data Capture, CDC)?.
- Weitere Informationen zur Verwendung von
AUTO CDCfinden Sie unter Die AUTO CDC-APIs: Vereinfachen der Änderungsdatenerfassung mit Pipelines. - Weitere Informationen zu
CREATE FLOWfinden Sie in CREATE FLOW (Pipelines).
Syntax
CREATE OR REFRESH STREAMING TABLE table_name;
CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Sie definieren Datenqualitätseinschränkungen für das Ziel mithilfe derselben CONSTRAINT Klausel wie andere Pipelineabfragen. Weitere Informationen finden Sie unter Verwalten der Datenqualität mit Pipelineerwartungen.
Das Standardverhalten für INSERT und UPDATE Ereignisse besteht darin, CDC-Ereignisse aus der Quelle zu upsert : Aktualisieren Sie alle Zeilen in der Zieltabelle, die den angegebenen Schlüsseln entsprechen, oder fügen Sie eine neue Zeile ein, wenn in der Zieltabelle kein übereinstimmender Datensatz vorhanden ist. Die Behandlung für DELETE Ereignisse kann mit der APPLY AS DELETE WHEN Bedingung angegeben werden.
Von Bedeutung
Sie müssen eine Zielstreamingtabelle deklarieren, um die Änderungen anzuwenden. Optional können Sie das Schema für Die Zieltabelle angeben. Bei SCD-Typ-2-Tabellen müssen Sie beim Definieren des Schemas der Zieltabelle auch die Spalten __START_AT und __END_AT mit demselben Datentyp wie das Feld sequence_by einbeziehen.
Siehe Die AUTO CDC-APIs: Vereinfachen der Änderungsdatenerfassung mit Pipelines.
Die Parameter
flow_nameDer Name des zu erstellenden Flusses.
sourceDie Quelle für die Daten. Die Quelle muss eine Streamingquelle sein. Verwenden Sie das STREAM-Schlüsselwort, um Streamingsemantik zum Lesen aus der Quelle zu verwenden. Wenn beim Lesen eine Änderung oder löschung in einem vorhandenen Datensatz auftritt, wird ein Fehler ausgelöst. Es ist am sichersten, aus statischen oder nur angefügten Quellen zu lesen. Zum Einlesen von Daten mit Änderungs-Commits können Sie Python und die
SkipChangeCommitsOption zur Fehlerbehandlung verwenden.Weitere Informationen zum Streamen von Daten finden Sie unter Transformieren von Daten mit Pipelines.
KEYSDie Spalte oder Kombination von Spalten, die eine Zeile in den Quelldaten eindeutig identifizieren. Die Werte in diesen Spalten werden verwendet, um zu identifizieren, welche CDC-Ereignisse für bestimmte Datensätze in der Zieltabelle gelten.
Verwenden Sie zum Definieren einer Kombination von Spalten eine durch Trennzeichen getrennte Liste von Spalten.
Diese Klausel ist erforderlich.
IGNORE NULL UPDATESErmöglicht das Einlesen von Updates, die nur einige der Zielspalten umfassen. Wenn ein CDC-Ereignis mit einer vorhandenen Zeile übereinstimmt und IGNORE NULL UPDATES angegeben wird, behalten Spalten, die einen
nullWert enthalten, ihre vorhandenen Werte im Ziel bei. Dies gilt auch für geschachtelte Spalten mit einemnullWert.Diese Klausel ist optional.
Standardmäßig werden vorhandene Spalten mit
nullWerten überschrieben.APPLY AS DELETE WHENGibt an, wann ein CDC-Ereignis als
DELETEund nicht als Upsert behandelt werden soll.Für SCD-Typ-2-Quellen wird zur Verarbeitung von Out-of-Order-Daten die gelöschte Zeile vorübergehend als Grabstein in der zugrunde liegenden Delta-Tabelle beibehalten, und es wird eine Ansicht im Metastore erstellt, die diese Grabsteine herausfiltert. Das Aufbewahrungsintervall kann mit der
pipelines.cdc.tombstoneGCThresholdInSecondsTabelleneigenschaft konfiguriert werden.Diese Klausel ist optional.
APPLY AS TRUNCATE WHENGibt an, wann ein CDC-Ereignis als vollständige Tabelle
TRUNCATEbehandelt werden soll. Da diese Klausel eine vollständige Abkürzung der Zieltabelle auslöst, sollte sie nur für bestimmte Anwendungsfälle verwendet werden, die diese Funktionalität erfordern.Die
APPLY AS TRUNCATE WHENKlausel wird nur für SCD-Typ 1 unterstützt. SCD Typ 2 unterstützt den Abkürzungsvorgang nicht.Diese Klausel ist optional.
SEQUENCE BYDer Spaltenname, der die logische Reihenfolge von CDC-Ereignissen in den Quelldaten angibt. Die Pipelineverarbeitung verwendet diese Sequenzierung, um Änderungsereignisse zu behandeln, die außerhalb der Reihenfolge ankommen.
Wenn mehrere Spalten für die Sequenzierung erforderlich sind, verwenden Sie einen
STRUCTAusdruck: Sie sortiert zuerst nach dem ersten Strukturfeld, dann nach dem zweiten Feld, wenn eine Bindung vorhanden ist usw.Angegebene Spalten müssen sortierbare Datentypen sein.
Diese Klausel ist erforderlich.
COLUMNSGibt eine Teilmenge von Spalten an, die in die Zieltabelle eingeschlossen werden sollen. Sie haben folgende Möglichkeiten:
- Geben Sie die vollständige Liste der einzuschließenden Spalten an:
COLUMNS (userId, name, city). - Geben Sie eine Liste der auszuschließenden Spalten an:
COLUMNS * EXCEPT (operation, sequenceNum)
Diese Klausel ist optional.
Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn die
COLUMNSKlausel nicht angegeben wird.- Geben Sie die vollständige Liste der einzuschließenden Spalten an:
STORED ASGibt an, ob Datensätze als SCD-Typ 1 oder SCD-Typ 2 gespeichert werden sollen.
Diese Klausel ist optional.
Der SCD-Typ 1 ist der Standardwert.
TRACK HISTORY ONGibt eine Teilmenge von Ausgabespalten an, um Verlaufsdatensätze zu generieren, wenn Änderungen an diesen angegebenen Spalten vorhanden sind. Sie haben folgende Möglichkeiten:
- Geben Sie die vollständige Liste der zu verfolgenden Spalten an:
COLUMNS (userId, name, city). - Geben Sie eine Liste von Spalten an, die von der Nachverfolgung ausgeschlossen werden sollen:
COLUMNS * EXCEPT (operation, sequenceNum)
Diese Klausel ist optional. Standardmäßig wird der Verlauf für alle Ausgabespalten nachverfolgt, wenn es Änderungen gibt, was dem
TRACK HISTORY ON *entspricht.- Geben Sie die vollständige Liste der zu verfolgenden Spalten an:
Examples
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);