Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Dotyczy:✅ Inżynieria danych sieci szkieletowej i nauka o danych
Wydajne skalowanie w dół to funkcja w Microsoft Fabric Spark, która oddziela dane shuffle w Spark od cyklu życia egzekutora. Zamiast zapisywać dane wyjściowe operacji shuffle na lokalnych dyskach executorów, usługa Fabric Spark kieruje dane shuffle do usługi Azure Blob Storage (lub w razie potrzeby migruje je tam) i pozwala funkcji Adaptive Query Execution (AQE) kształtować sam zapis. Rezultatem jest szybsze skalowanie klastra, niższe koszty obliczeń i bardziej odporne zadania — bez zmian w zapytaniach, notesach lub potokach.
Overview
Wydajne skalowanie w dół opiera się na czterech współdziałających funkcjach:
| Capability | Do czego służy |
|---|---|
| Remote Shuffle Manager (RSM) | Zapisuje i odczytuje dane przetasowywania w usłudze Azure Blob Storage zamiast na dyskach lokalnych egzekutorów. |
| Migracja losowa | Przenosi bloki danych tasowania z egzekutora przed wycofaniem go z eksploatacji, zamiast je usuwać. |
| Warstwa decyzyjna | Routing środowiska uruchomieniowego dla etapu, który utrzymuje małe przetasowania lokalne i odciąża duże przetasowania do magazynu zdalnego. |
| AQE Shuffle Write | Pozwala adaptacyjnemu wykonywaniu zapytań uczestniczyć w fazie zapisu shuffle, dzięki czemu partycjonowanie jest poprawne już za pierwszym razem. |
Wymagania wstępne
- Należy włączyć natywny aparat wykonywania (NEE).
- Włączone automatyczne skalowanie (zalecane). Wydajne skalowanie w dół działa również bez automatycznego skalowania dzięki poniższym konfiguracjom platformy Spark.
- Środowisko uruchomieniowe 1.3 (Apache Spark 3.5) lub nowsze.
Jak to działa
Gdy Spark przetwarza zapytanie, często redystrybuuje dane między etapami — shuffle. Zwykle dane przetasowania są przechowywane na dysku lokalnym każdego executora, co wiąże executory z tymi danymi. Nie można ich wydać, dopóki każdy konsument nie zakończy czytania. To sprzężenie jest głównym powodem, dla którego klastry nie mogą szybko skalować się w dół i dlaczego utrata egzekutora powoduje kosztowne ponowne uruchamianie etapów.
Efektywne skalowanie w dół przerywa to sprzężenie:
- Duże przetasowania są kierowane bezpośrednio do usługi Azure Blob Storage przez Remote Shuffle Manager.
- Małe operacje tasowania są przechowywane na dysku lokalnym dla zwiększenia wydajności. Jeśli ich funkcja wykonawcza będzie później potrzebna, funkcja Shuffle Migration przenosi bloki do elementów równorzędnych lub rezerwowego magazynu w tle.
- Warstwa decyzyjna wybiera właściwą ścieżkę dla każdego etapu w czasie wykonywania.
- AQE Shuffle Write zapewnia, że mechanizm zapisu tworzy taki układ partycji, który AQE na dalszym etapie wykorzystuje bez ponownego scalania partycji, co pozwala uniknąć zbędnych operacji we/wy.
┌───────────────────────────┐
Query ───► │ AQE + Decision Layer │ per-stage choice
└─────────────┬─────────────┘
│
┌─────────────▼─────────────┐
│ AQE Shuffle Write │ partition-aware writer
└─────┬─────────────────┬───┘
│ │
local ▼ ▼ remote
┌────────────────────┐ ┌──────────────────┐
│ Local disk + │ │ RSM → Azure │
│ Shuffle Migration │ │ Blob Storage │
└─────────┬──────────┘ └─────────┬────────┘
│ on decommission │
▼ ▼
fallback storage Remote shuffle store
Inteligentne trasowanie (warstwa decyzyjna)
Warstwa decyzyjna ocenia każdą operację shuffle i decyduje:
- Duże tasowania → Azure Blob Storage. Maksymalne korzyści ze skalowania w dół i odporności na awarie.
- Małe przetasowanie → dysku lokalnego. Brak obciążenia związanego z we/wy w chmurze w przypadku niewielkich transferów. Jeśli egzekutor zostanie później wycofany z eksploatacji, Shuffle Migration przejmie jego zadania.
Routing jest automatyczny i nie wymaga danych wejściowych użytkownika. Zalecany stopień szczegółowości to poziom każdego etapu.
Najważniejsze korzyści
Niższe koszty: płacisz tylko za używane zasoby obliczeniowe
Dzięki wydajnemu skalowaniu w dół egzekutory są zwalniane, gdy tylko zakończą pracę. Nie pozostają już bezczynnie, przetrzymując dane shuffle, które zadania podrzędne mogłyby później odczytać.
- Szybsze zmniejszanie skali. Automatyczne skalowanie usuwa węzły natychmiast po zakończeniu zadania.
- Mniej bezczynnych zasobów obliczeniowych. Żadne „zombie” egzekutory nie są utrzymywane przy życiu wyłącznie po to, aby obsługiwać ich lokalne dane shuffle.
- Brak nadmiernej aprowizacji dysku. Duże operacje przetasowywania danych trafiają do magazynu obiektowego blob, zamiast wymagać dużych dysków lokalnych.
- Koszt magazynu ograniczonego. Pamięć zapasowa jest automatycznie czyszczona, gdy bloki nie są już potrzebne.
Bardziej odporne zadania
Gdy dane shuffle znajdują się wyłącznie na dysku lokalnym, awaria egzekutora oznacza, że te dane zostają utracone i Spark musi je ponownie obliczyć. W przypadku efektywnego zmniejszania skali dane albo już znajdują się w magazynie obiektów blob, albo zostają tam przeniesione, zanim moduł wykonawczy przestanie działać.
| Scenario | Bez wydajnego skalowania w dół | Dzięki wydajnemu skalowaniu w dół |
|---|---|---|
| Funkcja wykonawcza ulega awarii | Utracono dane przetasowania; etapy wykonano ponownie | Dane są bezpieczne w magazynie; brak ponownej kompilacji |
| Preempcja węzła | Utracone dane, kosztowne ponowienia | Dane przetrwają; zadanie jest kontynuowane normalnie |
| Kontrolowane wycofanie z eksploatacji | Shuffle spadła po zamknięciu | Bloki zmigrowane do równorzędnej lub zapasowej pamięci masowej |
| Zakłócenia sieci podczas pobierania | Kaskadowe FetchFailedException |
Odczyty pochodzą z pamięci masowej i pozostają bez wpływu |
Eliminuje to najczęstszą przyczynę FetchFailedException w środowisku produkcyjnym.
Szybsze, naprawdę elastyczne skalowanie
Bez efektywnego zmniejszania skali autoskaler nie może zwolnić węzła, jeśli jakikolwiek executor na nim nadal przechowuje dane shuffle lub dane w pamięci podręcznej. Efektywne skalowanie w dół rozdziela oba elementy:
- Przetasowane dane znajdują się w magazynie obiektów Blob (lub są tam przenoszone podczas zamykania).
- Pamięć podręczna nie przypina już funkcji wykonawczych. Pamięci podręczne możliwe do odtworzenia, takie jak Delta Snapshot Cache, nie są objęte ochroną przed zmniejszaniem skali.
Autoskalator może swobodnie usuwać bezczynne węzły i zmieniać rozmiar klastra w odpowiedzi na zmiany obciążenia.
Lepsza wydajność przy nierównomiernym rozkładzie danych i dużych operacjach shuffle
Funkcja AQE Shuffle Write pozwala mechanizmowi Adaptive Query Execution (AQE) kształtować sam zapis shuffle — wybierać partycjonowanie wykorzystywane przez kolejne etapy AQE bez ponownego scalania partycji oraz tworzyć mniej bloków, ale w lepiej dobranym rozmiarze, na potrzeby zdalnej pamięci masowej. W połączeniu z warstwą decyzyjną zyskujesz krótszy czas wykonania w przypadku dużych zapytań lub zapytań o nierównomiernym rozkładzie oraz niezmienione opóźnienia w przypadku małych.
Wprowadzenie
Zalecana konfiguracja
Zastosuj tę konfigurację, aby włączyć pełny, wydajny stos do skalowania w dół:
# Remote Shuffle Manager
spark.conf.set("spark.remote.shuffle.enabled", "true")
# Decision Layer — per-stage routing of local vs. remote shuffle
spark.conf.set("spark.sql.rsm.decisionlayer.enabled.level", "stage")
# AQE participates in shuffle write
spark.conf.set("spark.sql.adaptive.shuffleWrite.enabled", "true")
# Shuffle Migration on executor decommission
spark.conf.set("spark.storage.decommission.shuffleBlocks.enabled", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.cleanup", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.migrateToFallbackStorage", "true")
spark.conf.set("spark.storage.decommission.fallbackStorage.cleanUp", "true")
Nie są wymagane żadne zmiany kodu. Można je również ustawić we właściwościach Spark środowiska.
Dokumentacja konfiguracji
Remote Shuffle Manager (RSM)
| Setting | Zalecane | Co kontroluje |
|---|---|---|
spark.remote.shuffle.enabled |
true |
Włącza wydajne skalowanie w dół. Dane tasowania są zapisywane w Azure Blob Storage zamiast na lokalnych dyskach egzekutora. |
Warstwa decyzyjna
| Setting | Zalecane | Co kontroluje |
|---|---|---|
spark.sql.rsm.decisionlayer.enabled.level |
stage |
Poziom szczegółowości, na którym warstwa decyzyjna kieruje operacje shuffle.
stage ocenia każdy etap platformy Spark niezależnie. |
AQE Shuffle Write
| Setting | Zalecane | Co kontroluje |
|---|---|---|
spark.sql.adaptive.shuffleWrite.enabled |
true |
Pozwala AQE uczestniczyć w fazie zapisu danych shuffle. Tworzy partycjonowanie, które AQE na dalszym etapie wykorzystuje bez ponownego scalania. |
Uwaga / Notatka
Samo AQE (spark.sql.adaptive.enabled) musi być włączone. Jest on domyślnie włączony w usłudze Fabric Spark.
Migracja shuffle podczas wycofywania z eksploatacji
| Setting | Zalecane | Co kontroluje |
|---|---|---|
spark.storage.decommission.shuffleBlocks.enabled |
true |
Migruje bloki shuffla z executora, który jest wycofywany z eksploatacji, zamiast je usuwać. |
spark.storage.decommission.shuffleBlocks.cleanup |
true |
Czyści bloki shuffle na egzekutorze źródłowym po pomyślnej migracji. |
spark.storage.decommission.shuffleBlocks.migrateToFallbackStorage |
true |
Jeśli żaden egzekutor równorzędny nie może przyjąć bloków, są one migrowane do magazynu awaryjnego (Azure Blob Storage). |
spark.storage.decommission.fallbackStorage.cleanUp |
true |
Usuwa bloki shuffle z pamięci zapasowej, gdy nie są już potrzebne, ograniczając koszty pamięci masowej. |
Alokacja dynamiczna uwzględniająca pamięć podręczną
| Setting | Zalecane | Co kontroluje |
|---|---|---|
spark.dynamicAllocation.preventShutdownExecutorWithCache |
false |
Umożliwia dynamicznemu przydzielaniu zwalnianie executorów nawet wtedy, gdy przechowują zbuforowane bloki. |
spark.dynamicAllocation.excludeDeltaSnapshotCache |
true |
Ignoruje pamięć podręczną migawek Delta przy podejmowaniu decyzji, czy egzekutor nadal utrzymuje przydatne dane w pamięci podręcznej. Pamięć podręczna migawek różnicowych jest odtwarzalna i nie powinna blokować skalowania w dół. |
Zaawansowane dostrajanie (RSM)
Większość użytkowników nie musi zmieniać tych wartości domyślnych.
Wydajność zapisu
| Setting | Wartość domyślna | Co kontroluje |
|---|---|---|
spark.remote.shuffle.partition.buffersize |
16777216 (16 MB) |
Buforuj dla każdej partycji przed zapisaniem do pamięci masowej. |
spark.remote.shuffle.blocksize |
8388608 (8 MB) |
Rozmiar poszczególnych bloków przekazanych do Blob Storage. |
spark.remote.shuffle.write.maxthreads |
cores × 16 |
Maksymalna liczba wątków używanych do zapisywania danych shuffle. |
spark.remote.shuffle.write.maxtasks |
16384 |
Maksymalna liczba współbieżnych operacji zapisu. |
Wydajność odczytu
| Setting | Wartość domyślna | Co kontroluje |
|---|---|---|
spark.remote.shuffle.read.parallel.enabled |
true |
Równoległe strumienie pobierania dla odczytów shuffle. |
spark.remote.shuffle.read.parallelism |
4 |
Równoległe strumienie pobierania dla każdego zadania. |
spark.remote.shuffle.read.prefetchqueuesize |
250 |
Głębokość kolejki pobierania wstępnego podczas odczytu. |
spark.remote.shuffle.read.maxthreads |
cores × 4 |
Maksymalna liczba wątków używanych do odczytu. |
Reliability
| Setting | Wartość domyślna | Co kontroluje |
|---|---|---|
spark.remote.shuffle.retries |
5 |
Ponawianie prób w przypadku przejściowych błędów pamięci masowej. |
spark.remote.shuffle.retrydelayms |
800 |
Początkowe opóźnienie między ponownymi próbami. |
spark.remote.shuffle.retrymaxdelayms |
60000 |
Limit wycofywania. |
Compression
| Setting | Wartość domyślna | Co kontroluje |
|---|---|---|
spark.remote.shuffle.compression |
Używa spark.io.compression.codec |
Format kompresji zdalnych danych shuffle (na przykład lz4, zstd). |
Wyniki wydajności
Oszczędności kosztów obliczeniowych (benchmark TPC-DS)
| Metric | Bez wydajnego skalowania w dół | Dzięki wydajnemu skalowaniu w dół |
|---|---|---|
| Total Compute (VM-Minutes) | 14,952 | 6,880 |
| Obniżenie kosztów | — | 54% |
Łączny czas wykonywania zadania może być dłuższy (skalowanie automatyczne wykorzystuje mniej współbieżnych egzekutorów), ale rozliczane zasoby obliczeniowe są zmniejszone o ponad połowę.
Wydajność warstwy decyzyjnej (TPC-DS, RSM włączony)
Kierowanie małych operacji shuffle na dysk lokalny, a tylko dużych operacji shuffle do zdalnej pamięci masowej, zapewnia nawet 57% poprawę czasu wykonywania w porównaniu z kierowaniem wszystkich operacji shuffle do pamięci zdalnej, przy zachowaniu tej samej korzyści wynikającej ze skalowania w dół.
Ograniczenia
- NEE jest wymagane. Efektywne skalowanie w dół zależy od Native Execution Engine.
- Tylko Azure Blob Storage. Standard
BlockBlobStoragez wyłączonym HNS. Konta Azure Data Lake Gen2 oraz konta z włączonym HNS nie są obsługiwane jako zdalny magazyn danych shuffle. - Nieobsługiwane w przypadku Azure Private Link. Środowiska korzystające z sieci łącza prywatnego nie są obecnie zgodne.
- Poziom szczegółowości warstwy decyzyjnej jest obecnie określany na poziomie etapu. Trasowanie na poziomie poszczególnych zadań lub partycji nie wchodzi w zakres.
- Zmiana zachowania pamięci podręcznej. W przypadku
preventShutdownExecutorWithCache=falseegzekutory przechowujące danecache()/persist()mogą być skalowane w dół. Zadania, które w dużym stopniu zależą od lokalnej pamięci podręcznej executora dla często używanych danych, należy zweryfikować.