Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Lakeflow Declarative Pipelines vereinfacht die Änderungsdatenerfassung (CDC) mit den AUTO CDC
und AUTO CDC FROM SNAPSHOT
APIs.
Hinweis
Die AUTO CDC
APIs wurden zuvor APPLY CHANGES
benannt und hatten dieselbe Syntax.
Die verwendete Schnittstelle hängt von der Quelle der Änderungsdaten ab:
- Verwenden Sie
AUTO CDC
, um Änderungen aus einem Änderungsdatenfeed (CDF) zu verarbeiten. - Verwenden Sie
AUTO CDC FROM SNAPSHOT
(öffentliche Vorschau und nur für Python verfügbar), um Änderungen in Datenbankmomentaufnahmen zu verarbeiten.
Zuvor wurde die MERGE INTO
-Anweisung üblicherweise für die Verarbeitung von CDC-Datensätzen auf Azure Databricks verwendet.
MERGE INTO
Kann jedoch aufgrund von Out-of-Sequence-Datensätzen falsche Ergebnisse erzeugen oder erfordert eine komplexe Logik zum Erneuten Anordnen von Datensätzen.
Die AUTO CDC
API wird in den SQL- und Python-Schnittstellen "Lakeflow Declarative Pipelines" unterstützt. Die AUTO CDC FROM SNAPSHOT
API wird in der Python-Schnittstelle "Lakeflow Declarative Pipelines" unterstützt.
Sowohl AUTO CDC
als auch AUTO CDC FROM SNAPSHOT
unterstützen das Aktualisieren von Tabellen mit SCD-Typ 1 und Typ 2.
- Verwenden Sie SCD-Typ 1, um Datensätze direkt zu aktualisieren. Die Historie wird für aktualisierte Datensätze nicht aufbewahrt.
- Verwenden Sie SCD-Typ 2, um einen Verlauf von Datensätzen beizubehalten, entweder für alle Aktualisierungen oder für Aktualisierungen an einer bestimmten Gruppe von Spalten.
Syntax und andere Verweise finden Sie unter AUTO CDC für Lakeflow Declarative Pipelines SQL, AUTO CDC für Lakeflow Declarative Pipelines Python und AUTO CDC FROM SNAPSHOT für Lakeflow Declarative Pipelines Python.
Hinweis
In diesem Artikel wird beschrieben, wie Tabellen in Ihren deklarativen Lakeflow-Pipelines basierend auf Änderungen in Quelldaten aktualisiert werden. Informationen zum Aufzeichnen und Abfragen von Änderungsinformationen auf Zeilenebene für Delta-Tabellen finden Sie unter Verwenden des Delta Lake-Änderungsdatenfeeds in Azure Databricks.
Anforderungen
Um die CDC-APIs zu verwenden, muss Ihre Pipeline so konfiguriert werden, dass die Serverless Lakeflow Declarative Pipelines oder die Pro
Advanced
Editionen von Lakeflow verwendet werden.
Wie wird CDC mit der AUTO CDC-API implementiert?
Durch die automatische Verarbeitung von Out-of-Sequence-Datensätzen stellt die AUTO CDC-API in Lakeflow Declarative Pipelines die korrekte Verarbeitung von CDC-Datensätzen sicher und entfernt die Notwendigkeit, komplexe Logik für die Verarbeitung von Out-of-Sequence-Datensätzen zu entwickeln. Sie müssen eine Spalte in den Quelldaten angeben, für die Datensätze sequenziert werden sollen, die Lakeflow Declarative Pipelines als monoton zunehmende Darstellung der richtigen Reihenfolge der Quelldaten interpretiert. Lakeflow Declarative Pipelines verarbeitet automatisch Daten, die außerhalb der Reihenfolge ankommen. Für Änderungen des Typs SCD 2 verteilt Lakeflow Declarative Pipelines die entsprechenden Sequenzwerte auf die Spalten __START_AT
und __END_AT
der Zieltabelle. Bei jedem Sequenzierungswert sollte eine unterschiedliche Aktualisierung pro Schlüssel vorhanden sein, und NULL-Sequenzierungswerte werden nicht unterstützt.
Zum Ausführen der CDC-Verarbeitung mit AUTO CDC
erstellen Sie zuerst eine Streamingtabelle und verwenden dann die AUTO CDC ... INTO
-Anweisung in SQL oder die create_auto_cdc_flow()
-Funktion in Python, um die Quelle, Schlüssel und Sequenzierung für den Änderungsfeed anzugeben. Verwenden Sie zum Erstellen der Zielstreamingtabelle die CREATE OR REFRESH STREAMING TABLE
Anweisung in SQL oder der create_streaming_table()
Funktion in Python. Siehe die Beispiele für SCD-Typ 1 und -Typ 2 für die Verarbeitung.
Ausführliche Informationen zur Syntax finden Sie in der SQL-Referenz oder Python-Referenz zu Lakeflow Declarative Pipelines.
Wie wird CDC mit der AUTO CDC FROM SNAPSHOT
API implementiert?
Wichtig
Die AUTO CDC FROM SNAPSHOT
-API befindet sich in der öffentlichen Vorschau.
AUTO CDC FROM SNAPSHOT
ist eine deklarative API, mit der Änderungen an Quelldaten effizient ermittelt werden, indem eine Reihe von Momentaufnahmen in Reihenfolge verglichen und dann die für die CDC-Verarbeitung der Datensätze in den Momentaufnahmen erforderliche Verarbeitung ausgeführt wird.
AUTO CDC FROM SNAPSHOT
wird nur von der Python-Schnittstelle "Lakeflow Declarative Pipelines" unterstützt.
AUTO CDC FROM SNAPSHOT
unterstützt das Einlesen von Momentaufnahmen aus mehreren Quelltypen:
- Verwenden Sie die regelmäßige Erfassung von Momentaufnahmen, um Momentaufnahmen aus einer vorhandenen Tabelle oder Sicht aufzunehmen.
AUTO CDC FROM SNAPSHOT
verfügt über eine einfache, optimierte Schnittstelle, die das regelmäßige Aufnehmen von Momentaufnahmen aus einem vorhandenen Datenbankobjekt unterstützt. Eine neue Momentaufnahme wird mit jedem Pipelineupdate aufgenommen, und die Aufnahmezeit wird als Momentaufnahmeversion verwendet. Wenn eine Pipeline im fortlaufenden Modus ausgeführt wird, werden mehrere Momentaufnahmen mit jeder Pipelineaktualisierung für einen Zeitraum aufgenommen, der durch die Triggerintervalleinstellung für den Fluss bestimmt wird, der dieAUTO CDC FROM SNAPSHOT
Verarbeitung enthält. - Verwenden Sie historische Momentaufnahmen zum Verarbeiten von Dateien, die Datenbankmomentaufnahmen enthalten, z. B. Momentaufnahmen, die aus einer Oracle- oder MySQL-Datenbank oder einem Data Warehouse generiert wurden.
Zum Ausführen der CDC-Verarbeitung von einem beliebigen Quelltyp mit AUTO CDC FROM SNAPSHOT
erstellen Sie zuerst eine Streamingtabelle und verwenden dann die create_auto_cdc_from_snapshot_flow()
-Funktion in Python, um die Momentaufnahme, Schlüssel und andere Argumente anzugeben, die zum Implementieren der Verarbeitung erforderlich sind. Sehen Sie sich die Beispiele für die periodische Verarbeitung von Momentaufnahmen und historische Verarbeitung von Momentaufnahmen an.
Die Momentaufnahmen, die an die API übergeben werden, müssen in aufsteigender Reihenfolge nach Version sein. Wenn Lakeflow Declarative Pipelines eine Out-of-Order-Momentaufnahme erkennt, wird ein Fehler ausgelöst.
Ausführliche Informationen zur Syntax finden Sie in der Python-Referenz zu Lakeflow Declarative Pipelines.
Verwenden mehrerer Spalten für die Sequenzierung
Sie können nach mehreren Spalten sequenzieren (z. B. einen Zeitstempel und eine ID zum Unterbrechen von Bindungen), sie können eine STRUKTUR verwenden, um sie zu kombinieren: sie sortiert nach dem ersten Feld der STRUKTUR zuerst und im Falle einer Bindung nach dem zweiten Feld usw.
Beispiel in SQL:
SEQUENCE BY STRUCT(timestamp_col, id_col)
Beispiel für Python:
sequence_by = struct("timestamp_col", "id_col")
Einschränkungen
Die für die Sequenzierung verwendete Spalte muss ein sortierbarer Datentyp sein.
Beispiel: SCD-Typ 1- und SCD-Typ 2-Verarbeitung mit CDF-Quelldaten
In den folgenden Abschnitten finden Sie Beispiele für SCD-Abfragen vom Typ 1 und Typ 2 von Lakeflow Declarative Pipelines, die Zieltabellen basierend auf Quellereignissen aus einem Änderungsdatenfeed aktualisieren:
- Erstellt neue Benutzerdatensätze.
- Löscht einen Benutzerdatensatz.
- Aktualisiert Benutzerdatensätze. In dem Beispiel zum SCD-Typ 1 kommen die letzten
UPDATE
-Vorgänge zu spät an und werden aus der Zieltabelle gelöscht, um die Behandlung von Ereignissen in nicht ordnungsgemäßer Reihenfolge zu veranschaulichen.
In den folgenden Beispielen wird davon ausgegangen, dass Sie mit der Konfiguration und Aktualisierung von deklarativen Lakeflow-Pipelines vertraut sind. Siehe Lernprogramm: Erstellen einer ETL-Pipeline mithilfe der Änderungsdatenerfassung mit Lakeflow Declarative Pipelines.
Zum Ausführen dieser Beispiele müssen Sie zunächst ein Beispiel-Dataset erstellen. Siehe Generieren von Testdaten.
Im Folgenden sind die Eingabedatensätze für diese Beispiele aufgeführt:
Benutzer-ID | Name | City | Vorgang | Sequenznummer |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lilie | Cancun | INSERT | 2 |
123 | NULL | NULL | Löschen | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
Wenn Sie die letzte Zeile in den Beispieldaten auskommentieren, wird der folgende Datensatz eingefügt, der angibt, wo die Datensätze abgeschnitten werden sollen:
Benutzer-ID | Name | City | Vorgang | Sequenznummer |
---|---|---|---|---|
NULL | NULL | NULL | ABSCHNEIDEN | 3 |
Hinweis
Alle folgenden Beispiele enthalten Optionen zum Angeben von Beiden DELETE
und TRUNCATE
Vorgängen, aber jede ist optional.
Verarbeiten von Aktualisierungen des SCD-Typs 1
Im folgenden Beispiel wird die Verarbeitung von SCD-Typ 1-Updates veranschaulicht:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flowname AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Nach dem Ausführen des SCD-Typs 1-Beispiels enthält die Zieltabelle die folgenden Datensätze:
Benutzer-ID | Name | City |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lilie | Cancun |
Nach der Ausführung des SCD-Typ 1-Beispiels mit dem zusätzlichen TRUNCATE
-Datensatz werden die Datensätze 124
und 126
aufgrund des TRUNCATE
-Vorgangs bei sequenceNum=3
abgeschnitten, und die Zieltabelle enthält den folgenden Datensatz:
Benutzer-ID | Name | City |
---|---|---|
125 | Mercedes | Guadalajara |
Verarbeiten von Aktualisierungen des SCD-Typs 2
Im folgenden Beispiel wird die Verarbeitung von SCD-Typ 2-Updates veranschaulicht:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Nach dem Ausführen des SCD-Typs 2-Beispiels enthält die Zieltabelle die folgenden Datensätze:
Benutzer-ID | Name | City | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | NULL |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | NULL |
126 | Lilie | Cancun | 2 | NULL |
Eine SCD-Typ 2-Abfrage kann auch eine Teilmenge der Ausgabespalten angeben, die für den Verlauf in der Zieltabelle nachverfolgt werden sollen. Änderungen an anderen Spalten werden aktualisiert, anstatt neue Verlaufsdatensätze zu generieren. Im folgenden Beispiel wird veranschaulicht, wie die Spalte city
von der Nachverfolgung ausgeschlossen wird.
Im folgenden Beispiel wird die Verwendung des Verlaufs mit SCD-Typ 2 veranschaulicht:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Nach dem Ausführen dieses Beispiels ohne den zusätzlichen TRUNCATE
Datensatz enthält die Zieltabelle die folgenden Datensätze:
Benutzer-ID | Name | City | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | NULL |
125 | Mercedes | Guadalajara | 2 | NULL |
126 | Lilie | Cancun | 2 | NULL |
Generieren von Testdaten
Der folgende Code wird bereitgestellt, um ein Beispiel-Dataset für die Verwendung in den Beispielabfragen zu generieren, die in diesem Lernprogramm vorhanden sind. Wenn Sie über die richtigen Anmeldeinformationen verfügen, um ein neues Schema zu erstellen und eine neue Tabelle zu erstellen, können Sie diese Anweisungen entweder mit einem Notizbuch oder mit Databricks SQL ausführen. Der folgende Code soll nicht als Teil der deklarativen Lakeflow-Pipelines ausgeführt werden:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Beispiel: Regelmäßige Momentaufnahmeverarbeitung
Das folgende Beispiel veranschaulicht die SCD-Typ 2-Verarbeitung, die Momentaufnahmen einer Tabelle erfasst, die bei mycatalog.myschema.mytable
gespeichert wird. Die Ergebnisse der Verarbeitung werden in eine Tabelle mit dem Namen target
geschrieben.
mycatalog.myschema.mytable
Datensätze zum Zeitstempel 2024-01-01 00:00:00
Schlüssel | Wert |
---|---|
1 | a1 |
2 | a2 |
mycatalog.myschema.mytable
Aufzeichnungen zum Zeitstempel 2024-01-01 12:00:00
Schlüssel | Wert |
---|---|
2 | b2 |
3 | a3 |
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_from_snapshot_flow(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
Nach der Verarbeitung der Momentaufnahmen enthält die Zieltabelle die folgenden Datensätze:
Schlüssel | Wert | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 01.01.2024 00:00:00 | 2024-01-01 12:00:00 |
2 | a2 | 01.01.2024 00:00:00 | 2024-01-01 12:00:00 |
2 | b2 | 2024-01-01 12:00:00 | NULL |
3 | a3 | 2024-01-01 12:00:00 | NULL |
Beispiel: Verarbeitung von historischen Momentaufnahmen
Das folgende Beispiel veranschaulicht die SCD-Typ 2-Verarbeitung, die eine Zieltabelle basierend auf Quellereignissen aus zwei Momentaufnahmen aktualisiert, die in einem Cloudspeichersystem gespeichert sind:
Momentaufnahme bei timestamp
, gespeichert in /<PATH>/filename1.csv
Schlüssel | TrackingColumn | Nichtverfolgungsspalte |
---|---|---|
1 | a1 | b1 |
2 | a2 | b2 |
4 | a4 | b4 |
Momentaufnahme bei timestamp + 5
, gespeichert in /<PATH>/filename2.csv
Schlüssel | TrackingColumn | Nichtverfolgungsspalte |
---|---|---|
2 | a2_new | b2 |
3 | a3 | b3 |
4 | a4 | b4_new |
Im folgenden Codebeispiel wird die Verarbeitung von SCD-Typ 2-Updates mit diesen Momentaufnahmen veranschaulicht:
import dlt
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dlt.create_streaming_live_table("target")
dlt.create_auto_cdc_from_snapshot_flow(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
Nach der Verarbeitung der Momentaufnahmen enthält die Zieltabelle die folgenden Datensätze:
Schlüssel | TrackingColumn | Nichtverfolgungsspalte | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | b1 | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_new | b2 | 2 | NULL |
3 | a3 | b3 | 2 | NULL |
4 | a4 | b4_new | 1 | NULL |
Hinzufügen, Ändern oder Löschen von Daten in einer Zielstreamingtabelle
Wenn Ihre Pipeline Tabellen im Unity-Katalog veröffentlicht, können Sie DML-Anweisungen (Data Manipulation Language ) verwenden, einschließlich Einfüge-, Aktualisierungs-, Lösch- und Zusammenführungsanweisungen, um die von AUTO CDC ... INTO
Anweisungen erstellten Zielstreamingtabellen zu ändern.
Hinweis
- DML-Anweisungen, die das Tabellenschema einer Streamingtabelle ändern, werden nicht unterstützt. Stellen Sie sicher, dass Ihre DML-Anweisungen nicht versuchen, das Tabellenschema weiterzuentwickeln.
- DML-Anweisungen, die eine Streamingtabelle aktualisieren, können nur in einem freigegebenen Unity Catalog-Cluster oder einem SQL-Warehouse mit Databricks Runtime 13.3 LTS und höher ausgeführt werden.
- Da für das Streaming Datenquellen im Nur-Anfügen-Modus benötigt werden, legen Sie das Flag skipChangeCommits beim Lesen der Quellstreamingtabelle fest, wenn Ihre Verarbeitung Streaming aus einer Quellstreamingtabelle mit Änderungen (z. B. durch DML-Anweisungen) erfordert. Wenn
skipChangeCommits
festgelegt ist, werden Transaktionen, die Datensätze in der Quelltabelle löschen oder ändern, ignoriert. Wenn Ihre Verarbeitung keine Streamingtabelle erfordert, können Sie eine materialisierte Ansicht (die nicht die Einschränkung „nur Anhängen“ hat) als Zieltabelle verwenden.
Da Lakeflow Declarative Pipelines eine angegebene SEQUENCE BY
Spalte verwendet und geeignete Sequenzierungswerte an die __START_AT
Und __END_AT
Spalten der Zieltabelle (für SCD-Typ 2) verteilt, müssen Sie sicherstellen, dass DML-Anweisungen gültige Werte für diese Spalten verwenden, um die richtige Reihenfolge von Datensätzen beizubehalten. Erfahren Sie , wie cdC mit der AUTO CDC-API implementiert wird?.
Weitere Informationen zur Verwendung von DML-Anweisungen mit Streamingtabellen finden Sie unter Hinzufügen, Ändern oder Löschen von Daten in einer Streamingtabelle.
Im folgenden Beispiel wird ein aktiver Datensatz mit einer Startsequenz von 5 eingefügt:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
Lesen eines Änderungsdatenfeeds aus einer AUTO CDC-Zieltabelle
In Databricks Runtime 15.2 und höher können Sie einen Änderungsdatenfeed aus einer Streamingtabelle lesen, die das Ziel von AUTO CDC
- oder AUTO CDC FROM SNAPSHOT
-Abfragen ist, auf die gleiche Weise wie Sie einen Änderungsdatenfeed aus anderen Delta-Tabellen lesen. Um den Änderungsdatenfeed aus einer Zielstreamingtabelle zu lesen, werden folgende Voraussetzungen benötigt:
- Die Zielstreamingtabelle muss im Unity-Katalog veröffentlicht werden. Weitere Informationen finden Sie unter Verwenden des Unity-Katalogs mit deklarativen Lakeflow-Pipelines.
- Um den Änderungsdatenfeed aus der Zielstreamingtabelle zu lesen, müssen Sie Databricks Runtime 15.2 oder höher verwenden. Um den Änderungsdatenfeed in einer anderen Pipeline zu lesen, muss die Pipeline für die Verwendung von Databricks Runtime 15.2 oder höher konfiguriert sein.
Sie lesen den Änderungsdatenfeed aus einer Zielstreamingtabelle, die in Lakeflow Declarative Pipelines erstellt wurde, auf die gleiche Weise wie einen Änderungsdatenfeed aus anderen Delta-Tabellen. Weitere Informationen zur Verwendung der Delta-Änderungsdatenfeedfunktionen, einschließlich Beispiele in Python und SQL, finden Sie unter Verwenden des Delta Lake-Änderungsdatenfeeds in Azure Databricks.
Hinweis
Der Datensatz für Änderungsdatenfeed enthält Metadaten , die den Typ des Änderungsereignisses identifizieren. Wenn ein Datensatz in einer Tabelle aktualisiert wird, enthalten die Metadaten für die zugehörigen Änderungsdatensätze in der Regel _change_type
-Werte, die auf update_preimage
- und update_postimage
-Ereignisse festgelegt sind.
Die _change_type
Werte unterscheiden sich jedoch, wenn Aktualisierungen an der Zielstreamingtabelle vorgenommen werden, die änderungen der Primärschlüsselwerte umfassen. Wenn Änderungen Aktualisierungen an Primärschlüsseln enthalten, werden die _change_type
Metadatenfelder auf insert
und delete
Ereignisse festgelegt. Änderungen an Primärschlüsseln können auftreten, wenn manuelle Aktualisierungen an einem der Schlüsselfelder mit einer UPDATE
oder MERGE
Anweisung vorgenommen werden oder, bei SCD-Typ 2-Tabellen, wenn sich das __start_at
Feld ändert, um einen früheren Startsequenzwert widerzuspiegeln.
Die AUTO CDC
Abfrage bestimmt die Primärschlüsselwerte, die sich für die SCD-Typ 1- und SCD-Typ 2-Verarbeitung unterscheiden:
- Für die SCD-Typ 1-Verarbeitung und die Python-Schnittstelle "Lakeflow Declarative Pipelines" ist der Primärschlüssel der Wert des
keys
Parameters in dercreate_auto_cdc_flow()
Funktion. Für die SQL-Schnittstelle "Lakeflow Declarative Pipelines" ist der Primärschlüssel die Spalten, die durch dieKEYS
-Klausel in derAUTO CDC ... INTO
-Anweisung definiert werden. - Bei SCD-Typ 2 ist der Primärschlüssel der
keys
-Parameter oder dieKEYS
-Klausel sowie der Rückgabewert descoalesce(__START_AT, __END_AT)
-Vorgangs, wobei es sich bei__START_AT
und__END_AT
um die entsprechenden Spalten aus der Zielstreamingtabelle handelt.
Daten über die von einer Lakeflow-Declarative-Pipelines-CDC-Abfrage verarbeiteten Datensätze abrufen
Hinweis
Die folgenden Metriken werden nur von AUTO CDC
Abfragen und nicht von AUTO CDC FROM SNAPSHOT
Abfragen erfasst.
Die folgenden Metriken werden von AUTO CDC
Abfragen erfasst:
-
num_upserted_rows
: Die Anzahl der Ausgabezeilen, die während einer Aktualisierung in das Dataset eingefügt wurden. -
num_deleted_rows
: Die Anzahl vorhandener Ausgabezeilen, die während einer Aktualisierung aus dem Dataset gelöscht wurden.
Die Metrik „num_output_rows
“, die für Nicht-CDC-Flows ausgegeben wird, wird nicht für AUTO CDC
-Abfragen erfasst.
Welche Datenobjekte werden für die CdC-Verarbeitung von Lakeflow Declarative Pipelines verwendet?
Hinweis
- Diese Datenstrukturen gelten nur für die
AUTO CDC
-Verarbeitung, nicht für dieAUTO CDC FROM SNAPSHOT
-Verarbeitung. - Diese Datenstrukturen gelten nur, wenn die Zieltabelle im Hive-Metaspeicher veröffentlicht wird. Wenn eine Pipeline im Unity-Katalog veröffentlicht wird, sind die internen Hintergrundtabellen für Benutzer nicht zugänglich.
Wenn Sie die Zieltabelle im Hive-Metaspeicher deklarieren, werden zwei Datenstrukturen erstellt:
- Eine Ansicht mit dem Namen, der der Zieltabelle zugewiesen ist.
- Eine interne Sicherungstabelle, die von Lakeflow Declarative Pipelines zum Verwalten der CDC-Verarbeitung verwendet wird. Der Name dieser Tabelle wird gebildet, indem
__apply_changes_storage_
an den Zieltabellennamen vorangestellt wird.
Wenn Sie beispielsweise eine Zieltabelle mit dem Namen dlt_cdc_target
deklarieren, wird im Metastore eine Ansicht namens dlt_cdc_target
und eine Tabelle namens __apply_changes_storage_dlt_cdc_target
angezeigt. Durch das Erstellen einer Ansicht kann Lakeflow Declarative Pipelines die zusätzlichen Informationen (z. B. Tombstones und Versionen) herausfiltern, die für die Verarbeitung von ungeordneten Daten erforderlich sind. Um die verarbeiteten Daten anzuzeigen, führen Sie eine Abfrage auf das Ziel-View durch. Da sich das Schema der __apply_changes_storage_
Tabelle möglicherweise ändert, um zukünftige Features oder Verbesserungen zu unterstützen, sollten Sie die Tabelle nicht für die Produktionsverwendung abfragen. Wenn Sie der Tabelle Daten manuell hinzufügen, werden die Datensätze angenommen, bevor andere Änderungen vorgenommen werden, da die Versionsspalten fehlen.