Share via


Real-time mode reference

This page provides reference information for real-time mode in Structured Streaming, including supported environments, languages, sources, sinks, operators, and known limitations.

Supported environments, languages, and modes

Supported languages: Real-time mode supports Scala, Java, and Python.

Supported compute types:

Compute type Supported
Dedicated (formerly: single user)
Standard (formerly: shared) ✓ (only Python)
Lakeflow Spark Declarative Pipelines Classic Not supported
Lakeflow Spark Declarative Pipelines Serverless Not supported
Serverless Not supported

Supported execution modes:

Execution mode Supported
Update mode
Append mode Not supported
Complete mode Not supported

Source and sink support

Source or sink As source As sink
Apache Kafka
Event Hubs (using Kafka connector)
Kinesis ✓ (only EFO mode) Not supported
AWS MSK Not supported
Delta Not supported Not supported
Google Pub/Sub Not supported Not supported
Apache Pulsar Not supported Not supported
Arbitrary sinks (using forEachWriter) Not applicable

Supported operators

Operators Supported
Stateless Operations
Selection
Projection
UDFs
Scala UDF ✓ (with some limitations)
Python UDF ✓ (with some limitations)
Aggregation
sum
count
max
min
avg
Aggregations functions
Windowing
Tumbling
Sliding
Session Not supported
Deduplication
dropDuplicates ✓ (the state is unbounded)
dropDuplicatesWithinWatermark Not supported
Stream - Table Join
Broadcast table (should be small)
Stream - Stream Join Not supported
(flat)MapGroupsWithState Not supported
transformWithState ✓ (with some differences)
union ✓ (with some limitations)
forEach
forEachBatch Not supported
mapPartitions Not supported (see limitation)

Special considerations

Some operators and features have specific considerations or differences when used in real-time mode.

transformWithState in real-time mode

For building custom stateful applications, Databricks supports transformWithState, an API in Apache Spark Structured Streaming. See Build a custom stateful application for more information about the API and code snippets.

However, there are some differences between how the API behaves in real-time mode and traditional streaming queries that leverage the micro-batch architecture.

  • Real-time mode calls the handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) method for each row.
    • The inputRows iterator returns a single value. Micro-batch mode calls it once for each key, and the inputRows iterator returns all values for a key in the micro batch.
    • Account for this difference when writing your code
  • Event time timers are not supported in real-time mode.
  • In real-time mode, timers are delayed in firing depending on data arrival:
    • If a timer is scheduled for 10:00:00 but no data arrives, the timer doesn't fire immediately.
    • If data arrives at 10:00:10, the timer fires with a 10-second delay.
    • If no data arrives and the long-running batch is terminating, the timer fires before the batch terminates.

Python UDFs in real-time mode

Databricks supports the majority of Python user-defined functions (UDFs) in real-time mode:

Category UDF type Supported
Stateless Python scalar UDF (User-defined scalar functions - Python)
Stateless Arrow scalar UDF
Stateless Pandas scalar UDF (pandas user-defined functions)
Stateless Arrow function (mapInArrow)
Stateless Pandas function (Map)
Stateful grouping (UDAF) transformWithState (Row interface only)
Stateful grouping (UDAF) applyInPandasWithState Not supported
Non-stateful grouping (UDAF) apply Not supported
Non-stateful grouping (UDAF) applyInArrow Not supported
Non-stateful grouping (UDAF) applyInPandas Not supported
Table function UDTF (Python user-defined table functions (UDTFs)) Not supported
Table function UC UDF Not supported

There are several points to consider when using Python UDFs in real-time mode:

  • To minimize the latency, configure the Arrow batch size (spark.sql.execution.arrow.maxRecordsPerBatch) to 1.
    • Trade-off: This configuration optimizes for latency at the expense of throughput. For most workloads, this setting is recommended.
    • Increase the batch size only if a higher throughput is required to accommodate input volume, accepting the potential increase in latency.
  • Pandas UDFs and functions do not perform well with an Arrow batch size of 1.
    • If you use pandas UDFs or functions, set the Arrow batch size to a higher value (for example, 100 or higher).
    • This implies higher latency. Databricks recommends using an Arrow UDF or function if possible.
  • Due to the performance issue with pandas, transformWithState is only supported with the Row interface.

Limitations

Source limitations

For Kinesis, real-time mode doesn't support polling mode. Moreover, frequent repartitions might negatively impact latency.

Union limitations

The Union operator has some limitations:

  • Real-time mode doesn't support self-union:
    • Kafka: You can't use the same source data frame object and union derived data frames from it. Workaround: Use different DataFrames that read from the same source.
    • Kinesis: You can't union data frames derived from the same Kinesis source with the same configuration. Workaround: Besides using different DataFrames, you can assign a different 'consumerName' option to each DataFrame.
  • Real-time mode doesn't support stateful operators (for example, aggregate, deduplicate, transformWithState) defined before the Union.
  • Real-time mode doesn't support union with batch sources.

MapPartitions limitation

mapPartitions in Scala and similar Python APIs (mapInPandas, mapInArrow) takes an iterator of the entire input partition and produces an iterator of the entire output with arbitrary mapping between input and output. These APIs can cause performance issues in Streaming Real-Time Mode by blocking the entire output, which increases latency. The semantics of these APIs don't support watermark propagation well.

Use scalar UDFs combined with Transform complex data types or filter instead to achieve similar functionality.