Sdílet prostřednictvím


Použití datového kanálu změn Delta Lake v Azure Databricks

Kanál změn dat umožňuje Službě Azure Databricks sledovat změny na úrovni řádků mezi verzemi tabulky Delta. Pokud je v tabulce Delta povolená, modul runtime zaznamenává změny událostí pro všechna data zapsaná do tabulky. To zahrnuje data řádků spolu s metadaty označujícími, jestli byl zadaný řádek vložený, odstraněný nebo aktualizovaný.

Důležité

Kanál změn funguje společně s historií tabulek a poskytuje informace o změnách. Klonování tabulky Delta vytváří samostatnou historii, a proto se datový kanál změn u naklonovaných tabulek neshoduje s původní tabulkou.

Přírůstkové zpracování dat změn

Databricks doporučuje používat kanál změn v kombinaci se strukturovaným streamováním a přírůstkově zpracovávat změny z tabulek Delta. Strukturované streamování pro Azure Databricks musíte použít k automatickému sledování verzí pro datový kanál změn vaší tabulky.

Poznámka:

Delta Live Tables poskytuje funkce pro snadné šíření dat změn a ukládání výsledků jako SCD (pomalu se měnící dimenze) typu 1 nebo 2 tabulek. Viz rozhraní API APPLY CHANGES: Zjednodušení zachytávání dat změn pomocí rozdílových živých tabulek.

Pokud chcete číst datový kanál změn z tabulky, musíte v této tabulce povolit datový kanál změn. Viz Povolení kanálu změn dat.

Nastavte možnost readChangeFeed při true konfiguraci datového proudu pro tabulku pro čtení datového kanálu změn, jak je znázorněno v následujícím příkladu syntaxe:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Ve výchozím nastavení datový proud vrací nejnovější snímek tabulky, když se stream poprvé spustí jako INSERT a budoucí změny dat změn.

Změňte potvrzení dat jako součást transakce Delta Lake a zpřístupní se současně s novými potvrzeními dat do tabulky.

Volitelně můžete zadat počáteční verzi. Podívejte se, jestli mám zadat počáteční verzi?

Kanál změn dat také podporuje dávkové spouštění, které vyžaduje zadání počáteční verze. Viz Čtení změn v dávkových dotazech.

Možnosti, jako jsou limity rychlosti (maxFilesPerTrigger, maxBytesPerTrigger) a excludeRegex jsou také podporovány při čtení dat o změnách.

Omezení rychlosti může být atomické pro jiné verze než počáteční verze snímku. To znamená, že celá verze potvrzení bude omezená rychlostí nebo se vrátí celé potvrzení.

Mám zadat počáteční verzi?

Pokud chcete ignorovat změny, ke kterým došlo před konkrétní verzí, můžete volitelně zadat počáteční verzi. Verzi můžete zadat pomocí časového razítka nebo čísla ID verze zaznamenaného v transakčním protokolu Delta.

Poznámka:

Pro čtení dávek se vyžaduje počáteční verze a mnoho dávkových vzorů může těžit z nastavení volitelné koncové verze.

Při konfiguraci úloh strukturovaného streamování zahrnujících datový kanál změn je důležité pochopit, jak určení počáteční verze ovlivňuje zpracování.

Řada úloh streamování, zejména nových kanálů zpracování dat, přináší výchozí chování. Při výchozím chování se první dávka zpracuje, když stream nejprve zaznamená všechny existující záznamy v tabulce jako INSERT operace v kanálu změn dat.

Pokud cílová tabulka již obsahuje všechny záznamy s odpovídajícími změnami až do určitého bodu, zadejte počáteční verzi, aby se zabránilo zpracování stavu zdrojové tabulky jako INSERT událostí.

Následující příklad syntaxe zotavení ze selhání streamování, ve kterém byl kontrolní bod poškozen. V tomto příkladu předpokládejme následující podmínky:

  1. U zdrojové tabulky byla při vytváření tabulky povolená změna datového kanálu.
  2. Cílová podřízená tabulka zpracovala všechny změny až do verze 75 a včetně ní.
  3. Historie verzí zdrojové tabulky je k dispozici pro verze 70 a vyšší.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

