Effektiv nedskalering og fjern-shuffle manager

Gælder for:✅ Fabric Data Engineering og Data Science

Effektiv skalering er en funktion i Microsoft Fabric Spark, der adskiller Spark-shuffle-data fra eksekutorens levetid. I stedet for at fastgøre shuffle-output til lokale executor-diske, ruter Fabric Spark shuffle-data til Azure Blob Storage (eller migrerer dem dertil efter behov) og lader Adaptive Query Execution (AQE) forme selve skrivningen. Resultatet er hurtigere cluster-nedskalering, lavere beregningsomkostninger og mere robuste jobs – uden ændringer i dine forespørgsler, notebooks eller pipelines.

Oversigt

Effektiv nedskalering er bygget op af fire samarbejdende funktioner:

Kapacitet Hvad den gør
Fjern-Shuffle Manager (RSM) Skriver og læser shuffle-data til Azure Blob Storage i stedet for executor lokale diske.
Shuffle-migrationen Moves shuffle blokerer en eksekutor, før den bliver afviklet, i stedet for at droppe dem.
Beslutningslag Per-stage runtime-routing, der holder små shuffles lokalt og aflaster store shuffles til fjernlagring.
AQE Shuffle Skriv Lader Adaptive Query Execution deltage i shuffle-skrivefasen, så partitioneringen er korrekt første gang.

Forudsætninger

  • Native Execution Engine (NEE) skal være aktiveret.
  • Autoscale aktiveret (anbefalet). Effektiv skalering fungerer også uden autoskalering via Spark-konfigurationerne nedenfor.
  • Runtime 1.3 (Apache Spark 3.5) eller nyere.

Sådan fungerer det

Når Spark behandler en forespørgsel, omfordeler det ofte data mellem trinene—en blanding. Normalt gemmes shuffle-data på hver eksekutors lokale disk, hvilket knytter eksekutører til disse data. De kan ikke udgives, før alle forbrugere har læst færdigt. Den kobling er den største enkeltstående grund til, at klynger ikke kan skaleres hurtigt ned, og hvorfor det at miste en eksekutor forårsager dyre stage-retrys.

Effektiv nedskalering bryder denne kobling:

  • Store blandinger gå direkte til Azure Blob Storage via Remote Shuffle Manager.
  • Små blandinger forbliver på den lokale disk for hastighed. Hvis deres eksekutor senere skal frigives, flytter Shuffle Migration blokkene til peers eller til fallback-lagring i baggrunden.
  • Beslutningslaget vælger den rigtige sti pr. fase ved kørsel.
  • AQE Shuffle Write sikrer, at forfatteren producerer en partitionering, som downstream AQE forbruger uden at re-koaleschere, hvilket undgår spildt I/O.
                ┌───────────────────────────┐
   Query  ───►  │   AQE + Decision Layer    │   per-stage choice
                └─────────────┬─────────────┘
                              │
                ┌─────────────▼─────────────┐
                │   AQE Shuffle Write       │   partition-aware writer
                └─────┬─────────────────┬───┘
                      │                 │
              local   ▼                 ▼   remote
        ┌────────────────────┐   ┌──────────────────┐
        │  Local disk +      │   │  RSM → Azure     │
        │  Shuffle Migration │   │  Blob Storage    │
        └─────────┬──────────┘   └─────────┬────────┘
                  │ on decommission        │
                  ▼                        ▼
        fallback storage   Remote shuffle store

Smart routing (beslutningslag)

Beslutningslaget evaluerer hver blandingsudveksling og beslutter:

  • Store skubbede → Azure Blob Storage. Maksimal reduktion og fordel ved fejltolerance.
  • Små omrokeringer → lokal disk. Ingen cloud I/O-overhead til små overførsler. Hvis eksekutoren senere afvikler, overtager Shuffle Migration.

Routing er automatisk og kræver ingen brugerinput. Den anbefalede granularitet er pr. trin.

Vigtigste fordele

Lavere omkostninger: Betal kun for den compute, du bruger

Med effektiv nedskalering bliver eksekutorerne frigivet, så snart deres arbejde er udført. De står ikke længere inaktive og holder shuffle data, som efterfølgende opgaver måske til sidst læser.

  • Hurtigere nedskalering. Autoscale fjerner noder straks efter opgaveafslutning.
  • Mindre inaktiv beregning. Ingen "zombie"-eksekutorer holdt i live kun for at tjene deres lokale omvæltning.
  • Ingen disk-overprovisionering. Store shuffles går til blob-lagring i stedet for at kræve store lokale diske.
  • Begrænset opbevaringsomkostninger. Fallback-lagring ryddes automatisk op, når blokke ikke længere er nødvendige.

