Freigeben über


Die AUTO CDC-APIs: Vereinfachen Sie die Erfassung von Änderungsdaten mit den deklarativen Lakeflow-Pipelines

Lakeflow Declarative Pipelines vereinfacht die Änderungsdatenerfassung (CDC) mit den AUTO CDC und AUTO CDC FROM SNAPSHOT APIs.

Hinweis

Die AUTO CDC APIs wurden zuvor APPLY CHANGES benannt und hatten dieselbe Syntax.

Die verwendete Schnittstelle hängt von der Quelle der Änderungsdaten ab:

  • Verwenden Sie AUTO CDC, um Änderungen aus einem Änderungsdatenfeed (CDF) zu verarbeiten.
  • Verwenden Sie AUTO CDC FROM SNAPSHOT (öffentliche Vorschau und nur für Python verfügbar), um Änderungen in Datenbankmomentaufnahmen zu verarbeiten.

Zuvor wurde die MERGE INTO-Anweisung üblicherweise für die Verarbeitung von CDC-Datensätzen auf Azure Databricks verwendet. MERGE INTO Kann jedoch aufgrund von Out-of-Sequence-Datensätzen falsche Ergebnisse erzeugen oder erfordert eine komplexe Logik zum Erneuten Anordnen von Datensätzen.

Die AUTO CDC API wird in den SQL- und Python-Schnittstellen "Lakeflow Declarative Pipelines" unterstützt. Die AUTO CDC FROM SNAPSHOT API wird in der Python-Schnittstelle "Lakeflow Declarative Pipelines" unterstützt.

Sowohl AUTO CDC als auch AUTO CDC FROM SNAPSHOT unterstützen das Aktualisieren von Tabellen mit SCD-Typ 1 und Typ 2.

  • Verwenden Sie SCD-Typ 1, um Datensätze direkt zu aktualisieren. Die Historie wird für aktualisierte Datensätze nicht aufbewahrt.
  • Verwenden Sie SCD-Typ 2, um einen Verlauf von Datensätzen beizubehalten, entweder für alle Aktualisierungen oder für Aktualisierungen an einer bestimmten Gruppe von Spalten.

Syntax und andere Verweise finden Sie unter AUTO CDC für Lakeflow Declarative Pipelines SQL, AUTO CDC für Lakeflow Declarative Pipelines Python und AUTO CDC FROM SNAPSHOT für Lakeflow Declarative Pipelines Python.

Hinweis

In diesem Artikel wird beschrieben, wie Tabellen in Ihren deklarativen Lakeflow-Pipelines basierend auf Änderungen in Quelldaten aktualisiert werden. Informationen zum Aufzeichnen und Abfragen von Änderungsinformationen auf Zeilenebene für Delta-Tabellen finden Sie unter Verwenden des Delta Lake-Änderungsdatenfeeds in Azure Databricks.

Anforderungen

Um die CDC-APIs zu verwenden, muss Ihre Pipeline so konfiguriert werden, dass die Serverless Lakeflow Declarative Pipelines oder die ProAdvancedEditionen von Lakeflow verwendet werden.

Wie wird CDC mit der AUTO CDC-API implementiert?

Durch die automatische Verarbeitung von Out-of-Sequence-Datensätzen stellt die AUTO CDC-API in Lakeflow Declarative Pipelines die korrekte Verarbeitung von CDC-Datensätzen sicher und entfernt die Notwendigkeit, komplexe Logik für die Verarbeitung von Out-of-Sequence-Datensätzen zu entwickeln. Sie müssen eine Spalte in den Quelldaten angeben, für die Datensätze sequenziert werden sollen, die Lakeflow Declarative Pipelines als monoton zunehmende Darstellung der richtigen Reihenfolge der Quelldaten interpretiert. Lakeflow Declarative Pipelines verarbeitet automatisch Daten, die außerhalb der Reihenfolge ankommen. Für Änderungen des Typs SCD 2 verteilt Lakeflow Declarative Pipelines die entsprechenden Sequenzwerte auf die Spalten __START_AT und __END_AT der Zieltabelle. Bei jedem Sequenzierungswert sollte eine unterschiedliche Aktualisierung pro Schlüssel vorhanden sein, und NULL-Sequenzierungswerte werden nicht unterstützt.