V tomto příkladu musíte také zadat nové umístění kontrolního bodu.

Důležité

Pokud zadáte počáteční verzi, stream se nepovede spustit z nového kontrolního bodu, pokud už v historii tabulek neexistuje výchozí verze. Delta Lake automaticky vyčistí historické verze, což znamená, že všechny zadané počáteční verze se nakonec odstraní.

Podívejte se, jak můžu změnit datový kanál k přehrání celé historie tabulky?.

Čtení změn v dávkových dotazech

Syntaxi dávkového dotazu můžete použít ke čtení všech změn počínaje konkrétní verzí nebo ke čtení změn v zadaném rozsahu verzí.

Jako celé číslo zadáte verzi a časové razítko jako řetězec ve formátu yyyy-MM-dd[ HH:mm:ss[.SSS]].

Počáteční a koncové verze jsou v dotazech inkluzivní. Pokud chcete přečíst změny z konkrétní počáteční verze na nejnovější verzi tabulky, zadejte pouze počáteční verzi.

Pokud zadáte nižší verzi nebo časové razítko, které zaznamenalo události změn – to znamená, že když byl datový kanál změn povolen – zobrazí se chyba, která značí, že datový kanál změn nebyl povolený.

Následující příklady syntaxe ukazují použití možností počáteční a koncové verze se čtením dávky:

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Poznámka:

Ve výchozím nastavení platí, že pokud uživatel předá verzi nebo časové razítko překračující poslední potvrzení v tabulce, vyvolá se chyba timestampGreaterThanLatestCommit . V Databricks Runtime 11.3 LTS a vyšší může změnit datový kanál zpracovat případ verze mimo rozsah, pokud uživatel nastaví následující konfiguraci:true

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Pokud zadáte počáteční verzi větší než poslední potvrzení v tabulce nebo novější časové razítko spuštění než poslední potvrzení v tabulce, vrátí se po povolení předchozí konfigurace prázdný výsledek čtení.

Pokud zadáte koncovou verzi větší než poslední potvrzení v tabulce nebo novější časové razítko ukončení než poslední potvrzení v tabulce, vrátí se při povolení předchozí konfigurace v režimu dávkového čtení všechny změny mezi počáteční verzí a posledním potvrzením.

Jaké je schéma kanálu změn dat?

Při čtení z datového kanálu změn pro tabulku se použije schéma nejnovější verze tabulky.

Poznámka:

Většina operací změny schématu a vývoje je plně podporovaná. Tabulka s povoleným mapováním sloupců nepodporuje všechny případy použití a demonstruje jiné chování. Viz Omezení kanálu změn dat pro tabulky s povoleným mapováním sloupců.

Kromě datových sloupců ze schématu tabulky Delta obsahuje datový kanál změn sloupce metadat, které identifikují typ události změny:

Název sloupce Typ Hodnoty
_change_type String insert, update_preimage , update_postimage, delete (1)
_commit_version Dlouhé celé číslo Verze protokolu Delta nebo tabulky obsahující změnu.
_commit_timestamp Časové razítko Časové razítko přidružené k vytvoření potvrzení.

(1) preimage je hodnota před aktualizací, postimage je hodnota po aktualizaci.

Poznámka:

Pokud schéma obsahuje sloupce se stejnými názvy jako tyto přidané sloupce, nelze v tabulce povolit datový kanál změn. Před pokusem o povolení kanálu změn dat přejmenujte sloupce v tabulce, abyste tento konflikt vyřešili.

Povolení kanálu změn dat

U povolených tabulek můžete číst jen datový kanál změn. Možnost kanálu změn dat musíte explicitně povolit pomocí jedné z následujících metod:

  • Nová tabulka: Nastavte vlastnost delta.enableChangeDataFeed = true tabulky v CREATE TABLE příkazu.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Existující tabulka: Nastavte vlastnost delta.enableChangeDataFeed = true tabulky v ALTER TABLE příkazu.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Všechny nové tabulky:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Důležité

