Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Verwenden Sie die AUTO CDC ... INTO
-Anweisung, um einen Fluss zu erstellen, der die Lakeflow Deklarative Pipelines Change Data Capture (CDC)-Funktionalität verwendet. Diese Anweisung liest Änderungen aus einer CDC-Quelle und wendet sie auf einen Zielstream an.
- Weitere Informationen zu CDC finden Sie unter Was ist die Datenerfassung (Change Data Capture, CDC)?.
- Weitere Informationen zur Verwendung von
AUTO CDC
finden Sie in der AUTO CDC-APIs: Vereinfachung der Änderungsdatenerfassung mit deklarativen Lakeflow-Pipelines. - Weitere Informationen zu
CREATE FLOW
finden Sie unter CREATE FLOW (Lakeflow Declarative Pipelines).
Syntax
CREATE OR REFRESH STREAMING TABLE table_name;
CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Sie definieren Datenqualitätseinschränkungen für das Ziel mithilfe derselben CONSTRAINT
Klausel wie andere Deklarative Pipelines-Abfragen von Lakeflow. Siehe Verwalten der Datenqualität mit Pipelineerwartungen.
Das Standardverhalten für INSERT
- und UPDATE
-Ereignisse ist das Einfügen und Aktualisieren von CDC-Ereignissen aus der Quelle: jegliche Zeilen in der Zieltabelle aktualisieren, die mit den angegebenen Schlüsseln übereinstimmen, oder eine neue Zeile einfügen, wenn kein übereinstimmender Datensatz in der Zieltabelle vorhanden ist. Die Behandlung von DELETE
-Ereignissen kann mit der APPLY AS DELETE WHEN
-Bedingung angegeben werden.
Wichtig
Sie müssen eine Zielstreamingtabelle deklarieren, um die Änderungen anzuwenden. Optional können Sie das Schema für Ihre Zieltabelle angeben. Bei SCD-Typ-2-Tabellen müssen Sie, beim Angeben des Schemas der Zieltabelle, auch die Spalten __START_AT
und __END_AT
mit demselben Datentyp wie das sequence_by
Feld einbeziehen.
Parameter
flow_name
Der Name des zu erstellenden Flusses.
source
Die Quelle für die Daten. Die Quelle muss eine Streamingquelle sein. Verwenden Sie das STREAM-Schlüsselwort, um Streamingsemantik zum Lesen aus der Quelle zu verwenden. Wenn beim Lesen eine Änderung oder löschung in einem vorhandenen Datensatz auftritt, wird ein Fehler ausgelöst. Es ist am sichersten, aus statischen oder nur angefügten Quellen zu lesen. Zum Erfassen von Daten mit Änderungscommits können Sie Python und die Option „
SkipChangeCommits
“ zum Behandeln von Fehlern verwenden.Weitere Informationen zum Streamen von Daten finden Sie unter Transformieren von Daten mit Pipelines.
KEYS
Die Spalte oder Kombination von Spalten, die eine Zeile in den Quelldaten eindeutig identifiziert. Die Werte in diesen Spalten werden verwendet, um zu identifizieren, welche CDC-Ereignisse für bestimmte Datensätze in der Zieltabelle gelten.
Verwenden Sie zum Definieren einer Kombination von Spalten eine durch Trennzeichen getrennte Liste von Spalten.
Diese Klausel ist erforderlich.
IGNORE NULL UPDATES
Ermöglichen der Erfassung von Updates, die eine Teilmenge der Zielspalten enthalten. Wenn ein CDC-Ereignis mit einer vorhandenen Zeile übereinstimmt und IGNORE NULL UPDATES angegeben wird, behalten Spalten mit einem Wert von
null
ihre bestehenden Werte im Ziel bei. Dies gilt auch für geschachtelte Spalten mit einemnull
Wert.Diese Klausel ist optional.
Die Standardeinstellung ist das Überschreiben vorhandener Spalten mit
null
-Werten.APPLY AS DELETE WHEN
Gibt an, wann ein CDC-Ereignis als
DELETE
und nicht als Upsert behandelt werden soll.Für SCD-Typ 2-Quellen, um außerhalb der Reihenfolge liegende Daten zu verarbeiten, wird die gelöschte Zeile vorübergehend als Grabstein in der zugrunde liegenden Delta-Tabelle beibehalten, und eine Ansicht wird im Metastore erstellt, die diese Grabsteine filtert. Das Aufbewahrungsintervall kann mit der
pipelines.cdc.tombstoneGCThresholdInSeconds
Tabelleneigenschaft konfiguriert werden.Diese Klausel ist optional.
APPLY AS TRUNCATE WHEN
Gibt an, wann ein CDC-Ereignis als vollständige Tabelle
TRUNCATE
behandelt werden soll. Da diese Klausel die vollständige Abschneidung der Zieltabelle auslöst, sollte sie nur in bestimmten Anwendungsfälle verwendet werden, die die Nutzung dieser Funktion erfordern.Die
APPLY AS TRUNCATE WHEN
-Klausel wird nur für den SCD-Typ 1 unterstützt. Der SCD-Typ 2 unterstützt den Trunkierungsvorgang nicht.Diese Klausel ist optional.
SEQUENCE BY
Der Spaltenname, der die logische Reihenfolge der CDC-Ereignisse in den Quelldaten angibt. Lakeflow Declarative Pipelines verwendet diese Sequenzierung, um Änderungsereignisse zu behandeln, die außerhalb der Reihenfolge eingehen.
Wenn mehrere Spalten für die Sequenzierung erforderlich sind, verwenden Sie einen
STRUCT
Ausdruck: Sie sortiert zuerst nach dem ersten Strukturfeld, dann nach dem zweiten Feld, wenn eine Bindung vorhanden ist usw.Angegebene Spalten müssen sortierbare Datentypen sein.
Diese Klausel ist erforderlich.
COLUMNS
Gibt eine Teilmenge der Spalten an, die in die Zieltabelle eingeschlossen werden sollen. Sie haben folgende Möglichkeiten:
- Geben Sie die vollständige Liste der einzuschließenden Spalten an:
COLUMNS (userId, name, city)
. - Geben Sie eine Liste der auszuschließenden Spalten an:
COLUMNS * EXCEPT (operation, sequenceNum)
Diese Klausel ist optional.
Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn die
COLUMNS
-Klausel nicht angegeben ist.- Geben Sie die vollständige Liste der einzuschließenden Spalten an:
STORED AS
Gibt an, ob Datensätze als SCD-Typ 1 oder SCD-Typ 2 gespeichert werden sollen.
Diese Klausel ist optional.
Der SCD-Typ 1 ist der Standardwert.
TRACK HISTORY ON
Gibt eine Teilmenge der Ausgabespalten an, um Verlaufsdatensätze zu generieren, wenn Änderungen an diesen angegebenen Spalten vorgenommen werden. Sie haben folgende Möglichkeiten:
- Geben Sie die vollständige Liste der zu verfolgenden Spalten an:
COLUMNS (userId, name, city)
. - Geben Sie eine Liste von Spalten an, die von der Nachverfolgung ausgeschlossen werden sollen:
COLUMNS * EXCEPT (operation, sequenceNum)
Diese Klausel ist optional. Die Standardeinstellung ist das Nachverfolgen des Verlaufs für alle Ausgabespalten, wenn Änderungen vorhanden sind (entspricht
TRACK HISTORY ON *
).- Geben Sie die vollständige Liste der zu verfolgenden Spalten an:
Beispiele
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);