Share via


Set up real-time mode

This page describes the prerequisites and configuration needed to run real-time mode queries in Structured Streaming. For a step-by-step tutorial, see Tutorial: Run a real-time streaming workload. For conceptual information about real-time mode, see Real-time mode in Structured Streaming.

Prerequisites

To use real-time mode, you must configure your compute to meet the following requirements:

  • Use dedicated access mode on classic compute. Standard access mode, Lakeflow Spark Declarative Pipelines, and serverless clusters are not supported.
  • Use Databricks Runtime 16.4 LTS and above.
  • Turn off autoscaling.
  • Turn off Photon.
  • Set spark.databricks.streaming.realTimeMode.enabled to true.
  • Turn off spot instances to avoid interruptions.

For instructions on creating and configuring classic compute, see Compute configuration reference.

Query configuration

To run a query in real-time mode, you must enable the real-time trigger. Real-time triggers are supported only in update mode.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Compute sizing

You can run one real-time job per compute resource if the compute has enough task slots.

To run in low-latency mode, the total number of available task slots must be greater than or equal to the number of tasks across all query stages.

Slot calculation examples

Pipeline type Configuration Required slots
Single-stage stateless (Kafka source + sink) maxPartitions = 8 8 slots
Two-stage stateful (Kafka source + shuffle) maxPartitions = 8, shuffle partitions = 20 28 slots (8 + 20)
Three-stage (Kafka source + shuffle + repartition) maxPartitions = 8, two shuffle stages of 20 each 48 slots (8 + 20 + 20)

If you don't set maxPartitions, use the number of partitions in the Kafka topic.

Additional resources