Zaznamenávají se pouze změny provedené po povolení datového kanálu změn. Předchozí změny v tabulce se nezachytí.

Změna úložiště dat

Povolení datového kanálu změn způsobí malé zvýšení nákladů na úložiště pro tabulku. Při spuštění dotazu se vygenerují záznamy změn dat a jsou obecně mnohem menší než celková velikost přepsaných souborů.

Záznamy Azure Databricks mění data pro UPDATE, DELETEa MERGE operace ve složce pod _change_data adresářem tabulky. Některé operace, jako jsou operace jen pro vložení a odstranění celého oddílu, negenerují data v _change_data adresáři, protože Azure Databricks dokáže efektivně vypočítat datový kanál změn přímo z transakčního protokolu.

Všechna čtení datových souborů ve _change_data složce by měla projít podporovanými rozhraními Delta Lake API.

Soubory ve _change_data složce se řídí zásadami uchovávání informací v tabulce. Změna dat datového kanálu se odstraní při VACUUM spuštění příkazu.

Můžu pomocí kanálu změn dat přehrát celou historii tabulky?

Datový kanál změn není určen jako trvalý záznam všech změn v tabulce. Změnit datový kanál pouze záznamy, ke kterým dochází po povolení.

Změna datového kanálu a Delta Lake umožňují vždy rekonstruovat úplný snímek zdrojové tabulky, což znamená, že můžete spustit nové streamování čtení proti tabulce s povoleným datovým kanálem změn a zachytit aktuální verzi této tabulky a všechny změny, ke kterým dochází po.

Záznamy v datovém kanálu změn musíte považovat za přechodné a přístupné pouze pro zadané okno uchovávání informací. Protokol transakcí Delta v pravidelných intervalech odebere verze tabulek a jejich odpovídající verze datového kanálu změn. Při odebrání verze z transakčního protokolu už nemůžete číst datový kanál změn pro danou verzi.

Pokud váš případ použití vyžaduje zachování trvalé historie všech změn v tabulce, měli byste k zápisu záznamů z datového kanálu změn do nové tabulky použít přírůstkovou logiku. Následující příklad kódu ukazuje použití trigger.AvailableNow, který využívá přírůstkové zpracování strukturovaného streamování, ale zpracovává dostupná data jako dávkové úlohy. Tuto úlohu můžete naplánovat asynchronně s hlavními kanály zpracování a vytvořit zálohu datového kanálu změn pro účely auditování nebo úplné přehrání.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Změna omezení datového kanálu pro tabulky s povoleným mapováním sloupců

Pokud je u tabulky Delta povolené mapování sloupců, můžete v tabulce přetáhnout nebo přejmenovat sloupce bez přepsání datových souborů pro existující data. Při povoleném mapování sloupců má datový kanál změn omezení po provedení nedatných změn schématu, jako je přejmenování nebo vyřazení sloupce, změna datového typu nebo změny s možnou hodnotou null.

Důležité

  • Nelze číst datový kanál změn pro transakci nebo oblast, ve které dochází ke změně schématu bez sčítání pomocí sémantiky dávky.
  • V Databricks Runtime 12.2 LTS a níže jsou tabulky s mapováním sloupců povolené, u kterých došlo ke změnám nepřidatného schématu, nepodporují streamovaná čtení v datovém kanálu změn. Viz Streamování s mapováním sloupců a změnami schématu.
  • V Databricks Runtime 11.3 LTS a níže nemůžete číst datový kanál změn pro tabulky s povoleným mapováním sloupců, u kterých došlo k přejmenování nebo vyřazení sloupců.

Ve službě Databricks Runtime 12.2 LTS a vyšší můžete provádět dávkové čtení datového kanálu změn pro tabulky s povoleným mapováním sloupců, u kterých došlo ke změnám schématu bez sčítání. Místo použití schématu nejnovější verze tabulky operace čtení používají schéma koncové verze tabulky zadané v dotazu. Dotazy stále selžou, pokud zadaný rozsah verzí zahrnuje změnu schématu bez sčítání.