Verwenden Sie den Delta Lake-Änderungs-Datenfeed in Azure Databricks

Hinweis

Änderungs-Datenfeed ermöglicht es Azure Databricks, Änderungen auf Zeilenebene zwischen Versionen einer Delta-Tabelle nachzuverfolgen. Wenn es für eine Delta-Tabelle aktiviert ist, zeichnet die Laufzeit Änderungsereignisse für alle Daten auf, die in die Tabelle geschrieben werden. Dazu gehören die Zeilendaten sowie die Metadaten, die anzeigen, ob die angegebene Zeile eingefügt, gelöscht oder aktualisiert wurde.

Sie können die Änderungsereignisse in Stapelabfragen mit Spark SQL, Apache Spark DataFrames und strukturiertem Streaming lesen.

Wichtig

Änderungsdatenfeed arbeitet zusammen mit dem Tabellenverlauf, um Änderungsinformationen bereitzustellen. Da durch das Klonen einer Delta-Tabelle ein separater Verlauf erstellt wird, stimmt der Änderungsdatenfeed für geklonte Tabellen nicht mit dem der ursprünglichen Tabelle überein.

Anwendungsfälle

Änderungs-Datenfeed ist standardmäßig nicht aktiviert. Sie sollten den Änderungsdatenfeed in den folgenden Anwendungsfällen aktivieren:

  • Silberne und goldene Tabellen: Verbessern Sie die Leistung von Delta Lake, indem Sie nur Änderungen auf Zeilenebene nach anfänglichen MERGE-, UPDATE-, oder DELETE-Operationen verarbeiten, um ETL- und ELT-Operationen zu beschleunigen und zu vereinfachen.
  • Materialisierte Sichten: Erstellen Sie aktuelle, aggregierte Sichten von Informationen, die für BI und Analysen verwendet werden können, ohne dabei die vollständigen zugrunde liegenden Tabellen erneut verarbeiten zu müssen, sondern nur dort zu aktualisieren, wo Änderungen vorgenommen werden.
  • Übertragen von Änderungen: Senden Sie einen Änderungsdatenfeed an Downstreamsysteme wie Kafka oder RDBMS, die ihn zur inkrementellen Verarbeitung in späteren Phasen von Datenpipelines verwenden können.
  • Überwachungspfadtabelle: Erfassen Sie den Änderungsdatenfeed als Delta-Tabelle. Dabei können Sie einen unbefristeten Speicher und eine effiziente Abfragefunktion verwenden, über die Sie alle im Laufe der Zeit vorgenommenen Änderungen anzeigen können, einschließlich des Zeitpunkts von Löschungen und der vorgenommenen Aktualisierungen.

Aktivieren des Änderungsdatenfeeds

Sie müssen den Änderungsdatenfeed explizit mit einer der folgenden Methoden aktivieren:

  • Neue Tabelle: Legen Sie die Tabelleneigenschaft delta.enableChangeDataFeed = true im Befehl CREATE TABLE fest.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Vorhandene Tabelle: Legen Sie die Tabelleneigenschaft delta.enableChangeDataFeed = true im Befehl ALTER TABLE fest.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Alle neuen Tabellen:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Wichtig

Nur Änderungen, die nach dem Aktivieren des Änderungsdatenfeeds vorgenommen wurden, werden aufgezeichnet. Frühere Änderungen an einer Tabelle werden nicht erfasst.

Änderungsdatenspeicher

Azure Databricks zeichnet Änderungsdaten für UPDATE-, DELETE- und MERGE-Vorgänge im Ordner _change_data unter dem Tabellenverzeichnis auf. Einige Vorgänge, z. B. reine Einfügevorgänge und vollständige Partitionslöschungen, generieren keine Daten im _change_data-Verzeichnis, da Azure Databricks den Änderungsdatenfeed effizient direkt aus dem Transaktionsprotokoll berechnen kann.

Für die Dateien im Ordner _change_data gilt die Aufbewahrungsrichtlinie der Tabelle. Wenn Sie den VACUUM-Befehl ausführen, werden daher auch Daten des Änderungsdatenfeeds gelöscht.

Lesen von Änderungen in Batchabfragen

