Jegyzet
Az oldalhoz való hozzáférés engedélyezést igényel. Próbálhatod be jelentkezni vagy könyvtárat váltani.
Az oldalhoz való hozzáférés engedélyezést igényel. Megpróbálhatod a könyvtár váltását.
DataFrame-műveletek vagy SQL táblaértékfüggvények használatával lekérdezheti a strukturált streamelési állapot adatait és metaadatait. Ezekkel a függvényekkel megfigyelheti a strukturált streamelési állapotalapú lekérdezések állapotadatait, amelyek a figyeléshez és a hibakereséshez hasznosak lehetnek.
Az állapotadatok vagy metaadatok lekérdezéséhez olvasási hozzáféréssel kell rendelkeznie a streamelési lekérdezés ellenőrzőpont-elérési útjának eléréséhez. A cikkben ismertetett függvények írásvédett hozzáférést biztosítanak az állapotadatokhoz és metaadatokhoz. Az állapotinformációk lekérdezéséhez csak kötegelt olvasási szemantikát szabad használni.
Feljegyzés
A Lakeflow Spark deklaratív folyamatainak, streamtábláinak és materializált nézeteinek állapotadatai nem kérdezhetők le. Az állapotadatok nem kérdezhetők le kiszolgáló nélküli számítással vagy standard hozzáférési móddal konfigurált számítással.
Követelmények
- Használja az alábbi számítási konfigurációk egyikét:
- Databricks Runtime 16.3 és újabb verziók standard hozzáférési móddal konfigurált számításon.
- A Databricks Runtime 14.3 LTS és újabb verziója dedikált vagy elkülönítés nélküli hozzáférési móddal konfigurált számítási egységen.
- A streamelési lekérdezés által használt ellenőrzőpont elérési útjához való olvasási hozzáférés.
Strukturált streamelési állapottároló olvasása
A támogatott Databricks-futtatókörnyezetekben végrehajtott strukturált streamelési lekérdezések állapottárolási információit olvashatja. Alkalmazza a következő szintaxist:
Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore('/checkpoint/path')
Állapotolvasó API-paraméterei
Az állapotolvasó API a következő választható konfigurációkat támogatja:
| Lehetőség | Típus | Alapértelmezett érték | Leírás |
|---|---|---|---|
batchId |
Hosszú | legújabb kötegazonosító | Azt a célkötetet jelöli, amelyből olvasni kíván. Ezzel a beállítással lekérdezheti a lekérdezés egy korábbi állapotának állapotadatait. A tételt véglegesíteni kell, de még nem szabad megtisztítani. |
operatorId |
Hosszú | 0 | Azt a céloperátort jelöli, amelyből olvasni szeretne. Ez a beállítás akkor használatos, ha a lekérdezés több állapotalapú operátort használ. |
storeName |
Sztring | "ALAPÉRTELMEZETT" | A célállapot-tároló nevét jelöli, amelyből olvasni szeretne. Ez a beállítás akkor használható, ha az állapotalapú operátor több állapottároló-példányt használ. A stream-steam illesztéshez storeName vagy joinSide megadása szükséges, de nem mindkettő. |
joinSide |
String ("balra" vagy "jobbra") | A céloldalt jelöli, amelyről olvasni szeretne. Ezt a lehetőséget akkor használja a rendszer, ha a felhasználók stream-csatlakozásból szeretnék beolvasni az állapotot. | |
stateVarName |
Sztring | Egyik sem | A lekérdezés részeként beolvasandó állapotváltozó neve. Az állapotváltozó neve az egyes változóknak az init operátor által StatefulProcessor használt függvényen transformWithState belül megadott egyedi név. Ez a beállítás kötelező, ha az transformWithState operátort használja. Ez a beállítás csak az transformWithState operátorra vonatkozik, és más operátorok esetében figyelmen kívül hagyja. A Databricks Runtime 16.2-ben és újabb verziókban érhető el. |
readRegisteredTimers |
logikai | hamis | Az operátorban használt regisztrált időzítők olvasására true van beállítva transformWithState-ben. Ez a beállítás csak az transformWithState operátorra vonatkozik, és más operátorok esetében figyelmen kívül hagyja. A Databricks Runtime 16.2-ben és újabb verziókban érhető el. |
flattenCollectionTypes |
logikai | igaz | Ha true, lapítja a leképezési és listázási állapotváltozókhoz visszaadott rekordokat. Ha false, a rekordokat Spark SQL Array vagy Map segítségével adják vissza. Ez a beállítás csak az transformWithState operátorra vonatkozik, és más operátorok esetében figyelmen kívül hagyja. A Databricks Runtime 16.2-ben és újabb verziókban érhető el. |
A visszaadott adatok sémája a következő:
| Oszlop | Típus | Leírás |
|---|---|---|
key |
Struct (az állapotkulcsból származtatott további típus) | Az állapotalapú operátorrekord kulcsa az állapot-ellenőrzőpontban. |
value |
Struct (az állapotértékből származtatott további típus) | Az állapotalapú operátorrekord értéke az állapot-ellenőrzőpontban. |
partition_id |
Egész szám | Az állapotalapú operátorrekordot tartalmazó állapot-ellenőrzőpont partíciója. |
Lásd read_statestore táblaértékelt függvény.
Strukturált streamelési állapot metaadatainak olvasása
Fontos
Az állapot metaadatainak rögzítéséhez streamelési lekérdezéseket kell futtatnia a Databricks Runtime 14.2-ben vagy újabb verziójában. Az állapot metaadat-fájljai nem szakítják meg a visszamenőleges kompatibilitást. Ha streamelési lekérdezést futtat a Databricks Runtime 14.1 vagy újabb verzióján, a rendszer figyelmen kívül hagyja a meglévő állapot metaadatfájljait, és nem ír új állapot metaadatfájlokat.
A Databricks Runtime 14.2 vagy újabb verzióján futó strukturált streamelési lekérdezések állapot metaadatait olvashatja. Alkalmazza a következő szintaxist:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
A visszaadott adatok sémája a következő:
| Oszlop | Típus | Leírás |
|---|---|---|
operatorId |
Egész szám | Az állapotalapú streaming operátor egész azonosítója. |
operatorName |
Egész szám | Az állapotalapú streamelő operátor neve. |
stateStoreName |
Sztring | Az operátor állapottárolójának neve. |
numPartitions |
Egész szám | Az állapottároló partícióinak száma. |
minBatchId |
Hosszú | Az állapot lekérdezéséhez elérhető minimális kötegazonosító. |
maxBatchId |
Hosszú | Az állapot lekérdezéséhez elérhető maximális kötegazonosító. |
Feljegyzés
A minBatchId és maxBatchId által megadott kötegazonosító értékek az ellenőrzőpont megírásának idejének állapotát tükrözik. A régi kötegek automatikusan törlődnek a mikroköteg végrehajtásával, így az itt megadott érték nem garantált, hogy továbbra is elérhető marad.
Lásd read_state_metadata táblaértékelt függvény.
Példa: Stream-stream illesztés egyik oldalának lekérdezése
A stream-stream illesztés bal oldalának lekérdezéséhez használja az alábbi szintaxist:
Python
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
Példa: Adatfolyam állapottárolójának lekérdezése több állapotalapú operátorral
Ez a példa az állapot metaadat-olvasójával gyűjti össze egy streamelési lekérdezés metaadatait több állapotalapú operátorral, majd a metaadat-eredményeket használja az állapotolvasó beállításaiként.
Az állapot metaadat-olvasója az ellenőrzőpont elérési útját használja egyetlen lehetőségként, ahogyan az alábbi szintaxisbeli példában is látható:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Az alábbi táblázat az állapottár metaadatainak példakimenetét mutatja be:
| operátorazonosító | operátor neve | állapotTárolóNév | numPartitions | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | stateStoreSave | alapértelmezett | 200 | 0 | 13 |
| 1 | dedupeWithinWatermark | alapértelmezett | 200 | 0 | 13 |
Az operátor eredményének dedupeWithinWatermark lekéréséhez kérje le az állapotolvasót a operatorId beállítással, ahogyan az alábbi példában is látható:
Python
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);