Dela via


Använda foreachBatch för att skriva till godtyckliga datamottagare

Den här artikeln beskriver hur du använder foreachBatch structured streaming för att skriva utdata från en strömmande fråga till datakällor som inte har någon befintlig direktuppspelningsmottagare.

Med kodmönstret streamingDF.writeStream.foreachBatch(...) kan du använda batchfunktioner för utdata för varje mikrobatch i strömningsfrågan. Funktioner som används med foreachBatch ta två parametrar:

  • En dataram som har utdata från en mikrobatch.
  • Mikrobatchens unika ID.

Du måste använda foreachBatch för Delta Lake-sammanslagningsåtgärder i Strukturerad direktuppspelning. Se Upsert från strömmande frågor med hjälp av foreachBatch.

Tillämpa ytterligare DataFrame-åtgärder

Många DataFrame- och Dataset-åtgärder stöds inte i strömmande DataFrames eftersom Spark inte stöder generering av inkrementella planer i dessa fall. Med hjälp foreachBatch() av kan du tillämpa några av dessa åtgärder på varje mikrobatchutdata. Du kan till exempel använda foreachBatch() och ÅTGÄRDEN SQL MERGE INTO för att skriva utdata från strömmande aggregeringar till en Delta-tabell i uppdateringsläge. Mer information finns i MERGE INTO.

Viktigt!

  • foreachBatch() ger endast skrivgarantier minst en gång. Du kan dock använda batchId som tillhandahålls till funktionen som ett sätt att deduplicera utdata och få en exakt en gång-garanti. I båda fallen måste du resonera om semantiken från slutpunkt till slutpunkt själv.
  • foreachBatch()fungerar inte med läget för kontinuerlig bearbetning eftersom det i grunden förlitar sig på mikrobatchkörningen av en strömmande fråga. Om du skriver data i kontinuerligt läge använder du foreach() i stället.
  • När du använder foreachBatch med en tillståndskänslig operator är det viktigt att använda varje batch helt innan bearbetningen är klar. Se Förbruka varje batch DataFrame helt

En tom dataram kan anropas med foreachBatch() och användarkoden måste vara elastisk för att möjliggöra korrekt åtgärd. Ett exempel på detta visas här:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Beteendeändringar för foreachBatch i Databricks Runtime 14.0

I Databricks Runtime 14.0 och senare vid beräkning som konfigurerats med standardåtkomstläge gäller följande beteendeändringar:

  • print() kommandon skriver utdata till drivrutinsloggarna.
  • Du kan inte komma åt undermodulen dbutils.widgets i funktionen.
  • Alla filer, moduler eller objekt som refereras till i funktionen måste vara serialiserbara och tillgängliga på Spark.

Återanvända befintliga batchdatakällor

Med kan foreachBatch()du använda befintliga batchdataskrivare för datamottagare som kanske inte har stöd för structured streaming. Några exempel:

Många andra batchdatakällor kan användas från foreachBatch(). Se Ansluta till datakällor och externa tjänster.

Skriva till flera platser

Om du behöver skriva utdata från en strömmande fråga till flera platser rekommenderar Databricks att du använder flera strukturerade strömningsskrivare för bästa parallellisering och dataflöde.

Om du använder foreachBatch för att skriva till flera mottagare serialiseras körningen av strömmande skrivningar, vilket kan öka svarstiden för varje mikrobatch.

Om du använder foreachBatch för att skriva till flera Delta-tabeller, se Idempotenta tabellskrivningar i foreachBatch.

Förbruka varje batchdataram helt

När du använder tillståndskänsliga operatorer (till exempel använder dropDuplicatesWithinWatermark) måste varje batch-iteration använda hela DataFrame eller starta om frågan. Om du inte använder hela DataFrame misslyckas strömningsfrågan med nästa batch.

Detta kan inträffa i flera fall. I följande exempel visas hur du åtgärdar frågor som inte använder en dataram korrekt.

Avsiktligt använda en delmängd av batchen

Om du bara bryr dig om en delmängd av batchen kan du ha kod som följande.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def partial_func(batch_df, batch_id):
  batch_df.show(2)

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

I det här fallet hanterar den batch_df.show(2) enda de två första objekten i batchen, vilket är förväntat, men om det finns fler objekt måste de förbrukas. Följande kod använder hela DataFrame.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

# function to do nothing with a row
def do_nothing(row)
  pass

def partial_func(batch_df, batch_id):
  batch_df.show(2)
  batch_df.foreach(do_nothing) # silently consume the rest of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

do_nothing Här ignorerar funktionen tyst resten av DataFrame.

Hantera ett fel i en sats

Det kan uppstå ett fel när en foreachBatch process körs. Du kan ha kod som följande (i det här fallet skapar exemplet avsiktligt ett fel för att visa problemet).

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have


q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Genom att hantera (och tyst svälja) felet kan resten av batchen inte förbrukas. Det finns två alternativ för att hantera den här situationen.

Först kan du återskapa felet, vilket skickar det upp till orkestreringsskiktet för att försöka batchen igen. Detta kan lösa felet, om det är ett tillfälligt problem, eller så kan det eskaleras till ditt driftteam för att försöka åtgärda det manuellt. Det gör du genom partial_func att ändra koden så här:

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    raise e # re-raise the issue

Det andra alternativet, om du vill fånga undantaget och ignorera resten av batchen, är att ändra koden till detta.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

# function to do nothing with a row
def do_nothing(row)
    pass

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    batch_df.foreach(do_nothing) # silently consume the remainder of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Den här koden använder do_nothing funktionen för att ignorera resten av batchen tyst.