Zum Ausführen der CDC-Verarbeitung mit AUTO CDC erstellen Sie zuerst eine Streamingtabelle und verwenden dann die AUTO CDC ... INTO-Anweisung in SQL oder die create_auto_cdc_flow()-Funktion in Python, um die Quelle, Schlüssel und Sequenzierung für den Änderungsfeed anzugeben. Verwenden Sie zum Erstellen der Zielstreamingtabelle die CREATE OR REFRESH STREAMING TABLE Anweisung in SQL oder der create_streaming_table() Funktion in Python. Siehe die Beispiele für SCD-Typ 1 und -Typ 2 für die Verarbeitung.

Ausführliche Informationen zur Syntax finden Sie in der SQL-Referenz oder Python-Referenz zu Lakeflow Declarative Pipelines.

Wie wird CDC mit der AUTO CDC FROM SNAPSHOT API implementiert?

Wichtig

Die AUTO CDC FROM SNAPSHOT-API befindet sich in der öffentlichen Vorschau.

AUTO CDC FROM SNAPSHOT ist eine deklarative API, mit der Änderungen an Quelldaten effizient ermittelt werden, indem eine Reihe von Momentaufnahmen in Reihenfolge verglichen und dann die für die CDC-Verarbeitung der Datensätze in den Momentaufnahmen erforderliche Verarbeitung ausgeführt wird. AUTO CDC FROM SNAPSHOT wird nur von der Python-Schnittstelle "Lakeflow Declarative Pipelines" unterstützt.

AUTO CDC FROM SNAPSHOT unterstützt das Einlesen von Momentaufnahmen aus mehreren Quelltypen:

  • Verwenden Sie die regelmäßige Erfassung von Momentaufnahmen, um Momentaufnahmen aus einer vorhandenen Tabelle oder Sicht aufzunehmen. AUTO CDC FROM SNAPSHOT verfügt über eine einfache, optimierte Schnittstelle, die das regelmäßige Aufnehmen von Momentaufnahmen aus einem vorhandenen Datenbankobjekt unterstützt. Eine neue Momentaufnahme wird mit jedem Pipelineupdate aufgenommen, und die Aufnahmezeit wird als Momentaufnahmeversion verwendet. Wenn eine Pipeline im fortlaufenden Modus ausgeführt wird, werden mehrere Momentaufnahmen mit jeder Pipelineaktualisierung für einen Zeitraum aufgenommen, der durch die Triggerintervalleinstellung für den Fluss bestimmt wird, der die AUTO CDC FROM SNAPSHOT Verarbeitung enthält.
  • Verwenden Sie historische Momentaufnahmen zum Verarbeiten von Dateien, die Datenbankmomentaufnahmen enthalten, z. B. Momentaufnahmen, die aus einer Oracle- oder MySQL-Datenbank oder einem Data Warehouse generiert wurden.

Zum Ausführen der CDC-Verarbeitung von einem beliebigen Quelltyp mit AUTO CDC FROM SNAPSHOT erstellen Sie zuerst eine Streamingtabelle und verwenden dann die create_auto_cdc_from_snapshot_flow()-Funktion in Python, um die Momentaufnahme, Schlüssel und andere Argumente anzugeben, die zum Implementieren der Verarbeitung erforderlich sind. Sehen Sie sich die Beispiele für die periodische Verarbeitung von Momentaufnahmen und historische Verarbeitung von Momentaufnahmen an.

Die Momentaufnahmen, die an die API übergeben werden, müssen in aufsteigender Reihenfolge nach Version sein. Wenn Lakeflow Declarative Pipelines eine Out-of-Order-Momentaufnahme erkennt, wird ein Fehler ausgelöst.

Ausführliche Informationen zur Syntax finden Sie in der Python-Referenz zu Lakeflow Declarative Pipelines.

Verwenden mehrerer Spalten für die Sequenzierung

Sie können nach mehreren Spalten sequenzieren (z. B. einen Zeitstempel und eine ID zum Unterbrechen von Bindungen), sie können eine STRUKTUR verwenden, um sie zu kombinieren: sie sortiert nach dem ersten Feld der STRUKTUR zuerst und im Falle einer Bindung nach dem zweiten Feld usw.

