Použití příkazu foreachBatch k zápisu do libovolných datových jímek

Tato stránka ukazuje, jak použít foreachBatch se strukturovaným streamováním k zápisu výstupu dotazu streamování do zdrojů dat, které nemají existující jímku streamování.

Vzor streamingDF.writeStream.foreachBatch(...) kódu umožňuje použít dávkové funkce na výstupní data každého mikrodávkového streamovacího dotazu. Funkce používané s foreachBatch mají dva parametry:

  • Datový rámec, který obsahuje výstupní data mikrodávky.
  • Jedinečné ID mikrodávky.

Musíte pro operace sloučení v Delta Lake použít foreachBatch ve strukturovaném streamování. Zobrazit Upsert ze streamovaných dotazů pomocí foreachBatch.

Použití dalších operací datového rámce

V streamovaných datových rámcích se nepodporuje mnoho operací datových rámců a datových sad, protože Spark v těchto případech nepodporuje generování přírůstkových plánů. Pomocí foreachBatch() můžete některé z těchto operací použít na každém výstupu mikrodávky. Můžete například použít foreachBatch() a operaci SQL MERGE INTO k zápisu výstupu agregací streamování do tabulky Delta v režimu aktualizace. Další podrobnosti najdete v MERGE INTO.

Důležité

  • foreachBatch() poskytuje záruky zápisu alespoň jednou. Můžete ale použít batchId poskytnuté funkci jako prostředek k odstranění duplicitních dat ve výstupu a získání záruky, že se duplikace stanou jen jednou. V obou případech budete muset sami uvažovat o koncových sémantikách.
  • foreachBatch()nefunguje s režimem průběžného zpracování, protože se v podstatě spoléhá na mikrodávkové spouštění streamovacího dotazu. Pokud zapisujete data v nepřetržitém režimu, použijte foreach() místo toho.
  • Při použití foreachBatch se stavovým operátorem je důležité před dokončením zpracování zcela využívat každou dávku. Zobrazit úplné využití jednotlivých dávkových datových rámců

Zpracování prázdných datových rámců

foreachBatch() může obdržet prázdný datový rámec a váš kód musí tento scénář zpracovat. Jinak může váš dotaz selhat.

Pokud je například zdrojem streamování Delta Lake, mohou tyto scénáře předat prázdný datový rámec:foreachBatch()

  • OPTIMIZE bez souborů ke zpracování: Pokud operace OPTIMIZE probíhá v tabulce Delta Lake, ale není žádný soubor k zpracování, Structured Streaming zapíše do protokolu posunu položku, čímž dojde ke zvýšení verze tabulky. Tím se v jímce vytvoří prázdná mikrodávka, i když se nečtou žádné soubory.
  • Vyřazení souborů na úrovni fyzického plánu: Pokud zasunutí predikátu nebo odstranění souboru eliminuje všechny záznamy na úrovni fyzického plánu, výsledkem je prázdný záznam do cílového úložiště.

Uživatelský kód musí zpracovávat prázdné datové rámce, aby umožňoval správné operace. Podívejte se na následující příklady:

Python

def process_batch(output_df, batch_id):
  # Process valid DataFrames only
  if not output_df.isEmpty():
    # business logic
    pass

streamingDF.writeStream.foreachBatch(process_batch).start()

Scala

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid DataFrames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Změny chování v foreachBatch Databricks Runtime 14.0

Ve službě Databricks Runtime 14.0 a novějších na výpočetních prostředcích nakonfigurovaných pomocí standardního režimu přístupu platí následující změny chování:

  • print() příkazy zapisují výstup do protokolů ovladačů.
  • K dílčímu dbutils.widgets modulu uvnitř funkce nelze získat přístup.
  • Všechny soubory, moduly nebo objekty odkazované ve funkci musí být serializovatelné a dostupné ve Sparku.

Opakované použití existujících dávkových zdrojů dat

Pomocí foreachBatch(), můžete použít existující dávkové zapisovače dat pro jímky dat, které nemusí mít podporu strukturovaného streamování. Tady je pár příkladů:

Mnoho dalších dávkových zdrojů dat lze použít z foreachBatch(). Viz Připojení ke zdrojům dat a externím službám.

Zápis do více lokalit

Pokud potřebujete napsat výstup streamovacího dotazu do více umístění, databricks doporučuje používat více zapisovačů strukturovaného streamování pro zajištění nejlepší paralelizace a propustnosti.

Použití foreachBatch k zápisu do více cílů serializuje provádění streamovacích zápisů, což může zvýšit latenci pro každou mikro dávku.

Pokud se používá foreachBatch k zápisu do více tabulek Delta, přečtěte si téma Použití foreachBatch pro zápisy z idempotentní tabulky.

Úplné využití každého dávkového datového rámce

