Structured Streaming patterns on Azure Databricks

This contains notebooks and code samples for common patterns for working with Structured Streaming on Azure Databricks.

Getting started with Structured Streaming

These two notebooks show how to use the DataFrame API to build Structured Streaming applications in Python and Scala.

Structured Streaming demo Python notebook

Get notebook

Structured Streaming demo Scala notebook

Get notebook

Write to Cassandra as a sink for Structured Streaming in Python

Apache Cassandra is a distributed, low-latency, scalable, highly-available OLTP database.

Structured Streaming works with Cassandra through the Spark Cassandra Connector. This connector supports both RDD and DataFrame APIs, and it has native support for writing streaming data. Important You must use the corresponding version of the spark-cassandra-connector-assembly.

The following example connects to one or more hosts in a Cassandra database cluster. It also specifies connection configurations such as the checkpoint location and the specific keyspace and table names:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Write to Azure Synapse Analytics using foreachBatch() in Python

streamingDF.writeStream.foreachBatch() allows you to reuse existing batch data writers to write the output of a streaming query to Azure Synapse Analytics. See the foreachBatch documentation for details.

To run this example, you need the Azure Synapse Analytics connector. For details on the Azure Synapse Analytics connector, see Query data in Azure Synapse Analytics.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Stream-Stream joins

These two notebooks show how to use stream-stream joins in Python and Scala.

Stream-Stream joins Python notebook

Get notebook

Stream-Stream joins Scala notebook

Get notebook