Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Den här sidan visar hur du använder foreachBatch structured streaming för att skriva utdata från en strömmande fråga till datakällor som inte har någon befintlig direktuppspelningsmottagare.
Med kodmönstret streamingDF.writeStream.foreachBatch(...) kan du använda batchfunktioner för utdata för varje mikrobatch i strömningsfrågan. Funktioner som används med foreachBatch ta två parametrar:
- En dataram som har utdata från en mikrobatch.
- Mikrobatchens unika ID.
Du måste använda foreachBatch för Delta Lake-sammanslagningsåtgärder i Strukturerad direktuppspelning. Se Upsert från strömmande frågor med hjälp av foreachBatch.
Tillämpa ytterligare DataFrame-åtgärder
Många DataFrame- och Dataset-åtgärder stöds inte i strömmande DataFrames eftersom Spark inte stöder generering av inkrementella planer i dessa fall. Med hjälp av foreachBatch() kan du tillämpa vissa av dessa åtgärder på utdata för varje mikrobatch. Du kan till exempel använda foreachBatch() och ÅTGÄRDEN SQL MERGE INTO för att skriva utdata från strömmande aggregeringar till en Delta-tabell i uppdateringsläge. Mer information finns i MERGE INTO.
Viktigt!
-
foreachBatch()ger endast skrivgarantier för minst en gång. Du kan dock användabatchIdsom tillhandahålls till funktionen som ett sätt att deduplicera utdata och få en exakt en gång-garanti. I båda fallen måste du resonera om semantiken från slutpunkt till slutpunkt själv. -
foreachBatch()fungerar inte med kontinuerligt bearbetningsläge eftersom det i grunden förlitar sig på utförandet av mikrobatchar i en strömmande fråga. Om du skriver data i kontinuerligt läge använder duforeach()i stället. - När du använder
foreachBatchmed en tillståndskänslig operator är det viktigt att använda varje batch helt innan bearbetningen är klar. Se Förbruka varje batch DataFrame helt
Hantera tomma dataramar
foreachBatch() kan få en tom DataFrame och koden måste hantera det här scenariot. Annars kan frågan misslyckas.
När Delta Lake till exempel är strömningskällan kan dessa scenarier skicka en tom DataFrame till foreachBatch():
-
OPTIMIZEutan filer att bearbeta: När enOPTIMIZEåtgärd körs på Delta Lake-källtabellen men inga filer finns att bearbeta, skriver Structured Streaming en offset-loggpost för att öka tabellversionen. Detta skapar en tom mikrobatch på dataavlastaren trots att inga filer läses. - Filrensning på den fysiska plannivån: Om predikatsänkningsoptimering eller filrensning eliminerar alla poster på den fysiska plannivån, blir resultatet ett tomt commit till mottagaren.
Användarkoden måste hantera tomma DataFrames för att möjliggöra korrekt åtgärd. Se exemplen nedan:
Python
def process_batch(output_df, batch_id):
# Process valid DataFrames only
if not output_df.isEmpty():
# business logic
pass
streamingDF.writeStream.foreachBatch(process_batch).start()
Scala
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid DataFrames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Beteendeändringar för foreachBatch i Databricks Runtime 14.0
I Databricks Runtime 14.0 och senare vid beräkning som konfigurerats med standardåtkomstläge gäller följande beteendeändringar:
-
print()kommandon skriver utdata till drivrutinsloggarna. - Du kan inte komma åt undermodulen
dbutils.widgetsi funktionen. - Alla filer, moduler eller objekt som refereras till i funktionen måste vara serialiserbara och tillgängliga på Spark.
Återanvända befintliga batchdatakällor
Med foreachBatch() kan du använda befintliga batchdataskrivare för datamottagare som kanske inte har stöd för Structured Streaming. Några exempel:
Många andra batchdatakällor kan användas från foreachBatch(). Se Ansluta till datakällor och externa tjänster.
Skriva till flera platser
Om du behöver skriva utdata från en strömmande fråga till flera platser rekommenderar Databricks att du använder flera strukturerade strömningsskrivare för bästa parallellisering och dataflöde.
Om du använder foreachBatch för att skriva till flera mottagare serialiseras körningen av strömmande skrivningar, vilket kan öka svarstiden för varje mikrobatch.
Om du använder foreachBatch för att skriva till flera Delta-tabeller, se Använd foreachBatch för idempotenta tabellskrivningar.
Förbruka varje batchdataram helt
När du använder tillståndskänsliga operatorer (till exempel använder dropDuplicatesWithinWatermark) måste varje batch-iteration använda hela DataFrame eller starta om frågan. Om du inte använder hela DataFrame misslyckas strömningsfrågan med nästa batch.
Detta kan inträffa i flera fall. I följande exempel visas hur du åtgärdar frågor som inte använder en dataram korrekt.
Avsiktligt använda en delmängd av batchen
Om du bara bryr dig om en delmängd av batchen kan du ha kod som följande.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
I det här fallet hanterar den batch_df.show(2) enda de två första objekten i batchen, vilket är förväntat, men om det finns fler objekt måste de förbrukas. Följande kod använder hela DataFrame.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row):
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
do_nothing Här ignorerar funktionen tyst resten av DataFrame.
Hantera ett fel i en sats
För felhantering i foreachBatchrekommenderar Databricks att du tillåter att strömningsfrågan misslyckas snabbt och i stället förlitar dig på orkestreringslagret, till exempel Lakeflow-jobb eller Apache Airflow, för att hantera logiken för återförsök. Det här är mycket säkrare än att skapa komplexa återförsöksloopar i koden, där dataförlust kan inträffa.
Här följer riktlinjer som baseras på ditt skrivmål:
| Mål | Exempel | Riktlinjer |
|---|---|---|
| DataFrame-åtgärder | Delta Lake-tabeller | Du måste använda skriv-alternativen txnAppId och txnVersion, binda txnVersion till batchId, för att garantera idempotens och skydda datakorrekthet vid återförsök. Fånga inte och hantera inte undantag lokalt. I stället rekommenderar Databricks att du tillåter att fel sprids så att Spark-måtten förblir korrekta, att data inte dupliceras och att orkestratorn kan göra ett nytt försök med hela batchen. |
| Anpassad kod och externa destinationer |
.collect(), OLTP-databaser, meddelandeköer, API:er |
Implementera din egen idempotens. Du måste anta att alla åtgärder kan och kommer att göras om mellan batchar. Om den batchId förblir densamma måste resultatet av åtgärden förbli detsamma. Du kan försöka igen med rent tillfälliga fel, till exempel korta tidsgränser för anslutning, men var mycket noga med att undvika partiella eller duplicerade skrivningar om omförsöket misslyckas i slutändan. Den säkraste metoden är att låta fel spridas och låta orkestratorn försöka igen i hela batchen. |
Här är några exempel på undantagstyper och rekommendationer för hur du hanterar dem i foreachBatch:
| Undantagstyp | Exempel | Rekommenderad åtgärd |
|---|---|---|
| Tillfälliga sinkfel |
SQLTransientConnectionException, HTTP 429, tidsgränser |
Catch: försök igen eller skicka till en dödbrevkö |
| Dubblett- eller nyckelbegränsningsöverträdelser när mottagaren är idempotent | SQLIntegrityConstraintViolationException |
Fånga: logga och undertrycka |
| Anpassade återförsöksbara fel | Undantag för omsluten socket, databasfel som kan göras om | Catch: öka mått och tillåta kontrollerad fortsättning |
| Logik- eller schemafel |
NullPointerException, AttributeError, schemamisstag |
Propagera: Låt Spark misslyckas med frågan |
| Icke-återförsöksbara hållfelfel eller oupptäckta logikbuggar |
ValueError, PermissionError |
Propagera: Låt Spark misslyckas med frågan |
| Kritiska fel |
OutOfMemoryError, skadat tillstånd, dataintegritetsöverträdelser |
Propagera: Låt Spark misslyckas med frågan |
Kodexempel: undantagshantering
Följande exempel skapar avsiktligt ett fel i foreach för att visa olika metoder för att hantera felet:
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Koden ovan hanterar och undertrycker felet tyst och förbrukar kanske inte resten av batchen. Det finns två alternativ för att hantera den här situationen.
Först kan du återskapa felet, vilket skickar det upp till orkestreringsskiktet för att försöka batchen igen. Detta kan lösa felet om det är ett tillfälligt problem, eller eskalera det till ditt driftsteam för att försöka åtgärda det manuellt. Det gör du genom partial_func att ändra koden så här:
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
För det andra, om du vill fånga undantaget och ignorera resten av batchen, kan du ändra koden så att funktionen do_nothing används för att ignorera resten av batchen tyst.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row):
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()