แชร์ผ่าน


Real-time mode in Structured Streaming

This page describes how to use real-time mode with Structured Streaming, including what it is and how it works.

For step-by-step setup instructions, see Get started with real-time mode. For code examples, see Real-time mode examples. For supported sources, sinks, operators, and limitations, see Real-time mode reference.

What is real-time mode?

Real-time mode is a trigger type for Structured Streaming that enables ultra-low latency data processing with end-to-end latency as low as five milliseconds. Use real-time mode for operational workloads that require immediate response to streaming data, such as fraud detection, real-time personalization, and instant decision-making systems.

Operational vs. analytical workloads

Streaming workloads can be broadly divided into operational workloads and analytical workloads:

  • Operational workloads consume real-time data, apply business logic, and trigger downstream actions or decisions.
  • Analytical workloads use data ingestion and transformation, typically following the medallion architecture (for example, ingesting data into the bronze, silver, and gold tables).

Some examples of operational workloads are:

  • Blocking or flagging a credit card transaction in real time if a fraud score exceeds a threshold, based on factors like unusual location, large transaction size, or rapid spending patterns.
  • Delivering a promotional message when clickstream data shows a user has been browsing for jeans for five minutes, offering a 25% discount if they purchase in the next 15 minutes.

In general, operational workloads are characterized by the need for sub-second end-to-end latency. This can be achieved with real-time mode in Apache Spark Structured Streaming.

How real-time mode achieves low latency

Real-time mode improves the execution architecture by:

  • Executing long-running batches (the default is five minutes), in which the system processes data as it becomes available in the source.
  • Scheduling all stages of the query simultaneously. This requires the number of available task slots to be equal to or greater than the number of tasks of all the stages in a batch.
  • Passing data between stages as soon as it is produced using a streaming shuffle.

At the end of processing a batch, and before the next batch starts, Structured Streaming checkpoints progress and publishes metrics. The batch duration affects checkpointing frequency:

  • Longer batches: Less frequent checkpointing, which means longer replays on failure and delayed metrics availability.
  • Shorter batches: More frequent checkpointing, which may affect latency.

Databricks recommends benchmarking real-time mode against your target workload to find the appropriate trigger interval.

When to use real-time mode

Choose real-time mode when your use case requires:

  • Sub-second latency: Applications that need to respond to data within milliseconds, such as fraud detection systems that must block transactions in real time.
  • Operational decision-making: Systems that trigger immediate actions based on incoming data, like real-time offers, alerts, or notifications.
  • Continuous processing: Workloads where data must be processed as soon as it arrives, rather than in periodic batches.

Use micro-batch mode (the default Structured Streaming trigger) when:

  • Analytical processing: ETL pipelines, data transformations, and medallion architecture implementations where latency requirements are measured in seconds or minutes.
  • Cost optimization: Workloads where sub-second latency is not required, as real-time mode requires dedicated compute resources.
  • The checkpoint frequency matters: Applications that benefit from more frequent checkpointing for faster recovery.

Requirements and configuration

Real-time mode has specific requirements for compute setup and query configuration. This section describes the prerequisites and configuration steps needed to use real-time mode.

Prerequisites

To use real-time mode, you must configure your compute to meet the following requirements:

  • Databricks Runtime 16.4 LTS or above: Real-time mode is only available in DBR 16.4 LTS and later versions.
  • Dedicated compute: You must use a dedicated (formerly single user) compute. Standard (formerly shared), Lakeflow Spark Declarative Pipelines, and serverless clusters are not supported.
  • Turn off autoscaling: Autoscaling is not supported.
  • Turn off Photon: Photon acceleration is not supported.
  • Spark configuration: You must set spark.databricks.streaming.realTimeMode.enabled to true.

For step-by-step instructions on creating and configuring compute for real-time mode, see Get started with real-time mode.

Query configuration

To run a query in real-time mode, you must enable the real-time trigger. Real-time triggers are supported only in update mode.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Compute sizing

You can run one real-time job per compute resource if the compute has enough task slots.

To run in low-latency mode, the total number of available task slots must be greater than or equal to the number of tasks across all query stages.

Slot calculation examples

Pipeline type Configuration Required slots
Single-stage stateless (Kafka source + sink) maxPartitions = 8 8 slots
Two-stage stateful (Kafka source + shuffle) maxPartitions = 8, shuffle partitions = 20 28 slots (8 + 20)
Three-stage (Kafka source + shuffle + repartition) maxPartitions = 8, two shuffle stages of 20 each 48 slots (8 + 20 + 20)

If you don't set maxPartitions, use the number of partitions in the Kafka topic.

Performance

For compute tuning guidance, latency optimization techniques, and query monitoring, see Optimize and monitor real-time mode query performance.

Feature support and limitations

For a complete list of supported environments, languages, compute types, sources, sinks, operators, and known limitations, see Real-time mode reference.

Next steps

Now that you understand what real-time mode is and how to configure it, explore these resources to start implementing real-time streaming applications:

  • Get started with real-time mode: Follow step-by-step instructions to configure compute and run your first real-time streaming query.
  • Real-time mode code examples: Explore working examples including Kafka sources and sinks, stateful queries, aggregations, and custom sinks.
  • Performance; Tune your compute, reduce latency with asynchronous optimization techniques, and measure performance with built-in metrics.
  • Real-time mode reference: Review supported environments, languages, sources, sinks, operators, and known limitations.
  • Structured Streaming concepts: Learn the foundational concepts of Structured Streaming on Databricks.