Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Gilt für:
Databricks SQL
Von Bedeutung
Dieses Feature befindet sich in der Betaversion. Erfordert Databricks Runtime 17.3 und höher.
Verwenden Sie die FLOW AUTO CDC Klausel zum CREATE STREAMING TABLE Verarbeiten von CDC-Datensätzen (Change Data Capture) aus einer Quelle in eine Streamingtabelle.
Zuvor wurde die Anweisung häufig für die Verarbeitung von MERGE INTO CDC-Einträgen in Azure Databricks verwendet.
MERGE INTO Kann jedoch aufgrund von Out-of-Sequence-Datensätzen falsche Ergebnisse erzeugen oder erfordert eine komplexe Logik zum Erneuten Anordnen von Datensätzen.
AUTO CDC vereinfacht CDC durch die automatische Verarbeitung von Out-of-Order-Datensätzen. Sie geben Schlüssel zum Identifizieren von Datensätzen, einer Sequenzspalte für die Sortierung und an, ob Ergebnisse als SCD-Typ 1 (direkte Aktualisierungen) oder SCD-Typ 2 (Verlaufsverfolgung) gespeichert werden sollen.
Syntax
CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Das Standardverhalten für INSERT und UPDATE Ereignisse besteht darin, CDC-Ereignisse aus der Quelle zu upsert: Aktualisieren Sie alle Zeilen in der Zieltabelle, die den angegebenen Schlüsseln entsprechen, oder fügen Sie eine neue Zeile ein, wenn ein übereinstimmende Datensatz nicht in der Zieltabelle vorhanden ist. Die Behandlung für DELETE Ereignisse kann mit der APPLY AS DELETE WHEN Bedingung angegeben werden.
Parameter
sourceDie Quelle für die Daten. Die Quelle muss eine Streamingquelle sein. Verwenden Sie das
STREAMSchlüsselwort, um Streamingsemantik zum Lesen aus der Quelle zu verwenden. Wenn beim Lesen eine Änderung oder löschung in einem vorhandenen Datensatz auftritt, wird ein Fehler ausgelöst. Es ist am sichersten, aus statischen oder nur angefügten Quellen zu lesen.Weitere Informationen zum Streamen von Daten finden Sie unter Transformieren von Daten mit Pipelines.
KEYSDie Spalte oder Kombination von Spalten, die eine Zeile in den Quelldaten eindeutig identifizieren. Die Werte in diesen Spalten werden verwendet, um zu identifizieren, welche CDC-Ereignisse für bestimmte Datensätze in der Zieltabelle gelten.
Verwenden Sie zum Definieren einer Kombination von Spalten eine durch Trennzeichen getrennte Liste von Spalten.
Diese Klausel ist erforderlich.
IGNORE NULL UPDATESErmöglicht das Einlesen von Updates, die nur einige der Zielspalten umfassen. Wenn ein CDC-Ereignis einer vorhandenen Zeile entspricht und
IGNORE NULL UPDATESangegeben wird, behalten Spalten mit einemnullWert ihre vorhandenen Werte im Ziel bei. Dies gilt auch für geschachtelte Spalten mit einemnullWert.Diese Klausel ist optional.
Standardmäßig werden vorhandene Spalten mit
nullWerten überschrieben.APPLY AS DELETE WHENGibt an, wann ein CDC-Ereignis als
DELETEund nicht als Upsert behandelt werden soll.Für SCD-Typ-2-Quellen wird zur Verarbeitung von Out-of-Order-Daten die gelöschte Zeile vorübergehend als Grabstein in der zugrunde liegenden Delta-Tabelle beibehalten, und es wird eine Ansicht im Metastore erstellt, die diese Grabsteine herausfiltert. Das Aufbewahrungsintervall kann mit der
pipelines.cdc.tombstoneGCThresholdInSecondsTabelleneigenschaft konfiguriert werden.Diese Klausel ist optional.
APPLY AS TRUNCATE WHENGibt an, wann ein CDC-Ereignis als vollständige Tabelle
TRUNCATEbehandelt werden soll. Da diese Klausel eine vollständige Abkürzung der Zieltabelle auslöst, sollte sie nur für bestimmte Anwendungsfälle verwendet werden, die diese Funktionalität erfordern.Die
APPLY AS TRUNCATE WHENKlausel wird nur für SCD-Typ 1 unterstützt. SCD Typ 2 unterstützt den Abkürzungsvorgang nicht.Diese Klausel ist optional.
SEQUENCE BYDer Spaltenname, der die logische Reihenfolge von CDC-Ereignissen in den Quelldaten angibt. Die Pipelineverarbeitung verwendet diese Sequenzierung, um Änderungsereignisse zu behandeln, die außerhalb der Reihenfolge ankommen.
Wenn mehrere Spalten für die Sequenzierung erforderlich sind, verwenden Sie einen
STRUCTAusdruck: Sie sortiert zuerst nach dem ersten Strukturfeld, dann nach dem zweiten Feld, wenn eine Bindung vorhanden ist usw.Angegebene Spalten müssen sortierbare Datentypen sein.
Diese Klausel ist erforderlich.
COLUMNSGibt eine Teilmenge von Spalten an, die in die Zieltabelle eingeschlossen werden sollen. Sie haben folgende Möglichkeiten:
- Geben Sie die vollständige Liste der einzuschließenden Spalten an:
COLUMNS (userId, name, city). - Geben Sie eine Liste der auszuschließenden Spalten an:
COLUMNS * EXCEPT (operation, sequenceNum)
Diese Klausel ist optional.
Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn die
COLUMNSKlausel nicht angegeben wird.- Geben Sie die vollständige Liste der einzuschließenden Spalten an:
STORED ASGibt an, ob Datensätze als SCD-Typ 1 oder SCD-Typ 2 gespeichert werden sollen.
Diese Klausel ist optional.
Der SCD-Typ 1 ist der Standardwert.
TRACK HISTORY ONGibt eine Teilmenge von Ausgabespalten an, um Verlaufsdatensätze zu generieren, wenn Änderungen an diesen angegebenen Spalten vorhanden sind. Sie haben folgende Möglichkeiten:
- Geben Sie die vollständige Liste der zu verfolgenden Spalten an:
COLUMNS (userId, name, city). - Geben Sie eine Liste von Spalten an, die von der Nachverfolgung ausgeschlossen werden sollen:
COLUMNS * EXCEPT (operation, sequenceNum)
Diese Klausel ist optional. Standardmäßig wird der Verlauf für alle Ausgabespalten nachverfolgt, wenn es Änderungen gibt, was dem
TRACK HISTORY ON *entspricht.- Geben Sie die vollständige Liste der zu verfolgenden Spalten an:
Beispiele
-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
SEQUENCE BY sequenceNum
STORED AS SCD TYPE 1;
-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;
-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);