Structured Streaming writes to Azure Synapse

The Azure Synapse connector offers efficient and scalable Structured Streaming write support for Azure Synapse that provides consistent user experience with batch writes and uses COPY for large data transfers between an Azure Databricks cluster and Azure Synapse instance.

Structured Streaming support between Azure Databricks and Synapse provides simple semantics for configuring incremental ETL jobs. The model used to load data from Azure Databricks to Synapse introduces latency that might not meet SLA requirements for near-real time workloads. See Query data in Azure Synapse Analytics.

Supported output modes for streaming writes to Synapse

The Azure Synapse connector supports Append and Complete output modes for record appends and aggregations. For more details on output modes and compatibility matrix, see the Structured Streaming guide.

Synapse fault tolerance semantics

By default, Azure Synapse Streaming offers end-to-end exactly-once guarantee for writing data into an Azure Synapse table by reliably tracking progress of the query using a combination of checkpoint location in DBFS, checkpoint table in Azure Synapse, and locking mechanism to ensure that streaming can handle any types of failures, retries, and query restarts.

Optionally, you can select less restrictive at-least-once semantics for Azure Synapse Streaming by setting spark.databricks.sqldw.streaming.exactlyOnce.enabled option to false, in which case data duplication could occur in the event of intermittent connection failures to Azure Synapse or unexpected query termination.

Structured Streaming syntax for writing to Azure Synapse

The following code examples demonstrate streaming writes to Synapse using Structured Streaming in Scala and Python:

Scala

// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
  .format("rate")
  .option("rowsPerSecond", "100000")
  .option("numPartitions", "16")
  .load()

// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.

df.writeStream
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .option("checkpointLocation", "/tmp_checkpoint_location")
  .start()

Python

# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
  .format("rate") \
  .option("rowsPerSecond", "100000") \
  .option("numPartitions", "16") \
  .load()

# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.

df.writeStream \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .option("checkpointLocation", "/tmp_checkpoint_location") \
  .start()

For a full list of configurations, see Query data in Azure Synapse Analytics.

Synapse streaming checkpoint table management

The Azure Synapse connector does not delete the streaming checkpoint table that is created when new streaming query is started. This behavior is consistent with the checkpointLocation normally specified to object storage. Databricks recommends you periodically delete checkpoint tables for queries that are not going to be run in the future.

By default, all checkpoint tables have the name <prefix>_<query-id>, where <prefix> is a configurable prefix with default value databricks_streaming_checkpoint and query_id is a streaming query ID with _ characters removed.

To find all checkpoint tables for stale or deleted streaming queries, run the query:

SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'

You can configure the prefix with the Spark SQL configuration option spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefix.

Azure Databricks Synapse connector streaming options reference

The OPTIONS provided in Spark SQL support the following options for streaming in addition to the batch options:

Parameter Required Default Notes
checkpointLocation Yes No default Location on DBFS that will be used by Structured Streaming to write metadata and checkpoint information. See Recovering from Failures with Checkpointing in Structured Streaming programming guide.
numStreamingTempDirsToKeep No 0 Indicates how many (latest) temporary directories to keep for periodic cleanup of micro batches in streaming. When set to 0, directory deletion is triggered immediately after micro batch is committed, otherwise provided number of latest micro batches is kept and the rest of directories is removed. Use -1 to disable periodic cleanup.

Note

checkpointLocation and numStreamingTempDirsToKeep are relevant only for streaming writes from Azure Databricks to a new table in Azure Synapse.