Mere modstandsdygtige job

Når shuffle-data kun findes på den lokale disk, betyder et executor-crash, at dataene er væk, og Spark skal genberegne dem. Med effektiv nedskalering er data enten allerede i blob-lagring eller migreret dertil, før eksekveren forsvinder.

scenarie Uden effektiv nedskalering Med effektiv nedskalering
Eksekutør-nedbrud Shuffle data mistet; Stadier genudført Data er sikre i lagring; ingen omberegning
Node-præemption Data væk, dyre forsøg Data overlever; Arbejdet fortsætter normalt
Yndefuld udmønstring Shuffle droppet ved nedlukning Blokke migreret til peer- eller fallback-lagring
Netværksblips under hentning Kaskadering FetchFailedException Læsninger kommer fra opbevaring, upåvirket

Dette eliminerer den mest almindelige årsag til FetchFailedException produktion.

Hurtigere, virkelig elastisk skalering

Uden effektiv skalering kan autoscaleren ikke genvinde en node, mens enhver eksekver på den stadig har shuffle-data eller cachede data. Effektiv nedskalering adskiller begge:

  • Shuffle-data er i blob-lagring (eller migrerer dertil ved nedlukning).
  • Cache fastholder ikke længere eksekutører. Reproducerbare caches som Delta snapshot cache er udelukket fra skaleringsbeskyttelse.

Autoscaleren kan frit fjerne inaktive noder og ændre størrelsen på klyngen som reaktion på ændringer i arbejdsbelastningen.

Bedre ydeevne på skæve og store blandinger

AQE Shuffle Write lader adaptiv forespørgselsudførelse forme selve shuffle-skrivningen – ved at vælge partitionering, som downstream AQE bruger uden at samle sig igen, og producere færre, bedre blokke til fjernlagring. Kombineret med Decision Layer får du hurtigere væg-ur-tid på store/skæve forespørgsler og uændret latenstid på små.

Get started

Anvend denne konfiguration for at aktivere den fulde effektive nedskaleringsstak:

# Remote Shuffle Manager
spark.conf.set("spark.remote.shuffle.enabled", "true")

# Decision Layer — per-stage routing of local vs. remote shuffle
spark.conf.set("spark.sql.rsm.decisionlayer.enabled.level", "stage")

# AQE participates in shuffle write
spark.conf.set("spark.sql.adaptive.shuffleWrite.enabled", "true")

# Shuffle Migration on executor decommission
spark.conf.set("spark.storage.decommission.shuffleBlocks.enabled", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.cleanup", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.migrateToFallbackStorage", "true")
spark.conf.set("spark.storage.decommission.fallbackStorage.cleanUp", "true")

Der kræves ingen kodeændringer. Du kan også sætte disse i dine miljø-Spark-egenskaber.

Konfigurationsreference

Fjern-Shuffle Manager (RSM)

Setting Anbefalet Hvad den kontrollerer
spark.remote.shuffle.enabled true Slår effektiv nedskalering til. Shuffle-data går til Azure Blob Storage i stedet for executor local disks.

Beslutningslag

Setting Anbefalet Hvad den kontrollerer
spark.sql.rsm.decisionlayer.enabled.level stage Granularitet, hvormed beslutningslagets ruter blandes. stage vurderer hvert Spark-trin uafhængigt.

AQE Shuffle Skriv

Setting Anbefalet Hvad den kontrollerer
spark.sql.adaptive.shuffleWrite.enabled true Lad AQE deltage i shuffle write-fasen. Skaber opdeling, som nedstrøms AQE forbruger uden at re-koaleschere.

Bemærkning

AQE selv (spark.sql.adaptive.enabled) skal være tændt. Det er slået til som standard i Fabric Spark.

Shuffle-migration ved afvikling

