Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Operace datového rámce nebo funkce tabulek SQL můžete použít k dotazování dat a metadat stavu strukturovaného streamování. Pomocí těchto funkcí můžete sledovat stavové informace pro stavové dotazy strukturovaného streamování, které můžou být užitečné pro monitorování a ladění.
Abyste mohli dotazovat stavová data nebo metadata, musíte mít přístup ke čtení cesty ke kontrolnímu bodu dotazu streamování. Funkce popsané v tomto článku poskytují přístup jen pro čtení k datům a metadatům stavu. K dotazování informací o stavu můžete použít pouze sémantiku dávkového čtení.
Poznámka:
Nelze dotazovat informace o stavu pro deklarativní kanály Lakeflow Sparku, streamované tabulky nebo materializovaná zobrazení. Informace o stavu nemůžete dotazovat pomocí výpočetních prostředků bez serveru nebo výpočetních prostředků nakonfigurovaných pomocí standardního režimu přístupu.
Požadavky
- Použijte jednu z následujících konfigurací výpočetních prostředků:
- Databricks Runtime 16.3 a novější na výpočetních prostředcích nakonfigurovaných pomocí standardního režimu přístupu
- Databricks Runtime 14.3 LTS a vyšší na výpočetních prostředcích nakonfigurovaných s vyhrazeným režimem přístupu nebo bez režimu izolace.
- Přístup pro čtení k cestě kontrolního bodu, kterou používá streamovací dotaz.
Čtení stavového úložiště strukturovaného streamování
Číst můžete informace o úložišti stavu pro strukturované streamování dotazů spuštěné v jakémkoli podporovaném modulu Databricks Runtime. Použijte následující syntax:
Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore('/checkpoint/path')
Parametry rozhraní API čtečky stavu
Rozhraní API pro čtení stavu podporuje následující volitelné konfigurace:
| Možnost | Typ | Výchozí hodnota | Popis |
|---|---|---|---|
batchId |
Dlouhý | nejnovější ID dávky | Představuje cílovou dávku, ze které čteme. Tuto možnost zadejte, pokud chcete dotazovat informace o stavu pro dřívější stav dotazu. Dávka musí být komitována, ale ještě není vyčištěna. |
operatorId |
Dlouhý | 0 | Představuje cílový operátor pro čtení. Tato možnost se používá, když dotaz používá více stavových operátorů. |
storeName |
Řetězec | "VÝCHOZÍ" | Představuje název cílového úložiště stavu, z kterého se má číst. Tato možnost se používá, když stavový operátor používá více instancí úložiště stavů. Buď storeName, nebo joinSide musí být zadány pro spojení stream-steam, ale ne oba. |
joinSide |
Řetězec ("vlevo" nebo "vpravo") | Představuje cílovou stranu, ze které se má číst. Tato možnost se používá, když uživatelé chtějí číst stav z připojení stream-stream. | |
stateVarName |
Řetězec | Žádné | Název stavové proměnné, který se má přečíst jako součást tohoto dotazu. Název stavové proměnné je jedinečný název zadaný každé proměnné v rámci init funkce StatefulProcessor používané operátorem transformWithState . Tato možnost je povinná, pokud je použit operátor transformWithState. Tato možnost se vztahuje pouze na transformWithState operátor a je ignorována pro ostatní operátory. K dispozici v Databricks Runtime 16.2 a novějších. |
readRegisteredTimers |
logický | Nepravda | Nastavte na true, abyste mohli číst registrované časovače používané v rámci operátoru transformWithState. Tato možnost se vztahuje pouze na transformWithState operátor a je ignorována pro ostatní operátory. K dispozici v Databricks Runtime 16.2 a novějších. |
flattenCollectionTypes |
logický | pravda | Pokud true, zploští záznamy vrácené pro stavové proměnné mapy a seznamu. Pokud false, záznamy se vrátí pomocí dotazu Spark SQL Array nebo Map. Tato možnost se vztahuje pouze na transformWithState operátor a je ignorována pro ostatní operátory. K dispozici v Databricks Runtime 16.2 a novějších. |
Vrácená data mají následující schéma:
| Sloupec | Typ | Popis |
|---|---|---|
key |
Struktura (další typ odvozený ze stavového klíče) | Klíč pro záznam stavového operátoru v kontrolním bodu stavu. |
value |
Struktura (další typ odvozený z hodnoty stavu) | Hodnota pro záznam stavového operátoru v kontrolním bodě stavu. |
partition_id |
Celé číslo | Část stavového kontrolního bodu, která obsahuje záznam stavového operátoru. |
Viz tabulkovou funkci read_statestore.
Čtení metadat stavu strukturovaného streamování
Důležité
Pokud chcete zaznamenávat metadata stavu, musíte spouštět streamované dotazy na Databricks Runtime 14.2 nebo vyšší. Soubory metadat stavu neporušují zpětnou kompatibilitu. Pokud se rozhodnete spustit streamovací dotaz na Databricks Runtime 14.1 nebo nižší, existující soubory metadat stavu se ignorují a nebudou zapsány žádné nové soubory metadat stavu.
Můžete číst informace o metadatech stavu pro dotazy strukturovaného streamování spuštěné ve službě Databricks Runtime 14.2 nebo vyšší. Použijte následující syntax:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Vrácená data mají následující schéma:
| Sloupec | Typ | Popis |
|---|---|---|
operatorId |
Celé číslo | Celočíselné ID stavového operátoru streamování. |
operatorName |
Celé číslo | Název stavového streamovacího operátoru |
stateStoreName |
Řetězec | Název státního úložiště operátora. |
numPartitions |
Celé číslo | Počet oddílů stavového úložiště |
minBatchId |
Dlouhý | Minimální ID dávky, které je k dispozici pro dotazování stavu. |
maxBatchId |
Dlouhý | Maximální id dávky, které je k dispozici pro dotazování stavu. |
Poznámka:
Hodnoty ID dávky dané minBatchId a maxBatchId odrážejí stav v okamžiku zápisu kontrolního bodu. Staré dávky se automaticky vyčistí pomocí mikrodávkového zpracování, takže poskytnutá hodnota není zaručeno, že bude stále dostupná.
Viz tabulkovou funkci read_state_metadata.
Příklad: Dotazování na jednu stranu spojení stream-stream
K dotazování na levou stranu spojení stream-stream použijte následující syntaxi:
Python
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
Příklad: Úložiště stavu dotazu pro stream s několika stavovými operátory
V tomto příkladu se pomocí čtečky metadat stavu shromáždí podrobnosti o streamovacím dotazu s více stavovými operátory a výsledky metadat pak použije jako možnosti pro čtenáře stavu.
Čtenář metadat stavu vezme cestu kontrolního bodu jako jedinou možnost, jako v následujícím příkladu syntaxe:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Následující tabulka představuje příklad výstupu metadat úložiště stavů:
| operatorId | jméno operátora | název úložiště stavu | početOddílů | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | stateStoreSave | výchozí | 200 | 0 | 13 |
| 1 | dedupeWithinWatermark | výchozí | 200 | 0 | 13 |
Pokud chcete získat výsledky pro dedupeWithinWatermark operátor, zadejte dotaz na čtenáře stavu s operatorId možností, jako v následujícím příkladu:
Python
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);