Beispiel in SQL:

SEQUENCE BY STRUCT(timestamp_col, id_col)

Beispiel für Python:

sequence_by = struct("timestamp_col", "id_col")

Einschränkungen

Die für die Sequenzierung verwendete Spalte muss ein sortierbarer Datentyp sein.

Beispiel: SCD-Typ 1- und SCD-Typ 2-Verarbeitung mit CDF-Quelldaten

In den folgenden Abschnitten finden Sie Beispiele für SCD-Abfragen vom Typ 1 und Typ 2 von Lakeflow Declarative Pipelines, die Zieltabellen basierend auf Quellereignissen aus einem Änderungsdatenfeed aktualisieren:

  1. Erstellt neue Benutzerdatensätze.
  2. Löscht einen Benutzerdatensatz.
  3. Aktualisiert Benutzerdatensätze. In dem Beispiel zum SCD-Typ 1 kommen die letzten UPDATE-Vorgänge zu spät an und werden aus der Zieltabelle gelöscht, um die Behandlung von Ereignissen in nicht ordnungsgemäßer Reihenfolge zu veranschaulichen.

In den folgenden Beispielen wird davon ausgegangen, dass Sie mit der Konfiguration und Aktualisierung von deklarativen Lakeflow-Pipelines vertraut sind. Siehe Lernprogramm: Erstellen einer ETL-Pipeline mithilfe der Änderungsdatenerfassung mit Lakeflow Declarative Pipelines.

Zum Ausführen dieser Beispiele müssen Sie zunächst ein Beispiel-Dataset erstellen. Siehe Generieren von Testdaten.

Im Folgenden sind die Eingabedatensätze für diese Beispiele aufgeführt:

Benutzer-ID Name City Vorgang Sequenznummer
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lilie Cancun INSERT 2
123 NULL NULL Löschen 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Wenn Sie die letzte Zeile in den Beispieldaten auskommentieren, wird der folgende Datensatz eingefügt, der angibt, wo die Datensätze abgeschnitten werden sollen:

Benutzer-ID Name City Vorgang Sequenznummer
NULL NULL NULL ABSCHNEIDEN 3

Hinweis

Alle folgenden Beispiele enthalten Optionen zum Angeben von Beiden DELETE und TRUNCATE Vorgängen, aber jede ist optional.

Verarbeiten von Aktualisierungen des SCD-Typs 1

Im folgenden Beispiel wird die Verarbeitung von SCD-Typ 1-Updates veranschaulicht:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flowname AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Nach dem Ausführen des SCD-Typs 1-Beispiels enthält die Zieltabelle die folgenden Datensätze:

Benutzer-ID Name City
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lilie Cancun

Nach der Ausführung des SCD-Typ 1-Beispiels mit dem zusätzlichen TRUNCATE-Datensatz werden die Datensätze 124 und 126 aufgrund des TRUNCATE-Vorgangs bei sequenceNum=3 abgeschnitten, und die Zieltabelle enthält den folgenden Datensatz:

Benutzer-ID Name City
125 Mercedes Guadalajara

Verarbeiten von Aktualisierungen des SCD-Typs 2

Im folgenden Beispiel wird die Verarbeitung von SCD-Typ 2-Updates veranschaulicht:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW target_flow
AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Nach dem Ausführen des SCD-Typs 2-Beispiels enthält die Zieltabelle die folgenden Datensätze:

Benutzer-ID Name City __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 NULL
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 NULL
126 Lilie Cancun 2 NULL

Eine SCD-Typ 2-Abfrage kann auch eine Teilmenge der Ausgabespalten angeben, die für den Verlauf in der Zieltabelle nachverfolgt werden sollen. Änderungen an anderen Spalten werden aktualisiert, anstatt neue Verlaufsdatensätze zu generieren. Im folgenden Beispiel wird veranschaulicht, wie die Spalte city von der Nachverfolgung ausgeschlossen wird.

