Effiziente Herunterskalierung und Remote-Shuffle-Manager

Gilt für:✅ Fabric Data Engineering and Data Science

Effiziente Herunterskalierung ist eine Funktion in Microsoft Fabric Spark, die Spark-Shuffle-Daten von der Lebensdauer eines Executors entkoppelt. Anstatt die Shuffle-Ausgabe auf lokalen Executor-Datenträgern zu speichern, leitet Fabric Spark Shuffle-Daten an Azure Blob Storage weiter (oder migriert sie bei Bedarf dorthin) und überlässt die Gestaltung des Schreibvorgangs der Adaptive Query Execution (AQE) selbst. Das Ergebnis ist schnellere Clusterskalierung, niedrigere Computekosten und stabilere Aufträge – ohne Änderungen an Ihren Abfragen, Notizbüchern oder Pipelines.

Overview

Effiziente Skalierung basiert auf vier kooperationsbezogenen Funktionen:

Fähigkeit Was es bewirkt
Remote Shuffle Manager (RSM) Schreibt und liest Shuffle-Daten in Azure Blob Storage anstelle von lokalen Datenträgern.
Shuffle Migration Verschiebt Shuffle-Blöcke von einem Executor, bevor dieser außer Betrieb genommen wird, anstatt sie zu verwerfen.
Entscheidungsebene Laufzeitrouting pro Phase, das kleine Shuffles lokal hält und große Shuffles in externen Speicher auslagert.
AQE Shuffle Write Sorgt dafür, dass Adaptive Query Execution bereits an der Shuffle-Schreibphase teilnimmt, sodass die Partitionierung von Anfang an korrekt ist.

Voraussetzungen

  • Die Native Execution Engine (NEE) muss aktiviert sein.
  • Autoskalen aktiviert (empfohlen). Effiziente Skalierung funktioniert auch ohne automatische Skalierung über die folgenden Spark-Konfigurationen.
  • Runtime 1.3 (Apache Spark 3.5) oder höher.

So funktioniert es

Wenn Spark eine Abfrage verarbeitet, verteilt sie häufig Daten zwischen Phasen – einem Shuffle. Normalerweise werden Shuffle-Daten auf dem lokalen Datenträger jedes Executors gespeichert, wodurch Executoren mit diesen Daten verknüpft werden. Sie können erst freigegeben werden, wenn jeder Verbraucher die Lektüre abgeschlossen hat. Diese Kopplung ist der einzige Hauptgrund, warum Cluster nicht schnell herunterskalieren können und warum der Verlust eines Ausführenden teure Wiederholungsversuche verursacht.

Effizientes Herunterskalieren durchbricht diese Kopplung:

  • Große Shuffles werden über den Remote Shuffle Manager direkt in Azure Blob Storage übertragen.
  • Kleine Mischvorgänge bleiben aus Geschwindigkeitsgründen auf dem lokalen Datenträger. Wenn ihr Executor später freigegeben werden muss, verschiebt shuffle Migration die Blöcke auf Peers oder auf Fallbackspeicher im Hintergrund.
  • Die Entscheidungsebene wählt den richtigen Pfad pro Phase zur Laufzeit aus.
  • AQE Shuffle Write stellt sicher, dass der Writer Partitionierung erzeugt, die nachgelagertes AQE verbraucht, ohne neu zusammenzukoppeln und verschwendete E/A zu vermeiden.
                ┌───────────────────────────┐
   Query  ───►  │   AQE + Decision Layer    │   per-stage choice
                └─────────────┬─────────────┘
                              │
                ┌─────────────▼─────────────┐
                │   AQE Shuffle Write       │   partition-aware writer
                └─────┬─────────────────┬───┘
                      │                 │
              local   ▼                 ▼   remote
        ┌────────────────────┐   ┌──────────────────┐
        │  Local disk +      │   │  RSM → Azure     │
        │  Shuffle Migration │   │  Blob Storage    │
        └─────────┬──────────┘   └─────────┬────────┘
                  │ on decommission        │
                  ▼                        ▼
        fallback storage   Remote shuffle store

