Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Tento článek popisuje použití foreachBatch se strukturovaným streamováním k zápisu výstupu dotazu streamování do zdrojů dat, které nemají existující jímku streamování.
Vzor streamingDF.writeStream.foreachBatch(...) kódu umožňuje použít dávkové funkce na výstupní data každého mikrodávkového dotazu streamování. Funkce používané s foreachBatch mají dva parametry:
- Datový rámec, který obsahuje výstupní data mikrodávkové dávky.
- Jedinečné ID mikrodávkové dávky.
Musíte použít foreachBatch pro operace sloučení Delta Lake ve strukturovaném streamování. Zobrazit Upsert ze streamovaných dotazů pomocí foreachBatch.
Použití dalších operací datového rámce
V streamovaných datových rámcích se nepodporuje mnoho operací datových rámců a datových sad, protože Spark v těchto případech nepodporuje generování přírůstkových plánů. Některé z těchto operací můžete použít foreachBatch() na každém výstupu mikrodávkové dávky. Můžete například použít foreachBatch() a operaci SQL MERGE INTO k zápisu výstupu agregací streamování do tabulky Delta v režimu aktualizace. Další podrobnosti najdete v MERGE INTO.
Důležité
-
foreachBatch()poskytuje záruky zápisu pouze jednou. Můžete ale použítbatchIdposkytnuté funkci jako prostředek k odstranění duplicitních dat ve výstupu a získání záruky, že se duplikace stanou jen jednou. V obou případech budete muset zdůvodnění kompletní sémantiky sami. -
foreachBatch()nefunguje s režimem průběžného zpracování, protože se v podstatě spoléhá na mikrodávkové spouštění streamovacího dotazu. Pokud zapisujete data v nepřetržitém režimu, použijteforeach()místo toho. - Při použití
foreachBatchse stavovým operátorem je důležité před dokončením zpracování zcela využívat každou dávku. Zobrazit úplné využití jednotlivých dávkových datových rámců
Prázdný datový rámec lze vyvolat a foreachBatch() uživatelský kód musí být odolný, aby umožňoval správné operace. Příklad najdete tady:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Změny chování v foreachBatch Databricks Runtime 14.0
Ve službě Databricks Runtime 14.0 a novějších na výpočetních prostředcích nakonfigurovaných pomocí standardního režimu přístupu platí následující změny chování:
-
print()příkazy zapisuje výstup do protokolů ovladačů. - K dílčímu
dbutils.widgetsmodulu uvnitř funkce nelze získat přístup. - Všechny soubory, moduly nebo objekty odkazované ve funkci musí být serializovatelné a dostupné ve Sparku.
Opakované použití existujících dávkových zdrojů dat
Pomocí foreachBatch(), můžete použít existující dávkové zapisovače dat pro jímky dat, které nemusí mít podporu strukturovaného streamování. Tady je pár příkladů:
Mnoho dalších dávkových zdrojů dat lze použít z foreachBatch(). Viz Připojení ke zdrojům dat a externím službám.
Zápis do více umístění
Pokud potřebujete napsat výstup streamovacího dotazu do více umístění, databricks doporučuje používat více zapisovačů strukturovaného streamování pro zajištění nejlepší paralelizace a propustnosti.
Použití foreachBatch k zápisu do více jímek serializuje provádění zápisů streamování, což může zvýšit latenci pro každou mikrodávku.
Pokud použijete foreachBatch k zápisu do více tabulek Delta, přečtěte si téma Idempotentní zápisy do tabulek v foreachBatch.
Úplné využití každého dávkového datového rámce
Pokud používáte stavové operátory (například pomocí dropDuplicatesWithinWatermark), musí každá dávková iterace využívat celý datový rámec nebo restartovat dotaz. Pokud nevyužijete celý DataFrame, streamovací dotaz selže při zpracování další dávky.
K tomu může dojít v několika případech. Následující příklady ukazují, jak opravit dotazy, které nevyužívají datový rámec správně.
Úmyslné použití podmnožiny sady
Pokud vás zajímá jenom podmnožina dávky, můžete mít kód, například následující.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
V tomto případě batch_df.show(2) jediné zpracovává první dvě položky v dávce, což se očekává, ale pokud existuje více položek, je nutné je spotřebovat. Následující kód využívá celý datový rámec.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
do_nothing Funkce zde tiše ignoruje zbytek datového rámce.
Řešení chyby v dávkovém procesu
Při spuštění foreachBatch procesu může dojít k chybě. Můžete mít následující kód (v tomto případě ukázka záměrně vyvolá chybu, aby se ukázal problém).
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Při zpracování chyby (a jejím tichém potlačení) nemusí být zbytek dávky zpracován. Existují dvě možnosti pro zpracování této situace.
Nejprve můžete znovu vyvolat chybu, která ji předá do vrstvy orchestrace, aby se dávka zkusila znovu. Toto by mohlo vyřešit chybu, pokud jde o přechodný problém, nebo ji předat vašemu provoznímu týmu, aby se pokusili chybu opravit ručně. Uděláte to tak, že změníte partial_func kód tak, aby vypadal takto:
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
Druhou možností, jak zachytit výjimku a ignorovat zbytek dávky, je upravit kód takto.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Tento kód používá funkci do_nothing k tomu, aby tiše ignoroval zbytek dávky.