Freigeben über


APPLY CHANGES-API: Vereinfachtes CDC (Change Data Capture) in Delta Live Tables

Delta Live Tables vereinfacht die Change Data Capture (CDC) mit der APPLY CHANGES-API. Zuvor wurde die MERGE INTO-Anweisung üblicherweise für die Verarbeitung von CDC-Datensätzen auf Azure Databricks verwendet. MERGE INTO kann jedoch falsche Ergebnisse aufgrund von Out-of-Sequence-Datensätzen erzeugen oder komplexe Logik zum erneuten Anordnen von Datensätzen erfordern.

Durch die automatische Verarbeitung von Out-of-Sequence-Datensätzen stellt die APPLY CHANGES-API in Delta-Live Tables die korrekte Verarbeitung von CDC-Datensätzen sicher und entfernt die Notwendigkeit, komplexe Logik für die Behandlung von Out-of-Sequence-Datensätzen zu entwickeln.

Die APPLY CHANGES-API wird in den SQL- und Python-Schnittstellen Delta-Live Tables unterstützt, einschließlich der Unterstützung für das Aktualisieren von Tabellen mit SCD-Typ 1 und Typ 2:

  • Verwenden Sie SCD-Typ 1, um Datensätze direkt zu aktualisieren. Der Verlauf wird nicht für Datensätze aufbewahrt, die aktualisiert werden.
  • Verwenden Sie SCD-Typ 2, um einen Verlauf von Datensätzen beizubehalten, entweder für alle Updates, oder für Updates für einen angegebenen Satz von Spalten.

Syntax und andere Verweise finden Sie unter:

Hinweis

In diesem Artikel wird beschrieben, wie Tabellen in Ihrer Delta Live Tables-Pipeline basierend auf Änderungen in Quelldaten aktualisiert werden. Informationen zum Aufzeichnen und Abfragen von Änderungsinformationen auf Zeilenebene für Delta-Tabellen finden Sie unter Verwenden Sie den Delta Lake-Änderungs-Datenfeed in Azure Databricks.

Wie wird CDC mit Delta Live Tables implementiert?

Sie müssen eine Spalte in den Quelldaten angeben, für die Datensätze sequenziert werden sollen, die Delta Live Tables als monoton zunehmende Darstellung der richtigen Reihenfolge der Quelldaten interpretiert. Delta Live Tables verarbeitet automatisch Daten, die außerhalb der Reihenfolge ankommen. Für SCD-Typ 2-Änderungen verteilt Delta Live Tables die entsprechenden Sequenzierungswerte an dien Spalten __START_AT und __END_AT der Zieltabelle. Für jeden Sequenzierungswert sollte ein eindeutiges Update pro Schlüssel vorhanden sein, und NULL-Sequenzierungswerte werden nicht unterstützt.

Zum Ausführen der CDC-Verarbeitung mit Delta Live Tables erstellen Sie zuerst eine Streamingtabelle und verwenden dann eine APPLY CHANGES INTO-Anweisung, um die Quelle, Schlüssel und Sequenzierung für den Änderungsfeed anzugeben. Verwenden Sie zum Erstellen der Zielstreamingtabelle die CREATE OR REFRESH STREAMING TABLE -Anweisung in SQL oder die create_streaming_table()-Funktion in Python. Verwenden Sie zum Erstellen der Anweisung, die die CDC-Verarbeitung definiert, die APPLY CHANGES-Anweisung in SQL oder die apply_changes() -Funktion in Python. Ausführliche Informationen zur Syntax finden Sie unter Change Data Capture mit SQL in Delta Live Tables oder Change Data Capture mit Python in Delta Live Tables.

Welche Datenobjekte werden für die CDC-Verarbeitung von Delta Live Tables verwendet?

Wenn Sie die Zieltabelle im Hive-Metaspeicher deklarieren, werden zwei Datenstrukturen erstellt:

  • Eine Ansicht mit dem Namen, der der Zieltabelle zugewiesen ist.
  • Eine interne Sicherungstabelle, die von Delta Live Tables zum Verwalten der CDC-Verarbeitung verwendet wird. Diese Tabelle wird benannt, indem dem Namen der Zieltabelle __apply_changes_storage_ vorangestellt wird.

