Freigeben über


AUTO CDC INTO (Pipelines)

Verwenden Sie die AUTO CDC ... INTO-Anweisung, um einen Fluss zu erstellen, der die Change Data Capture (CDC)-Funktionalität von Lakeflow Spark Declarative Pipelines verwendet. Diese Anweisung liest Änderungen aus einer CDC-Datenquelle und wendet sie auf ein Streaming-Ziel an.

Syntax

CREATE OR REFRESH STREAMING TABLE table_name;

CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Sie definieren Datenqualitätseinschränkungen für das Ziel mithilfe derselben CONSTRAINT Klausel wie andere Pipelineabfragen. Weitere Informationen finden Sie unter Verwalten der Datenqualität mit Pipelineerwartungen.

Das Standardverhalten für INSERT und UPDATE Ereignisse besteht darin, CDC-Ereignisse aus der Quelle zu upsert : Aktualisieren Sie alle Zeilen in der Zieltabelle, die den angegebenen Schlüsseln entsprechen, oder fügen Sie eine neue Zeile ein, wenn in der Zieltabelle kein übereinstimmender Datensatz vorhanden ist. Die Behandlung für DELETE Ereignisse kann mit der APPLY AS DELETE WHEN Bedingung angegeben werden.

Von Bedeutung

Sie müssen eine Zielstreamingtabelle deklarieren, um die Änderungen anzuwenden. Optional können Sie das Schema für Die Zieltabelle angeben. Bei SCD-Typ-2-Tabellen müssen Sie beim Definieren des Schemas der Zieltabelle auch die Spalten __START_AT und __END_AT mit demselben Datentyp wie das Feld sequence_by einbeziehen.

Siehe Die AUTO CDC-APIs: Vereinfachen der Änderungsdatenerfassung mit Pipelines.

Die Parameter

  • flow_name

    Der Name des zu erstellenden Flusses.

  • source

    Die Quelle für die Daten. Die Quelle muss eine Streamingquelle sein. Verwenden Sie das STREAM-Schlüsselwort, um Streamingsemantik zum Lesen aus der Quelle zu verwenden. Wenn beim Lesen eine Änderung oder löschung in einem vorhandenen Datensatz auftritt, wird ein Fehler ausgelöst. Es ist am sichersten, aus statischen oder nur angefügten Quellen zu lesen. Zum Einlesen von Daten mit Änderungs-Commits können Sie Python und die SkipChangeCommits Option zur Fehlerbehandlung verwenden.

    Weitere Informationen zum Streamen von Daten finden Sie unter Transformieren von Daten mit Pipelines.

  • KEYS

    Die Spalte oder Kombination von Spalten, die eine Zeile in den Quelldaten eindeutig identifizieren. Die Werte in diesen Spalten werden verwendet, um zu identifizieren, welche CDC-Ereignisse für bestimmte Datensätze in der Zieltabelle gelten.

    Verwenden Sie zum Definieren einer Kombination von Spalten eine durch Trennzeichen getrennte Liste von Spalten.

    Diese Klausel ist erforderlich.

  • IGNORE NULL UPDATES

    Ermöglicht das Einlesen von Updates, die nur einige der Zielspalten umfassen. Wenn ein CDC-Ereignis mit einer vorhandenen Zeile übereinstimmt und IGNORE NULL UPDATES angegeben wird, behalten Spalten, die einen null Wert enthalten, ihre vorhandenen Werte im Ziel bei. Dies gilt auch für geschachtelte Spalten mit einem null Wert.

    Diese Klausel ist optional.

    Standardmäßig werden vorhandene Spalten mit null Werten überschrieben.

  • APPLY AS DELETE WHEN

    Gibt an, wann ein CDC-Ereignis als DELETE und nicht als Upsert behandelt werden soll.

    Für SCD-Typ-2-Quellen wird zur Verarbeitung von Out-of-Order-Daten die gelöschte Zeile vorübergehend als Grabstein in der zugrunde liegenden Delta-Tabelle beibehalten, und es wird eine Ansicht im Metastore erstellt, die diese Grabsteine herausfiltert. Das Aufbewahrungsintervall kann mit der pipelines.cdc.tombstoneGCThresholdInSecondsTabelleneigenschaft konfiguriert werden.

    Diese Klausel ist optional.

  • APPLY AS TRUNCATE WHEN

    Gibt an, wann ein CDC-Ereignis als vollständige Tabelle TRUNCATEbehandelt werden soll. Da diese Klausel eine vollständige Abkürzung der Zieltabelle auslöst, sollte sie nur für bestimmte Anwendungsfälle verwendet werden, die diese Funktionalität erfordern.

    Die APPLY AS TRUNCATE WHEN Klausel wird nur für SCD-Typ 1 unterstützt. SCD Typ 2 unterstützt den Abkürzungsvorgang nicht.

    Diese Klausel ist optional.

  • SEQUENCE BY

    Der Spaltenname, der die logische Reihenfolge von CDC-Ereignissen in den Quelldaten angibt. Die Pipelineverarbeitung verwendet diese Sequenzierung, um Änderungsereignisse zu behandeln, die außerhalb der Reihenfolge ankommen.

    Wenn mehrere Spalten für die Sequenzierung erforderlich sind, verwenden Sie einen STRUCT Ausdruck: Sie sortiert zuerst nach dem ersten Strukturfeld, dann nach dem zweiten Feld, wenn eine Bindung vorhanden ist usw.

    Angegebene Spalten müssen sortierbare Datentypen sein.

    Diese Klausel ist erforderlich.

  • COLUMNS

    Gibt eine Teilmenge von Spalten an, die in die Zieltabelle eingeschlossen werden sollen. Sie haben folgende Möglichkeiten:

    • Geben Sie die vollständige Liste der einzuschließenden Spalten an: COLUMNS (userId, name, city).
    • Geben Sie eine Liste der auszuschließenden Spalten an: COLUMNS * EXCEPT (operation, sequenceNum)

    Diese Klausel ist optional.

    Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn die COLUMNS Klausel nicht angegeben wird.

  • STORED AS

    Gibt an, ob Datensätze als SCD-Typ 1 oder SCD-Typ 2 gespeichert werden sollen.

    Diese Klausel ist optional.

    Der SCD-Typ 1 ist der Standardwert.

  • TRACK HISTORY ON

    Gibt eine Teilmenge von Ausgabespalten an, um Verlaufsdatensätze zu generieren, wenn Änderungen an diesen angegebenen Spalten vorhanden sind. Sie haben folgende Möglichkeiten:

    • Geben Sie die vollständige Liste der zu verfolgenden Spalten an: COLUMNS (userId, name, city).
    • Geben Sie eine Liste von Spalten an, die von der Nachverfolgung ausgeschlossen werden sollen: COLUMNS * EXCEPT (operation, sequenceNum)

    Diese Klausel ist optional. Standardmäßig wird der Verlauf für alle Ausgabespalten nachverfolgt, wenn es Änderungen gibt, was dem TRACK HISTORY ON * entspricht.

Examples

-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flow
AS AUTO CDC INTO
  target
FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);