Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
This article discusses using foreachBatch
with Structured Streaming to write the output of a streaming query to data sources that do not have an existing streaming sink.
The code pattern streamingDF.writeStream.foreachBatch(...)
allows you to apply batch functions to the output data of every micro-batch of the streaming query. Functions used with foreachBatch
take two parameters:
- A DataFrame that has the output data of a micro-batch.
- The unique ID of the micro-batch.
You must use foreachBatch
for Delta Lake merge operations in Structured Streaming. See Upsert from streaming queries using foreachBatch
.
Apply additional DataFrame operations
Many DataFrame and Dataset operations are not supported in streaming DataFrames because Spark does not support generating incremental plans in those cases. Using foreachBatch()
you can apply some of these operations on each micro-batch output. For example, you can use foreachBatch()
and the SQL MERGE INTO
operation to write the output of streaming aggregations into a Delta table in update mode. See more details in MERGE INTO.
Important
foreachBatch()
provides only at-least-once write guarantees. However, you can use thebatchId
provided to the function as way to deduplicate the output and get an exactly-once guarantee. In either case, you will have to reason about the end-to-end semantics yourself.foreachBatch()
does not work with the continuous processing mode as it fundamentally relies on the micro-batch execution of a streaming query. If you write data in continuous mode, useforeach()
instead.- When using
foreachBatch
with a stateful operator, it is important to completely consume each batch before processing is complete. See Completely consume each batch DataFrame
An empty dataframe can be invoked with foreachBatch()
and user code needs to be resilient to allow for proper operation. An example is shown here:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Behavior changes for foreachBatch
in Databricks Runtime 14.0
In Databricks Runtime 14.0 and above on compute configured with standard access mode, the following behavior changes apply:
print()
commands write output to the driver logs.- You cannot access the
dbutils.widgets
submodule inside the function. - Any files, modules, or objects referenced in the function must be serializable and available on Spark.
Reuse existing batch data sources
Using foreachBatch()
, you can use existing batch data writers for data sinks that might not have Structured Streaming support. Here are a few examples:
Many other batch data sources can be used from foreachBatch()
. See Connect to data sources and external services.
Write to multiple locations
If you need to write the output of a streaming query to multiple locations, Databricks recommends using multiple Structured Streaming writers for best parallelization and throughput.
Using foreachBatch
to write to multiple sinks serializes the execution of streaming writes, which can increase latency for each micro-batch.
If you do use foreachBatch
to write to multiple Delta tables, see Idempotent table writes in foreachBatch
.
Completely consume each batch DataFrame
When you are using stateful operators (for example, using dropDuplicatesWithinWatermark
), each batch iteration must consume the entire DataFrame or restart the query. If you do not consume the entire DataFrame, the streaming query will fail with the next batch.
This can happen in several cases. The following examples show how to fix queries that do not correctly consume a DataFrame.
Intentionally using a subset of the batch
If you only care about a subset of the batch, you could have code such as the following.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
In this case, the batch_df.show(2)
only handles the first two items in the batch, which is expected, but if there are more items, they must be consumed. The following code consumes the full DataFrame.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Here, the do_nothing
function silently ignores the rest of the DataFrame.
Handling an error in a batch
There could be an error while running a foreachBatch
process. You can have code such as the following (in this case, the sample intentionally raises an error to show the issue).
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
By handling (and silently swallowing) the error, the rest of the batch may not be consumed. There are two options for handling this situation.
First, you could re-raise the error, which passes it up to your orchestration layer to retry the batch. This could solve the error, if it is a transient issue, or raise it for your operations team to try to manually fix. To do this, change the partial_func
code to look like this:
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
The second option, if you want to catch the exception and ignore the rest of the batch, is to change the code to this.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
This code uses the do_nothing
function to silently ignore the rest of the batch.