Wenn Sie beispielsweise eine Zieltabelle mit dem Namen dlt_cdc_targetdeklarieren, wird eine Ansicht mit dem Namen dlt_cdc_target und eine Tabelle mit dem Namen __apply_changes_storage_dlt_cdc_target im Metastore angezeigt. Durch das Erstellen einer Ansicht können Delta Live Tables die zusätzlichen Informationen (z. B. Tombstones und Versionen) herausfiltern, die für die Verarbeitung von nicht ordnungsgemäßen Daten erforderlich sind. Um die verarbeiteten Daten anzuzeigen, fragen Sie die Zielansicht ab. Da sich das Schema der __apply_changes_storage_-Tabelle möglicherweise ändert, um zukünftige Features oder Verbesserungen zu unterstützen, sollten Sie die Tabelle nicht für die Produktionsverwendung abfragen. Wenn Sie der Tabelle Daten manuell hinzufügen, werden die Datensätze angenommen, bevor andere Änderungen vorgenommen werden, da die Versionsspalten fehlen.

Wenn eine Pipeline im Unity Catalog veröffentlicht wird, sind die internen Sicherungstabellen für Benutzer nicht zugänglich.

Abrufen von Daten zu Datensätzen, die von einer Delta Live Tables-CDC-Abfrage verarbeitet werden

Die folgenden Metriken werden von apply changes-Abfragen erfasst:

  • num_upserted_rows: Die Anzahl der Ausgabezeilen, für die während einer Aktualisierung ein Upsertvorgang im Dataset ausgeführt wird.
  • num_deleted_rows: Die Anzahl vorhandener Ausgabezeilen, die während einer Aktualisierung aus dem Dataset gelöscht wurden.

Die Metrik num_output_rows, die für Nicht-CDC-Flows ausgegeben wird, wird nicht für apply changes-Abfragen erfasst.

Begrenzungen

Das Ziel der APPLY CHANGES INTO-Abfrage oder apply_changes-Funktion kann nicht als Streamingquelle für Tabellen verwendet werden. Eine Tabelle, die aus dem Ziel einer APPLY CHANGES INTO-Abfrage oder apply_changes-Funktion liest, muss eine materialisierte Sicht sein.

SCD-Typ 1 und SCD-Typ 2 auf Azure Databricks

Die folgenden Abschnitte veranschaulichen Abfragen von Delta Live Tables SCD-Typ 1 und Typ 2, die Zieltabellen basierend auf Quellereignissen aktualisieren, die Folgendes durchführen:

  1. Neue Benutzerdatensätze erstellen.
  2. Einen Benutzerdatensatz löschen.
  3. Benutzerdatensätze aktualisieren. In dem Beispiel vom SCD-Typ 1 kommen die letzten UPDATE-Vorgänge zu spät an und werden aus der Zieltabelle gelöscht, um die Behandlung von Ereignissen in nicht ordnungsgemäßer Reihenfolge zu veranschaulichen.

In den folgenden Beispielen wird davon ausgegangen, dass Sie mit der Konfiguration und Aktualisierung von Delta Live Tables-Pipelines vertraut sind. Siehe Tutorial: Ausführen Ihrer ersten Delta Live Tables-Pipeline.

Zum Ausführen dieser Beispiele müssen Sie zunächst ein Beispiel-Dataset erstellen. Siehe Generieren von Testdaten.

Im Folgenden sind die Eingabedatensätze für diese Beispiele aufgeführt:

userId name city operation sequenceNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lily Cancun INSERT 2
123 null NULL DELETE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Wenn Sie die letzte Zeile in den Beispieldaten auskommentieren, wird der folgende Datensatz eingefügt, der angibt, wo die Datensätze abgeschnitten werden sollen:

userId name city operation sequenceNum
null NULL null TRUNCATE 3

Hinweis

Alle folgenden Beispiele enthalten Optionen zum Angeben von DELETE- und TRUNCATE-Vorgängen, die jedoch optional sind.

Aktualisierungen des SCD-Typs 1 verarbeiten

Im folgenden Codebeispiel wird die Verarbeitung von SCD-Typ 1-Updates veranschaulicht:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Nach dem Ausführen des Beispiels vom SCD-Typ 1 enthält die Zieltabelle die folgenden Datensätze:

userId name city
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lily Cancun

Nach der Ausführung des SCD-Typ 1-Beispiels mit dem zusätzlichen TRUNCATE-Datensatz werden die Datensätze 124 und 126 aufgrund des TRUNCATE-Vorgangs bei sequenceNum=3 abgeschnitten, und die Zieltabelle enthält den folgenden Datensatz:

