Delta Lake-wijzigingenfeed gebruiken in Azure Databricks
Met de gegevensfeed voor wijzigingen kan Azure Databricks wijzigingen op rijniveau bijhouden tussen versies van een Delta-tabel. Wanneer deze optie is ingeschakeld voor een Delta-tabel, worden in de runtime gebeurtenissen vastgelegd voor alle gegevens die in de tabel zijn geschreven. Dit omvat de rijgegevens, samen met metagegevens die aangeven of de opgegeven rij is ingevoegd, verwijderd of bijgewerkt.
Belangrijk
Wijzigingen in de gegevensfeed werken samen met de tabelgeschiedenis om wijzigingsinformatie te bieden. Omdat het klonen van een Delta-tabel een afzonderlijke geschiedenis maakt, komt de wijzigingsgegevensfeed op gekloonde tabellen niet overeen met die van de oorspronkelijke tabel.
Wijzigingsgegevens incrementeel verwerken
Databricks raadt aan wijzigingenfeeds te gebruiken in combinatie met Structured Streaming om wijzigingen uit Delta-tabellen incrementeel te verwerken. U moet Structured Streaming voor Azure Databricks gebruiken om automatisch versies bij te houden voor de wijzigingengegevensfeed van uw tabel.
Notitie
Delta Live Tables biedt functionaliteit voor het eenvoudig doorgeven van wijzigingsgegevens en het opslaan van resultaten als SCD-type 1 of type 2-tabellen. Zie de APPLY CHANGES API's: Vereenvoudig het vastleggen van wijzigingsgegevens met Delta Live Tables.
Als u de wijzigingengegevensfeed uit een tabel wilt lezen, moet u de gegevensfeed voor die tabel inschakelen. Zie Wijzigingenfeed inschakelen.
Stel de optie readChangeFeed
in voor true
het configureren van een stream voor een tabel om de wijzigingengegevensfeed te lezen, zoals wordt weergegeven in het volgende syntaxisvoorbeeld:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
Standaard retourneert de stream de meest recente momentopname van de tabel wanneer de stream voor het eerst wordt gestart als een INSERT
en toekomstige wijzigingen als wijzigingsgegevens.
Wijzig gegevensdoorvoeringen als onderdeel van de Delta Lake-transactie en wordt tegelijkertijd beschikbaar voor de nieuwe gegevensdoorvoeringen in de tabel.
U kunt desgewenst een beginversie opgeven. Zie Moet ik een beginversie opgeven?.
Wijzigingenfeed biedt ook ondersteuning voor batchuitvoering. Hiervoor moet een beginversie worden opgegeven. Zie Leeswijzigingen in batchquery's.
Opties zoals frequentielimieten (maxFilesPerTrigger
, maxBytesPerTrigger
) en excludeRegex
worden ook ondersteund bij het lezen van wijzigingsgegevens.
Snelheidsbeperking kan atomisch zijn voor andere versies dan de versie van de beginmomentopname. Dat wil gezegd, de volledige doorvoerversie is beperkt of de volledige doorvoering wordt geretourneerd.
Moet ik een beginversie opgeven?
U kunt desgewenst een beginversie opgeven als u wijzigingen wilt negeren die zijn opgetreden vóór een bepaalde versie. U kunt een versie opgeven met behulp van een tijdstempel of het versie-id-nummer dat is vastgelegd in het Delta-transactielogboek.
Notitie
Een beginversie is vereist voor batchleesbewerkingen en veel batchpatronen kunnen profiteren van het instellen van een optionele eindversie.
Wanneer u Structured Streaming-workloads configureert met betrekking tot wijzigingengegevensfeed, is het belangrijk om te begrijpen hoe het opgeven van een beginversie van invloed is op de verwerking.
Veel streamingworkloads, met name nieuwe pijplijnen voor gegevensverwerking, profiteren van het standaardgedrag. Met het standaardgedrag wordt de eerste batch verwerkt wanneer de stream eerst alle bestaande records in de tabel registreert als INSERT
bewerkingen in de wijzigingengegevensfeed.
Als uw doeltabel al alle records met de juiste wijzigingen tot een bepaald punt bevat, geeft u een beginversie op om te voorkomen dat de status van de brontabel als INSERT
gebeurtenissen wordt verwerkt.
De volgende voorbeeldsyntaxis die wordt hersteld na een streamingfout waarbij het controlepunt is beschadigd. In dit voorbeeld wordt uitgegaan van de volgende voorwaarden:
- Wijzigingsgegevensfeed is ingeschakeld voor de brontabel bij het maken van de tabel.
- De downstream-doeltabel heeft alle wijzigingen tot en met versie 75 verwerkt.
- Versiegeschiedenis voor de brontabel is beschikbaar voor versies 70 en hoger.
Python
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
In dit voorbeeld moet u ook een nieuwe controlepuntlocatie opgeven.
Belangrijk
Als u een beginversie opgeeft, kan de stream niet worden gestart vanaf een nieuw controlepunt als de beginversie niet meer aanwezig is in de tabelgeschiedenis. Delta Lake schoont historische versies automatisch op, wat betekent dat alle opgegeven beginversies uiteindelijk worden verwijderd.
Zie Kan ik de gegevensfeed wijzigen gebruiken om de hele geschiedenis van een tabel opnieuw af te spelen?.
Wijzigingen in batchquery's lezen
U kunt de syntaxis van batchquery's gebruiken om alle wijzigingen te lezen die beginnen met een bepaalde versie of om wijzigingen binnen een opgegeven reeks versies te lezen.
U geeft een versie op als een geheel getal en een tijdstempel als een tekenreeks in de notatie yyyy-MM-dd[ HH:mm:ss[.SSS]]
.
De begin- en eindversies zijn inclusief in de query's. Als u de wijzigingen van een bepaalde beginversie naar de nieuwste versie van de tabel wilt lezen, geeft u alleen de beginversie op.
Als u een lagere versie of tijdstempel opgeeft die ouder is dan een versie die wijzigingengebeurtenissen heeft geregistreerd( dat wil zeggen wanneer de wijzigingenfeed is ingeschakeld), wordt er een fout gegenereerd die aangeeft dat de wijzigingenfeed niet is ingeschakeld.
In de volgende syntaxisvoorbeelden ziet u hoe u opties voor de begin- en eindversie gebruikt met batchleesbewerkingen:
SQL
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
Python
# version as ints or longs
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
Scala
// version as ints or longs
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
// timestamps as formatted timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
Notitie
Als een gebruiker standaard een versie of tijdstempel doorgeeft die de laatste doorvoering in een tabel overschrijdt, wordt de fout timestampGreaterThanLatestCommit
gegenereerd. In Databricks Runtime 11.3 LTS en hoger kan de gegevensfeed de case van de buiten bereikversie verwerken als de gebruiker de volgende configuratie instelt op true
:
set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
Als u een beginversie opgeeft die groter is dan de laatste doorvoering in een tabel of een nieuwere begintijdstempel dan de laatste doorvoering in een tabel, wordt er een leeg leesresultaat geretourneerd wanneer de voorgaande configuratie is ingeschakeld.
Als u een eindversie opgeeft die groter is dan de laatste doorvoering in een tabel of een nieuwere eindtijdstempel dan de laatste doorvoering in een tabel, worden alle wijzigingen tussen de beginversie en de laatste doorvoering geretourneerd wanneer de voorgaande configuratie is ingeschakeld in de batchleesmodus.
Wat is het schema voor de wijzigingengegevensfeed?
Wanneer u uit de wijzigingengegevensfeed voor een tabel leest, wordt het schema voor de nieuwste tabelversie gebruikt.
Notitie
De meeste schemawijzigings- en evolutiebewerkingen worden volledig ondersteund. Tabel waarvoor kolomtoewijzing is ingeschakeld, bieden geen ondersteuning voor alle use cases en demonstreren verschillend gedrag. Zie Beperkingen voor gegevensfeeds wijzigen voor tabellen waarvoor kolomtoewijzing is ingeschakeld.
Naast de gegevenskolommen uit het schema van de Delta-tabel bevat wijzigingenfeed metagegevenskolommen waarmee het type wijzigingsevenement wordt geïdentificeerd:
Kolomnaam | Type | Waarden |
---|---|---|
_change_type |
String | insert , , update_postimage , delete update_preimage (1) |
_commit_version |
Lang | Het Delta-logboek of de tabelversie met de wijziging. |
_commit_timestamp |
Tijdstempel | Het tijdstempel dat is gekoppeld aan het moment dat de doorvoering is gemaakt. |
(1) preimage
is de waarde vóór de update, postimage
is de waarde na de update.
Notitie
U kunt de gegevensfeed voor een tabel niet inschakelen als het schema kolommen bevat met dezelfde namen als deze toegevoegde kolommen. Wijzig de naam van kolommen in de tabel om dit conflict op te lossen voordat u de wijzigingenfeed probeert in te schakelen.
Wijzigingenfeed inschakelen
U kunt de wijzigingengegevensfeed alleen lezen voor ingeschakelde tabellen. U moet de optie voor de wijzigingengegevensfeed expliciet inschakelen met behulp van een van de volgende methoden:
Nieuwe tabel: Stel de tabeleigenschap
delta.enableChangeDataFeed = true
in deCREATE TABLE
opdracht in.CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
Bestaande tabel: Stel de tabeleigenschap
delta.enableChangeDataFeed = true
in deALTER TABLE
opdracht in.ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Alle nieuwe tabellen:
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
Belangrijk
Alleen wijzigingen die zijn aangebracht nadat u de wijzigingenfeed hebt ingeschakeld, worden vastgelegd. Eerdere wijzigingen in een tabel worden niet vastgelegd.
Gegevensopslag wijzigen
Het inschakelen van wijzigingengegevensfeed veroorzaakt een kleine toename van de opslagkosten voor een tabel. Wijzigingsgegevensrecords worden gegenereerd wanneer de query wordt uitgevoerd en zijn over het algemeen veel kleiner dan de totale grootte van herschreven bestanden.
Azure Databricks-records wijzigen gegevens voor UPDATE
en DELETE
MERGE
bewerkingen in de _change_data
map onder de tabelmap. Sommige bewerkingen, zoals bewerkingen met alleen-invoegen en verwijderingen van volledige partities, genereren geen gegevens in de _change_data
map, omdat Azure Databricks de wijzigingenfeed efficiënt kan berekenen vanuit het transactielogboek.
Alle leesbewerkingen voor gegevensbestanden in de _change_data
map moeten ondersteunde Delta Lake-API's doorlopen.
De bestanden in de _change_data
map volgen het bewaarbeleid van de tabel. Wijzigingen in gegevensfeedgegevens worden verwijderd wanneer de VACUUM
opdracht wordt uitgevoerd.
Kan ik de gegevensfeed wijzigen om de hele geschiedenis van een tabel opnieuw af te spelen?
Wijzigingenfeed is niet bedoeld als een permanent record van alle wijzigingen in een tabel. Wijzigingen in de gegevensfeed worden alleen vastgelegd nadat deze zijn ingeschakeld.
Met wijzigingen in de gegevensfeed en Delta Lake kunt u altijd een volledige momentopname van een brontabel reconstrueren. Dit betekent dat u een nieuwe streamingbewerking kunt starten op basis van een tabel waarvoor wijzigingen in de gegevensfeed zijn ingeschakeld en de huidige versie van die tabel en alle wijzigingen die zich daarna voordoen, kunt vastleggen.
U moet records in de wijzigingenfeed behandelen als tijdelijk en alleen toegankelijk voor een opgegeven bewaarvenster. Het Delta-transactielogboek verwijdert tabelversies en de bijbehorende wijzigingenfeedversies met regelmatige tussenpozen. Wanneer een versie uit het transactielogboek wordt verwijderd, kunt u de wijzigingengegevensfeed voor die versie niet meer lezen.
Als voor uw use-case een permanente geschiedenis van alle wijzigingen in een tabel moet worden bijgehouden, moet u incrementele logica gebruiken om records van de wijzigingengegevensfeed naar een nieuwe tabel te schrijven. In het volgende codevoorbeeld wordt het gebruik gedemonstreerd. trigger.AvailableNow
Hierbij wordt gebruikgemaakt van de incrementele verwerking van Structured Streaming, maar worden beschikbare gegevens verwerkt als een batchworkload. U kunt deze werkbelasting asynchroon plannen met uw belangrijkste verwerkingspijplijnen om een back-up van wijzigingsgegevensfeed te maken voor controledoeleinden of volledige afspeelbaarheid.
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
Beperkingen voor gegevensfeeds wijzigen voor tabellen waarvoor kolomtoewijzing is ingeschakeld
Als kolomtoewijzing is ingeschakeld voor een Delta-tabel, kunt u kolommen in de tabel verwijderen of de naam ervan wijzigen zonder dat u gegevensbestanden voor bestaande gegevens hoeft te herschrijven. Als kolomtoewijzing is ingeschakeld, heeft wijzigingsgegevensfeed beperkingen na het uitvoeren van niet-additieve schemawijzigingen, zoals het wijzigen van de naam of het verwijderen van een kolom, het wijzigen van het gegevenstype of wijzigingen in null-functionaliteit.
Belangrijk
- U kunt de wijzigingenfeed niet lezen voor een transactie of bereik waarin een niet-additieve schemawijziging plaatsvindt met behulp van batch-semantiek.
- In Databricks Runtime 12.2 LTS en lager bieden tabellen met kolomtoewijzing ingeschakeld waarvoor niet-additieve schemawijzigingen zijn ingeschakeld, geen ondersteuning voor streaming-leesbewerkingen voor wijzigingengegevensfeeds. Zie Streaming met kolomtoewijzing en schemawijzigingen.
- In Databricks Runtime 11.3 LTS en hieronder kunt u de gegevensfeed voor tabellen waarvoor kolomtoewijzing is ingeschakeld, niet lezen waarvoor de naam van kolommen is gewijzigd of verwijderd.
In Databricks Runtime 12.2 LTS en hoger kunt u batchleesbewerkingen uitvoeren op wijzigingengegevensfeeds voor tabellen waarvoor kolomtoewijzing is ingeschakeld waarvoor niet-additieve schemawijzigingen zijn aangebracht. In plaats van het schema van de nieuwste versie van de tabel te gebruiken, gebruiken leesbewerkingen het schema van de eindversie van de tabel die is opgegeven in de query. Query's mislukken nog steeds als het opgegeven versiebereik een niet-additieve schemawijziging omvat.