Freigeben über


AUTO CDC INTO (Lakeflow Declarative Pipelines)

Verwenden Sie die AUTO CDC ... INTO-Anweisung, um einen Fluss zu erstellen, der die Lakeflow Deklarative Pipelines Change Data Capture (CDC)-Funktionalität verwendet. Diese Anweisung liest Änderungen aus einer CDC-Quelle und wendet sie auf einen Zielstream an.

Syntax

CREATE OR REFRESH STREAMING TABLE table_name;

CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Sie definieren Datenqualitätseinschränkungen für das Ziel mithilfe derselben CONSTRAINT Klausel wie andere Deklarative Pipelines-Abfragen von Lakeflow. Siehe Verwalten der Datenqualität mit Pipelineerwartungen.

Das Standardverhalten für INSERT- und UPDATE-Ereignisse ist das Einfügen und Aktualisieren von CDC-Ereignissen aus der Quelle: jegliche Zeilen in der Zieltabelle aktualisieren, die mit den angegebenen Schlüsseln übereinstimmen, oder eine neue Zeile einfügen, wenn kein übereinstimmender Datensatz in der Zieltabelle vorhanden ist. Die Behandlung von DELETE-Ereignissen kann mit der APPLY AS DELETE WHEN-Bedingung angegeben werden.

Wichtig

Sie müssen eine Zielstreamingtabelle deklarieren, um die Änderungen anzuwenden. Optional können Sie das Schema für Ihre Zieltabelle angeben. Bei SCD-Typ-2-Tabellen müssen Sie, beim Angeben des Schemas der Zieltabelle, auch die Spalten __START_AT und __END_AT mit demselben Datentyp wie das sequence_by Feld einbeziehen.

Siehe Die AUTO CDC-APIs: Vereinfachen Sie die Erfassung von Änderungsdaten mit Lakeflow-Deklarativ-Pipelines.

Parameter

  • flow_name

    Der Name des zu erstellenden Flusses.

  • source

    Die Quelle für die Daten. Die Quelle muss eine Streamingquelle sein. Verwenden Sie das STREAM-Schlüsselwort, um Streamingsemantik zum Lesen aus der Quelle zu verwenden. Wenn beim Lesen eine Änderung oder löschung in einem vorhandenen Datensatz auftritt, wird ein Fehler ausgelöst. Es ist am sichersten, aus statischen oder nur angefügten Quellen zu lesen. Zum Erfassen von Daten mit Änderungscommits können Sie Python und die Option „SkipChangeCommits“ zum Behandeln von Fehlern verwenden.

    Weitere Informationen zum Streamen von Daten finden Sie unter Transformieren von Daten mit Pipelines.

  • KEYS

    Die Spalte oder Kombination von Spalten, die eine Zeile in den Quelldaten eindeutig identifiziert. Die Werte in diesen Spalten werden verwendet, um zu identifizieren, welche CDC-Ereignisse für bestimmte Datensätze in der Zieltabelle gelten.

    Verwenden Sie zum Definieren einer Kombination von Spalten eine durch Trennzeichen getrennte Liste von Spalten.

    Diese Klausel ist erforderlich.

  • IGNORE NULL UPDATES

    Ermöglichen der Erfassung von Updates, die eine Teilmenge der Zielspalten enthalten. Wenn ein CDC-Ereignis mit einer vorhandenen Zeile übereinstimmt und IGNORE NULL UPDATES angegeben wird, behalten Spalten mit einem Wert von null ihre bestehenden Werte im Ziel bei. Dies gilt auch für geschachtelte Spalten mit einem null Wert.

    Diese Klausel ist optional.

    Die Standardeinstellung ist das Überschreiben vorhandener Spalten mit null-Werten.

  • APPLY AS DELETE WHEN

    Gibt an, wann ein CDC-Ereignis als DELETE und nicht als Upsert behandelt werden soll.

    Für SCD-Typ 2-Quellen, um außerhalb der Reihenfolge liegende Daten zu verarbeiten, wird die gelöschte Zeile vorübergehend als Grabstein in der zugrunde liegenden Delta-Tabelle beibehalten, und eine Ansicht wird im Metastore erstellt, die diese Grabsteine filtert. Das Aufbewahrungsintervall kann mit der pipelines.cdc.tombstoneGCThresholdInSecondsTabelleneigenschaft konfiguriert werden.

    Diese Klausel ist optional.

  • APPLY AS TRUNCATE WHEN

    Gibt an, wann ein CDC-Ereignis als vollständige Tabelle TRUNCATE behandelt werden soll. Da diese Klausel die vollständige Abschneidung der Zieltabelle auslöst, sollte sie nur in bestimmten Anwendungsfälle verwendet werden, die die Nutzung dieser Funktion erfordern.

    Die APPLY AS TRUNCATE WHEN-Klausel wird nur für den SCD-Typ 1 unterstützt. Der SCD-Typ 2 unterstützt den Trunkierungsvorgang nicht.

    Diese Klausel ist optional.

  • SEQUENCE BY

    Der Spaltenname, der die logische Reihenfolge der CDC-Ereignisse in den Quelldaten angibt. Lakeflow Declarative Pipelines verwendet diese Sequenzierung, um Änderungsereignisse zu behandeln, die außerhalb der Reihenfolge eingehen.

    Wenn mehrere Spalten für die Sequenzierung erforderlich sind, verwenden Sie einen STRUCT Ausdruck: Sie sortiert zuerst nach dem ersten Strukturfeld, dann nach dem zweiten Feld, wenn eine Bindung vorhanden ist usw.

    Angegebene Spalten müssen sortierbare Datentypen sein.

    Diese Klausel ist erforderlich.

  • COLUMNS

    Gibt eine Teilmenge der Spalten an, die in die Zieltabelle eingeschlossen werden sollen. Sie haben folgende Möglichkeiten:

    • Geben Sie die vollständige Liste der einzuschließenden Spalten an: COLUMNS (userId, name, city).
    • Geben Sie eine Liste der auszuschließenden Spalten an: COLUMNS * EXCEPT (operation, sequenceNum)

    Diese Klausel ist optional.

    Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn die COLUMNS-Klausel nicht angegeben ist.

  • STORED AS

    Gibt an, ob Datensätze als SCD-Typ 1 oder SCD-Typ 2 gespeichert werden sollen.

    Diese Klausel ist optional.

    Der SCD-Typ 1 ist der Standardwert.

  • TRACK HISTORY ON

    Gibt eine Teilmenge der Ausgabespalten an, um Verlaufsdatensätze zu generieren, wenn Änderungen an diesen angegebenen Spalten vorgenommen werden. Sie haben folgende Möglichkeiten:

    • Geben Sie die vollständige Liste der zu verfolgenden Spalten an: COLUMNS (userId, name, city).
    • Geben Sie eine Liste von Spalten an, die von der Nachverfolgung ausgeschlossen werden sollen: COLUMNS * EXCEPT (operation, sequenceNum)

    Diese Klausel ist optional. Die Standardeinstellung ist das Nachverfolgen des Verlaufs für alle Ausgabespalten, wenn Änderungen vorhanden sind (entspricht TRACK HISTORY ON *).

Beispiele

-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flow
AS AUTO CDC INTO
  target
FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);