対象:✅ ファブリックデータエンジニアリングおよびデータサイエンス
効率的なスケールダウンは、Spark シャッフル データを Executor の有効期間から切り離す、Microsoft Fabric Spark の機能です。 ローカルの Executor ディスクにシャッフル出力をピン留めする代わりに、Fabric Spark はデータのシャッフルをAzure Blob Storageにルーティング (または必要に応じて移行) し、アダプティブ クエリ実行 (AQE) が書き込み自体を形成できるようにします。 その結果、クエリ、ノートブック、またはパイプラインに変更を加えず、クラスターのスケールダウンの高速化、コンピューティング コストの削減、回復性の高いジョブが実現します。
Overview
効率的なスケールダウンは、次の 4 つの連携機能から構築されます。
| 能力 | それが何をするか |
|---|---|
| リモート シャッフル マネージャー (RSM) | Executor ローカル ディスクではなく、Azure Blob Storageにシャッフル データを書き込んで読み取ります。 |
| シャッフル移行 | 停止する前に、実行プログラムを削除するのではなく、シャッフル ブロックを Executor から移動します。 |
| デシジョン レイヤー | 小さなシャッフルをローカルに保持し、大きなシャッフルをリモート ストレージにオフロードするステージごとのランタイム ルーティング。 |
| AQE Shuffle Write | アダプティブ クエリ実行をシャッフル書き込みフェーズに参加させることができるので、パーティション分割が初めて適切になります。 |
Prerequisites
- ネイティブ実行エンジン (NEE) を有効にする必要があります。
- 自動スケールが有効 (推奨)。 効率的なスケールダウンは、以下の Spark 構成を使用した自動スケールなしでも機能します。
- ランタイム 1.3 (Apache Spark 3.5) 以降。
どのように機能するのか
Spark はクエリを処理するときに、多くの場合、ステージ間 ( シャッフル) 間でデータを再配布します。 通常、シャッフル データは各 Executor のローカル ディスクに格納され、Executor はそのデータに結び付けられます。 すべてのコンシューマーが読み取りを完了するまで、リリースすることはできません。 この結合は、クラスターが迅速にスケールダウンできない最大の理由の 1 つであり、Executor を失うと高コストのステージ再試行が発生する理由です。
効率的なスケールダウンにより、この結び付きは解消されます。
- Large shufflesリモート シャッフル マネージャーを介して直接Azure Blob Storageに移動します。
- 小さなシャッフルは、高速化のためローカルディスク上に保持されます。 そのエグゼキューターを後で解放する必要が生じた場合、Shuffle Migration はバックグラウンドでブロックをピアまたはフォールバックストレージに移動します。
- デシジョン レイヤーは、実行時にステージごとに適切なパスを選択します。
- AQE シャッフル書き込み により、ダウンストリーム AQE が再結合せずに消費するパーティション分割がライターによって生成され、無駄な I/O が回避されます。
┌───────────────────────────┐
Query ───► │ AQE + Decision Layer │ per-stage choice
└─────────────┬─────────────┘
│
┌─────────────▼─────────────┐
│ AQE Shuffle Write │ partition-aware writer
└─────┬─────────────────┬───┘
│ │
local ▼ ▼ remote
┌────────────────────┐ ┌──────────────────┐
│ Local disk + │ │ RSM → Azure │
│ Shuffle Migration │ │ Blob Storage │
└─────────┬──────────┘ └─────────┬────────┘
│ on decommission │
▼ ▼
fallback storage Remote shuffle store
スマート ルーティング (デシジョン レイヤー)
デシジョン レイヤーは、各シャッフル交換を評価し、次の決定を行います。
- 大規模なシャッフル → Azure Blob Storage 最大スケールダウンとフォールト トレランスの利点。
- 小規模なシャッフル → ローカル ディスク 小さな転送に対するクラウド I/O オーバーヘッドはありません。 Executor が後で使用を停止した場合、Shuffle Migration が引き継ぎます。
ルーティングは自動であり、ユーザー入力は必要ありません。 推奨される粒度はステージ単位です。
主な利点
コストの削減: 使用するコンピューティングに対してのみ支払う
効率的なスケールダウンにより、Executor は作業が完了するとすぐにリリースされます。 下流タスクが後で読み取る可能性のあるシャッフルデータを保持したまま、何もせず遊休状態になることはなくなります。
- より高速なスケールダウン。 自動スケールでは、タスクの完了後すぐにノードが削除されます。
- アイドル状態のコンピューティングを削減。 ローカルシャッフルを提供するためだけに生かされ続ける「ゾンビ」エグゼキューターはありません。
- ディスクオーバープロビジョニングなし。 大規模なシャッフル処理は、大容量のローカル ディスクを必要とせず、Blob Storage に格納されます。
- 有界ストレージ コスト。 フォールバック ストレージは、ブロックが不要になったときに自動的にクリーンアップされます。
回復性の高いジョブ
シャッフル データがローカル ディスクにのみ存在する場合、Executor のクラッシュはデータがなくなったことを意味し、Spark はそれを再計算する必要があります。 効率的なスケールダウンにより、データは既に BLOB ストレージに格納されているか、Executor が終了する前にそこに移行されます。
| Scenario | 効率的なスケールダウンなし | 効率的なスケールダウンで |
|---|---|---|
| Executor がクラッシュする | 失われたデータをシャッフルする。ステージの再実行 | データはストレージ内で安全です。再計算なし |
| ノードのプリエンプション | データがなくなった、コストの高い再試行 | データは保持される。ジョブは通常どおり継続される |
| 正常停止 | シャットダウン時にシャッフルが解除された | ピアまたはフォールバック ストレージに移行されたブロック |
| フェッチ中のネットワークの一時的な不具合 | カスケーディング FetchFailedException |
読み取りはストレージから行われ、影響を受けません |
これにより、運用環境での FetchFailedException の最も一般的な原因がなくなります。
迅速で、本当に柔軟なスケーリング
効率的なスケールダウンがないと、オートスケーラーはノードを再利用できませんが、その上の実行プログラムはシャッフル データまたはキャッシュされたデータを保持します。 効率的なスケールダウンでは、その両方を切り離せます:
- シャッフル データは BLOB ストレージ内にあります (または、シャットダウン時にそこに移行されます)。
- キャッシュはエグゼキューターを固定しなくなりました。 Delta スナップショットキャッシュなどの再生成可能なキャッシュは、スケールダウン保護の対象外です。
自動スケーラーは、ワークロードの変更に応じて、アイドル状態のノードを自由に削除し、クラスターのサイズを変更できます。
偏りのあるシャッフルや大規模なシャッフルでのパフォーマンス向上
AQE Shuffle Write を使用すると、Adaptive Query Execution がシャッフル書き込み自体を最適化できるようになります。具体的には、後続の AQE が再統合することなく利用できるパーティション分割を選択し、リモートストレージ向けに、より少数で適切なサイズのブロックを生成します。 デシジョン レイヤーと組み合わせることで、大規模なクエリや傾斜したクエリでのウォールクロック時間が短縮され、小さなクエリの待機時間は変更されません。
概要
推奨される構成
この構成を適用して、完全に効率的なスケールダウン スタックを有効にします。
# Remote Shuffle Manager
spark.conf.set("spark.remote.shuffle.enabled", "true")
# Decision Layer — per-stage routing of local vs. remote shuffle
spark.conf.set("spark.sql.rsm.decisionlayer.enabled.level", "stage")
# AQE participates in shuffle write
spark.conf.set("spark.sql.adaptive.shuffleWrite.enabled", "true")
# Shuffle Migration on executor decommission
spark.conf.set("spark.storage.decommission.shuffleBlocks.enabled", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.cleanup", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.migrateToFallbackStorage", "true")
spark.conf.set("spark.storage.decommission.fallbackStorage.cleanUp", "true")
コードに変更を加える必要はありません。 これらは、環境の Spark プロパティでも設定できます。
構成参照
リモート シャッフル マネージャー (RSM)
| Setting | 推奨 | 制御する内容 |
|---|---|---|
spark.remote.shuffle.enabled |
true |
効率的なスケールダウンをオンにします。 シャッフル データは、Executor ローカル ディスクではなくAzure Blob Storageに行きます。 |
デシジョン レイヤー
| Setting | 推奨 | 制御する内容 |
|---|---|---|
spark.sql.rsm.decisionlayer.enabled.level |
stage |
デシジョン レイヤーがシャッフルをルーティングする粒度。
stage では、各 Spark ステージが個別に評価されます。 |
AQE シャッフル書き込み
| Setting | 推奨 | 制御する内容 |
|---|---|---|
spark.sql.adaptive.shuffleWrite.enabled |
true |
AQE がシャッフル書き込みフェーズに参加できるようにします。 下流のAQEが再集約することなく利用できるパーティション分割を生成します。 |
Note
AQE 自体 (spark.sql.adaptive.enabled) がオンになっている必要があります。 Fabric Spark では既定でオンになっています。
使用停止時のシャッフル移行
| Setting | 推奨 | 制御する内容 |
|---|---|---|
spark.storage.decommission.shuffleBlocks.enabled |
true |
シャッフル ブロックを削除するのではなく、使用停止中の Executor からシャッフル ブロックを移行します。 |
spark.storage.decommission.shuffleBlocks.cleanup |
true |
移行が成功した後、ソース Executor のシャッフル ブロックをクリーンアップします。 |
spark.storage.decommission.shuffleBlocks.migrateToFallbackStorage |
true |
ピア Executor がブロックを受け入れられない場合は、フォールバック ストレージ (Azure Blob Storage) に移行します。 |
spark.storage.decommission.fallbackStorage.cleanUp |
true |
必要なくなったフォールバック ストレージからシャッフル ブロックを削除し、ストレージ コストを制限します。 |
キャッシュ対応の動的割り当て
| Setting | 推奨 | 制御する内容 |
|---|---|---|
spark.dynamicAllocation.preventShutdownExecutorWithCache |
false |
キャッシュされたブロックを保持している場合でも、Executor を解放する動的割り当てを許可します。 |
spark.dynamicAllocation.excludeDeltaSnapshotCache |
true |
エグゼキューターがまだ有効なキャッシュを保持しているかどうかを判断する際に、Delta スナップショット キャッシュを無視します。 差分スナップショットキャッシュは再現性があり、スケールダウンを妨げるべきではありません。 |
高度なチューニング (RSM)
ほとんどのユーザーは、これらの既定値を変更する必要はありません。
書き込み性能
| Setting | デフォルト | 制御する内容 |
|---|---|---|
spark.remote.shuffle.partition.buffersize |
16777216 (16 MB) |
ストレージに書き込む前のパーティションごとのバッファー。 |
spark.remote.shuffle.blocksize |
8388608 (8 MB) |
Blob Storageにアップロードされた個々のブロックのサイズ。 |
spark.remote.shuffle.write.maxthreads |
cores × 16 |
シャッフル データの書き込みに使用される最大スレッド数。 |
spark.remote.shuffle.write.maxtasks |
16384 |
最大同時書き込み操作数。 |
読み取り性能
| Setting | デフォルト | 制御する内容 |
|---|---|---|
spark.remote.shuffle.read.parallel.enabled |
true |
シャッフル読み込み用の並列ダウンロードストリーム。 |
spark.remote.shuffle.read.parallelism |
4 |
タスクごとの並列ダウンロード ストリーム。 |
spark.remote.shuffle.read.prefetchqueuesize |
250 |
読み取り中にキューの深さをプリフェッチします。 |
spark.remote.shuffle.read.maxthreads |
cores × 4 |
読み取りに使用される最大スレッド数。 |
Reliability
| Setting | デフォルト | 制御する内容 |
|---|---|---|
spark.remote.shuffle.retries |
5 |
一時的なストレージ エラーに対する再試行。 |
spark.remote.shuffle.retrydelayms |
800 |
再試行間の初期バックオフ時間。 |
spark.remote.shuffle.retrymaxdelayms |
60000 |
バックオフの上限。 |
Compression
| Setting | デフォルト | 制御する内容 |
|---|---|---|
spark.remote.shuffle.compression |
用途 spark.io.compression.codec |
リモート シャッフル データの圧縮形式 ( lz4、 zstdなど)。 |
パフォーマンスの結果
コンピューティング コストの削減 (TPC-DS ベンチマーク)
| Metric | 効率的なスケールダウンなし | 効率的なスケールダウンで |
|---|---|---|
| Total Compute (VM-Minutes) | 14,952 | 6,880 |
| コスト削減 | — | 54% |
ジョブの総実行時間は長くなる可能性があります(自動スケールでは同時実行エグゼキューター数が少なくなるため)が、課金対象のコンピュートは半分以上削減されます。
Decision Layer のパフォーマンス (TPC-DS, RSM 有効)
小さなシャッフルをローカル ディスクにルーティングし、大きなシャッフルのみをリモート ストレージにルーティングすると、最大 57% ランタイムが向上 し、すべてのシャッフルがリモートでルーティングされるのに対して、同じスケールダウンの利点があります。
制限事項
- NEE が必要です。 効率的なスケールダウンは、ネイティブ実行エンジンによって異なります。
- Azure Blob Storageのみ。 Standard
BlockBlobStorage(HNS は無効)。 Azure Data Lake Gen2/HNS 対応アカウントは、リモート シャッフル ストアとしてサポートされていません。 - Azure Private Linkではサポートされていません。 プライベート リンク ネットワークを使用する環境には、現在互換性がありません。
- デシジョン レイヤーの細分性 は、現在ステージごとに行われます。 タスクごとまたはパーティションごとのルーティングはスコープ内にありません。
- キャッシュ動作の変更。
preventShutdownExecutorWithCache=falseでは、cache()/persist()データを保持している Executor がスケールダウンされる可能性があります。 ホット データの Executor ローカル キャッシュに大きく依存するワークロードは検証する必要があります。