Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Tato stránka ukazuje, jak použít foreachBatch se strukturovaným streamováním k zápisu výstupu dotazu streamování do zdrojů dat, které nemají existující jímku streamování.
Vzor streamingDF.writeStream.foreachBatch(...) kódu umožňuje použít dávkové funkce na výstupní data každého mikrodávkového streamovacího dotazu. Funkce používané s foreachBatch mají dva parametry:
- Datový rámec, který obsahuje výstupní data mikrodávky.
- Jedinečné ID mikrodávky.
Musíte pro operace sloučení v Delta Lake použít foreachBatch ve strukturovaném streamování. Zobrazit Upsert ze streamovaných dotazů pomocí foreachBatch.
Použití dalších operací datového rámce
V streamovaných datových rámcích se nepodporuje mnoho operací datových rámců a datových sad, protože Spark v těchto případech nepodporuje generování přírůstkových plánů. Pomocí foreachBatch() můžete některé z těchto operací použít na každém výstupu mikrodávky. Můžete například použít foreachBatch() a operaci SQL MERGE INTO k zápisu výstupu agregací streamování do tabulky Delta v režimu aktualizace. Další podrobnosti najdete v MERGE INTO.
Důležité
-
foreachBatch()poskytuje záruky zápisu alespoň jednou. Můžete ale použítbatchIdposkytnuté funkci jako prostředek k odstranění duplicitních dat ve výstupu a získání záruky, že se duplikace stanou jen jednou. V obou případech budete muset sami uvažovat o koncových sémantikách. -
foreachBatch()nefunguje s režimem průběžného zpracování, protože se v podstatě spoléhá na mikrodávkové spouštění streamovacího dotazu. Pokud zapisujete data v nepřetržitém režimu, použijteforeach()místo toho. - Při použití
foreachBatchse stavovým operátorem je důležité před dokončením zpracování zcela využívat každou dávku. Zobrazit úplné využití jednotlivých dávkových datových rámců
Zpracování prázdných datových rámců
foreachBatch() může obdržet prázdný datový rámec a váš kód musí tento scénář zpracovat. Jinak může váš dotaz selhat.
Pokud je například zdrojem streamování Delta Lake, mohou tyto scénáře předat prázdný datový rámec:foreachBatch()
-
OPTIMIZEbez souborů ke zpracování: Pokud operaceOPTIMIZEprobíhá v tabulce Delta Lake, ale není žádný soubor k zpracování, Structured Streaming zapíše do protokolu posunu položku, čímž dojde ke zvýšení verze tabulky. Tím se v jímce vytvoří prázdná mikrodávka, i když se nečtou žádné soubory. - Vyřazení souborů na úrovni fyzického plánu: Pokud zasunutí predikátu nebo odstranění souboru eliminuje všechny záznamy na úrovni fyzického plánu, výsledkem je prázdný záznam do cílového úložiště.
Uživatelský kód musí zpracovávat prázdné datové rámce, aby umožňoval správné operace. Podívejte se na následující příklady:
Python
def process_batch(output_df, batch_id):
# Process valid DataFrames only
if not output_df.isEmpty():
# business logic
pass
streamingDF.writeStream.foreachBatch(process_batch).start()
Scala
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid DataFrames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Změny chování v foreachBatch Databricks Runtime 14.0
Ve službě Databricks Runtime 14.0 a novějších na výpočetních prostředcích nakonfigurovaných pomocí standardního režimu přístupu platí následující změny chování:
-
print()příkazy zapisují výstup do protokolů ovladačů. - K dílčímu
dbutils.widgetsmodulu uvnitř funkce nelze získat přístup. - Všechny soubory, moduly nebo objekty odkazované ve funkci musí být serializovatelné a dostupné ve Sparku.
Opakované použití existujících dávkových zdrojů dat
Pomocí foreachBatch(), můžete použít existující dávkové zapisovače dat pro jímky dat, které nemusí mít podporu strukturovaného streamování. Tady je pár příkladů:
Mnoho dalších dávkových zdrojů dat lze použít z foreachBatch(). Viz Připojení ke zdrojům dat a externím službám.
Zápis do více lokalit
Pokud potřebujete napsat výstup streamovacího dotazu do více umístění, databricks doporučuje používat více zapisovačů strukturovaného streamování pro zajištění nejlepší paralelizace a propustnosti.
Použití foreachBatch k zápisu do více cílů serializuje provádění streamovacích zápisů, což může zvýšit latenci pro každou mikro dávku.
Pokud se používá foreachBatch k zápisu do více tabulek Delta, přečtěte si téma Použití foreachBatch pro zápisy z idempotentní tabulky.
Úplné využití každého dávkového datového rámce
Pokud používáte stavové operátory (například pomocí dropDuplicatesWithinWatermark), musí každá dávková iterace využívat celý datový rámec nebo restartovat dotaz. Pokud nevyužijete celý DataFrame, streamovací dotaz selže při zpracování další dávky.
K tomu může dojít v několika případech. Následující příklady ukazují, jak opravit dotazy, které nevyužívají datový rámec správně.
Úmyslné použití podmnožiny sady
Pokud vás zajímá jenom podmnožina dávky, můžete mít kód, například následující.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
V tomto případě batch_df.show(2) jediné zpracovává první dvě položky v dávce, což se očekává, ale pokud existuje více položek, je nutné je spotřebovat. Následující kód využívá celý datový rámec.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row):
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
do_nothing Funkce zde tiše ignoruje zbytek datového rámce.
Řešení chyby v dávkovém procesu
Pro zpracování chyb v foreachBatch Databricks doporučuje, abyste umožnili rychlé ukončení streamovacího dotazu a místo toho se spoléhali na vrstvu orchestrace, jako jsou Lakeflow úlohy nebo Apache Airflow, ke správě opakované logiky. Je to mnohem bezpečnější než vytváření složitých smyček opakování v kódu, kdy může dojít ke ztrátě dat.
Tady jsou pokyny založené na vašem cílovém textu:
| Target | Příklady | Pokyny |
|---|---|---|
| Operace datového rámce | Tabulky Delta Lake | K zajištění idempotence a ochrany správnosti dat při opakování musíte použít možnosti zápisu txnAppId a txnVersion a svázat txnVersion s batchId. Nezachycujte a nezkoušejte výjimky zpracovat místně. Místo toho Databricks doporučuje, abyste povolili šíření chyb, aby metriky Sparku zůstaly přesné, data se nezdvojovala a orchestrátor mohl čistě opakovat celou dávku. |
| Vlastní kód a externí cíle |
.collect(), databáze OLTP, fronty zpráv, rozhraní API |
Implementujte vlastní idempotenci. Musíte předpokládat, že jakákoli operace může a bude se opakovat napříč dávkami. Pokud batchId zůstane stejný, výsledek vaší operace musí zůstat stejný. Můžete opakovat čistě přechodné chyby, jako jsou krátké vypršení časového limitu připojení, ale v případě, že opakování nakonec selže, dávejte extrémní pozor, abyste se vyhnuli částečným nebo duplicitním zápisům. Nejbezpečnějším přístupem je umožnit šíření chyb a umožnit orchestrátoru opakování celé dávky. |
Tady je několik příkladů typů výjimek a doporučení pro jejich zpracování v foreachBatch:
| Typ výjimky | Příklady | Doporučená akce |
|---|---|---|
| Přechodné chyby jímky |
SQLTransientConnectionException, HTTP 429, časové limity |
Zachycení: opakování nebo odeslání do fronty nedoručených zpráv |
| Porušení omezení duplicity nebo klíčových omezení, pokud je cíl idempotentní. | SQLIntegrityConstraintViolationException |
Zachycení: záznam a potlačení |
| Vlastní chyby s možností opakování | Výjimky obaleného soketu, opakovatelné chyby databáze | Zachytit: zvýšit metriky a umožnit řízené pokračování |
| Chyby logiky nebo schématu |
NullPointerException, AttributeError, neshoda schématu |
Propagace: Spark nechť selže dotaz |
| Chyby jímky, které se nedají opakovat, nebo nezachycené chyby logiky |
ValueError, PermissionError |
Propagace: Spark nechť selže dotaz |
| Kritické chyby |
OutOfMemoryError, poškozený stav, porušení integrity dat |
Propagace: Spark nechť selže dotaz |
Příklady kódu: Zpracování výjimek
Následující příklady záměrně vyvolávají chybu foreach , aby se zobrazily různé přístupy pro zpracování chyby:
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Výše uvedený kód zpracovává a bezobslužně potlačí chybu a nemusí spotřebovávat zbytek dávky. Existují dvě možnosti pro zpracování této situace.
Nejprve můžete znovu vyvolat chybu, což ji předá do orchestrační vrstvy k opětovnému zpracování dávky. To může vyřešit chybu, pokud se jedná o přechodný problém, nebo ji předat provoznímu týmu, aby se pokusil o ruční opravu. Uděláte to tak, že změníte partial_func kód tak, aby vypadal takto:
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
Za druhé, pokud chcete zachytit výjimku a ignorovat zbytek dávky, můžete změnit kód, aby použil funkci do_nothing k tichému ignorování zbytku dávky.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row):
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()