userId name city
125 Mercedes Guadalajara

Aktualisierungen des SCD-Typs 2 verarbeiten

Im folgenden Codebeispiel wird die Verarbeitung von SCD-Typ 2-Updates veranschaulicht:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Nach dem Ausführen des SCD Typ 2-Beispiels enthält die Zieltabelle die folgenden Datensätze:

userId name city __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 null
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 null
126 Lily Cancun 2 NULL

Eine SCD-Typ 2-Abfrage kann auch eine Teilmenge der Ausgabespalten angeben, die im Hinblick auf den Verlauf in der Zieltabelle nachverfolgt werden sollen. Änderungen an anderen Spalten werden aktualisiert, statt dass neue Verlaufsdatensätze generiert werden. Im folgenden Beispiel wird veranschaulicht, dass die Spalte city von der Nachverfolgung ausgeschlossen wird:

Im folgenden Beispiel wird die Verwendung des Verlaufs mit SCD-Typ 2 veranschaulicht:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Nach dem Ausführen dieses Beispiels ohne den zusätzlichen TRUNCATE-Datensatz enthält die Zieltabelle die folgenden Datensätze:

userId name city __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 null
125 Mercedes Guadalajara 2 null
126 Lily Cancun 2 null

Generieren von Testdaten

Der folgende Code wird bereitgestellt, um ein Beispiel-Dataset für die Verwendung in den Beispielabfragen zu generieren, die in diesem Lernprogramm vorhanden sind. Wenn Sie über die richtigen Anmeldeinformationen verfügen, um ein neues Schema zu erstellen und eine neue Tabelle zu erstellen, können Sie diese Anweisungen entweder mit einem Notebook oder mit Databricks SQL ausführen. Der folgende Code sollte nicht als Teil einer Delta Live Tables-Pipeline ausgeführt werden:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Hinzufügen, Ändern oder Löschen von Daten in einer Zielstreamingtabelle

Wenn Ihre Pipeline Tabellen im Unity Catalog veröffentlicht, können Sie Datenbearbeitungssprache (DML)-Anweisungen verwenden, einschließlich Einfüge-, Aktualisierungs-, Lösch- und Zusammenführungsanweisungen, um die von APPLY CHANGES INTO-Anweisungen erstellten Zielstreamingtabellen zu ändern.

Hinweis

  • DML-Anweisungen, die das Tabellenschema einer Streamingtabelle ändern, werden nicht unterstützt. Stellen Sie sicher, dass Ihre DML-Anweisungen nicht versuchen, das Tabellenschema weiterzuentwickeln.
  • DML-Anweisungen, die eine Streamingtabelle aktualisieren, können nur in einem freigegebenen Unity Catalog-Cluster oder einem SQL-Warehouse mit Databricks Runtime 13.3 LTS und höher ausgeführt werden.
  • Da für das Streaming Datenquellen im Nur-Anfügen-Modus benötigt werden, setzen Sie das Flag skipChangeCommits beim Lesen der Streaming-Quelltabelle, wenn Ihre Verarbeitung Streaming aus einer Streaming-Quelltabelle mit Änderungen (z. B. durch DML-Anweisungen) erfordert. Wenn skipChangeCommits festgelegt ist, werden Transaktionen, die Datensätze in der Quelltabelle löschen oder ändern, ignoriert. Wenn Ihre Verarbeitung keine Streaming-Tabelle erfordert, können Sie eine materialisierte Ansicht (die nicht die Einschränkung „nur Anhängen“ hat) als Zieltabelle verwenden.

Da Delta Live Tables eine angegebene SEQUENCE BY-Spalte verwendet und geeignete Sequenzierungswerte an die __START_AT- und __END_AT-Spalten der Zieltabelle (für SCD-Typ 2) verteilt, müssen Sie sicherstellen, dass DML-Anweisungen gültige Werte für diese Spalten verwenden, um die richtige Reihenfolge von Datensätzen aufrechtzuerhalten. Siehe Wie wird CDC mit Delta Live Tables implementiert?.

Weitere Informationen zum Verwenden von DML-Anweisungen mit Streamingtabellen finden Sie unter Hinzufügen, Ändern oder Löschen von Daten in einer Streamingtabelle.

Im folgenden Beispiel wird ein aktiver Datensatz mit einer Startsequenz von 5 eingefügt:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);