Megosztás:


Strukturált streamelési állapotinformációk olvasása

DataFrame-műveletek vagy SQL táblaértékfüggvények használatával lekérdezheti a strukturált streamelési állapot adatait és metaadatait. Ezekkel a függvényekkel megfigyelheti a strukturált streamelési állapotalapú lekérdezések állapotadatait, amelyek a figyeléshez és a hibakereséshez hasznosak lehetnek.

Az állapotadatok vagy metaadatok lekérdezéséhez olvasási hozzáféréssel kell rendelkeznie a streamelési lekérdezés ellenőrzőpont-elérési útjának eléréséhez. A cikkben ismertetett függvények írásvédett hozzáférést biztosítanak az állapotadatokhoz és metaadatokhoz. Az állapotinformációk lekérdezéséhez csak kötegelt olvasási szemantikát szabad használni.

Feljegyzés

A Lakeflow Spark deklaratív folyamatainak, streamtábláinak és materializált nézeteinek állapotadatai nem kérdezhetők le. Az állapotadatok nem kérdezhetők le kiszolgáló nélküli számítással vagy standard hozzáférési móddal konfigurált számítással.

Követelmények

  • Használja az alábbi számítási konfigurációk egyikét:
    • Databricks Runtime 16.3 és újabb verziók standard hozzáférési móddal konfigurált számításon.
    • A Databricks Runtime 14.3 LTS és újabb verziója dedikált vagy elkülönítés nélküli hozzáférési móddal konfigurált számítási egységen.
  • A streamelési lekérdezés által használt ellenőrzőpont elérési útjához való olvasási hozzáférés.

Strukturált streamelési állapottároló olvasása

A támogatott Databricks-futtatókörnyezetekben végrehajtott strukturált streamelési lekérdezések állapottárolási információit olvashatja. Alkalmazza a következő szintaxist:

Python

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore('/checkpoint/path')

Állapotolvasó API-paraméterei

Az állapotolvasó API a következő választható konfigurációkat támogatja:

Lehetőség Típus Alapértelmezett érték Leírás
batchId Hosszú legújabb kötegazonosító Azt a célkötetet jelöli, amelyből olvasni kíván. Ezzel a beállítással lekérdezheti a lekérdezés egy korábbi állapotának állapotadatait. A tételt véglegesíteni kell, de még nem szabad megtisztítani.
operatorId Hosszú 0 Azt a céloperátort jelöli, amelyből olvasni szeretne. Ez a beállítás akkor használatos, ha a lekérdezés több állapotalapú operátort használ.
storeName Sztring "ALAPÉRTELMEZETT" A célállapot-tároló nevét jelöli, amelyből olvasni szeretne. Ez a beállítás akkor használható, ha az állapotalapú operátor több állapottároló-példányt használ. A stream-steam illesztéshez storeName vagy joinSide megadása szükséges, de nem mindkettő.
joinSide String ("balra" vagy "jobbra") A céloldalt jelöli, amelyről olvasni szeretne. Ezt a lehetőséget akkor használja a rendszer, ha a felhasználók stream-csatlakozásból szeretnék beolvasni az állapotot.
stateVarName Sztring Egyik sem A lekérdezés részeként beolvasandó állapotváltozó neve. Az állapotváltozó neve az egyes változóknak az init operátor által StatefulProcessor használt függvényen transformWithState belül megadott egyedi név. Ez a beállítás kötelező, ha az transformWithState operátort használja. Ez a beállítás csak az transformWithState operátorra vonatkozik, és más operátorok esetében figyelmen kívül hagyja. A Databricks Runtime 16.2-ben és újabb verziókban érhető el.
readRegisteredTimers logikai hamis Az operátorban használt regisztrált időzítők olvasására true van beállítva transformWithState-ben. Ez a beállítás csak az transformWithState operátorra vonatkozik, és más operátorok esetében figyelmen kívül hagyja. A Databricks Runtime 16.2-ben és újabb verziókban érhető el.
flattenCollectionTypes logikai igaz Ha true, lapítja a leképezési és listázási állapotváltozókhoz visszaadott rekordokat. Ha false, a rekordokat Spark SQL Array vagy Map segítségével adják vissza. Ez a beállítás csak az transformWithState operátorra vonatkozik, és más operátorok esetében figyelmen kívül hagyja. A Databricks Runtime 16.2-ben és újabb verziókban érhető el.

