Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
This page provides reference information for real-time mode in Structured Streaming, including supported environments, languages, sources, sinks, operators, and known limitations.
Supported environments, languages, and modes
Supported languages: Real-time mode supports Scala, Java, and Python.
Supported compute types:
| Compute type | Supported |
|---|---|
| Dedicated (formerly: single user) | ✓ |
| Standard (formerly: shared) | ✓ (only Python) |
| Lakeflow Spark Declarative Pipelines Classic | Not supported |
| Lakeflow Spark Declarative Pipelines Serverless | Not supported |
| Serverless | Not supported |
Supported execution modes:
| Execution mode | Supported |
|---|---|
| Update mode | ✓ |
| Append mode | Not supported |
| Complete mode | Not supported |
Source and sink support
| Source or sink | As source | As sink |
|---|---|---|
| Apache Kafka | ✓ | ✓ |
| Event Hubs (using Kafka connector) | ✓ | ✓ |
| Kinesis | ✓ (only EFO mode) | Not supported |
| AWS MSK | ✓ | Not supported |
| Delta | Not supported | Not supported |
| Google Pub/Sub | Not supported | Not supported |
| Apache Pulsar | Not supported | Not supported |
Arbitrary sinks (using forEachWriter) |
Not applicable | ✓ |
Supported operators
| Operators | Supported |
|---|---|
| Stateless Operations | |
| Selection | ✓ |
| Projection | ✓ |
| UDFs | |
| Scala UDF | ✓ (with some limitations) |
| Python UDF | ✓ (with some limitations) |
| Aggregation | |
| sum | ✓ |
| count | ✓ |
| max | ✓ |
| min | ✓ |
| avg | ✓ |
| Aggregations functions | ✓ |
| Windowing | |
| Tumbling | ✓ |
| Sliding | ✓ |
| Session | Not supported |
| Deduplication | |
| dropDuplicates | ✓ (the state is unbounded) |
| dropDuplicatesWithinWatermark | Not supported |
| Stream - Table Join | |
| Broadcast table (should be small) | ✓ |
| Stream - Stream Join | Not supported |
| (flat)MapGroupsWithState | Not supported |
| transformWithState | ✓ (with some differences) |
| union | ✓ (with some limitations) |
| forEach | ✓ |
| forEachBatch | Not supported |
| mapPartitions | Not supported (see limitation) |
Special considerations
Some operators and features have specific considerations or differences when used in real-time mode.
transformWithState in real-time mode
For building custom stateful applications, Databricks supports transformWithState, an API in Apache Spark Structured Streaming. See Build a custom stateful application for more information about the API and code snippets.
However, there are some differences between how the API behaves in real-time mode and traditional streaming queries that leverage the micro-batch architecture.
- Real-time mode calls the
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)method for each row.- The
inputRowsiterator returns a single value. Micro-batch mode calls it once for each key, and theinputRowsiterator returns all values for a key in the micro batch. - Account for this difference when writing your code
- The
- Event time timers are not supported in real-time mode.
- In real-time mode, timers are delayed in firing depending on data arrival:
- If a timer is scheduled for 10:00:00 but no data arrives, the timer doesn't fire immediately.
- If data arrives at 10:00:10, the timer fires with a 10-second delay.
- If no data arrives and the long-running batch is terminating, the timer fires before the batch terminates.
Python UDFs in real-time mode
Databricks supports the majority of Python user-defined functions (UDFs) in real-time mode:
| Category | UDF type | Supported |
|---|---|---|
| Stateless | Python scalar UDF (User-defined scalar functions - Python) | ✓ |
| Stateless | Arrow scalar UDF | ✓ |
| Stateless | Pandas scalar UDF (pandas user-defined functions) | ✓ |
| Stateless | Arrow function (mapInArrow) |
✓ |
| Stateless | Pandas function (Map) | ✓ |
| Stateful grouping (UDAF) | transformWithState (Row interface only) |
✓ |
| Stateful grouping (UDAF) | applyInPandasWithState |
Not supported |
| Non-stateful grouping (UDAF) | apply |
Not supported |
| Non-stateful grouping (UDAF) | applyInArrow |
Not supported |
| Non-stateful grouping (UDAF) | applyInPandas |
Not supported |
| Table function | UDTF (Python user-defined table functions (UDTFs)) | Not supported |
| Table function | UC UDF | Not supported |
There are several points to consider when using Python UDFs in real-time mode:
- To minimize the latency, configure the Arrow batch size (
spark.sql.execution.arrow.maxRecordsPerBatch) to 1.- Trade-off: This configuration optimizes for latency at the expense of throughput. For most workloads, this setting is recommended.
- Increase the batch size only if a higher throughput is required to accommodate input volume, accepting the potential increase in latency.
- Pandas UDFs and functions do not perform well with an Arrow batch size of 1.
- If you use pandas UDFs or functions, set the Arrow batch size to a higher value (for example, 100 or higher).
- This implies higher latency. Databricks recommends using an Arrow UDF or function if possible.
- Due to the performance issue with pandas, transformWithState is only supported with the
Rowinterface.
Limitations
Source limitations
For Kinesis, real-time mode doesn't support polling mode. Moreover, frequent repartitions might negatively impact latency.
Union limitations
The Union operator has some limitations:
- Real-time mode doesn't support self-union:
- Kafka: You can't use the same source data frame object and union derived data frames from it. Workaround: Use different DataFrames that read from the same source.
- Kinesis: You can't union data frames derived from the same Kinesis source with the same configuration. Workaround: Besides using different DataFrames, you can assign a different 'consumerName' option to each DataFrame.
- Real-time mode doesn't support stateful operators (for example,
aggregate,deduplicate,transformWithState) defined before the Union. - Real-time mode doesn't support union with batch sources.
MapPartitions limitation
mapPartitions in Scala and similar Python APIs (mapInPandas, mapInArrow) takes an iterator of the entire input partition and produces an iterator of the entire output with arbitrary mapping between input and output. These APIs can cause performance issues in Streaming Real-Time Mode by blocking the entire output, which increases latency. The semantics of these APIs don't support watermark propagation well.
Use scalar UDFs combined with Transform complex data types or filter instead to achieve similar functionality.