Megosztás a következőn keresztül:


Delta Lake change data feed használata az Azure Databricksben

A változásadatcsatorna lehetővé teszi, hogy az Azure Databricks nyomon kövesse a Delta-tábla verziói közötti sorszintű változásokat. Ha egy Delta-táblában engedélyezve van, a futtatókörnyezeti rekordok megváltoztatják a táblába írt összes adat eseményeit . Ide tartoznak a soradatok, valamint a metaadatok, amelyek jelzik, hogy a megadott sor be lett-e szúrva, törölve vagy frissítve.

Fontos

A változásadatcsatorna a táblaelőzményekkel együtt működik a változásadatok megadásához. Mivel a Delta-tábla klónozása külön előzményt hoz létre, a klónozott táblák változási adatcsatornája nem egyezik meg az eredeti táblával.

Változásadatok növekményes feldolgozása

A Databricks a változásadatcsatorna és a strukturált streamelés együttes használatát javasolja a Delta-táblák változásainak növekményes feldolgozásához. Az Azure Databricks strukturált streameléssel automatikusan nyomon követheti a tábla változásadatcsatornájának verzióit.

Feljegyzés

A Delta Live Tables funkcióval egyszerűen propagálja a változásadatokat, és scd (lassan változó dimenzió) 1- vagy 2-es típusú táblákként tárolja az eredményeket. Lásd: APPLY CHANGES API: A változásadatok rögzítésének egyszerűsítése a Delta Live Tablesben.

Ha a változásadatcsatornát egy táblából szeretné olvasni, engedélyeznie kell a változási adatcsatornát a táblán. Lásd: Változásadatcsatorna engedélyezése.

Állítsa be azt a beállítást readChangeFeed , hogy true amikor streamet konfigurál egy táblához a változásadatcsatorna olvasásához, ahogyan az a következő szintaxisbeli példában látható:

Python

(spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Alapértelmezés szerint a stream visszaadja a tábla legújabb pillanatképét, amikor a stream először elindul INSERT , és a jövőben változásadatokként változik.

Módosítsa az adat véglegesítéseket a Delta Lake-tranzakció részeként, és az új adat véglegesítésével egyidejűleg elérhetővé válik a táblában.

Igény szerint megadhat egy kezdő verziót. Lásd: Kezdő verzió megadása?.

Az adatcsatorna módosítása támogatja a kötegelt végrehajtást is, amelyhez meg kell adni egy kezdő verziót. Lásd: Változások olvasása kötegelt lekérdezésekben.

Az olyan lehetőségek, mint a sebességkorlátok (maxFilesPerTrigger), maxBytesPerTriggerés excludeRegex a változásadatok olvasásakor is támogatottak.

A sebességkorlátozás a kezdő pillanatkép-verziótól eltérő verziók esetében lehet atomi. Ez azt jelzi, hogy a véglegesítés teljes verziója korlátozott lesz, vagy a teljes véglegesítés vissza lesz adva.

Meg kell adnom egy kezdő verziót?

Ha figyelmen kívül szeretné hagyni az adott verzió előtt történt módosításokat, megadhat egy kezdő verziót. A verziót időbélyeg használatával vagy a Delta tranzakciónaplóban rögzített verzióazonosító-számmal adhatja meg.

Feljegyzés

A kötegolvasásokhoz kezdő verzióra van szükség, és számos kötegminta hasznos lehet az opcionális befejezési verzió beállításában.

Ha strukturált streamelési számítási feladatokat konfigurál, beleértve a változási adatcsatornát, fontos tisztában lenni azzal, hogy a kezdő verzió megadása hogyan befolyásolja a feldolgozást.

Számos streamelési számítási feladat, különösen az új adatfeldolgozási folyamatok kihasználják az alapértelmezett viselkedést. Az alapértelmezett viselkedéssel az első köteg akkor lesz feldolgozva, amikor a stream először rögzíti a tábla összes meglévő rekordját a változásadatcsatorna műveleteiként INSERT .

Ha a céltábla már tartalmazza a megfelelő módosításokkal rendelkező összes rekordot egy adott pontig, adjon meg egy kezdő verziót, hogy elkerülje a forrástábla állapotának eseményekként való INSERT feldolgozását.

Az alábbi példa szintaxisa helyreállítható egy olyan streamelési hibából, amelyben az ellenőrzőpont sérült. Ebben a példában a következő feltételeket feltételezzük:

  1. A módosítási adatcsatorna engedélyezve lett a forrástáblában a tábla létrehozásakor.
  2. A cél alsóbb rétegbeli tábla feldolgozta az összes módosítást a 75-ös verzióig és az azt is beleértve.
  3. A forrástábla verzióelőzményei a 70-es és újabb verziókhoz érhetők el.

Python

(spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

Ebben a példában meg kell adnia egy új ellenőrzőpont-helyet is.

Fontos

Ha kezdő verziót ad meg, a stream nem indul el egy új ellenőrzőpontról, ha a kezdő verzió már nem szerepel a táblaelőzményekben. A Delta Lake automatikusan törli a korábbi verziókat, ami azt jelenti, hogy az összes megadott kezdőverzió törlődik.

Lásd: Használhatok módosítási adatcsatornát egy tábla teljes előzményeinek lejátszásához?.

Módosítások olvasása kötegelt lekérdezésekben

A batch-lekérdezés szintaxisával beolvashatja az összes módosítást egy adott verziótól kezdve, vagy beolvashatja a módosításokat egy megadott verziótartományon belül.

A verziót egész számként, az időbélyegeket pedig sztringként adja meg.yyyy-MM-dd[ HH:mm:ss[.SSS]]

A lekérdezések tartalmazzák a kezdő és a befejező verziót. Ha egy adott kezdő verzióról a tábla legújabb verziójára szeretné beolvasni a módosításokat, csak a kezdő verziót adja meg.

Ha egy korábbi verziót vagy időbélyeget ad meg, mint amelyik változáseseményeket rögzített – vagyis amikor a változásadatcsatorna engedélyezve volt – hibaüzenet jelenik meg, amely azt jelzi, hogy a változásadatcsatorna nem volt engedélyezve.

Az alábbi szintaxisbeli példák bemutatják, hogy a verzióindítási és befejezési beállításokat kötegolvasásokkal kell használni:

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Feljegyzés

Alapértelmezés szerint, ha egy felhasználó egy táblán az utolsó véglegesítést meghaladó verziót vagy időbélyeget ad át, a rendszer timestampGreaterThanLatestCommit hibát jelez. A Databricks Runtime 11.3 LTS és újabb verzióiban az adatcsatorna módosítása képes kezelni a tartományon kívüli verzió esetét, ha a felhasználó a következő konfigurációt trueállítja be:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Ha egy táblában az utolsó véglegesítésnél nagyobb kezdőverziót vagy egy tábla utolsó véglegesítésénél újabb kezdési időbélyeget ad meg, akkor az előző konfiguráció engedélyezésekor a rendszer üres olvasási eredményt ad vissza.

Ha egy táblában az utolsó véglegesítésnél nagyobb végverziót vagy egy tábla utolsó véglegesítésénél újabb befejezési időbélyeget ad meg, akkor amikor az előző konfiguráció engedélyezve van kötegelt olvasási módban, a rendszer visszaadja a kezdő verzió és az utolsó véglegesítés közötti összes módosítást.

Mi a változásadatcsatorna sémája?

Amikor egy tábla változásadatcsatornájából olvas, a rendszer a legújabb táblaverzió sémáját használja.

Feljegyzés

A legtöbb sémamódosítási és evolúciós művelet teljes mértékben támogatott. Az oszlopleképezést engedélyező táblázat nem támogatja az összes használati esetet, és eltérő viselkedést mutat. Lásd: Adatcsatornákra vonatkozó korlátozások módosítása az oszlopleképezést engedélyező táblák esetében.

A Változástábla sémájának adatoszlopai mellett a változásadatcsatorna metaadatoszlopokat is tartalmaz, amelyek azonosítják a változásesemény típusát:

Oszlop neve Típus Értékek
_change_type Sztring insert, update_preimage , update_postimagedelete(1)
_commit_version Hosszú A módosítást tartalmazó Delta-napló vagy táblaverzió.
_commit_timestamp Időbélyegző A véglegesítés létrehozásakor társított időbélyeg.

(1)preimage a frissítés előtti érték, postimage a frissítés utáni érték.

Feljegyzés

Nem engedélyezhető a tábla adatcsatornájának módosítása, ha a séma a hozzáadott oszlopokkal azonos nevű oszlopokat tartalmaz. Nevezze át a tábla oszlopait az ütközés feloldásához, mielőtt engedélyezni szeretné az adatcsatorna módosítását.

Változásadatcsatorna engedélyezése

Csak az engedélyezett táblák változásadatcsatornája olvasható. Explicit módon engedélyeznie kell a változásadatcsatorna-beállítást az alábbi módszerek egyikével:

  • Új tábla: A táblatulajdonság delta.enableChangeDataFeed = true beállítása a CREATE TABLE parancsban.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Meglévő tábla: A táblatulajdonság delta.enableChangeDataFeed = true beállítása a ALTER TABLE parancsban.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Minden új tábla:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Fontos

Csak a változásadatcsatorna engedélyezése után végrehajtott módosítások lesznek rögzítve. A rendszer nem rögzíti a tábla korábbi módosításait.

Adattárolás módosítása

A változásadatcsatorna engedélyezése kis mértékben növeli a táblák tárolási költségeit. A változó adatrekordok a lekérdezés futtatásakor jönnek létre, és általában sokkal kisebbek, mint az újraírt fájlok teljes mérete.

Az Azure Databricks-rekordok a táblakönyvtár alatti mappában módosítják a , és a műveletek adataitUPDATEDELETE._change_dataMERGE Egyes műveletek, például a csak beszúrási műveletek és a teljes partíció törlések, nem hoznak létre adatokat a címtárban, mert az _change_data Azure Databricks hatékonyan tudja kiszámítani a változásadatcsatornát közvetlenül a tranzakciónaplóból.

A mappában lévő _change_data adatfájlokra irányuló összes olvasásnak támogatott Delta Lake API-kon kell keresztülmennie.

A mappában lévő _change_data fájlok a tábla adatmegőrzési szabályzatát követik. A parancs futtatásakor VACUUM az adatcsatornák adatainak módosítása törlődik.

Használhatok módosítási adatcsatornát egy tábla teljes előzményeinek lejátszásához?

A változásadatcsatorna nem szolgál a tábla összes módosításának állandó rekordjaként. Az adatcsatorna módosítása csak az engedélyezés után bekövetkező változásokat rögzíti.

Az adatcsatorna módosítása és a Delta Lake lehetővé teszi, hogy mindig rekonstruáljon egy forrástábla teljes pillanatképét, ami azt jelenti, hogy elindíthat egy új streamelést egy táblán, amelyen engedélyezve van a változásadatcsatorna, és rögzítheti a tábla aktuális verzióját, valamint az azt követő összes módosítást.

A változásadat-csatornában lévő rekordokat átmenetiként kell kezelnie, és csak egy megadott adatmegőrzési időszakhoz kell hozzáférnie. A Delta tranzakciónapló rendszeres időközönként eltávolítja a táblaverziókat és azok megfelelő változásadatcsatorna-verzióit. Ha eltávolít egy verziót a tranzakciónaplóból, a továbbiakban nem olvashatja el az adott verzióhoz tartozó változásadatcsatornát.

Ha a használati esethez egy tábla összes módosításának állandó előzményeit kell megőriznie, növekményes logikával rekordokat kell írnia a változásadatcsatornából egy új táblába. Az alábbi példakód bemutatja a használatot trigger.AvailableNow, amely a strukturált streamelés növekményes feldolgozását használja, de a rendelkezésre álló adatokat kötegelt számítási feladatként dolgozza fel. Ezt a számítási feladatot aszinkron módon ütemezheti a fő feldolgozási folyamatokkal, így naplózási célból vagy teljes újrajátszhatóság érdekében biztonsági másolatot készíthet a változási adatcsatornáról.

Python

(spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Adatcsatornákra vonatkozó korlátozások módosítása olyan táblák esetében, amelyeken engedélyezve van az oszlopleképezés

Ha egy Delta-táblában engedélyezve van az oszlopleképezés, a meglévő adatok adatfájljainak újraírása nélkül is elvetheti vagy átnevezheti a tábla oszlopait. Ha az oszlopleképezés engedélyezve van, a változásadatcsatorna korlátozásokkal rendelkezik a nem additív sémamódosítások, például az oszlopok átnevezése vagy elvetése, az adattípus módosítása vagy a nullbilitás módosítása után.

Fontos

  • Nem olvasható be a változásadatcsatorna olyan tranzakcióhoz vagy tartományhoz, amelyben nem additív sémamódosítás történik kötegelt szemantikával.
  • A Databricks Runtime 12.2 LTS-ben és az alábbi verziókban az oszlopleképezést engedélyező táblák, amelyek nem additív sémamódosításokat tapasztaltak, nem támogatják a változásadatcsatorna streamelési olvasásait. Lásd: Streamelés oszlopleképezéssel és sémamódosításokkal.
  • A Databricks Runtime 11.3 LTS-ben és az alábbi verziókban nem olvashatja az oszlopleképezést engedélyező táblák adatcsatornájának módosítását, amelyeknél az oszlopok átnevezése vagy elvetése történt.

A Databricks Runtime 12.2 LTS-ben és újabb verziókban kötegelt beolvasásokat végezhet a változásadatcsatornán olyan táblák esetében, amelyek oszlopleképezése engedélyezve van, és amelyek nem additív sémamódosításokat tapasztaltak. A tábla legújabb verziójának sémája helyett az olvasási műveletek a lekérdezésben megadott tábla végverziójának sémáját használják. A lekérdezések továbbra is sikertelenek maradnak, ha a megadott verziótartomány nem additív sémamódosításra terjed ki.