Im folgenden Beispiel wird die Verwendung des Verlaufs mit SCD-Typ 2 veranschaulicht:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW target_flow
AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Nach dem Ausführen dieses Beispiels ohne den zusätzlichen TRUNCATE Datensatz enthält die Zieltabelle die folgenden Datensätze:

Benutzer-ID Name City __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 NULL
125 Mercedes Guadalajara 2 NULL
126 Lilie Cancun 2 NULL

Generieren von Testdaten

Der folgende Code wird bereitgestellt, um ein Beispiel-Dataset für die Verwendung in den Beispielabfragen zu generieren, die in diesem Lernprogramm vorhanden sind. Wenn Sie über die richtigen Anmeldeinformationen verfügen, um ein neues Schema zu erstellen und eine neue Tabelle zu erstellen, können Sie diese Anweisungen entweder mit einem Notizbuch oder mit Databricks SQL ausführen. Der folgende Code soll nicht als Teil der deklarativen Lakeflow-Pipelines ausgeführt werden:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Beispiel: Regelmäßige Momentaufnahmeverarbeitung

Das folgende Beispiel veranschaulicht die SCD-Typ 2-Verarbeitung, die Momentaufnahmen einer Tabelle erfasst, die bei mycatalog.myschema.mytable gespeichert wird. Die Ergebnisse der Verarbeitung werden in eine Tabelle mit dem Namen targetgeschrieben.

mycatalog.myschema.mytable Datensätze zum Zeitstempel 2024-01-01 00:00:00

Schlüssel Wert
1 a1
2 a2

mycatalog.myschema.mytable Aufzeichnungen zum Zeitstempel 2024-01-01 12:00:00