Intelligentes Routing (Entscheidungsebene)

Die Entscheidungsebene wertet jeden Shuffle-Austausch aus und entscheidet:

  • Umfangreiche Shuffles → Azure Blob Storage. Maximale Skalierungs- und Fehlertoleranzvorteile.
  • Kleine Shuffle-Vorgänge → auf den lokalen Datenträger. Kein Cloud-E/A-Aufwand für winzige Übertragungen. Wenn der Executor später stillgelegt wird, übernimmt Shuffle Migration.

Das Routing erfolgt automatisch und erfordert keine Benutzereingabe. Die empfohlene Granularität ist pro Stufe.

Hauptvorteile

Niedrigere Kosten: Bezahlen Sie nur für die von Ihnen verwendete Berechnung

Beim effizienten Herunterskalieren werden Executors freigegeben, sobald ihre Arbeit erledigt ist. Sie befinden sich nicht mehr im Leerlauf, in dem Shuffle-Daten enthalten sind, die nachgelagerte Aufgaben möglicherweise schließlich lesen.

  • Schnellere Skalierung. Autoscale entfernt Knoten unmittelbar nach Abschluss des Vorgangs.
  • Weniger ungenutzte Rechenleistung. Keine „Zombie“-Executoren wurden nur dafür am Leben gehalten, ihren lokalen Shuffle zu bedienen.
  • Keine Datenträgerüberbereitstellung. Große Shuffle-Vorgänge werden in Blobspeicher ausgelagert, anstatt große lokale Datenträger zu erfordern.
  • Kosten für gebundenen Speicherplatz. Der Ausweichspeicher wird automatisch bereinigt, wenn Blöcke nicht mehr benötigt werden.

Stabilere Aufträge

Wenn "Shuffle"-Daten nur auf einem lokalen Datenträger gespeichert sind, bedeutet ein Ausführenderabsturz, dass Daten nicht mehr vorhanden sind und Spark sie erneut kompensieren muss. Bei effizienter Skalierung befinden sich Daten entweder bereits im Blob-Speicher oder werden dort migriert, bevor der Executor weggeht.

Scenario Ohne effiziente Herunterskalierung Mit effizienter Herunterskalierung
Executor stürzt ab Shuffle-Daten verloren; Stufen erneut ausgeführt Die Daten sind sicher im Speicher; keine Neukompilierung
Knotenverdrängung Daten verschwunden, teure Wiederholungen Daten bleiben erhalten; der Job läuft normal weiter.
Geordnete Außerbetriebnahme Shuffle beim Herunterfahren deaktiviert Zu Peer- oder Fallbackspeicher migrierte Blöcke
Netzwerkblips während des Abrufs Kaskadierend FetchFailedException Lesevorgänge werden aus dem Speicher gelesen, bleiben unbeeinflusst

Dies beseitigt die häufigste Ursache für FetchFailedException in der Produktion.

Schnellere, wirklich elastische Skalierung

Ohne effiziente Herunterskalierung kann der Autoscaler einen Knoten nicht freigeben, solange ein Executor darauf noch Shuffle-Daten oder zwischengespeicherte Daten hält. Effiziente Skalierung entkoppelt beide:

  • Shuffle-Daten befinden sich im Blobspeicher (oder werden beim Herunterfahren dorthin migriert).
  • Der Cache bindet Executoren nicht mehr. Reproduzierbare Caches wie der Delta snapshot cache sind vom Scale-down-Schutz ausgenommen.

Der Autoscaler kann inaktive Knoten beliebig entfernen und die Clustergröße als Reaktion auf Änderungen der Arbeitslast anpassen.

Bessere Leistung bei schiefen und großen Shuffles

