Condividi tramite


Usare foreachBatch per scrivere in sink di dati arbitrari

Questo articolo illustra l'uso di foreachBatch con Structured Streaming per scrivere l'output di una query di streaming in origini dati che non dispongono di un sink di streaming esistente.

Il modello streamingDF.writeStream.foreachBatch(...) di codice consente di applicare funzioni batch ai dati di output di ogni micro batch della query di streaming. Le funzioni usate con foreachBatch accettano due parametri:

  • DataFrame con i dati di output di un micro batch.
  • ID univoco del micro batch.

È necessario usare foreachBatch per le operazioni di merge Delta Lake in Structured Streaming. Consulta Upsert dalle query di streaming utilizzando foreachBatch.

Applicare operazioni aggiuntive sul dataframe

Molte operazioni di dataframe e set di dati non sono supportate nei dataframe di streaming perché Spark non supporta la generazione di piani incrementali in questi casi. Usando foreachBatch() è possibile applicare alcune di queste operazioni a ogni output micro-batch. Ad esempio, è possibile usare foreachBatch() e l'operazione SQL MERGE INTO per scrivere l'output delle aggregazioni di streaming in una tabella Delta in modalità di aggiornamento. Per altri dettagli, vedere MERGE INTO.

Importante

  • foreachBatch() fornisce solo garanzie di scrittura at-least-once. Tuttavia, è possibile usare il batchId fornito alla funzione come modo per deduplicare l'output e ottenere una garanzia di esecuzione unica. In entrambi i casi, è necessario ragionare sulla semantica end-to-end manualmente.
  • foreachBatch() non funziona con la modalità di elaborazione continua perché si basa fondamentalmente sull'esecuzione micro batch di una query di streaming. Se si scrivono dati in modalità continua, usare foreach() invece .
  • Quando si usa foreachBatch con un operatore con stato, è importante utilizzare completamente ogni batch prima del completamento dell'elaborazione. Vedere Utilizzo completo di ogni dataframe batch

È possibile richiamare un dataframe vuoto con foreachBatch() e il codice utente deve essere resiliente per consentire un funzionamento corretto. Di seguito è riportato un esempio:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Modifiche del comportamento per foreachBatch in Databricks Runtime 14.0

In Databricks Runtime 14.0 e versioni successive nel calcolo configurato con la modalità di accesso standard si applicano le modifiche di comportamento seguenti:

  • I comandi print() scrivono l'output nei log del driver.
  • Non è possibile accedere al modulo secondario dbutils.widgets all'interno della funzione.
  • Tutti i file, i moduli o gli oggetti a cui si fa riferimento nella funzione devono essere serializzabili e disponibili in Spark.

Riutilizzare le origini dati batch esistenti

Usando foreachBatch(), è possibile usare writer di dati batch esistenti per sink di dati che potrebbero non avere il supporto di Structured Streaming. Ecco alcuni esempi:

Molte altre origini dati batch possono essere usate da foreachBatch(). Vedere Connettersi a origini dati e servizi esterni.

Scrivere in più posizioni

Se è necessario scrivere l'output di una query di streaming in più posizioni, Databricks consiglia di usare più writer di streaming strutturato per ottimizzare la parallelizzazione e la velocità effettiva.

L'uso foreachBatch di per scrivere in più sink serializza l'esecuzione di scritture di streaming, che può aumentare la latenza per ogni micro batch.

Se si utilizza foreachBatch per scrivere su più tabelle Delta, consultare Scritture idempotenti delle tabelle in foreachBatch.

Consumare completamente ogni batch DataFrame

Quando si usano operatori con stato , ad esempio usando dropDuplicatesWithinWatermark, ogni iterazione batch deve utilizzare l'intero dataframe o riavviare la query. Se non si utilizza l'intero dataframe, la query di streaming avrà esito negativo con il batch successivo.

Questo può verificarsi in diversi casi. Gli esempi seguenti illustrano come correggere le query che non utilizzano correttamente un dataframe.

Uso intenzionale di un subset del batch

Se si è preoccupati solo di un subset del batch, è possibile avere codice come il seguente.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def partial_func(batch_df, batch_id):
  batch_df.show(2)

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

In questo caso, gestisce batch_df.show(2) solo i primi due elementi del batch, che è previsto, ma se sono presenti più elementi, devono essere utilizzati. Il codice seguente usa il dataframe completo.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

# function to do nothing with a row
def do_nothing(row)
  pass

def partial_func(batch_df, batch_id):
  batch_df.show(2)
  batch_df.foreach(do_nothing) # silently consume the rest of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

In questo caso, la do_nothing funzione ignora automaticamente il resto del dataframe.

Gestione di un errore in un batch

Potrebbe verificarsi un errore durante l'esecuzione di un foreachBatch processo. È possibile avere codice come il seguente (in questo caso, l'esempio genera intenzionalmente un errore per visualizzare il problema).

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have


q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Gestendo (e ingoiando in modo invisibile all'utente) l'errore, il resto del batch potrebbe non essere utilizzato. Esistono due opzioni per gestire questa situazione.

Prima, è possibile generare nuovamente l'errore, il quale viene passato al livello di orchestrazione per ritentare il batch. Questo potrebbe risolvere l'errore, se si tratta di un problema temporaneo o generarlo per il team operativo per tentare di risolvere manualmente. A tale scopo, modificare il partial_func codice in modo che sia simile al seguente:

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    raise e # re-raise the issue

La seconda opzione, se si vuole intercettare l'eccezione e ignorare il resto del batch, consiste nel modificare il codice in questo modo.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

# function to do nothing with a row
def do_nothing(row)
    pass

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    batch_df.foreach(do_nothing) # silently consume the remainder of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Questo codice usa la do_nothing funzione per ignorare automaticamente il resto del batch.