Use foreachBatch to write to arbitrary data sinks
This article discusses using foreachBatch
with Structured Streaming to write the output of a streaming query to data sources that do not have an existing streaming sink.
The code pattern streamingDF.writeStream.foreachBatch(...)
allows you to apply batch functions to the output data of every micro-batch of the streaming query. Functions used with foreachBatch
take two parameters:
- A DataFrame that has the output data of a micro-batch.
- The unique ID of the micro-batch.
You must use foreachBatch
for Delta Lake merge operations in Structured Streaming. See Upsert from streaming queries using foreachBatch.
Apply additional DataFrame operations
Many DataFrame and Dataset operations are not supported in streaming DataFrames because Spark does not support generating incremental plans in those cases. Using foreachBatch()
you can apply some of these operations on each micro-batch output. For example, you can use foreachBath()
and the SQL MERGE INTO
operation to write the output of streaming aggregations into a Delta table in update mode. See more details in MERGE INTO.
Important
foreachBatch()
provides only at-least-once write guarantees. However, you can use thebatchId
provided to the function as way to deduplicate the output and get an exactly-once guarantee. In either case, you will have to reason about the end-to-end semantics yourself.foreachBatch()
does not work with the continuous processing mode as it fundamentally relies on the micro-batch execution of a streaming query. If you write data in continuous mode, useforeach()
instead.
An empty dataframe can be invoked with foreachBatch()
and user code needs to be resilient to allow for proper operation. An example is shown here:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Behavior changes for foreachBatch
in Databricks Runtime 14.0
In Databricks Runtime 14.0 and above on compute configured with shared access mode, forEachBatch
runs in a separate isolated Python process on Apache Spark, rather than in the REPL environment. It is serialized and pushed to Spark and does not have access to global spark
objects for the duration of the session.
In all other compute configurations, foreachBatch
runs in the same Python REPL that runs the rest of your code. As a result, the function is not serialized.
When you use Databricks Runtime 14.0 and above on compute configured with shared access mode, you must use the sparkSession
variable scoped to the local DataFrame when using foreachBatch
in Python, as in the following code example:
def example_function(df, batch_id):
df.sparkSession.sql("<query>")
The following behavior changes apply:
- You cannot access any global Python variables from within your function.
print()
commands write output to the driver logs.- Any files, modules, or objects referenced in the function must be serializable and available on Spark.
Reuse existing batch data sources
Using foreachBatch()
, you can use existing batch data writers for data sinks that might not have Structured Streaming support. Here are a few examples:
Many other batch data sources can be used from foreachBatch()
. See Connect to data sources.
Write to multiple locations
If you need to write the output of a streaming query to multiple locations, Databricks recommends using multiple Structured Streaming writers for best parallelization and throughput.
Using foreachBatch
to write to multiple sinks serializes the execution of streaming writes, which can increase latency for each micro-batch.
If you do use foreachBatch
to write to multiple Delta tables, see Idempotent table writes in foreachBatch.
Σχόλια
https://aka.ms/ContentUserFeedback.
Σύντομα διαθέσιμα: Καθ' όλη τη διάρκεια του 2024 θα καταργήσουμε σταδιακά τα ζητήματα GitHub ως μηχανισμό ανάδρασης για το περιεχόμενο και θα το αντικαταστήσουμε με ένα νέο σύστημα ανάδρασης. Για περισσότερες πληροφορίες, ανατρέξτε στο θέμα:Υποβολή και προβολή σχολίων για