Pokud používáte stavové operátory (například pomocí dropDuplicatesWithinWatermark), musí každá dávková iterace využívat celý datový rámec nebo restartovat dotaz. Pokud nevyužijete celý DataFrame, streamovací dotaz selže při zpracování další dávky.

K tomu může dojít v několika případech. Následující příklady ukazují, jak opravit dotazy, které nevyužívají datový rámec správně.

Úmyslné použití podmnožiny sady

Pokud vás zajímá jenom podmnožina dávky, můžete mít kód, například následující.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def partial_func(batch_df, batch_id):
  batch_df.show(2)

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

V tomto případě batch_df.show(2) jediné zpracovává první dvě položky v dávce, což se očekává, ale pokud existuje více položek, je nutné je spotřebovat. Následující kód využívá celý datový rámec.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

# function to do nothing with a row
def do_nothing(row):
  pass

def partial_func(batch_df, batch_id):
  batch_df.show(2)
  batch_df.foreach(do_nothing) # silently consume the rest of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

do_nothing Funkce zde tiše ignoruje zbytek datového rámce.

Řešení chyby v dávkovém procesu

Pro zpracování chyb v foreachBatch Databricks doporučuje, abyste umožnili rychlé ukončení streamovacího dotazu a místo toho se spoléhali na vrstvu orchestrace, jako jsou Lakeflow úlohy nebo Apache Airflow, ke správě opakované logiky. Je to mnohem bezpečnější než vytváření složitých smyček opakování v kódu, kdy může dojít ke ztrátě dat.

Tady jsou pokyny založené na vašem cílovém textu:

Target Příklady Pokyny
Operace datového rámce Tabulky Delta Lake K zajištění idempotence a ochrany správnosti dat při opakování musíte použít možnosti zápisu txnAppId a txnVersion a svázat txnVersion s batchId. Nezachycujte a nezkoušejte výjimky zpracovat místně. Místo toho Databricks doporučuje, abyste povolili šíření chyb, aby metriky Sparku zůstaly přesné, data se nezdvojovala a orchestrátor mohl čistě opakovat celou dávku.
Vlastní kód a externí cíle .collect(), databáze OLTP, fronty zpráv, rozhraní API Implementujte vlastní idempotenci. Musíte předpokládat, že jakákoli operace může a bude se opakovat napříč dávkami. Pokud batchId zůstane stejný, výsledek vaší operace musí zůstat stejný. Můžete opakovat čistě přechodné chyby, jako jsou krátké vypršení časového limitu připojení, ale v případě, že opakování nakonec selže, dávejte extrémní pozor, abyste se vyhnuli částečným nebo duplicitním zápisům. Nejbezpečnějším přístupem je umožnit šíření chyb a umožnit orchestrátoru opakování celé dávky.

Tady je několik příkladů typů výjimek a doporučení pro jejich zpracování v foreachBatch:

Typ výjimky Příklady Doporučená akce
Přechodné chyby jímky SQLTransientConnectionException, HTTP 429, časové limity Zachycení: opakování nebo odeslání do fronty nedoručených zpráv
Porušení omezení duplicity nebo klíčových omezení, pokud je cíl idempotentní. SQLIntegrityConstraintViolationException Zachycení: záznam a potlačení
Vlastní chyby s možností opakování Výjimky obaleného soketu, opakovatelné chyby databáze Zachytit: zvýšit metriky a umožnit řízené pokračování
Chyby logiky nebo schématu NullPointerException, AttributeError, neshoda schématu Propagace: Spark nechť selže dotaz
Chyby jímky, které se nedají opakovat, nebo nezachycené chyby logiky ValueError, PermissionError Propagace: Spark nechť selže dotaz
Kritické chyby OutOfMemoryError, poškozený stav, porušení integrity dat Propagace: Spark nechť selže dotaz

Příklady kódu: Zpracování výjimek

Následující příklady záměrně vyvolávají chybu foreach , aby se zobrazily různé přístupy pro zpracování chyby:

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have


q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Výše uvedený kód zpracovává a bezobslužně potlačí chybu a nemusí spotřebovávat zbytek dávky. Existují dvě možnosti pro zpracování této situace.

Nejprve můžete znovu vyvolat chybu, což ji předá do orchestrační vrstvy k opětovnému zpracování dávky. To může vyřešit chybu, pokud se jedná o přechodný problém, nebo ji předat provoznímu týmu, aby se pokusil o ruční opravu. Uděláte to tak, že změníte partial_func kód tak, aby vypadal takto:

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    raise e # re-raise the issue

Za druhé, pokud chcete zachytit výjimku a ignorovat zbytek dávky, můžete změnit kód, aby použil funkci do_nothing k tichému ignorování zbytku dávky.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

# function to do nothing with a row
def do_nothing(row):
    pass

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    batch_df.foreach(do_nothing) # silently consume the remainder of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()