Freigeben über


Verwenden Sie den Delta Lake-Änderungs-Datenfeed in Azure Databricks

Änderungs-Datenfeed ermöglicht es Azure Databricks, Änderungen auf Zeilenebene zwischen Versionen einer Delta-Tabelle nachzuverfolgen. Wenn es für eine Delta-Tabelle aktiviert ist, zeichnet die Laufzeit Änderungsereignisse für alle Daten auf, die in die Tabelle geschrieben werden. Dazu gehören die Zeilendaten sowie die Metadaten, die anzeigen, ob die angegebene Zeile eingefügt, gelöscht oder aktualisiert wurde.

Wichtig

Änderungsdatenfeed arbeitet zusammen mit dem Tabellenverlauf, um Änderungsinformationen bereitzustellen. Da durch das Klonen einer Delta-Tabelle ein separater Verlauf erstellt wird, stimmt der Änderungsdatenfeed für geklonte Tabellen nicht mit dem der ursprünglichen Tabelle überein.

Inkrementelle Verarbeitung von Änderungsdaten

Databricks empfiehlt die Verwendung von Änderungsdatenfeeds in Kombination mit strukturiertem Streaming, um Änderungen aus Delta-Tabellen inkrementell zu verarbeiten. Sie müssen strukturiertes Streaming für Azure Databricks verwenden, um Versionen für den Änderungsdatenfeed Ihrer Tabelle automatisch nachzuverfolgen.

Hinweis

Delta Live Tables bietet Funktionen für die einfache Verteilung von Änderungsdaten und das Speichern von Ergebnissen als SCD (langsam ändernde Dimension) Typ 1- oder Typ 2-Tabellen. Weitere Informationen finden Sie unter Die APPLY CHANGES-APIs: Vereinfachen der Änderungsdatenerfassung mit Delta Live Tables.

Zum Lesen des Änderungsdatenfeeds aus einer Tabelle müssen Sie den Datenfeed für diese Tabelle aktivieren. Weitere Informationen finden Sie unter Aktivieren des Änderungsdatenfeeds.

Legen Sie beim Konfigurieren eines Datenstroms für eine Tabelle die Option readChangeFeed auf true fest, um den Änderungsdatenfeed zu lesen, wie im folgenden Syntaxbeispiel gezeigt:

Python

(spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Standardmäßig gibt der Datenstrom die neueste Momentaufnahme der Tabelle zurück, wenn der Datenstrom zum ersten Mal als INSERT und zukünftige Änderungen als Änderungsdaten gestartet werden.

Ändern Sie Datencommits als Teil der Delta Lake-Transaktion, und sie werden gleichzeitig verfügbar, wenn die neuen Daten in die Tabelle übernommen werden.

Optional können Sie eine Startversion angeben. Weitere Informationen unter Sollte ich eine Startversion angeben?.

Der Datenfeed zum Ändern unterstützt auch die Batchausführung, bei der eine Startversion angegeben werden muss. Weitere Informationen unter Lesen von Änderungen in Batchabfragen.

Optionen wie Datenübertragungslimits (maxFilesPerTrigger, maxBytesPerTrigger) und excludeRegex werden auch beim Lesen von Änderungsdaten unterstützt.

Die Ratenbegrenzung kann für andere Versionen als die Startversion der Momentaufnahme unteilbar sein. Das heißt, dass die gesamte Commitversion einer Ratenbegrenzung unterliegt oder der gesamte Commit zurückgegeben wird.

Sollte ich eine Startversion angeben?

Sie können optional eine Startversion angeben, wenn Sie Änderungen ignorieren möchten, die vor einer bestimmten Version aufgetreten sind. Sie können eine Version mithilfe eines Zeitstempels oder der im Delta-Transaktionsprotokoll aufgezeichneten Versionsnummer angeben.

Hinweis

Für Batchlese ist eine Startversion erforderlich, und viele Batchmuster können von der Festlegung einer optionalen Endversion profitieren.

Wenn Sie strukturierte Streaming-Workloads mit Änderungsdatenfeed konfigurieren, ist es wichtig zu verstehen, wie sich die Angabe einer Startversion auf die Verarbeitung auswirkt.

Viele Streamingworkloads, insbesondere neue Datenverarbeitungspipelines, profitieren vom Standardverhalten. Mit dem Standardverhalten wird der erste Batch verarbeitet, wenn der Datenstrom zuerst alle vorhandenen Datensätze in der Tabelle als INSERT-Vorgänge im Änderungsdatenfeed erfasst.

Wenn Ihre Zieltabelle bereits alle Datensätze mit entsprechenden Änderungen bis zu einem bestimmten Punkt enthält, geben Sie eine Startversion an, um die Verarbeitung des Quelltabellenzustands als INSERT-Ereignisse zu vermeiden.

In der folgenden Beispielsyntax wird ein Streamingfehler wiederhergestellt, bei dem der Prüfpunkt beschädigt wurde. Gehen Sie in diesem Beispiel von den folgenden Bedingungen aus:

  1. Der Datenfeed wurde für die Quelltabelle bei der Tabellenerstellung aktiviert.
  2. Die nachgelagerte Zieltabelle hat alle Änderungen bis einschließlich Version 75 verarbeitet.
  3. Der Versionsverlauf für die Quelltabelle ist für die Versionen 70 und höher verfügbar.

Python

(spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

In diesem Beispiel müssen Sie auch einen neuen Prüfpunktspeicherort angeben.

Wichtig

Wenn Sie eine Startversion angeben, kann der Datenstrom nicht von einem neuen Prüfpunkt aus gestartet werden, wenn die Startversion nicht mehr im Tabellenverlauf vorhanden ist. Delta Lake bereinigt historische Versionen automatisch, was bedeutet, dass alle angegebenen Startversionen irgendwann gelöscht werden.

Weitere Informationen unter Kann ich den Änderungsdatenfeed verwenden, um den gesamten Verlauf einer Tabelle wiederzugeben?.

Lesen von Änderungen in Batchabfragen

Mit der Batchabfragesyntax können Sie alle Änderungen ab einer bestimmten Version lesen oder Änderungen innerhalb eines bestimmten Versionsbereichs lesen.

Sie geben eine Version als ganze Zahl und einen Zeitstempel als eine Zeichenfolge im Format yyyy-MM-dd[ HH:mm:ss[.SSS]] an.

Die Versionen für Start und Ende sind in den Abfragen enthalten. Geben Sie nur die Startversion an, um die Änderungen von einer bestimmten Startversion zur neuesten Version der Tabelle zu lesen.

Wenn Sie eine niedrigere Version oder einen älteren Zeitstempel angeben als eine, die Änderungsereignisse aufgezeichnet hat – d. h. wenn der Änderungsdaten-Feed aktiviert war – wird ein Fehler ausgegeben, der darauf hinweist, dass der Änderungsdaten-Feed nicht aktiviert war.

Die folgenden Syntaxbeispiele veranschaulichen die Verwendung von Start- und Endversionsoptionen mit Batchlesevorgängen:

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Hinweis

Wenn ein Benutzer eine Version oder einen Zeitstempel übergibt, der den letzten Commit für eine Tabelle überschreitet, wird standardmäßig der Fehler timestampGreaterThanLatestCommit ausgegeben. In Databricks Runtime 11.3 LTS und höher kann der Änderungsdatenfeed den Fall einer Version außerhalb des zulässigen Bereichs behandeln, wenn der Benutzer die folgende Konfiguration auf true setzt:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Wenn Sie eine Startversion bereitstellen, die höher ist als die des letzten Commits in einer Tabelle, oder einen Startzeitstempel, der nach dem letzten Commit in einer Tabelle liegt, wird beim Aktivieren der vorherigen Konfiguration ein leeres Leseergebnis zurückgegeben.

Wenn Sie eine Endversion bereitstellen, die höher ist als die des letzten Commits in einer Tabelle, oder einen Endzeitstempel, der nach dem letzten Commit in einer Tabelle liegt, werden beim Aktivieren der vorherigen Konfiguration im Batchmodus für Lesevorgänge alle Änderungen zurückgegeben, die zwischen der Startversion und dem letzten Commit erfolgt sind.

Was ist das Schema für den Änderungsdatenfeed?

Wenn Sie aus dem Änderungsdatenfeed für eine Tabelle lesen, wird das Schema für die neueste Tabellenversion verwendet.

Hinweis

Die meisten Schemaänderungs- und Evolutionsvorgänge werden vollständig unterstützt. Tabellen mit aktivierter Spaltenzuordnung unterstützen nicht alle Anwendungsfälle und zeigen ein anderes Verhalten. Weitere Informationen finden Sie unter Ändern von Datenfeedeinschränkungen für Tabellen mit aktivierter Spaltenzuordnung.

Zusätzlich zu den Datenspalten aus dem Schema der Delta-Tabelle enthält der Änderungsdatenfeed Metadatenspalten, die den Typ des Änderungsereignisses identifizieren:

Spaltenname type Werte
_change_type String insert, update_preimage , update_postimage, delete (1)
_commit_version Long Die Version des Delta-Protokolls oder der Delta-Tabelle, die die Änderung enthält
_commit_timestamp Timestamp Der Zeitstempel, der beim Erstellen des Commits zugeordnet wurde

(1) preimage ist der Wert vor dem Update, postimage ist der Wert nach dem Update.

Hinweis

Sie können den Änderungsdatenfeed für eine Tabelle nicht aktivieren, wenn das Schema Spalten mit denselben Namen wie diese hinzugefügten Spalten enthält. Benennen Sie Spalten in der Tabelle um, um diesen Konflikt zu beheben, bevor Sie versuchen, den Änderungsdatenfeed zu aktivieren.

Aktivieren des Änderungsdatenfeeds

Sie können den Änderungsdatenfeed nur für aktivierte Tabellen lesen. Sie müssen den Änderungsdatenfeed explizit mit einer der folgenden Methoden aktivieren:

  • Neue Tabelle: Legen Sie die Tabelleneigenschaft delta.enableChangeDataFeed = true im Befehl CREATE TABLE fest.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Vorhandene Tabelle: Legen Sie die Tabelleneigenschaft delta.enableChangeDataFeed = true im Befehl ALTER TABLE fest.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Alle neuen Tabellen:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Wichtig

Nur Änderungen, die vorgenommen wurden, nachdem Sie den Änderungsdatenfeed aktiviert haben, werden aufgezeichnet. Vergangene Änderungen an einer Tabelle werden nicht erfasst.

Änderungsdatenspeicher

Das Aktivieren von Änderungsdatenfeeds führt zu einer geringen Erhöhung der Speicherkosten für eine Tabelle. Änderungsdatensätze werden generiert, wenn die Abfrage ausgeführt wird, und sind im Allgemeinen wesentlich kleiner als die Gesamtgröße der neu geschriebenen Dateien.

Azure Databricks zeichnet Änderungsdaten für UPDATE-, DELETE- und MERGE-Vorgänge im Ordner _change_data unter dem Tabellenverzeichnis auf. Einige Vorgänge, z. B. reine Einfügevorgänge und vollständige Partitionslöschungen, generieren keine Daten im _change_data-Verzeichnis, da Azure Databricks den Änderungsdatenfeed effizient direkt aus dem Transaktionsprotokoll berechnen kann.

Alle Lesevorgänge für Datendateien im _change_data-Ordner sollten unterstützte Delta Lake-APIs durchlaufen.

Für die Dateien im Ordner _change_data gilt die Aufbewahrungsrichtlinie der Tabelle. Datenfeeddaten werden beim Ausführen des VACUUM-Befehls gelöscht.

Kann ich den Änderungsdatenfeed verwenden, um den gesamten Verlauf einer Tabelle wiederzuverwenden?

Der Datenfeed soll nicht als permanenter Datensatz aller Änderungen an einer Tabelle dienen. Beim Ändern des Datenfeeds werden nur Änderungen aufgezeichnet, die nach der Aktivierung auftreten.

Mit dem Ändern von Datenfeeds und Delta Lake können Sie immer eine vollständige Momentaufnahme einer Quelltabelle rekonstruieren, was bedeutet, dass Sie mit einem neuen Streaming-Lesevorgang für eine Tabelle mit aktiviertem Änderungsdatenfeed beginnen können, und die aktuelle Version dieser Tabelle und alle Änderungen, die nachher auftreten, erfassen können.

Sie müssen Datensätze im Änderungsdatenfeed als vorübergehend und nur für ein bestimmtes Aufbewahrungsfenster zugänglich behandeln. Das Delta-Transaktionsprotokoll entfernt Tabellenversionen und die entsprechenden Änderungsdatenfeedversionen in regelmäßigen Abständen. Wenn eine Version aus dem Transaktionsprotokoll entfernt wird, können Sie den Änderungsdatenfeed für diese Version nicht mehr lesen.

Wenn für Ihren Anwendungsfall ein permanenter Verlauf aller Änderungen an einer Tabelle erforderlich ist, sollten Sie inkrementelle Logik verwenden, um Datensätze aus dem Änderungsdatenfeed in eine neue Tabelle zu schreiben. Im folgenden Codebeispiel wird die Verwendung von trigger.AvailableNow veranschaulicht, welche die inkrementelle Verarbeitung von strukturiertem Streaming nutzt, aber verfügbare Daten als Batchworkload verarbeitet. Sie können diese Workload asynchron mit Ihren Hauptverarbeitungspipelines planen, um eine Sicherung des Änderungsdatenfeeds für Überwachungszwecke oder vollständige Möglichkeit der Wiedergabe zu erstellen.

Python

(spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Ändern von Datenfeedeinschränkungen für Tabellen mit aktivierter Spaltenzuordnung

Wenn die Spaltenzuordnung für eine Delta-Tabelle aktiviert ist, können Sie Spalten in der Tabelle löschen oder umbenennen, ohne Datendateien für vorhandene Daten umzuschreiben. Wenn die Spaltenzuordnung aktiviert ist, weist der Änderungsdatenfeed Einschränkungen auf, nachdem nicht additive Schemaänderungen durchgeführt wurden, z. B. das Umbenennen oder Löschen einer Spalte, das Ändern des Datentyps oder Änderungen an der NULL-Zulässigkeit.

Wichtig

  • Sie können keinen Änderungsdatenfeed für eine Transaktion oder einen Bereich lesen, in dem eine nicht additive Schemaänderung unter Verwendung von Batchsemantiken auftritt.
  • In Databricks Runtime 12.2 LTS und früher unterstützen Tabellen mit aktivierter Spaltenzuordnung, bei denen Änderungen an nicht additiven Schemas vorgenommen wurden, keine Streaminglesevorgänge in Änderungsdatenfeeds. Weitere Informationen finden Sie unter Streaming mit Spaltenzuordnung und Schemaänderungen.
  • In Databricks Runtime 11.3 LTS und früher können Sie nicht den Änderungsdatenfeed für Tabellen mit aktivierter Spaltenzuordnung lesen, in denen Spalten umbenannt oder gelöscht wurden.

In Databricks Runtime 12.2 LTS und höher können Sie Batchlesevorgänge in Änderungsdatenfeeds für Tabellen mit aktivierter Spaltenzuordnung durchführen, in denen nicht additive Schemaänderungen vorgenommen wurden. Statt das Schema der neuesten Version der Tabelle zu verwenden, nutzen Lesevorgänge das Schema der in der Abfrage angegebenen Endversion der Tabelle. Abfragen schlagen weiterhin fehl, wenn der angegebene Versionsbereich eine nicht additive Schemaänderung umfasst.