Private Link-support för Spark-anslutningsprogrammet för Microsoft Fabric Data Warehouse

anslutningsappen Spark för Fabric Data Warehouse stöder läs- och skrivåtgärder i miljöer där Private Link är aktiverat på klient- eller arbetsytenivå. Skrivoperationer använder batchinsertstrategier i JDBC som skickar data direkt via JDBC-anslutningen utan extern mellanlagring.

Prerequisites

Använd dessa importinstruktioner i början av anteckningsboken eller innan du börjar använda anslutningsappen:

import com.microsoft.spark.fabric
from com.microsoft.spark.fabric.Constants import Constants

Skrivstrategier för satsvis infogning med JDBC

Anslutningsappen exponerar ett writeStrategy alternativ med två JDBC-baserade strategier:

Strategi Beskrivning Duplicerad säkerhet
MULTI_THREAD_NO_DUPLICATES_BATCH_INSERT Flertrådade skrivningar med mellanlagringstabeller Full
NO_DUPLICATES_BATCH_INSERT Skrivningar i en tråd med mellanlagringstabeller Full

Båda strategierna använder en metod med tvåfasbekräftelse. Data skrivs först till tillfälliga mellanlagringstabeller och sammanfogas sedan atomiskt till måltabellen. Den här metoden garanterar noll dubbletter även när Spark försöker utföra misslyckade uppgifter igen.

Automatiskt strategival

I miljöer med Private Link väljer anslutningsprogrammet automatiskt MULTI_THREAD_NO_DUPLICATES_BATCH_INSERT. Ingen konfiguration krävs.

df.write \
  .mode("overwrite") \
  .synapsesql("<warehouse name>.<schema name>.<table name>")

Explicit strategival

Du kan åsidosätta strategin manuellt med hjälp writeStrategy av alternativet .

# Multi-threaded (recommended for faster throughput)
df.write \
  .mode("overwrite") \
  .option("writeStrategy", "MULTI_THREAD_NO_DUPLICATES_BATCH_INSERT") \
  .synapsesql("<warehouse name>.<schema name>.<table name>")

# Single-threaded (use if you encounter OutOfMemoryErrors)
df.write \
  .mode("overwrite") \
  .option("writeStrategy", "NO_DUPLICATES_BATCH_INSERT") \
  .synapsesql("<warehouse name>.<schema name>.<table name>")

Så här fungerar det

Fas 1: Skrivning till mellanlagringstabeller

Varje Spark-partition skriver sina rader i dedikerade tillfälliga mellanlagringstabeller med hjälp av flerradsinstruktioner INSERT INTO … VALUES . Mellantabeller töms före varje skrivning, så om Spark försöker köra om en uppgift läggs data in på nytt utan dubbletter.

Den flertrådade strategin parallelliserar det här steget genom att varje partition använder flera skrivtrådar, var och en skriver till sin egen mellanlagringstabell.

Fas 2: Atomisk sammanslagning

När alla partitioner har slutförts sammanfogar drivrutinen varje mellanlagringstabell till målet i en enda atomisk transaktion. Den här metoden garanterar att antingen alla data hamnar i måltabellen eller att ingen gör det. Alla mellanlagringstabeller rensas automatiskt.

Välj en strategi

Använd följande vägledning för att välja en strategi:

  • Börja med MULTI_THREAD_NO_DUPLICATES_BATCH_INSERT. Det är standardinställningen i privata länkmiljöer och ger det bästa dataflödet.
  • Växla till NO_DUPLICATES_BATCH_INSERT om du stöter på executor OutOfMemoryErrors. Den här strategin strömmar rader utan att buffra hela partitioner, på bekostnad av genomströmning för lägre minnesanvändning.

Båda strategierna ger identiska noll duplicerade garantier.

Öka antalet skrivtrådar för snabbare skrivning

Du kan öka dataflödet genom att öka skrivtrådarna per Spark-uppgift.

df.write \
  .mode("overwrite") \
  .option("writeStrategy", "MULTI_THREAD_NO_DUPLICATES_BATCH_INSERT") \
  .option("writerThreadsPerTask", "5") \
  .synapsesql("<warehouse name>.<schema name>.<table name>")

I följande tabell beskrivs konfigurationsinformationen för skrivtrådar:

Inställning Detaljer
Standardtrådar per aktivitet 3
Effekten av fler trådar Fler samtidiga JDBC-anslutningar till lagret
Övervägande Högre användning av körminne; för många trådar kan orsaka OutOfMemoryErrors

Kodexempel

Skriv data med automatisk identifiering

I det följande exemplet skriver du en Spark-dataframe till en datalagertabell. Anslutningen väljer automatiskt rätt skrivstrategi.

from pyspark.sql.functions import concat, lit, col, current_timestamp

df = spark.range(100000).toDF("id") \
  .withColumn("name", concat(lit("user_"), col("id"))) \
  .withColumn("score", (col("id") % 100).cast("int"))

df.write \
  .mode("overwrite") \
  .synapsesql("<warehouse name>.<schema name>.<table name>")

Lägga till data i en befintlig tabell

I följande exempel läggs nya rader till i en befintlig lagertabell:

newData = spark.createDataFrame([
  (1001, "Alice", 95),
  (1002, "Bob", 87),
  (1003, "Charlie", 92)
], ["id", "name", "score"])

newData.write \
  .mode("append") \
  .synapsesql("<warehouse name>.<schema name>.<table name>")

Skriv data i stor skala

Följande exempel visar en storskalig skrivoperation med ompartitionering:

from pyspark.sql.functions import concat, lit, col, current_timestamp

bigDf = spark.range(10000000).repartition(16).toDF("id") \
  .withColumn("name", concat(lit("user_"), col("id"))) \
  .withColumn("city", lit("Seattle")) \
  .withColumn("age", (col("id") % 100).cast("int")) \
  .withColumn("salary", (col("id") * 1.5).cast("double")) \
  .withColumn("active", col("id") % 2 == 0) \
  .withColumn("created_at", current_timestamp())

bigDf.write \
  .mode("overwrite") \
  .synapsesql("<warehouse name>.<schema name>.<table name>")

Överväganden

För närvarande använder anslutningen JDBC-batchstrategier för infogning:

  • Har stöd för alla DataFrame-sparlägen (ErrorIfExists, Ignore, Overwrite, Append).
  • Fungerar säkert med återförsök i Spark. Stagingtabeller trunkeras vid återförsök.
  • Stöder inte uppdaterings-, borttagnings- eller upsert-åtgärder.