Sie können entweder die Version oder den Zeitstempel für den Start und das Ende angeben. Die Versionen und Zeitstempel für Start und Ende sind in den Abfragen enthalten. Geben Sie nur die Startversion oder den Startzeitstempel an, um die Änderungen von einer bestimmten Startversion zur neuesten Version der Tabelle zu lesen.

Sie geben eine Version als ganze Zahl und einen Zeitstempel als eine Zeichenfolge im Format yyyy-MM-dd[ HH:mm:ss[.SSS]] an.

Wenn Sie eine niedrigere Version oder einen älteren Zeitstempel angeben als eine, die Änderungsereignisse aufgezeichnet hat – d. h. wenn der Änderungsdaten-Feed aktiviert war – wird ein Fehler ausgegeben, der darauf hinweist, dass der Änderungsdaten-Feed nicht aktiviert war.

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')

Python

# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# path based tables
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .load("pathToMyDeltaTable")

Scala

// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// path based tables
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .load("pathToMyDeltaTable")

Lesen von Änderungen in Streamingabfragen

Python

# providing a starting version
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# providing a starting timestamp
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2021-04-21 05:35:43") \
  .load("/pathToMyDeltaTable")

# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .table("myDeltaTable")

Scala

// providing a starting version
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// providing a starting timestamp
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "2021-04-21 05:35:43")
  .load("/pathToMyDeltaTable")

// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Legen Sie die Option readChangeFeed auf true fest, um die Änderungsdaten beim Lesen der Tabelle abzurufen. Die startingVersion oder der startingTimestamp sind optional. Wenn sie nicht angegeben werden, gibt der Stream die letzte Momentaufnahme der Tabelle zum Zeitpunkt des Streamings als INSERT und zukünftige Änderungen als Änderungsdaten zurück. Optionen wie Datenübertragungslimits (maxFilesPerTrigger, maxBytesPerTrigger) und excludeRegex werden auch beim Lesen von Änderungsdaten unterstützt.

Hinweis

Die Ratenbegrenzung kann für andere Versionen als die Startversion der Momentaufnahme unteilbar sein. Das heißt, dass die gesamte Commitversion einer Ratenbegrenzung unterliegt oder der gesamte Commit zurückgegeben wird.

Wenn ein Benutzer eine Version oder einen Zeitstempel übergibt, der den letzten Commit für eine Tabelle überschreitet, wird standardmäßig der Fehler timestampGreaterThanLatestCommit ausgegeben. In Databricks Runtime 11.3 LTS und höher kann der Änderungsdatenfeed den Fall einer Version außerhalb des zulässigen Bereichs behandeln, wenn der Benutzer die folgende Konfiguration auf true setzt:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Wenn Sie eine Startversion bereitstellen, die höher ist als die des letzten Commits in einer Tabelle, oder einen Startzeitstempel, der nach dem letzten Commit in einer Tabelle liegt, wird beim Aktivieren der vorherigen Konfiguration ein leeres Leseergebnis zurückgegeben.

Wenn Sie eine Endversion bereitstellen, die höher ist als die des letzten Commits in einer Tabelle, oder einen Endzeitstempel, der nach dem letzten Commit in einer Tabelle liegt, werden beim Aktivieren der vorherigen Konfiguration im Batchmodus für Lesevorgänge alle Änderungen zurückgegeben, die zwischen der Startversion und dem letzten Commit erfolgt sind.

Was ist das Schema für den Änderungsdatenfeed?

Wenn Sie aus dem Änderungsdatenfeed für eine Tabelle lesen, wird das Schema für die neueste Tabellenversion verwendet.

Hinweis

Die meisten Schemaänderungs- und Evolutionsvorgänge werden vollständig unterstützt. Tabellen mit aktivierter Spaltenzuordnung unterstützen nicht alle Anwendungsfälle und zeigen ein anderes Verhalten. Weitere Informationen finden Sie unter Ändern von Datenfeedeinschränkungen für Tabellen mit aktivierter Spaltenzuordnung.

Zusätzlich zu den Datenspalten aus dem Schema der Delta-Tabelle enthält der Änderungsdatenfeed Metadatenspalten, die den Typ des Änderungsereignisses identifizieren:

Spaltenname type Werte
_change_type String insert, update_preimage , update_postimage, delete(1)
_commit_version Long Die Version des Delta-Protokolls oder der Delta-Tabelle, die die Änderung enthält
_commit_timestamp Timestamp Der Zeitstempel, der beim Erstellen des Commits zugeordnet wurde

(1)preimage ist der Wert vor dem Update, postimage ist der Wert nach dem Update.

Hinweis

Sie können den Änderungsdatenfeed für eine Tabelle nicht aktivieren, wenn das Schema Spalten mit denselben Namen wie diese hinzugefügten Spalten enthält. Benennen Sie Spalten in der Tabelle um, um diesen Konflikt zu beheben, bevor Sie versuchen, den Änderungsdatenfeed zu aktivieren.

Ändern von Datenfeedeinschränkungen für Tabellen mit aktivierter Spaltenzuordnung

Wenn die Spaltenzuordnung für eine Delta-Tabelle aktiviert ist, können Sie Spalten in der Tabelle löschen oder umbenennen, ohne Datendateien für vorhandene Daten umzuschreiben. Wenn die Spaltenzuordnung aktiviert ist, weist der Änderungsdatenfeed Einschränkungen auf, nachdem nicht additive Schemaänderungen durchgeführt wurden, z. B. das Umbenennen oder Löschen einer Spalte, das Ändern des Datentyps oder Änderungen an der NULL-Zulässigkeit.

Wichtig

  • Sie können keinen Änderungsdatenfeed für eine Transaktion oder einen Bereich lesen, in dem eine nicht additive Schemaänderung unter Verwendung von Batchsemantiken auftritt.
  • In Databricks Runtime 12.2 LTS und früher unterstützen Tabellen mit aktivierter Spaltenzuordnung, bei denen Änderungen an nicht additiven Schemas vorgenommen wurden, keine Streaminglesevorgänge in Änderungsdatenfeeds. Weitere Informationen finden Sie unter Streaming mit Spaltenzuordnung und Schemaänderungen.
  • In Databricks Runtime 11.3 LTS und früher können Sie nicht den Änderungsdatenfeed für Tabellen mit aktivierter Spaltenzuordnung lesen, in denen Spalten umbenannt oder gelöscht wurden.

In Databricks Runtime 12.2 LTS und höher können Sie Batchlesevorgänge in Änderungsdatenfeeds für Tabellen mit aktivierter Spaltenzuordnung durchführen, in denen nicht additive Schemaänderungen vorgenommen wurden. Statt das Schema der neuesten Version der Tabelle zu verwenden, nutzen Lesevorgänge das Schema der in der Abfrage angegebenen Endversion der Tabelle. Abfragen schlagen weiterhin fehl, wenn der angegebene Versionsbereich eine nicht additive Schemaänderung umfasst.

Häufig gestellte Fragen (FAQ)

Wie hoch ist der Mehraufwand für das Aktivieren des Änderungsdatenfeeds?

Dieser hat keine signifikanten Auswirkungen. Die Änderungsdatensätze werden während der Abfrageausführung generiert und sind im Allgemeinen viel kleiner als die Gesamtgröße der neu geschriebenen Dateien.

Welche Aufbewahrungsrichtlinie gilt für Änderungsdatensätze?

Für Änderungsdatensätze gilt dieselbe Aufbewahrungsrichtlinie wie für veraltete Tabellenversionen. Sie werden mit „VACUUM“ bereinigt, wenn sie außerhalb des angegebenen Aufbewahrungszeitraums liegen.

Wann werden neue Datensätze im Änderungsdatenfeed verfügbar?

Änderungsdaten werden zusammen mit der Delta Lake-Transaktion comittet und stehen dann zur Verfügung, wenn die neuen Daten in der Tabelle verfügbar sind.

Notebookbeispiel: Übertragen von Änderungen mit Delta-Änderungsdatenfeed

Dieses Notizbuch zeigt, wie Änderungen an einer silbernen Tabelle mit absoluten Impfzahlen auf eine goldene Tabelle mit Impfquoten übertragen werden.

Notebook des Änderungsdatenfeeds

Notebook abrufen