Optimize stateful Structured Streaming queries

Managing the intermediate state information of stateful Structured Streaming queries can help prevent unexpected latency and production problems.

Preventing slow down from garbage collection (GC) pause in stateful streaming

If you have stateful operations in your streaming query (such as streaming aggregation) and you want to maintain millions of keys in the state, then you may face issues related to large JVM garbage collection (GC) pauses. This causes high variations in the micro-batch processing times. This occurs because your JVM’s memory maintains your state data by default. Having a large number of state objects puts pressure on your JVM memory, which causes high GC pauses.

In such cases, you can choose to use a more optimized state management solution based on RocksDB. This solution is available in Databricks Runtime. Rather than keeping the state in the JVM memory, this solution uses RocksDB to efficiently manage the state in the native memory and the local SSD. Furthermore, any changes to this state are automatically saved by Structured Streaming to the checkpoint location you have provided, thus providing full fault-tolerance guarantees (the same as default state management). For instructions for configuring RocksDB as state store, see Configure RocksDB state store on Azure Databricks.

Databricks recommends:

  • Use compute-optimized instances as workers. For example, Azure Standard_F16s instances.
  • Set the number of shuffle partitions to 1-2 times number of cores in the cluster.
  • Set the spark.sql.streaming.noDataMicroBatches.enabled configuration to false in the SparkSession. This prevents the streaming micro-batch engine from processing micro-batches that do not contain data. Note also that setting this configuration to false could result in stateful operations that leverage watermarks or processing time timeouts to not get data output until new data arrives instead of immediately.

Regarding performance benefits, RocksDB-based state management can maintain 100 times more state keys than the default one. For example, in a Spark cluster with Azure Standard_F16s instances as workers, the default state management can maintain up to 1-2 million state keys per executor after which the JVM GC starts affecting performance significantly. In contrast, the RocksDB-based state management can easily maintain 100 million state keys per executor without any GC issues.

Note

The state management scheme cannot be changed between query restarts. That is, if a query has been started with the default management, then it cannot changed without starting the query from scratch with a new checkpoint location.