Edit

Private Link support for the Spark connector for Microsoft Fabric Data Warehouse

The Spark connector for Fabric Data Warehouse supports read and write operations in environments where Private Link is enabled at either the tenant or workspace level. Write operations use JDBC batch insert strategies that send data directly through the JDBC connection without external staging.

Prerequisites

Use these import statements at the beginning of your notebook or before you start using the connector:

import com.microsoft.spark.fabric
from com.microsoft.spark.fabric.Constants import Constants

JDBC batch insert write strategies

The connector exposes a writeStrategy option with two JDBC-based strategies:

Strategy Description Duplicate safety
MULTI_THREAD_NO_DUPLICATES_BATCH_INSERT Multi-threaded writes with staging tables Full
NO_DUPLICATES_BATCH_INSERT Single-threaded writes with staging tables Full

Both strategies use a two-phase commit approach. Data is first written to temporary staging tables, then atomically merged into the target table. This approach guarantees zero duplicates even when Spark retries failed tasks.

Automatic strategy selection

In private link-enabled environments, the connector automatically selects MULTI_THREAD_NO_DUPLICATES_BATCH_INSERT. No configuration is needed.

df.write \
  .mode("overwrite") \
  .synapsesql("<warehouse name>.<schema name>.<table name>")

Explicit strategy selection

You can override the strategy manually by using the writeStrategy option.

# Multi-threaded (recommended for faster throughput)
df.write \
  .mode("overwrite") \
  .option("writeStrategy", "MULTI_THREAD_NO_DUPLICATES_BATCH_INSERT") \
  .synapsesql("<warehouse name>.<schema name>.<table name>")

# Single-threaded (use if you encounter OutOfMemoryErrors)
df.write \
  .mode("overwrite") \
  .option("writeStrategy", "NO_DUPLICATES_BATCH_INSERT") \
  .synapsesql("<warehouse name>.<schema name>.<table name>")

How it works

Phase 1: Write to staging tables

Each Spark partition writes its rows into dedicated temporary staging tables using multi-row INSERT INTO … VALUES statements. Staging tables are truncated before each write, so if Spark retries a task, the data is cleanly re-inserted with no duplicates.

The multi-threaded strategy parallelizes this step by having each partition use multiple writer threads, each writing to its own staging table.

Phase 2: Atomic merge

After all partitions complete, the driver merges every staging table into the target in a single atomic transaction. This approach guarantees that either all data lands in the target table or none does. All staging tables are cleaned up automatically.

Choose a strategy

Use the following guidance to choose a strategy:

  • Start with MULTI_THREAD_NO_DUPLICATES_BATCH_INSERT. It's the default in private link environments and provides the best throughput.
  • Switch to NO_DUPLICATES_BATCH_INSERT if you encounter executor OutOfMemoryErrors. This strategy streams rows without buffering entire partitions, trading throughput for lower memory usage.

Both strategies provide identical zero-duplicate guarantees.

Increase writer threads for faster writes

You can increase throughput by increasing writer threads per Spark task.

df.write \
  .mode("overwrite") \
  .option("writeStrategy", "MULTI_THREAD_NO_DUPLICATES_BATCH_INSERT") \
  .option("writerThreadsPerTask", "5") \
  .synapsesql("<warehouse name>.<schema name>.<table name>")

The following table describes the configuration details for writer threads:

Setting Details
Default threads per task 3
Effect of more threads More concurrent JDBC connections to the warehouse
Consideration Higher executor memory usage; too many threads might cause OutOfMemoryErrors

Code examples

Write data with auto-detection

The following example writes a Spark DataFrame to a warehouse table. The connector automatically selects the appropriate write strategy.

from pyspark.sql.functions import concat, lit, col, current_timestamp

df = spark.range(100000).toDF("id") \
  .withColumn("name", concat(lit("user_"), col("id"))) \
  .withColumn("score", (col("id") % 100).cast("int"))

df.write \
  .mode("overwrite") \
  .synapsesql("<warehouse name>.<schema name>.<table name>")

Append data to an existing table

The following example appends new rows to an existing warehouse table:

newData = spark.createDataFrame([
  (1001, "Alice", 95),
  (1002, "Bob", 87),
  (1003, "Charlie", 92)
], ["id", "name", "score"])

newData.write \
  .mode("append") \
  .synapsesql("<warehouse name>.<schema name>.<table name>")

Write data at scale

The following example shows a large-scale write operation with repartitioning:

from pyspark.sql.functions import concat, lit, col, current_timestamp

bigDf = spark.range(10000000).repartition(16).toDF("id") \
  .withColumn("name", concat(lit("user_"), col("id"))) \
  .withColumn("city", lit("Seattle")) \
  .withColumn("age", (col("id") % 100).cast("int")) \
  .withColumn("salary", (col("id") * 1.5).cast("double")) \
  .withColumn("active", col("id") % 2 == 0) \
  .withColumn("created_at", current_timestamp())

bigDf.write \
  .mode("overwrite") \
  .synapsesql("<warehouse name>.<schema name>.<table name>")

Considerations

Currently, the connector with JDBC batch insert strategies:

  • Supports all DataFrame save modes (ErrorIfExists, Ignore, Overwrite, Append).
  • Is safe with Spark retries. Staging tables are truncated on retry.
  • Doesn't support update, delete, or upsert operations.