AQE Shuffle Write ermöglicht es Adaptive Query Execution, den Shuffle-Schreibvorgang selbst zu gestalten: Es wählt eine Partitionierung, die nachgelagertes AQE ohne erneute Zusammenführung nutzt, und erzeugt weniger, besser dimensionierte Blöcke für den Remote-Speicher. In Kombination mit der Entscheidungsebene erhalten Sie eine schnellere Wall-Clock-Zeit für große/schiefe Abfragen und eine unveränderte Latenz für kleine Abfragen.

Erste Schritte

Wenden Sie diese Konfiguration an, um den vollständigen effizienten Scaledown-Stack zu aktivieren:

# Remote Shuffle Manager
spark.conf.set("spark.remote.shuffle.enabled", "true")

# Decision Layer — per-stage routing of local vs. remote shuffle
spark.conf.set("spark.sql.rsm.decisionlayer.enabled.level", "stage")

# AQE participates in shuffle write
spark.conf.set("spark.sql.adaptive.shuffleWrite.enabled", "true")

# Shuffle Migration on executor decommission
spark.conf.set("spark.storage.decommission.shuffleBlocks.enabled", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.cleanup", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.migrateToFallbackStorage", "true")
spark.conf.set("spark.storage.decommission.fallbackStorage.cleanUp", "true")

Es sind keine Codeänderungen erforderlich. Sie können diese auch in Ihren Spark-Eigenschaften in Ihrer Umgebung festlegen.

Konfigurationsreferenz

Remote Shuffle Manager (RSM)

Setting Empfohlen Was sie steuert
spark.remote.shuffle.enabled true Aktiviert die effiziente Herunterskalierung. Shuffle-Daten werden in Azure Blob Storage statt auf den lokalen Datenträgern des Executors gespeichert.

Entscheidungsebene

Setting Empfohlen Was sie steuert
spark.sql.rsm.decisionlayer.enabled.level stage Granularität, bei der die Decision Layer-Routen shuffle. stage wertet jede Spark-Phase unabhängig aus.

AQE Shuffle Write

Setting Empfohlen Was sie steuert
spark.sql.adaptive.shuffleWrite.enabled true Ermöglicht AQE die Teilnahme an der Shuffle-Schreibphase. Erzeugt eine Partitionierung, die von nachgeschaltetem AQE ohne erneute Zusammenführung genutzt wird.

Note

AQE selbst (spark.sql.adaptive.enabled) muss aktiviert sein. Sie ist standardmäßig in Fabric Spark aktiviert.

Shuffle-Migration bei der Außerbetriebnahme

Setting Empfohlen Was sie steuert
spark.storage.decommission.shuffleBlocks.enabled true Migriert Shuffle-Blöcke von einem Executor, der außer Betrieb genommen wird, anstatt sie zu verwerfen.
spark.storage.decommission.shuffleBlocks.cleanup true Bereinigt Shuffle-Blöcke auf dem Quell-Executor nach einer erfolgreichen Migration.
spark.storage.decommission.shuffleBlocks.migrateToFallbackStorage true Wenn kein Peer-Executor die Blöcke akzeptieren kann, werden sie in den Fallbackspeicher (Azure Blob Storage) migriert.
spark.storage.decommission.fallbackStorage.cleanUp true Entfernt Shuffle-Blöcke aus dem Fallbackspeicher, sobald sie nicht mehr benötigt werden, wodurch die Speicherkosten begrenzt werden.

Cache-bewusste dynamische Speicherzuweisung

Setting Empfohlen Was sie steuert
spark.dynamicAllocation.preventShutdownExecutorWithCache false Ermöglicht die dynamische Zuordnung zum Freigeben von Executoren auch dann, wenn sie zwischengespeicherte Blöcke enthalten.
spark.dynamicAllocation.excludeDeltaSnapshotCache true Ignoriert den Delta-Snapshot-Cache bei der Entscheidung, ob ein Executor noch einen nützlichen Cache hält. Der Delta-Momentaufnahmecache ist reproduzierbar und sollte keine Skalierung blockieren.

Erweiterte Optimierung (RSM)