Setting Anbefalet Hvad den kontrollerer
spark.storage.decommission.shuffleBlocks.enabled true Flytter blokerer en bobestyrer, der er ved at afvikle dem, i stedet for at droppe dem.
spark.storage.decommission.shuffleBlocks.cleanup true Rydder op i shuffle blocks på kilde-executoren efter en vellykket migration.
spark.storage.decommission.shuffleBlocks.migrateToFallbackStorage true Hvis ingen peer executor kan acceptere blokkene, migrerer den dem til Fallback-lagring (Azure Blob Storage).
spark.storage.decommission.fallbackStorage.cleanUp true Fjerner shuffle-blokke fra fallback-lageret, når de ikke længere er nødvendige, hvilket begrænser lagringsomkostningerne.

Cache-bevidst dynamisk allokering

Setting Anbefalet Hvad den kontrollerer
spark.dynamicAllocation.preventShutdownExecutorWithCache false Tillader dynamisk allokering til at frigive eksekutorer, selv når de indeholder cachede blokke.
spark.dynamicAllocation.excludeDeltaSnapshotCache true Ignorerer Delta snapshot cache, når det skal besluttes, om en eksekver stadig har nyttig cache. Delta snapshot cache er reproducerbar og bør ikke blokere skalering.

Avanceret stemning (RSM)

De fleste brugere behøver ikke ændre disse standardindstillinger.

Skrivepræstation

Setting Standardindstilling Hvad den kontrollerer
spark.remote.shuffle.partition.buffersize 16777216 (16 MB) Buffer pr. partition før skrivning til lager.
spark.remote.shuffle.blocksize 8388608 (8 MB) Størrelsen af individuelle blokke uploadet til Blob Storage.
spark.remote.shuffle.write.maxthreads cores × 16 Maksimale tråde brugt til at skrive shuffle-data.
spark.remote.shuffle.write.maxtasks 16384 Maksimale samtidige skriveoperationer.

Læs optræden

Setting Standardindstilling Hvad den kontrollerer
spark.remote.shuffle.read.parallel.enabled true Parallelle downloadstreams til shuffle-læsninger.
spark.remote.shuffle.read.parallelism 4 Parallelle downloadstrømme pr. opgave.
spark.remote.shuffle.read.prefetchqueuesize 250 Prefetch-kødybde under læsninger.
spark.remote.shuffle.read.maxthreads cores × 4 Maksimalt antal tråde brugt til læsning.

Reliability

Setting Standardindstilling Hvad den kontrollerer
spark.remote.shuffle.retries 5 Prøv forsøg igen på fejl i midlertidig lagring.
spark.remote.shuffle.retrydelayms 800 Første tilbagetrækning mellem forsøg.
spark.remote.shuffle.retrymaxdelayms 60000 Backoff-kasket.

Komprimering

Setting Standardindstilling Hvad den kontrollerer
spark.remote.shuffle.compression Anvendelser spark.io.compression.codec Komprimeringsformat for fjern-blandede data (for eksempel lz4, zstd).

Præstationsresultater

Diagram, der viser beregningsbesparelser med effektiv nedskalering aktiveret versus deaktiveret på en TPC-DS benchmark, hvilket demonstrerer 54 procents omkostningsreduktion.

Beregningsbesparelser (TPC-DS benchmark)

Metriske Uden effektiv nedskalering Med effektiv nedskalering
Total beregning (VM-Minutes) 14,952 6,880
Omkostningsreduktion 54%

Den samlede jobkørselstid kan være længere (autoscale bruger færre samtidige eksekvere), men den fakturerede beregning reduceres med mere end halvdelen.

Beslutningslagsydelse (TPC-DS, RSM on)

At roude små shuffles til lokal disk og kun store shuffles til fjernlagring giver op til 57% runtime-forbedring sammenlignet med at route hver shuffle eksternt, med samme skaleringsfordel.

Begrænsninger

  • NEE påkrævet. Effektiv nedskalering afhænger af den oprindelige eksekveringsmotor.
  • Azure Blob Storage only. Standard BlockBlobStorage med HNS deaktiveret. Azure Data Lake Gen2 / HNS-aktiverede konti understøttes ikke som den eksterne shuffle-butik.
  • Ikke understøttet med Azure Private Link. Miljøer, der bruger privat link-netværk, er ikke kompatible i øjeblikket.
  • Beslutningslagets granularitet er i øjeblikket pr. trin. Routing pr. opgave eller per partition er ikke inden for deres område.
  • Ændring i cacheadfærd. Med , kan eksekutorer, der preventShutdownExecutorWithCache=falseholder data, cache()/persist() skaleres ned. Arbejdsbelastninger, der er stærkt afhængige af executor-local cache for hot data, bør valideres.