Schlüssel Wert
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_from_snapshot_flow(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

Nach der Verarbeitung der Momentaufnahmen enthält die Zieltabelle die folgenden Datensätze:

Schlüssel Wert __START_AT __END_AT
1 a1 01.01.2024 00:00:00 2024-01-01 12:00:00
2 a2 01.01.2024 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 NULL
3 a3 2024-01-01 12:00:00 NULL

Beispiel: Verarbeitung von historischen Momentaufnahmen

Das folgende Beispiel veranschaulicht die SCD-Typ 2-Verarbeitung, die eine Zieltabelle basierend auf Quellereignissen aus zwei Momentaufnahmen aktualisiert, die in einem Cloudspeichersystem gespeichert sind:

Momentaufnahme bei timestamp, gespeichert in /<PATH>/filename1.csv

Schlüssel TrackingColumn Nichtverfolgungsspalte
1 a1 b1
2 a2 b2
4 a4 b4

Momentaufnahme bei timestamp + 5, gespeichert in /<PATH>/filename2.csv

Schlüssel TrackingColumn Nichtverfolgungsspalte
2 a2_new b2
3 a3 b3
4 a4 b4_new

Im folgenden Codebeispiel wird die Verarbeitung von SCD-Typ 2-Updates mit diesen Momentaufnahmen veranschaulicht:

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.create_auto_cdc_from_snapshot_flow(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

Nach der Verarbeitung der Momentaufnahmen enthält die Zieltabelle die folgenden Datensätze:

Schlüssel TrackingColumn Nichtverfolgungsspalte __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 NULL
3 a3 b3 2 NULL
4 a4 b4_new 1 NULL

Hinzufügen, Ändern oder Löschen von Daten in einer Zielstreamingtabelle

Wenn Ihre Pipeline Tabellen im Unity-Katalog veröffentlicht, können Sie DML-Anweisungen (Data Manipulation Language ) verwenden, einschließlich Einfüge-, Aktualisierungs-, Lösch- und Zusammenführungsanweisungen, um die von AUTO CDC ... INTO Anweisungen erstellten Zielstreamingtabellen zu ändern.

Hinweis

  • DML-Anweisungen, die das Tabellenschema einer Streamingtabelle ändern, werden nicht unterstützt. Stellen Sie sicher, dass Ihre DML-Anweisungen nicht versuchen, das Tabellenschema weiterzuentwickeln.
  • DML-Anweisungen, die eine Streamingtabelle aktualisieren, können nur in einem freigegebenen Unity Catalog-Cluster oder einem SQL-Warehouse mit Databricks Runtime 13.3 LTS und höher ausgeführt werden.
  • Da für das Streaming Datenquellen im Nur-Anfügen-Modus benötigt werden, legen Sie das Flag skipChangeCommits beim Lesen der Quellstreamingtabelle fest, wenn Ihre Verarbeitung Streaming aus einer Quellstreamingtabelle mit Änderungen (z. B. durch DML-Anweisungen) erfordert. Wenn skipChangeCommits festgelegt ist, werden Transaktionen, die Datensätze in der Quelltabelle löschen oder ändern, ignoriert. Wenn Ihre Verarbeitung keine Streamingtabelle erfordert, können Sie eine materialisierte Ansicht (die nicht die Einschränkung „nur Anhängen“ hat) als Zieltabelle verwenden.

Da Lakeflow Declarative Pipelines eine angegebene SEQUENCE BY Spalte verwendet und geeignete Sequenzierungswerte an die __START_AT Und __END_AT Spalten der Zieltabelle (für SCD-Typ 2) verteilt, müssen Sie sicherstellen, dass DML-Anweisungen gültige Werte für diese Spalten verwenden, um die richtige Reihenfolge von Datensätzen beizubehalten. Erfahren Sie , wie cdC mit der AUTO CDC-API implementiert wird?.

Weitere Informationen zur Verwendung von DML-Anweisungen mit Streamingtabellen finden Sie unter Hinzufügen, Ändern oder Löschen von Daten in einer Streamingtabelle.

Im folgenden Beispiel wird ein aktiver Datensatz mit einer Startsequenz von 5 eingefügt:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

Lesen eines Änderungsdatenfeeds aus einer AUTO CDC-Zieltabelle

In Databricks Runtime 15.2 und höher können Sie einen Änderungsdatenfeed aus einer Streamingtabelle lesen, die das Ziel von AUTO CDC- oder AUTO CDC FROM SNAPSHOT-Abfragen ist, auf die gleiche Weise wie Sie einen Änderungsdatenfeed aus anderen Delta-Tabellen lesen. Um den Änderungsdatenfeed aus einer Zielstreamingtabelle zu lesen, werden folgende Voraussetzungen benötigt:

  • Die Zielstreamingtabelle muss im Unity-Katalog veröffentlicht werden. Weitere Informationen finden Sie unter Verwenden des Unity-Katalogs mit deklarativen Lakeflow-Pipelines.
  • Um den Änderungsdatenfeed aus der Zielstreamingtabelle zu lesen, müssen Sie Databricks Runtime 15.2 oder höher verwenden. Um den Änderungsdatenfeed in einer anderen Pipeline zu lesen, muss die Pipeline für die Verwendung von Databricks Runtime 15.2 oder höher konfiguriert sein.

Sie lesen den Änderungsdatenfeed aus einer Zielstreamingtabelle, die in Lakeflow Declarative Pipelines erstellt wurde, auf die gleiche Weise wie einen Änderungsdatenfeed aus anderen Delta-Tabellen. Weitere Informationen zur Verwendung der Delta-Änderungsdatenfeedfunktionen, einschließlich Beispiele in Python und SQL, finden Sie unter Verwenden des Delta Lake-Änderungsdatenfeeds in Azure Databricks.

Hinweis

Der Datensatz für Änderungsdatenfeed enthält Metadaten , die den Typ des Änderungsereignisses identifizieren. Wenn ein Datensatz in einer Tabelle aktualisiert wird, enthalten die Metadaten für die zugehörigen Änderungsdatensätze in der Regel _change_type-Werte, die auf update_preimage- und update_postimage-Ereignisse festgelegt sind.

Die _change_type Werte unterscheiden sich jedoch, wenn Aktualisierungen an der Zielstreamingtabelle vorgenommen werden, die änderungen der Primärschlüsselwerte umfassen. Wenn Änderungen Aktualisierungen an Primärschlüsseln enthalten, werden die _change_type Metadatenfelder auf insert und delete Ereignisse festgelegt. Änderungen an Primärschlüsseln können auftreten, wenn manuelle Aktualisierungen an einem der Schlüsselfelder mit einer UPDATE oder MERGE Anweisung vorgenommen werden oder, bei SCD-Typ 2-Tabellen, wenn sich das __start_at Feld ändert, um einen früheren Startsequenzwert widerzuspiegeln.

Die AUTO CDC Abfrage bestimmt die Primärschlüsselwerte, die sich für die SCD-Typ 1- und SCD-Typ 2-Verarbeitung unterscheiden:

  • Für die SCD-Typ 1-Verarbeitung und die Python-Schnittstelle "Lakeflow Declarative Pipelines" ist der Primärschlüssel der Wert des keys Parameters in der create_auto_cdc_flow() Funktion. Für die SQL-Schnittstelle "Lakeflow Declarative Pipelines" ist der Primärschlüssel die Spalten, die durch die KEYS-Klausel in der AUTO CDC ... INTO-Anweisung definiert werden.
  • Bei SCD-Typ 2 ist der Primärschlüssel der keys-Parameter oder die KEYS-Klausel sowie der Rückgabewert des coalesce(__START_AT, __END_AT)-Vorgangs, wobei es sich bei __START_AT und __END_AT um die entsprechenden Spalten aus der Zielstreamingtabelle handelt.

Daten über die von einer Lakeflow-Declarative-Pipelines-CDC-Abfrage verarbeiteten Datensätze abrufen

Hinweis

Die folgenden Metriken werden nur von AUTO CDC Abfragen und nicht von AUTO CDC FROM SNAPSHOT Abfragen erfasst.

Die folgenden Metriken werden von AUTO CDC Abfragen erfasst:

  • num_upserted_rows: Die Anzahl der Ausgabezeilen, die während einer Aktualisierung in das Dataset eingefügt wurden.
  • num_deleted_rows: Die Anzahl vorhandener Ausgabezeilen, die während einer Aktualisierung aus dem Dataset gelöscht wurden.

Die Metrik „num_output_rows“, die für Nicht-CDC-Flows ausgegeben wird, wird nicht für AUTO CDC-Abfragen erfasst.

Welche Datenobjekte werden für die CdC-Verarbeitung von Lakeflow Declarative Pipelines verwendet?

Hinweis

  • Diese Datenstrukturen gelten nur für die AUTO CDC-Verarbeitung, nicht für die AUTO CDC FROM SNAPSHOT-Verarbeitung.
  • Diese Datenstrukturen gelten nur, wenn die Zieltabelle im Hive-Metaspeicher veröffentlicht wird. Wenn eine Pipeline im Unity-Katalog veröffentlicht wird, sind die internen Hintergrundtabellen für Benutzer nicht zugänglich.

Wenn Sie die Zieltabelle im Hive-Metaspeicher deklarieren, werden zwei Datenstrukturen erstellt:

  • Eine Ansicht mit dem Namen, der der Zieltabelle zugewiesen ist.
  • Eine interne Sicherungstabelle, die von Lakeflow Declarative Pipelines zum Verwalten der CDC-Verarbeitung verwendet wird. Der Name dieser Tabelle wird gebildet, indem __apply_changes_storage_ an den Zieltabellennamen vorangestellt wird.

Wenn Sie beispielsweise eine Zieltabelle mit dem Namen dlt_cdc_target deklarieren, wird im Metastore eine Ansicht namens dlt_cdc_target und eine Tabelle namens __apply_changes_storage_dlt_cdc_target angezeigt. Durch das Erstellen einer Ansicht kann Lakeflow Declarative Pipelines die zusätzlichen Informationen (z. B. Tombstones und Versionen) herausfiltern, die für die Verarbeitung von ungeordneten Daten erforderlich sind. Um die verarbeiteten Daten anzuzeigen, führen Sie eine Abfrage auf das Ziel-View durch. Da sich das Schema der __apply_changes_storage_ Tabelle möglicherweise ändert, um zukünftige Features oder Verbesserungen zu unterstützen, sollten Sie die Tabelle nicht für die Produktionsverwendung abfragen. Wenn Sie der Tabelle Daten manuell hinzufügen, werden die Datensätze angenommen, bevor andere Änderungen vorgenommen werden, da die Versionsspalten fehlen.

Weitere Ressourcen