Anmerkung
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen, dich anzumelden oder die Verzeichnisse zu wechseln.
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen , die Verzeichnisse zu wechseln.
Änderungs-Datenfeed ermöglicht es Azure Databricks, Änderungen auf Zeilenebene zwischen Versionen einer Delta-Tabelle nachzuverfolgen. Wenn es für eine Delta-Tabelle aktiviert ist, zeichnet die Laufzeit Änderungsereignisse für alle Daten auf, die in die Tabelle geschrieben werden. Dazu gehören die Zeilendaten sowie die Metadaten, die anzeigen, ob die angegebene Zeile eingefügt, gelöscht oder aktualisiert wurde.
Sie können den Änderungsdatenfluss verwenden, um allgemeine Datenanwendungsfälle zu unterstützen, einschließlich:
- ETL-Pipelines: Verarbeiten Sie inkrementell nur die Zeilen, die seit der letzten Pipelineausführung geändert wurden.
- Überwachungspfade: Nachverfolgen von Datenänderungen für Compliance- und Governanceanforderungen.
- Datenreplikation: Synchronisieren von Änderungen an nachgelagerten Tabellen, Caches oder externen Systemen.
Wichtig
Änderungsdatenfeed arbeitet zusammen mit dem Tabellenverlauf, um Änderungsinformationen bereitzustellen. Da das Klonen einer Delta-Tabelle einen separaten Verlauf erstellt, stimmt der Änderungsdatenfeed in geklonten Tabellen nicht mit der der ursprünglichen Tabelle überein.
Aktivieren des Änderungsdatenfeeds
Der Datenfeed muss für die Tabellen, aus denen Sie lesen möchten, explizit aktiviert sein. Verwenden Sie eine der folgenden Methoden.
Neue Tabelle
Legen Sie die Tabelleneigenschaft delta.enableChangeDataFeed = true im CREATE TABLE Befehl fest.
CREATE TABLE student (id INT, name STRING, age INT)
TBLPROPERTIES (delta.enableChangeDataFeed = true)
Vorhandene Tabelle
Legen Sie die Tabelleneigenschaft delta.enableChangeDataFeed = true im ALTER TABLE Befehl fest.
ALTER TABLE myDeltaTable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Alle neuen Tabellen in einer Sitzung
Legen Sie eine Spark-Konfiguration fest, um den Änderungsdatenfeed für alle neuen Tabellen zu aktivieren, die in einer Sitzung erstellt wurden.
SET spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
Wichtig
Nur Änderungen, die vorgenommen wurden, nachdem Sie den Änderungsdatenfeed aktiviert haben, werden aufgezeichnet. Vergangene Änderungen an einer Tabelle werden nicht erfasst.
Ändern des Datenfeedschemas
Wenn Sie aus dem Änderungsdatenfeed für eine Tabelle lesen, wird das Schema für die neueste Tabellenversion verwendet. Azure Databricks unterstützt die meisten Schemaänderungs- und Evolutionsvorgänge vollständig, tabellen mit aktivierter Spaltenzuordnung weisen jedoch Einschränkungen auf. Siehe Ändern von Datenfeedeinschränkungen für Tabellen mit Spaltenzuordnung.
Zusätzlich zu den Datenspalten aus dem Schema der Delta-Tabelle enthält der Änderungsdatenfeed Metadatenspalten, die den Typ des Änderungsereignisses identifizieren:
| Spaltenname | type | Werte |
|---|---|---|
_change_type |
Schnur |
insert, update_preimage, update_postimage, delete(1) |
_commit_version |
Lang | Die Version des Delta-Protokolls oder der Delta-Tabelle, die die Änderung enthält |
_commit_timestamp |
Zeitstempel | Der Zeitstempel, der beim Erstellen des Commits zugeordnet wurde |
(1)preimage ist der Wert vor dem Update, postimage ist der Wert nach dem Update.
Sie können den Datenfeed für eine Tabelle nicht aktivieren, wenn das Schema Spalten mit denselben Namen wie diese Metadatenspalten enthält. Benennen Sie Spalten in Der Tabelle um, um diesen Konflikt zu beheben, bevor Sie den Änderungsdatenfeed aktivieren.
Inkrementelle Verarbeitung von Änderungsdaten
Databricks empfiehlt die Verwendung von Änderungsdatenfeeds in Kombination mit strukturiertem Streaming, um Änderungen aus Delta-Tabellen inkrementell zu verarbeiten. Sie müssen strukturiertes Streaming für Azure Databricks verwenden, um Versionen für den Änderungsdatenfeed Ihrer Tabelle automatisch nachzuverfolgen. Informationen zur CDC-Verarbeitung mit SCD-Typ 1- oder Typ 2-Tabellen finden Sie in den AUTO CDC-APIs: Vereinfachen der Änderungsdatenerfassung mit Pipelines.
Legen Sie beim Konfigurieren eines Datenstroms für eine Tabelle die Option readChangeFeed auf true fest, um den Änderungsdatenfeed zu lesen, wie im folgenden Syntaxbeispiel gezeigt:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
Standardverhalten
Wenn der Datenstrom erstmals gestartet wird, liefert er die aktuellste Momentaufnahme der Tabelle als INSERT-Datensätze und gibt dann zukünftige Änderungen als Änderungsdaten zurück. Die Änderungsdaten werden als Teil der Delta Lake-Transaktion übernommen und gleichzeitig mit den neuen Daten in die Tabelle übernommen.
Zusätzliche Optionen
Sie können optional eine Startversion angeben (siehe Angeben einer Startversion) oder batchausführung verwenden (siehe Lesen von Änderungen in Batchabfragen). Azure Databricks unterstützt auch Ratelimits (maxFilesPerTrigger, maxBytesPerTrigger) und excludeRegex beim Lesen von Änderungsdaten.
Bei anderen Versionen als der Startmomentaufnahme gilt die Begrenzung der Rate atomisch auf gesamte Commits – entweder der gesamte Commit ist im aktuellen Batch enthalten oder wird auf den nächsten Batch zurückgestellt.
Angeben einer Startversion
Wenn Sie Änderungen von einem bestimmten Punkt lesen möchten, geben Sie eine Startversion entweder mit einem Zeitstempel oder einer Versionsnummer an. Startversionen sind für Batchlesevorgänge erforderlich. Optional können Sie eine Endversion angeben, um den Bereich einzuschränken. Weitere Informationen zur Tabellengeschichte des Delta Lake finden Sie unter Was ist Delta Lake Zeitreise?.
Wenn Sie strukturierte Streaming-Workloads mit Änderungsdatenfeed konfigurieren, verstehen Sie, wie sich die Angabe einer Startversion auf die Verarbeitung auswirkt:
- Neue Datenverarbeitungspipelines profitieren in der Regel vom Standardverhalten, das alle vorhandenen Datensätze in der Tabelle als
INSERTVorgänge beim ersten Start des Datenstroms aufzeichnet. - Wenn Ihre Zieltabelle bereits alle Datensätze mit entsprechenden Änderungen bis zu einem bestimmten Punkt enthält, geben Sie eine Startversion an, um die Verarbeitung des Quelltabellenzustands als
INSERT-Ereignisse zu vermeiden.
Das folgende Beispiel zeigt die Syntax zum Wiederherstellen eines Streamingfehlers, bei dem der Prüfpunkt beschädigt wurde. Gehen Sie in diesem Beispiel von den folgenden Bedingungen aus:
- Der Datenfeed wurde für die Quelltabelle bei der Tabellenerstellung aktiviert.
- Die nachgelagerte Zieltabelle hat alle Änderungen bis einschließlich Version 75 verarbeitet.
- Der Versionsverlauf für die Quelltabelle ist für die Versionen 70 und höher verfügbar.
Python
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
In diesem Beispiel müssen Sie auch einen neuen Prüfpunktspeicherort angeben.
Wichtig
Wenn Sie eine Startversion angeben, kann der Datenstrom nicht von einem neuen Prüfpunkt aus gestartet werden, wenn die Startversion nicht mehr im Tabellenverlauf vorhanden ist. Delta Lake bereinigt historische Versionen automatisch, was bedeutet, dass alle angegebenen Startversionen irgendwann gelöscht werden.
Siehe Verlauf der Wiederholungstabelle.
Lesen von Änderungen in Batchabfragen
Mit der Batchabfragesyntax können Sie alle Änderungen ab einer bestimmten Version lesen oder Änderungen innerhalb eines bestimmten Versionsbereichs lesen.
- Geben Sie Versionen als ganze Zahlen und Zeitstempel als Zeichenfolgen im Format
yyyy-MM-dd[ HH:mm:ss[.SSS]]an. - Die Start- und Endversionen sind inbegriffen.
- Wenn Sie von einer Startversion bis zur neuesten Version lesen möchten, geben Sie nur die Startversion an.
- Wenn Sie eine Version angeben, bevor der Datenfeed geändert wurde, wird ein Fehler ausgelöst.
Die folgenden Syntaxbeispiele veranschaulichen die Verwendung von Start- und Endversionsoptionen mit Batchlesevorgängen:
SQL
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name,
-- with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
Python
# version as ints or longs
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
Scala
// version as ints or longs
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
// timestamps as formatted timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
Behandlung von Versionen außerhalb des zulässigen Bereichs
Standardmäßig löst das Angeben einer Version oder eines Zeitstempels, der den letzten Commit überschreitet, den Fehler timestampGreaterThanLatestCommitaus. In Databricks Runtime 11.3 LTS und höher können Sie die Toleranz für Versionen außerhalb des akzeptierten Bereichs einschalten.
SET spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
Mit dieser Einstellung ist Folgendes aktiviert:
- Startversion/Zeitstempel nach dem letzten Commit: Gibt ein leeres Ergebnis zurück.
- Endversion/Zeitstempel über den letzten Commit hinaus: Gibt alle Änderungen vom Anfang bis zum letzten Commit zurück.
Datenänderungen aufzeichnen
Delta Lake zeichnet Datenänderungen effizient auf und verwendet möglicherweise andere Delta Lake-Features, um die Speicherdarstellung zu optimieren.
Überlegungen zur Speicherung
- Speicherkosten: Das Aktivieren des Änderungsdatenfeeds kann zu einer geringen Erhöhung der Speicherkosten führen, da Änderungen in separaten Dateien aufgezeichnet werden können.
- Vorgänge ohne Änderungsdateien: Einige Vorgänge (nur einfügen, Löschungen der vollständigen Partition) generieren keine Änderungsdatendateien – Azure Databricks berechnet den Änderungsdatenfeed direkt aus dem Transaktionsprotokoll.
-
Aufbewahrung: Ändern von Datendateien folgen der Aufbewahrungsrichtlinie der Tabelle. Der
VACUUMBefehl löscht sie und Änderungen aus dem Transaktionsprotokoll folgen der Aufbewahrung des Prüfpunkts.
Versuchen Sie nicht, den Änderungsdatenfeed zu rekonstruieren, indem Sie Änderungsdatendateien direkt abfragen. Verwenden Sie immer Delta Lake-APIs.
Verlauf der Wiedergabetabelle
Der Datenfeed soll nicht als permanenter Datensatz aller Änderungen an einer Tabelle dienen. Es zeichnet nur Änderungen auf, die nach der Aktivierung auftreten, und Sie können mit einem neuen Streaminglesevorgang beginnen, um die aktuelle Version und alle nachfolgenden Änderungen zu erfassen.
Datensätze im Änderungsdatenfeed sind vorübergehend und nur für ein bestimmtes Aufbewahrungsfenster zugänglich. Das Delta Lake-Transaktionsprotokoll entfernt Tabellenversionen und die entsprechenden Änderungsdatenfeedversionen in regelmäßigen Abständen. Wenn eine Version entfernt wird, können Sie den Änderungsdatenfeed für diese Version nicht mehr lesen.
Archiv-Änderungsdaten für den permanenten Verlauf
Wenn Für Ihren Anwendungsfall ein permanenter Verlauf aller Änderungen an einer Tabelle erforderlich ist, verwenden Sie inkrementelle Logik, um Datensätze aus dem Änderungsdatenfeed in eine neue Tabelle zu schreiben. Im folgenden Beispiel wird die Verwendung von trigger.AvailableNow zur Verarbeitung verfügbarer Daten als Batchworkload für Auditing oder für volles Wiederabspielen veranschaulicht.
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
Ändern von Datenfeedbeschränkungen für Tabellen mit Spaltenzuordnung
Wenn die Spaltenzuordnung in einer Delta-Tabelle aktiviert ist, können Sie Spalten ablegen oder umbenennen, ohne Datendateien neu zu schreiben. Änderungsdatenfeeds haben jedoch Einschränkungen nach nicht-additiven Schemaänderungen, wie z. B. das Umbenennen oder Löschen von Spalten, das Ändern von Datentypen oder Änderungen der Nullierbarkeit:
- Batchsemantik: Sie können den Änderungsdatenstrom für eine Transaktion oder einen Bereich nicht lesen, in dem eine nicht-additive Schemaänderung auftritt.
- Databricks Runtime 12.2 LTS und ältere Versionen: Tabellen mit aktivierter Spaltenzuordnung, die nicht-additive Schemaänderungen durchlaufen haben, unterstützen das Streaming von Lesevorgängen für Änderungsdatenfeeds nicht. Weitere Informationen finden Sie unter Streaming mit Spaltenzuordnung und Schemaänderungen.
- Databricks Runtime 11.3 LTS und älter: Sie können den Änderungen-Datenfeed für Tabellen mit aktivierter Spaltenzuordnung nicht abrufen, bei denen Spalten umbenannt oder entfernt wurden.
In Databricks Runtime 12.2 LTS und höher können Sie Batchlesevorgänge in Änderungsdatenfeeds für Tabellen mit aktivierter Spaltenzuordnung durchführen, in denen nicht additive Schemaänderungen vorgenommen wurden. Lesevorgänge verwenden das Schema der in der Abfrage angegebenen Endversion anstelle der neuesten Tabellenversion. Abfragen schlagen weiterhin fehl, wenn sich der Versionsbereich über eine nicht additive Schemaänderung erstreckt.