Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
anslutningsappen Spark för Fabric Data Warehouse stöder läs- och skrivåtgärder i miljöer där Private Link är aktiverat på klient- eller arbetsytenivå. Skrivoperationer använder batchinsertstrategier i JDBC som skickar data direkt via JDBC-anslutningen utan extern mellanlagring.
Prerequisites
Använd dessa importinstruktioner i början av anteckningsboken eller innan du börjar använda anslutningsappen:
import com.microsoft.spark.fabric
from com.microsoft.spark.fabric.Constants import Constants
Skrivstrategier för satsvis infogning med JDBC
Anslutningsappen exponerar ett writeStrategy alternativ med två JDBC-baserade strategier:
| Strategi | Beskrivning | Duplicerad säkerhet |
|---|---|---|
MULTI_THREAD_NO_DUPLICATES_BATCH_INSERT |
Flertrådade skrivningar med mellanlagringstabeller | Full |
NO_DUPLICATES_BATCH_INSERT |
Skrivningar i en tråd med mellanlagringstabeller | Full |
Båda strategierna använder en metod med tvåfasbekräftelse. Data skrivs först till tillfälliga mellanlagringstabeller och sammanfogas sedan atomiskt till måltabellen. Den här metoden garanterar noll dubbletter även när Spark försöker utföra misslyckade uppgifter igen.
Automatiskt strategival
I miljöer med Private Link väljer anslutningsprogrammet automatiskt MULTI_THREAD_NO_DUPLICATES_BATCH_INSERT. Ingen konfiguration krävs.
df.write \
.mode("overwrite") \
.synapsesql("<warehouse name>.<schema name>.<table name>")
Explicit strategival
Du kan åsidosätta strategin manuellt med hjälp writeStrategy av alternativet .
# Multi-threaded (recommended for faster throughput)
df.write \
.mode("overwrite") \
.option("writeStrategy", "MULTI_THREAD_NO_DUPLICATES_BATCH_INSERT") \
.synapsesql("<warehouse name>.<schema name>.<table name>")
# Single-threaded (use if you encounter OutOfMemoryErrors)
df.write \
.mode("overwrite") \
.option("writeStrategy", "NO_DUPLICATES_BATCH_INSERT") \
.synapsesql("<warehouse name>.<schema name>.<table name>")
Så här fungerar det
Fas 1: Skrivning till mellanlagringstabeller
Varje Spark-partition skriver sina rader i dedikerade tillfälliga mellanlagringstabeller med hjälp av flerradsinstruktioner INSERT INTO … VALUES . Mellantabeller töms före varje skrivning, så om Spark försöker köra om en uppgift läggs data in på nytt utan dubbletter.
Den flertrådade strategin parallelliserar det här steget genom att varje partition använder flera skrivtrådar, var och en skriver till sin egen mellanlagringstabell.
Fas 2: Atomisk sammanslagning
När alla partitioner har slutförts sammanfogar drivrutinen varje mellanlagringstabell till målet i en enda atomisk transaktion. Den här metoden garanterar att antingen alla data hamnar i måltabellen eller att ingen gör det. Alla mellanlagringstabeller rensas automatiskt.
Välj en strategi
Använd följande vägledning för att välja en strategi:
-
Börja med
MULTI_THREAD_NO_DUPLICATES_BATCH_INSERT. Det är standardinställningen i privata länkmiljöer och ger det bästa dataflödet. -
Växla till
NO_DUPLICATES_BATCH_INSERTom du stöter på executor OutOfMemoryErrors. Den här strategin strömmar rader utan att buffra hela partitioner, på bekostnad av genomströmning för lägre minnesanvändning.
Båda strategierna ger identiska noll duplicerade garantier.
Öka antalet skrivtrådar för snabbare skrivning
Du kan öka dataflödet genom att öka skrivtrådarna per Spark-uppgift.
df.write \
.mode("overwrite") \
.option("writeStrategy", "MULTI_THREAD_NO_DUPLICATES_BATCH_INSERT") \
.option("writerThreadsPerTask", "5") \
.synapsesql("<warehouse name>.<schema name>.<table name>")
I följande tabell beskrivs konfigurationsinformationen för skrivtrådar:
| Inställning | Detaljer |
|---|---|
| Standardtrådar per aktivitet | 3 |
| Effekten av fler trådar | Fler samtidiga JDBC-anslutningar till lagret |
| Övervägande | Högre användning av körminne; för många trådar kan orsaka OutOfMemoryErrors |
Kodexempel
Skriv data med automatisk identifiering
I det följande exemplet skriver du en Spark-dataframe till en datalagertabell. Anslutningen väljer automatiskt rätt skrivstrategi.
from pyspark.sql.functions import concat, lit, col, current_timestamp
df = spark.range(100000).toDF("id") \
.withColumn("name", concat(lit("user_"), col("id"))) \
.withColumn("score", (col("id") % 100).cast("int"))
df.write \
.mode("overwrite") \
.synapsesql("<warehouse name>.<schema name>.<table name>")
Lägga till data i en befintlig tabell
I följande exempel läggs nya rader till i en befintlig lagertabell:
newData = spark.createDataFrame([
(1001, "Alice", 95),
(1002, "Bob", 87),
(1003, "Charlie", 92)
], ["id", "name", "score"])
newData.write \
.mode("append") \
.synapsesql("<warehouse name>.<schema name>.<table name>")
Skriv data i stor skala
Följande exempel visar en storskalig skrivoperation med ompartitionering:
from pyspark.sql.functions import concat, lit, col, current_timestamp
bigDf = spark.range(10000000).repartition(16).toDF("id") \
.withColumn("name", concat(lit("user_"), col("id"))) \
.withColumn("city", lit("Seattle")) \
.withColumn("age", (col("id") % 100).cast("int")) \
.withColumn("salary", (col("id") * 1.5).cast("double")) \
.withColumn("active", col("id") % 2 == 0) \
.withColumn("created_at", current_timestamp())
bigDf.write \
.mode("overwrite") \
.synapsesql("<warehouse name>.<schema name>.<table name>")
Överväganden
För närvarande använder anslutningen JDBC-batchstrategier för infogning:
- Har stöd för alla DataFrame-sparlägen (ErrorIfExists, Ignore, Overwrite, Append).
- Fungerar säkert med återförsök i Spark. Stagingtabeller trunkeras vid återförsök.
- Stöder inte uppdaterings-, borttagnings- eller upsert-åtgärder.