Production considerations for Structured Streaming
This article contains recommendations for scheduling Structured Streaming workloads using jobs on Azure Databricks.
Databricks recommends always doing the following:
- Remove unnecessary code from notebooks that would return results, such as
display
andcount
. - Do not run Structured Streaming workloads using all-purpose compute. Always schedule streams as jobs using jobs compute.
- Schedule jobs using
Continuous
mode. - Do not enable auto-scaling for compute for Structured Streaming jobs.
Some workloads benefit from the following:
- Configure RocksDB state store on Azure Databricks
- Asynchronous state checkpointing for stateful queries
- What is asynchronous progress tracking?
Azure Databricks has introduced Delta Live Tables to reduce the complexities of managing production infrastructure for Structured Streaming workloads. Databricks recommends using Delta Live Tables for new Structured Streaming pipelines. See What is Delta Live Tables?.
Note
Compute auto-scaling has limitations scaling down cluster size for Structured Streaming workloads. Databricks recommends using Delta Live Tables with enhanced autoscaling for streaming workloads. See Optimize the cluster utilization of Delta Live Tables pipelines with enhanced autoscaling.
Design streaming workloads to expect failure
Databricks recommends always configuring streaming jobs to automatically restart on failure. Some functionality, including schema evolution, assumes that Structured Streaming workloads are configured to retry automatically. See Configure Structured Streaming jobs to restart streaming queries on failure.
Some operations like foreachBatch
provide at-least-once rather than exactly-once guarantees. For these operations, you should make that your processing pipeline is idempotent. See Use foreachBatch to write to arbitrary data sinks.
Note
When a query restarts, the micro-batch planned during the previous run processes. If your job failed due to an out-of-memory error or you manually canceled a job due to an oversized micro-batch, you might need to scale up the compute to successfully process the micro-batch.
If you change configurations between runs, these configurations apply to the first new batch planned. See Recover after changes in a Structured Streaming query.
When does a job retry?
You can schedule multiple tasks as part of a Azure Databricks job. When you configure a job using the continuous trigger, you cannot set dependencies between tasks.
You might choose to schedule multiple streams in a single job using one of the following approaches:
- Multiple tasks: Define a job with multiple tasks that run streaming workloads using the continuous trigger.
- Multiple queries: Define multiple streaming queries in the source code for a single task.
You can also combine these strategies. The following table compares these approaches.
Multiple tasks | Multiple queries | |
---|---|---|
How is compute shared? | Databricks recommends deploying jobs compute appropriately sized to each streaming task. You can optionally share compute across tasks. | All queries share the same compute. You can optional assign queries to scheduler pools. |
How are retries handled? | All tasks must fail before the job retries. | The task retries if any query fails. |
Configure Structured Streaming jobs to restart streaming queries on failure
Databricks recommends configuring all streaming workloads using the continuous trigger. See Run jobs continuously.
The continuous trigger provides the following behavior by default:
- Prevents more than one concurrent run of the job.
- Starts a new run when a previous run fails.
- Uses exponential backoff for retries.
Databricks recommends always using jobs compute instead of all-purpose compute when scheduling workflows. On job failure and retry, new compute resources deploy.
Note
You do not need to use streamingQuery.awaitTermination()
or spark.streams.awaitAnyTermination()
. Jobs automatically prevent a run from completing when a streaming query is active.
Use scheduler pools for multiple streaming queries
You can configure schedule pools to assign compute capacity to queries when running multiple streaming queries from the same source code.
By default, all queries started in a notebook run in the same fair scheduling pool. Apache Spark jobs generated by triggers from all of the streaming queries in a notebook run one after another in “first in, first out” (FIFO) order. This can cause unnecessary delays in the queries, because they are not efficiently sharing the cluster resources.
Scheduler pools allow you to declare which Structured Streaming queries share compute resources.
The following example assigns query1
to a dedicated pool, while query2
and query3
share a scheduler pool.
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")
# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")
# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
Note
The local property configuration must be in the same notebook cell where you start your streaming query.
See Apache fair scheduler documentation for more details.