Die meisten Benutzer müssen diese Standardwerte nicht ändern.

Schreibleistung

Setting Vorgabe Was sie steuert
spark.remote.shuffle.partition.buffersize 16777216 (16 MB) Puffer pro Partition vor dem Schreiben in den Speicher.
spark.remote.shuffle.blocksize 8388608 (8 MB) Größe einzelner Blöcke, die in Blob Storage hochgeladen wurden.
spark.remote.shuffle.write.maxthreads cores × 16 Maximale Anzahl von Threads zum Schreiben von Shuffle-Daten.
spark.remote.shuffle.write.maxtasks 16384 Maximale Anzahl gleichzeitiger Schreibvorgänge.

Leseleistung

Setting Vorgabe Was sie steuert
spark.remote.shuffle.read.parallel.enabled true Parallele Download-Streams beim Shuffle-Lesen.
spark.remote.shuffle.read.parallelism 4 Paralleler Downloaddatenstrom pro Aufgabe.
spark.remote.shuffle.read.prefetchqueuesize 250 Prefetch-Warteschlangentiefe während der Lesevorgänge.
spark.remote.shuffle.read.maxthreads cores × 4 Maximale Anzahl von Threads, die zum Lesen verwendet werden.

Zuverlässigkeit

Setting Vorgabe Was sie steuert
spark.remote.shuffle.retries 5 Wiederholen Sie Versuche bei vorübergehenden Speicherfehlern.
spark.remote.shuffle.retrydelayms 800 Anfängliche Wartezeit zwischen Wiederholungsversuchen.
spark.remote.shuffle.retrymaxdelayms 60000 Rückanfangskappe.

Komprimierung

Setting Vorgabe Was sie steuert
spark.remote.shuffle.compression Benutzungen spark.io.compression.codec Komprimierungsformat für Remote-Shuffle-Daten (z. B. lz4, zstd).

Leistungsergebnisse

Diagramm, das die Einsparungen bei den Rechenkosten bei aktiviertem gegenüber deaktiviertem effizientem Herunterskalieren in einem TPC-DS-Benchmark zeigt und eine Kostenreduzierung von 54 Prozent veranschaulicht.

Kosteneinsparungen berechnen (TPC-DS Benchmark)

Metric Ohne effiziente Herunterskalierung Mit effizienter Herunterskalierung
Berechnung insgesamt (VM-Minutes) 14,952 6,880
Kostensenkung 54%

Die gesamte Auftragslaufzeit kann länger sein (Autoscale verwendet weniger gleichzeitige Executors), aber die abgerechnete Rechenleistung wird um mehr als die Hälfte reduziert.

Decision Layer Leistung (TPC-DS, RSM aktiviert)

Das Umleiten kleiner Shuffles auf eine lokale Festplatte und nur großer Shuffles auf Remote-Speicher ermöglicht gegenüber dem vollständigen Remote-Routing aller Shuffles bei gleichem Scale-down-Vorteil eine um bis zu 57 % kürzere Laufzeit.

Einschränkungen

  • NEE erforderlich. Effizientes Herunterskalieren hängt von der Native Execution Engine ab.
  • Nur Azure Blob Storage. Standard BlockBlobStorage mit deaktiviertem HNS. Azure Data Lake Gen2 / Konten mit aktiviertem HNS werden nicht als Remote-Shuffle-Speicher unterstützt.
  • Wird mit Azure Private Link nicht unterstützt. Umgebungen, die private Linknetzwerke verwenden, sind derzeit nicht kompatibel.
  • Die Granularität der Entscheidungsebene erfolgt derzeit pro Stufe. Routing pro Aufgabe oder pro Partition ist nicht vorgesehen.
  • Änderung des Cacheverhaltens. Mit preventShutdownExecutorWithCache=false können Executors, die cache()/persist()-Daten vorhalten, herunterskaliert werden. Workloads, die stark vom lokalen Executor-Cache für Hot Data abhängen, sollten überprüft werden.