Sdílet prostřednictvím


Čtení informací o stavu strukturovaného streamování

Operace datového rámce nebo funkce tabulek SQL můžete použít k dotazování dat a metadat stavu strukturovaného streamování. Pomocí těchto funkcí můžete sledovat stavové informace pro stavové dotazy strukturovaného streamování, které můžou být užitečné pro monitorování a ladění.

Abyste mohli dotazovat stavová data nebo metadata, musíte mít přístup ke čtení cesty ke kontrolnímu bodu dotazu streamování. Funkce popsané v tomto článku poskytují přístup jen pro čtení k datům a metadatům stavu. K dotazování informací o stavu můžete použít pouze sémantiku dávkového čtení.

Poznámka:

Nelze dotazovat informace o stavu pro deklarativní kanály Lakeflow Sparku, streamované tabulky nebo materializovaná zobrazení. Informace o stavu nemůžete dotazovat pomocí výpočetních prostředků bez serveru nebo výpočetních prostředků nakonfigurovaných pomocí standardního režimu přístupu.

Požadavky

  • Použijte jednu z následujících konfigurací výpočetních prostředků:
    • Databricks Runtime 16.3 a novější na výpočetních prostředcích nakonfigurovaných pomocí standardního režimu přístupu
    • Databricks Runtime 14.3 LTS a vyšší na výpočetních prostředcích nakonfigurovaných s vyhrazeným režimem přístupu nebo bez režimu izolace.
  • Přístup pro čtení k cestě kontrolního bodu, kterou používá streamovací dotaz.

Čtení stavového úložiště strukturovaného streamování

Číst můžete informace o úložišti stavu pro strukturované streamování dotazů spuštěné v jakémkoli podporovaném modulu Databricks Runtime. Použijte následující syntax:

Python

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore('/checkpoint/path')

Parametry rozhraní API čtečky stavu

Rozhraní API pro čtení stavu podporuje následující volitelné konfigurace:

Možnost Typ Výchozí hodnota Popis
batchId Dlouhý nejnovější ID dávky Představuje cílovou dávku, ze které čteme. Tuto možnost zadejte, pokud chcete dotazovat informace o stavu pro dřívější stav dotazu. Dávka musí být komitována, ale ještě není vyčištěna.
operatorId Dlouhý 0 Představuje cílový operátor pro čtení. Tato možnost se používá, když dotaz používá více stavových operátorů.
storeName Řetězec "VÝCHOZÍ" Představuje název cílového úložiště stavu, z kterého se má číst. Tato možnost se používá, když stavový operátor používá více instancí úložiště stavů. Buď storeName, nebo joinSide musí být zadány pro spojení stream-steam, ale ne oba.
joinSide Řetězec ("vlevo" nebo "vpravo") Představuje cílovou stranu, ze které se má číst. Tato možnost se používá, když uživatelé chtějí číst stav z připojení stream-stream.
stateVarName Řetězec Žádné Název stavové proměnné, který se má přečíst jako součást tohoto dotazu. Název stavové proměnné je jedinečný název zadaný každé proměnné v rámci init funkce StatefulProcessor používané operátorem transformWithState . Tato možnost je povinná, pokud je použit operátor transformWithState. Tato možnost se vztahuje pouze na transformWithState operátor a je ignorována pro ostatní operátory. K dispozici v Databricks Runtime 16.2 a novějších.
readRegisteredTimers logický Nepravda Nastavte na true, abyste mohli číst registrované časovače používané v rámci operátoru transformWithState. Tato možnost se vztahuje pouze na transformWithState operátor a je ignorována pro ostatní operátory. K dispozici v Databricks Runtime 16.2 a novějších.
flattenCollectionTypes logický pravda Pokud true, zploští záznamy vrácené pro stavové proměnné mapy a seznamu. Pokud false, záznamy se vrátí pomocí dotazu Spark SQL Array nebo Map. Tato možnost se vztahuje pouze na transformWithState operátor a je ignorována pro ostatní operátory. K dispozici v Databricks Runtime 16.2 a novějších.

Vrácená data mají následující schéma:

Sloupec Typ Popis
key Struktura (další typ odvozený ze stavového klíče) Klíč pro záznam stavového operátoru v kontrolním bodu stavu.
value Struktura (další typ odvozený z hodnoty stavu) Hodnota pro záznam stavového operátoru v kontrolním bodě stavu.
partition_id Celé číslo Část stavového kontrolního bodu, která obsahuje záznam stavového operátoru.

Viz tabulkovou funkci read_statestore.

Čtení metadat stavu strukturovaného streamování

Důležité

Pokud chcete zaznamenávat metadata stavu, musíte spouštět streamované dotazy na Databricks Runtime 14.2 nebo vyšší. Soubory metadat stavu neporušují zpětnou kompatibilitu. Pokud se rozhodnete spustit streamovací dotaz na Databricks Runtime 14.1 nebo nižší, existující soubory metadat stavu se ignorují a nebudou zapsány žádné nové soubory metadat stavu.

Můžete číst informace o metadatech stavu pro dotazy strukturovaného streamování spuštěné ve službě Databricks Runtime 14.2 nebo vyšší. Použijte následující syntax:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Vrácená data mají následující schéma:

Sloupec Typ Popis
operatorId Celé číslo Celočíselné ID stavového operátoru streamování.
operatorName Celé číslo Název stavového streamovacího operátoru
stateStoreName Řetězec Název státního úložiště operátora.
numPartitions Celé číslo Počet oddílů stavového úložiště
minBatchId Dlouhý Minimální ID dávky, které je k dispozici pro dotazování stavu.
maxBatchId Dlouhý Maximální id dávky, které je k dispozici pro dotazování stavu.

Poznámka:

Hodnoty ID dávky dané minBatchId a maxBatchId odrážejí stav v okamžiku zápisu kontrolního bodu. Staré dávky se automaticky vyčistí pomocí mikrodávkového zpracování, takže poskytnutá hodnota není zaručeno, že bude stále dostupná.

Viz tabulkovou funkci read_state_metadata.

Příklad: Dotazování na jednu stranu spojení stream-stream

K dotazování na levou stranu spojení stream-stream použijte následující syntaxi:

Python

left_df = (spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    joinSide => 'left'
);

Příklad: Úložiště stavu dotazu pro stream s několika stavovými operátory

V tomto příkladu se pomocí čtečky metadat stavu shromáždí podrobnosti o streamovacím dotazu s více stavovými operátory a výsledky metadat pak použije jako možnosti pro čtenáře stavu.

Čtenář metadat stavu vezme cestu kontrolního bodu jako jedinou možnost, jako v následujícím příkladu syntaxe:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Následující tabulka představuje příklad výstupu metadat úložiště stavů:

operatorId jméno operátora název úložiště stavu početOddílů minBatchId maxBatchId
0 stateStoreSave výchozí 200 0 13
1 dedupeWithinWatermark výchozí 200 0 13

Pokud chcete získat výsledky pro dedupeWithinWatermark operátor, zadejte dotaz na čtenáře stavu s operatorId možností, jako v následujícím příkladu:

Python

left_df = (spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    operatorId => 1
);