Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
This page describes the prerequisites and configuration needed to run real-time mode queries in Structured Streaming. For a step-by-step tutorial, see Tutorial: Run a real-time streaming workload. For conceptual information about real-time mode, see Real-time mode in Structured Streaming.
Prerequisites
To use real-time mode, you must configure your compute to meet the following requirements:
- Use dedicated access mode on classic compute. Standard access mode, Lakeflow Spark Declarative Pipelines, and serverless clusters are not supported.
- Use Databricks Runtime 16.4 LTS and above.
- Turn off autoscaling.
- Turn off Photon.
- Set
spark.databricks.streaming.realTimeMode.enabledtotrue. - Turn off spot instances to avoid interruptions.
For instructions on creating and configuring classic compute, see Compute configuration reference.
Query configuration
To run a query in real-time mode, you must enable the real-time trigger. Real-time triggers are supported only in update mode.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Compute sizing
You can run one real-time job per compute resource if the compute has enough task slots.
To run in low-latency mode, the total number of available task slots must be greater than or equal to the number of tasks across all query stages.
Slot calculation examples
| Pipeline type | Configuration | Required slots |
|---|---|---|
| Single-stage stateless (Kafka source + sink) | maxPartitions = 8 |
8 slots |
| Two-stage stateful (Kafka source + shuffle) | maxPartitions = 8, shuffle partitions = 20 |
28 slots (8 + 20) |
| Three-stage (Kafka source + shuffle + repartition) | maxPartitions = 8, two shuffle stages of 20 each |
48 slots (8 + 20 + 20) |
If you don't set maxPartitions, use the number of partitions in the Kafka topic.