Sdílet prostřednictvím


Kontrolní body strukturovaného streamování

Kontrolní body a protokoly s předstihem pro zápis spolupracují a poskytují záruky zpracování pro úlohy strukturovaného streamování. Kontrolní bod sleduje informace, které identifikují dotaz, včetně informací o stavu a zpracovaných záznamů. Když odstraníte soubory v adresáři kontrolních bodů nebo změníte umístění kontrolního bodu, další spuštění dotazu začne znovu.

Každý dotaz musí mít jiné umístění kontrolního bodu. Více dotazů by nikdy nemělo sdílet stejné umístění.

Povolení vytváření kontrolních bodů pro dotazy strukturovaného streamování

Před spuštěním streamovacího dotazu musíte zadat checkpointLocation možnost, jak je znázorněno v následujícím příkladu:

Python

(df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")
)

Scala

df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")

Poznámka:

Některé sínky, jako je výstup pro display() v poznámkových blocích a sínka memory, automaticky vygenerují dočasné umístění kontrolního bodu, pokud tuto možnost vynecháte. Tato dočasná umístění kontrolních bodů nezajistí žádnou odolnost proti chybám ani záruky konzistence dat a nemusí se správně vyčistit. Databricks doporučuje vždy zadat umístění kontrolního bodu pro tyto jímky.

Obnovení po změnách v dotazu strukturovaného streamování

Existují omezení toho, jaké změny v dotazu na streamování jsou povolené mezi jednotlivými restartováními ze stejného umístění kontrolního bodu. Tady je několik změn, které buď nejsou povolené, nebo efekt změny není dobře definovaný. Pro všechny:

  • Povolený termín znamená, že zadanou změnu můžete provést, ale to, jestli je sémantika jejího účinku dobře definovaná, závisí na dotazu a změně.
  • Nepovolený termín znamená, že byste neměli provést zadanou změnu, protože restartovaný dotaz pravděpodobně selže s nepředvídatelnými chybami.
  • sdf představuje streamovaný datový rámec nebo datovou sadu vygenerovanou pomocí sparkSession.readStream.

Typy změn v dotazech strukturovaného streamování

  • Změny čísla nebo typu (tj. jiného zdroje) vstupních zdrojů: To není povoleno.

  • Změny parametrů vstupních zdrojů: Zda je to povoleno a zda jsou sémantika změny dobře definovaná, závisí na zdroji a dotazu, včetně ovládacích prvků přístupu, jako maxFilesPerTrigger je nebo maxOffsetsPerTrigger. Tady je pár příkladů.

    • Přidání, odstranění a úprava limitů frekvence jsou povoleny:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      na

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • Změny odebíraných článků a souborů se obecně nepovolují, protože výsledky jsou nepředvídatelné: spark.readStream.format("kafka").option("subscribe", "article")spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Změny v intervalu spuštění: Můžete měnit spuštění mezi přírůstkovými dávkami a časovými intervaly. Viz Změna intervalů spuštění mezi běhy.

  • Změny typu výstupní jímky: Změny mezi několika konkrétními kombinacemi jímek jsou povolené. To je potřeba ověřit na základě případu. Tady je pár příkladů.

    • Je povolená jímka souborů do jímky Kafka. Kafka uvidí jenom nová data.
    • Použití Kafka sinku jako výstupu do souborů není povoleno.
    • Je dovoleno změnit snímač Kafka na foreach nebo naopak.
  • Změny parametrů výstupní jímky: Jestli je tato možnost povolená a jestli jsou sémantika změny dobře definovaná, závisí na jímce a dotazu. Tady je pár příkladů.

    • Změny výstupního adresáře jímky souborů nejsou povoleny: sdf.writeStream.format("parquet").option("path", "/somePath")sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Změny v tématu výstupu jsou povolené: sdf.writeStream.format("kafka").option("topic", "topic1")sdf.writeStream.format("kafka").option("topic", "topic2")
    • Změny uživatelem definovaného foreach jímky (tj. ForeachWriter kódu) jsou povolené, ale význam změny závisí na kódu.
  • Změny v projekci / filtru / operacích podobných mapě: Některé případy jsou povoleny. Například:

    • Přidání nebo odstranění filtrů je povoleno: sdf.selectExpr("a") do sdf.where(...).selectExpr("a").filter(...).
    • Změny v projekcích se stejným schématem výstupu jsou povoleny: sdf.selectExpr("stringColumn AS json").writeStream do sdf.select(to_json(...).as("json")).writeStream.
    • Změny v projekcích s různým výstupním schématem jsou podmíněně povoleny: sdf.selectExpr("a").writeStream na sdf.selectExpr("b").writeStream je povoleno pouze v případě, že výstupní modul umožňuje změnu schématu z "a" na "b".
  • Změny ve stavových operacích: Některé operace v dotazech streamování musí udržovat stavová data, aby bylo možné průběžně aktualizovat výsledek. Strukturované streamování automaticky kontroluje stavová data do úložiště odolného proti chybám (například DBFS, Azure Blob Storage) a po restartování je obnoví. Předpokládá se však, že schéma stavových dat zůstává v restartech stejné. To znamená, že mezi restartováními nejsou povolené všechny změny (tj. přidání, odstranění nebo úpravy schématu) stavových operací streamovacího dotazu. Tady je seznam stavových operací, jejichž schéma by se nemělo mezi restartováními měnit, aby se zajistilo obnovení stavu:

    • Agregace streamování: Například sdf.groupBy("a").agg(...). Jakákoli změna počtu nebo typů klíčů nebo agregací není povolena.
    • Deduplikace streamování: Například sdf.dropDuplicates("a"). Jakákoli změna počtu nebo typů klíčů nebo agregací není povolena.
    • Spojení datových proudů: Například sdf1.join(sdf2, ...) (tj. oba vstupy se generují pomocí sparkSession.readStream). Změny ve schématu nebo sloupcích pro ekvijoining nejsou povoleny. Změny typu spojení (vnější nebo vnitřní) nejsou povoleny. Jiné změny v podmínce spojení jsou špatně definované.
    • Libovolná stavová operace: Například sdf.groupByKey(...).mapGroupsWithState(...) nebo sdf.groupByKey(...).flatMapGroupsWithState(...). Jakákoli změna schématu uživatelem definovaného stavu a typ časového limitu není povolený. Všechny změny v rámci uživatelem definované funkce mapování stavu jsou povolené, ale sémantický účinek změny závisí na uživatelsky definované logice. Pokud opravdu chcete podporovat změny schématu stavu, můžete explicitně zakódovat nebo dekódovat složité stavové datové struktury do bajtů pomocí schématu kódování/dekódování, které podporuje migraci schématu. Pokud například uložíte stav jako bajty s kódováním Avro, můžete změnit schéma stavu Avro mezi restartováním dotazu, protože tím se obnoví binární stav.

Důležité

Stavové operátory dropDuplicates() a dropDuplicatesWithinWatermark() mohou selhat při restartování kvůli kontrole kompatibility schématu stavu při přepínání mezi režimy přístupu k výpočetnímu výkonu.

Změna mezi režimy přístupu vyhrazenými a bez izolace je povolená. Změna mezi standardními a bezserverovými režimy přístupu je povolená. Nepokoušejte se změnit mezi jinými kombinacemi režimu přístupu.

Chcete-li se této chybě vyhnout, neměňte režim výpočetního přístupu pro dotazy streamování, které obsahují tyto operátory.