Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questo articolo illustra l'uso di foreachBatch
con Structured Streaming per scrivere l'output di una query di streaming in origini dati che non dispongono di un sink di streaming esistente.
Il modello streamingDF.writeStream.foreachBatch(...)
di codice consente di applicare funzioni batch ai dati di output di ogni micro batch della query di streaming. Le funzioni usate con foreachBatch
accettano due parametri:
- DataFrame con i dati di output di un micro batch.
- ID univoco del micro batch.
È necessario usare foreachBatch
per le operazioni di merge Delta Lake in Structured Streaming. Consulta Upsert dalle query di streaming utilizzando foreachBatch
.
Applicare operazioni aggiuntive sul dataframe
Molte operazioni di dataframe e set di dati non sono supportate nei dataframe di streaming perché Spark non supporta la generazione di piani incrementali in questi casi. Usando foreachBatch()
è possibile applicare alcune di queste operazioni a ogni output micro-batch. Ad esempio, è possibile usare foreachBatch()
e l'operazione SQL MERGE INTO
per scrivere l'output delle aggregazioni di streaming in una tabella Delta in modalità di aggiornamento. Per altri dettagli, vedere MERGE INTO.
Importante
-
foreachBatch()
fornisce solo garanzie di scrittura at-least-once. Tuttavia, è possibile usare ilbatchId
fornito alla funzione come modo per deduplicare l'output e ottenere una garanzia di esecuzione unica. In entrambi i casi, è necessario ragionare sulla semantica end-to-end manualmente. -
foreachBatch()
non funziona con la modalità di elaborazione continua perché si basa fondamentalmente sull'esecuzione micro batch di una query di streaming. Se si scrivono dati in modalità continua, usareforeach()
invece . - Quando si usa
foreachBatch
con un operatore con stato, è importante utilizzare completamente ogni batch prima del completamento dell'elaborazione. Vedere Utilizzo completo di ogni dataframe batch
È possibile richiamare un dataframe vuoto con foreachBatch()
e il codice utente deve essere resiliente per consentire un funzionamento corretto. Di seguito è riportato un esempio:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Modifiche del comportamento per foreachBatch
in Databricks Runtime 14.0
In Databricks Runtime 14.0 e versioni successive nel calcolo configurato con la modalità di accesso standard si applicano le modifiche di comportamento seguenti:
- I comandi
print()
scrivono l'output nei log del driver. - Non è possibile accedere al modulo secondario
dbutils.widgets
all'interno della funzione. - Tutti i file, i moduli o gli oggetti a cui si fa riferimento nella funzione devono essere serializzabili e disponibili in Spark.
Riutilizzare le origini dati batch esistenti
Usando foreachBatch()
, è possibile usare writer di dati batch esistenti per sink di dati che potrebbero non avere il supporto di Structured Streaming. Ecco alcuni esempi:
Molte altre origini dati batch possono essere usate da foreachBatch()
. Vedere Connettersi a origini dati e servizi esterni.
Scrivere in più posizioni
Se è necessario scrivere l'output di una query di streaming in più posizioni, Databricks consiglia di usare più writer di streaming strutturato per ottimizzare la parallelizzazione e la velocità effettiva.
L'uso foreachBatch
di per scrivere in più sink serializza l'esecuzione di scritture di streaming, che può aumentare la latenza per ogni micro batch.
Se si utilizza foreachBatch
per scrivere su più tabelle Delta, consultare Scritture idempotenti delle tabelle in foreachBatch
.
Consumare completamente ogni batch DataFrame
Quando si usano operatori con stato , ad esempio usando dropDuplicatesWithinWatermark
, ogni iterazione batch deve utilizzare l'intero dataframe o riavviare la query. Se non si utilizza l'intero dataframe, la query di streaming avrà esito negativo con il batch successivo.
Questo può verificarsi in diversi casi. Gli esempi seguenti illustrano come correggere le query che non utilizzano correttamente un dataframe.
Uso intenzionale di un subset del batch
Se si è preoccupati solo di un subset del batch, è possibile avere codice come il seguente.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
In questo caso, gestisce batch_df.show(2)
solo i primi due elementi del batch, che è previsto, ma se sono presenti più elementi, devono essere utilizzati. Il codice seguente usa il dataframe completo.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
In questo caso, la do_nothing
funzione ignora automaticamente il resto del dataframe.
Gestione di un errore in un batch
Potrebbe verificarsi un errore durante l'esecuzione di un foreachBatch
processo. È possibile avere codice come il seguente (in questo caso, l'esempio genera intenzionalmente un errore per visualizzare il problema).
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Gestendo (e ingoiando in modo invisibile all'utente) l'errore, il resto del batch potrebbe non essere utilizzato. Esistono due opzioni per gestire questa situazione.
Prima, è possibile generare nuovamente l'errore, il quale viene passato al livello di orchestrazione per ritentare il batch. Questo potrebbe risolvere l'errore, se si tratta di un problema temporaneo o generarlo per il team operativo per tentare di risolvere manualmente. A tale scopo, modificare il partial_func
codice in modo che sia simile al seguente:
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
La seconda opzione, se si vuole intercettare l'eccezione e ignorare il resto del batch, consiste nel modificare il codice in questo modo.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Questo codice usa la do_nothing
funzione per ignorare automaticamente il resto del batch.