Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Gilt für:✅ Fabric Data Engineering and Data Science
Effiziente Herunterskalierung ist eine Funktion in Microsoft Fabric Spark, die Spark-Shuffle-Daten von der Lebensdauer eines Executors entkoppelt. Anstatt die Shuffle-Ausgabe auf lokalen Executor-Datenträgern zu speichern, leitet Fabric Spark Shuffle-Daten an Azure Blob Storage weiter (oder migriert sie bei Bedarf dorthin) und überlässt die Gestaltung des Schreibvorgangs der Adaptive Query Execution (AQE) selbst. Das Ergebnis ist schnellere Clusterskalierung, niedrigere Computekosten und stabilere Aufträge – ohne Änderungen an Ihren Abfragen, Notizbüchern oder Pipelines.
Overview
Effiziente Skalierung basiert auf vier kooperationsbezogenen Funktionen:
| Fähigkeit | Was es bewirkt |
|---|---|
| Remote Shuffle Manager (RSM) | Schreibt und liest Shuffle-Daten in Azure Blob Storage anstelle von lokalen Datenträgern. |
| Shuffle Migration | Verschiebt Shuffle-Blöcke von einem Executor, bevor dieser außer Betrieb genommen wird, anstatt sie zu verwerfen. |
| Entscheidungsebene | Laufzeitrouting pro Phase, das kleine Shuffles lokal hält und große Shuffles in externen Speicher auslagert. |
| AQE Shuffle Write | Sorgt dafür, dass Adaptive Query Execution bereits an der Shuffle-Schreibphase teilnimmt, sodass die Partitionierung von Anfang an korrekt ist. |
Voraussetzungen
- Die Native Execution Engine (NEE) muss aktiviert sein.
- Autoskalen aktiviert (empfohlen). Effiziente Skalierung funktioniert auch ohne automatische Skalierung über die folgenden Spark-Konfigurationen.
- Runtime 1.3 (Apache Spark 3.5) oder höher.
So funktioniert es
Wenn Spark eine Abfrage verarbeitet, verteilt sie häufig Daten zwischen Phasen – einem Shuffle. Normalerweise werden Shuffle-Daten auf dem lokalen Datenträger jedes Executors gespeichert, wodurch Executoren mit diesen Daten verknüpft werden. Sie können erst freigegeben werden, wenn jeder Verbraucher die Lektüre abgeschlossen hat. Diese Kopplung ist der einzige Hauptgrund, warum Cluster nicht schnell herunterskalieren können und warum der Verlust eines Ausführenden teure Wiederholungsversuche verursacht.
Effizientes Herunterskalieren durchbricht diese Kopplung:
- Große Shuffles werden über den Remote Shuffle Manager direkt in Azure Blob Storage übertragen.
- Kleine Mischvorgänge bleiben aus Geschwindigkeitsgründen auf dem lokalen Datenträger. Wenn ihr Executor später freigegeben werden muss, verschiebt shuffle Migration die Blöcke auf Peers oder auf Fallbackspeicher im Hintergrund.
- Die Entscheidungsebene wählt den richtigen Pfad pro Phase zur Laufzeit aus.
- AQE Shuffle Write stellt sicher, dass der Writer Partitionierung erzeugt, die nachgelagertes AQE verbraucht, ohne neu zusammenzukoppeln und verschwendete E/A zu vermeiden.
┌───────────────────────────┐
Query ───► │ AQE + Decision Layer │ per-stage choice
└─────────────┬─────────────┘
│
┌─────────────▼─────────────┐
│ AQE Shuffle Write │ partition-aware writer
└─────┬─────────────────┬───┘
│ │
local ▼ ▼ remote
┌────────────────────┐ ┌──────────────────┐
│ Local disk + │ │ RSM → Azure │
│ Shuffle Migration │ │ Blob Storage │
└─────────┬──────────┘ └─────────┬────────┘
│ on decommission │
▼ ▼
fallback storage Remote shuffle store
Intelligentes Routing (Entscheidungsebene)
Die Entscheidungsebene wertet jeden Shuffle-Austausch aus und entscheidet:
- Umfangreiche Shuffles → Azure Blob Storage. Maximale Skalierungs- und Fehlertoleranzvorteile.
- Kleine Shuffle-Vorgänge → auf den lokalen Datenträger. Kein Cloud-E/A-Aufwand für winzige Übertragungen. Wenn der Executor später stillgelegt wird, übernimmt Shuffle Migration.
Das Routing erfolgt automatisch und erfordert keine Benutzereingabe. Die empfohlene Granularität ist pro Stufe.
Hauptvorteile
Niedrigere Kosten: Bezahlen Sie nur für die von Ihnen verwendete Berechnung
Beim effizienten Herunterskalieren werden Executors freigegeben, sobald ihre Arbeit erledigt ist. Sie befinden sich nicht mehr im Leerlauf, in dem Shuffle-Daten enthalten sind, die nachgelagerte Aufgaben möglicherweise schließlich lesen.
- Schnellere Skalierung. Autoscale entfernt Knoten unmittelbar nach Abschluss des Vorgangs.
- Weniger ungenutzte Rechenleistung. Keine „Zombie“-Executoren wurden nur dafür am Leben gehalten, ihren lokalen Shuffle zu bedienen.
- Keine Datenträgerüberbereitstellung. Große Shuffle-Vorgänge werden in Blobspeicher ausgelagert, anstatt große lokale Datenträger zu erfordern.
- Kosten für gebundenen Speicherplatz. Der Ausweichspeicher wird automatisch bereinigt, wenn Blöcke nicht mehr benötigt werden.
Stabilere Aufträge
Wenn "Shuffle"-Daten nur auf einem lokalen Datenträger gespeichert sind, bedeutet ein Ausführenderabsturz, dass Daten nicht mehr vorhanden sind und Spark sie erneut kompensieren muss. Bei effizienter Skalierung befinden sich Daten entweder bereits im Blob-Speicher oder werden dort migriert, bevor der Executor weggeht.
| Scenario | Ohne effiziente Herunterskalierung | Mit effizienter Herunterskalierung |
|---|---|---|
| Executor stürzt ab | Shuffle-Daten verloren; Stufen erneut ausgeführt | Die Daten sind sicher im Speicher; keine Neukompilierung |
| Knotenverdrängung | Daten verschwunden, teure Wiederholungen | Daten bleiben erhalten; der Job läuft normal weiter. |
| Geordnete Außerbetriebnahme | Shuffle beim Herunterfahren deaktiviert | Zu Peer- oder Fallbackspeicher migrierte Blöcke |
| Netzwerkblips während des Abrufs | Kaskadierend FetchFailedException |
Lesevorgänge werden aus dem Speicher gelesen, bleiben unbeeinflusst |
Dies beseitigt die häufigste Ursache für FetchFailedException in der Produktion.
Schnellere, wirklich elastische Skalierung
Ohne effiziente Herunterskalierung kann der Autoscaler einen Knoten nicht freigeben, solange ein Executor darauf noch Shuffle-Daten oder zwischengespeicherte Daten hält. Effiziente Skalierung entkoppelt beide:
- Shuffle-Daten befinden sich im Blobspeicher (oder werden beim Herunterfahren dorthin migriert).
- Der Cache bindet Executoren nicht mehr. Reproduzierbare Caches wie der Delta snapshot cache sind vom Scale-down-Schutz ausgenommen.
Der Autoscaler kann inaktive Knoten beliebig entfernen und die Clustergröße als Reaktion auf Änderungen der Arbeitslast anpassen.
Bessere Leistung bei schiefen und großen Shuffles
AQE Shuffle Write ermöglicht es Adaptive Query Execution, den Shuffle-Schreibvorgang selbst zu gestalten: Es wählt eine Partitionierung, die nachgelagertes AQE ohne erneute Zusammenführung nutzt, und erzeugt weniger, besser dimensionierte Blöcke für den Remote-Speicher. In Kombination mit der Entscheidungsebene erhalten Sie eine schnellere Wall-Clock-Zeit für große/schiefe Abfragen und eine unveränderte Latenz für kleine Abfragen.
Erste Schritte
Empfohlene Konfiguration
Wenden Sie diese Konfiguration an, um den vollständigen effizienten Scaledown-Stack zu aktivieren:
# Remote Shuffle Manager
spark.conf.set("spark.remote.shuffle.enabled", "true")
# Decision Layer — per-stage routing of local vs. remote shuffle
spark.conf.set("spark.sql.rsm.decisionlayer.enabled.level", "stage")
# AQE participates in shuffle write
spark.conf.set("spark.sql.adaptive.shuffleWrite.enabled", "true")
# Shuffle Migration on executor decommission
spark.conf.set("spark.storage.decommission.shuffleBlocks.enabled", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.cleanup", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.migrateToFallbackStorage", "true")
spark.conf.set("spark.storage.decommission.fallbackStorage.cleanUp", "true")
Es sind keine Codeänderungen erforderlich. Sie können diese auch in Ihren Spark-Eigenschaften in Ihrer Umgebung festlegen.
Konfigurationsreferenz
Remote Shuffle Manager (RSM)
| Setting | Empfohlen | Was sie steuert |
|---|---|---|
spark.remote.shuffle.enabled |
true |
Aktiviert die effiziente Herunterskalierung. Shuffle-Daten werden in Azure Blob Storage statt auf den lokalen Datenträgern des Executors gespeichert. |
Entscheidungsebene
| Setting | Empfohlen | Was sie steuert |
|---|---|---|
spark.sql.rsm.decisionlayer.enabled.level |
stage |
Granularität, bei der die Decision Layer-Routen shuffle.
stage wertet jede Spark-Phase unabhängig aus. |
AQE Shuffle Write
| Setting | Empfohlen | Was sie steuert |
|---|---|---|
spark.sql.adaptive.shuffleWrite.enabled |
true |
Ermöglicht AQE die Teilnahme an der Shuffle-Schreibphase. Erzeugt eine Partitionierung, die von nachgeschaltetem AQE ohne erneute Zusammenführung genutzt wird. |
Note
AQE selbst (spark.sql.adaptive.enabled) muss aktiviert sein. Sie ist standardmäßig in Fabric Spark aktiviert.
Shuffle-Migration bei der Außerbetriebnahme
| Setting | Empfohlen | Was sie steuert |
|---|---|---|
spark.storage.decommission.shuffleBlocks.enabled |
true |
Migriert Shuffle-Blöcke von einem Executor, der außer Betrieb genommen wird, anstatt sie zu verwerfen. |
spark.storage.decommission.shuffleBlocks.cleanup |
true |
Bereinigt Shuffle-Blöcke auf dem Quell-Executor nach einer erfolgreichen Migration. |
spark.storage.decommission.shuffleBlocks.migrateToFallbackStorage |
true |
Wenn kein Peer-Executor die Blöcke akzeptieren kann, werden sie in den Fallbackspeicher (Azure Blob Storage) migriert. |
spark.storage.decommission.fallbackStorage.cleanUp |
true |
Entfernt Shuffle-Blöcke aus dem Fallbackspeicher, sobald sie nicht mehr benötigt werden, wodurch die Speicherkosten begrenzt werden. |
Cache-bewusste dynamische Speicherzuweisung
| Setting | Empfohlen | Was sie steuert |
|---|---|---|
spark.dynamicAllocation.preventShutdownExecutorWithCache |
false |
Ermöglicht die dynamische Zuordnung zum Freigeben von Executoren auch dann, wenn sie zwischengespeicherte Blöcke enthalten. |
spark.dynamicAllocation.excludeDeltaSnapshotCache |
true |
Ignoriert den Delta-Snapshot-Cache bei der Entscheidung, ob ein Executor noch einen nützlichen Cache hält. Der Delta-Momentaufnahmecache ist reproduzierbar und sollte keine Skalierung blockieren. |
Erweiterte Optimierung (RSM)
Die meisten Benutzer müssen diese Standardwerte nicht ändern.
Schreibleistung
| Setting | Vorgabe | Was sie steuert |
|---|---|---|
spark.remote.shuffle.partition.buffersize |
16777216 (16 MB) |
Puffer pro Partition vor dem Schreiben in den Speicher. |
spark.remote.shuffle.blocksize |
8388608 (8 MB) |
Größe einzelner Blöcke, die in Blob Storage hochgeladen wurden. |
spark.remote.shuffle.write.maxthreads |
cores × 16 |
Maximale Anzahl von Threads zum Schreiben von Shuffle-Daten. |
spark.remote.shuffle.write.maxtasks |
16384 |
Maximale Anzahl gleichzeitiger Schreibvorgänge. |
Leseleistung
| Setting | Vorgabe | Was sie steuert |
|---|---|---|
spark.remote.shuffle.read.parallel.enabled |
true |
Parallele Download-Streams beim Shuffle-Lesen. |
spark.remote.shuffle.read.parallelism |
4 |
Paralleler Downloaddatenstrom pro Aufgabe. |
spark.remote.shuffle.read.prefetchqueuesize |
250 |
Prefetch-Warteschlangentiefe während der Lesevorgänge. |
spark.remote.shuffle.read.maxthreads |
cores × 4 |
Maximale Anzahl von Threads, die zum Lesen verwendet werden. |
Zuverlässigkeit
| Setting | Vorgabe | Was sie steuert |
|---|---|---|
spark.remote.shuffle.retries |
5 |
Wiederholen Sie Versuche bei vorübergehenden Speicherfehlern. |
spark.remote.shuffle.retrydelayms |
800 |
Anfängliche Wartezeit zwischen Wiederholungsversuchen. |
spark.remote.shuffle.retrymaxdelayms |
60000 |
Rückanfangskappe. |
Komprimierung
| Setting | Vorgabe | Was sie steuert |
|---|---|---|
spark.remote.shuffle.compression |
Benutzungen spark.io.compression.codec |
Komprimierungsformat für Remote-Shuffle-Daten (z. B. lz4, zstd). |
Leistungsergebnisse
Kosteneinsparungen berechnen (TPC-DS Benchmark)
| Metric | Ohne effiziente Herunterskalierung | Mit effizienter Herunterskalierung |
|---|---|---|
| Berechnung insgesamt (VM-Minutes) | 14,952 | 6,880 |
| Kostensenkung | — | 54% |
Die gesamte Auftragslaufzeit kann länger sein (Autoscale verwendet weniger gleichzeitige Executors), aber die abgerechnete Rechenleistung wird um mehr als die Hälfte reduziert.
Decision Layer Leistung (TPC-DS, RSM aktiviert)
Das Umleiten kleiner Shuffles auf eine lokale Festplatte und nur großer Shuffles auf Remote-Speicher ermöglicht gegenüber dem vollständigen Remote-Routing aller Shuffles bei gleichem Scale-down-Vorteil eine um bis zu 57 % kürzere Laufzeit.
Einschränkungen
- NEE erforderlich. Effizientes Herunterskalieren hängt von der Native Execution Engine ab.
- Nur Azure Blob Storage. Standard
BlockBlobStoragemit deaktiviertem HNS. Azure Data Lake Gen2 / Konten mit aktiviertem HNS werden nicht als Remote-Shuffle-Speicher unterstützt. - Wird mit Azure Private Link nicht unterstützt. Umgebungen, die private Linknetzwerke verwenden, sind derzeit nicht kompatibel.
- Die Granularität der Entscheidungsebene erfolgt derzeit pro Stufe. Routing pro Aufgabe oder pro Partition ist nicht vorgesehen.
- Änderung des Cacheverhaltens. Mit
preventShutdownExecutorWithCache=falsekönnen Executors, diecache()/persist()-Daten vorhalten, herunterskaliert werden. Workloads, die stark vom lokalen Executor-Cache für Hot Data abhängen, sollten überprüft werden.