A visszaadott adatok sémája a következő:

Oszlop Típus Leírás
key Struct (az állapotkulcsból származtatott további típus) Az állapotalapú operátorrekord kulcsa az állapot-ellenőrzőpontban.
value Struct (az állapotértékből származtatott további típus) Az állapotalapú operátorrekord értéke az állapot-ellenőrzőpontban.
partition_id Egész szám Az állapotalapú operátorrekordot tartalmazó állapot-ellenőrzőpont partíciója.

Lásd read_statestore táblaértékelt függvény.

Strukturált streamelési állapot metaadatainak olvasása

Fontos

Az állapot metaadatainak rögzítéséhez streamelési lekérdezéseket kell futtatnia a Databricks Runtime 14.2-ben vagy újabb verziójában. Az állapot metaadat-fájljai nem szakítják meg a visszamenőleges kompatibilitást. Ha streamelési lekérdezést futtat a Databricks Runtime 14.1 vagy újabb verzióján, a rendszer figyelmen kívül hagyja a meglévő állapot metaadatfájljait, és nem ír új állapot metaadatfájlokat.

A Databricks Runtime 14.2 vagy újabb verzióján futó strukturált streamelési lekérdezések állapot metaadatait olvashatja. Alkalmazza a következő szintaxist:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

A visszaadott adatok sémája a következő:

Oszlop Típus Leírás
operatorId Egész szám Az állapotalapú streaming operátor egész azonosítója.
operatorName Egész szám Az állapotalapú streamelő operátor neve.
stateStoreName Sztring Az operátor állapottárolójának neve.
numPartitions Egész szám Az állapottároló partícióinak száma.
minBatchId Hosszú Az állapot lekérdezéséhez elérhető minimális kötegazonosító.
maxBatchId Hosszú Az állapot lekérdezéséhez elérhető maximális kötegazonosító.

Feljegyzés

A minBatchId és maxBatchId által megadott kötegazonosító értékek az ellenőrzőpont megírásának idejének állapotát tükrözik. A régi kötegek automatikusan törlődnek a mikroköteg végrehajtásával, így az itt megadott érték nem garantált, hogy továbbra is elérhető marad.

Lásd read_state_metadata táblaértékelt függvény.

Példa: Stream-stream illesztés egyik oldalának lekérdezése

A stream-stream illesztés bal oldalának lekérdezéséhez használja az alábbi szintaxist:

Python

left_df = (spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    joinSide => 'left'
);

Példa: Adatfolyam állapottárolójának lekérdezése több állapotalapú operátorral

Ez a példa az állapot metaadat-olvasójával gyűjti össze egy streamelési lekérdezés metaadatait több állapotalapú operátorral, majd a metaadat-eredményeket használja az állapotolvasó beállításaiként.

Az állapot metaadat-olvasója az ellenőrzőpont elérési útját használja egyetlen lehetőségként, ahogyan az alábbi szintaxisbeli példában is látható:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Az alábbi táblázat az állapottár metaadatainak példakimenetét mutatja be:

operátorazonosító operátor neve állapotTárolóNév numPartitions minBatchId maxBatchId
0 stateStoreSave alapértelmezett 200 0 13
1 dedupeWithinWatermark alapértelmezett 200 0 13

Az operátor eredményének dedupeWithinWatermark lekéréséhez kérje le az állapotolvasót a operatorId beállítással, ahogyan az alábbi példában is látható:

Python

left_df = (spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    operatorId => 1
);