効率的なスケールダウンとリモート シャッフル マネージャー

対象:✅ ファブリックデータエンジニアリングおよびデータサイエンス

効率的なスケールダウンは、Spark シャッフル データを Executor の有効期間から切り離す、Microsoft Fabric Spark の機能です。 ローカルの Executor ディスクにシャッフル出力をピン留めする代わりに、Fabric Spark はデータのシャッフルをAzure Blob Storageにルーティング (または必要に応じて移行) し、アダプティブ クエリ実行 (AQE) が書き込み自体を形成できるようにします。 その結果、クエリ、ノートブック、またはパイプラインに変更を加えず、クラスターのスケールダウンの高速化、コンピューティング コストの削減、回復性の高いジョブが実現します。

Overview

効率的なスケールダウンは、次の 4 つの連携機能から構築されます。

能力 それが何をするか
リモート シャッフル マネージャー (RSM) Executor ローカル ディスクではなく、Azure Blob Storageにシャッフル データを書き込んで読み取ります。
シャッフル移行 停止する前に、実行プログラムを削除するのではなく、シャッフル ブロックを Executor から移動します。
デシジョン レイヤー 小さなシャッフルをローカルに保持し、大きなシャッフルをリモート ストレージにオフロードするステージごとのランタイム ルーティング。
AQE Shuffle Write アダプティブ クエリ実行をシャッフル書き込みフェーズに参加させることができるので、パーティション分割が初めて適切になります。

Prerequisites

  • ネイティブ実行エンジン (NEE) を有効にする必要があります。
  • 自動スケールが有効 (推奨)。 効率的なスケールダウンは、以下の Spark 構成を使用した自動スケールなしでも機能します。
  • ランタイム 1.3 (Apache Spark 3.5) 以降。

どのように機能するのか

Spark はクエリを処理するときに、多くの場合、ステージ間 ( シャッフル) 間でデータを再配布します。 通常、シャッフル データは各 Executor のローカル ディスクに格納され、Executor はそのデータに結び付けられます。 すべてのコンシューマーが読み取りを完了するまで、リリースすることはできません。 この結合は、クラスターが迅速にスケールダウンできない最大の理由の 1 つであり、Executor を失うと高コストのステージ再試行が発生する理由です。

効率的なスケールダウンにより、この結び付きは解消されます。

  • Large shufflesリモート シャッフル マネージャーを介して直接Azure Blob Storageに移動します。
  • 小さなシャッフルは、高速化のためローカルディスク上に保持されます。 そのエグゼキューターを後で解放する必要が生じた場合、Shuffle Migration はバックグラウンドでブロックをピアまたはフォールバックストレージに移動します。
  • デシジョン レイヤーは、実行時にステージごとに適切なパスを選択します。
  • AQE シャッフル書き込み により、ダウンストリーム AQE が再結合せずに消費するパーティション分割がライターによって生成され、無駄な I/O が回避されます。
                ┌───────────────────────────┐
   Query  ───►  │   AQE + Decision Layer    │   per-stage choice
                └─────────────┬─────────────┘
                              │
                ┌─────────────▼─────────────┐
                │   AQE Shuffle Write       │   partition-aware writer
                └─────┬─────────────────┬───┘
                      │                 │
              local   ▼                 ▼   remote
        ┌────────────────────┐   ┌──────────────────┐
        │  Local disk +      │   │  RSM → Azure     │
        │  Shuffle Migration │   │  Blob Storage    │
        └─────────┬──────────┘   └─────────┬────────┘
                  │ on decommission        │
                  ▼                        ▼
        fallback storage   Remote shuffle store

スマート ルーティング (デシジョン レイヤー)

デシジョン レイヤーは、各シャッフル交換を評価し、次の決定を行います。

  • 大規模なシャッフル → Azure Blob Storage 最大スケールダウンとフォールト トレランスの利点。
  • 小規模なシャッフル → ローカル ディスク 小さな転送に対するクラウド I/O オーバーヘッドはありません。 Executor が後で使用を停止した場合、Shuffle Migration が引き継ぎます。

ルーティングは自動であり、ユーザー入力は必要ありません。 推奨される粒度はステージ単位です。

主な利点

コストの削減: 使用するコンピューティングに対してのみ支払う

効率的なスケールダウンにより、Executor は作業が完了するとすぐにリリースされます。 下流タスクが後で読み取る可能性のあるシャッフルデータを保持したまま、何もせず遊休状態になることはなくなります。

  • より高速なスケールダウン。 自動スケールでは、タスクの完了後すぐにノードが削除されます。
  • アイドル状態のコンピューティングを削減。 ローカルシャッフルを提供するためだけに生かされ続ける「ゾンビ」エグゼキューターはありません。
  • ディスクオーバープロビジョニングなし。 大規模なシャッフル処理は、大容量のローカル ディスクを必要とせず、Blob Storage に格納されます。
  • 有界ストレージ コスト。 フォールバック ストレージは、ブロックが不要になったときに自動的にクリーンアップされます。

回復性の高いジョブ

シャッフル データがローカル ディスクにのみ存在する場合、Executor のクラッシュはデータがなくなったことを意味し、Spark はそれを再計算する必要があります。 効率的なスケールダウンにより、データは既に BLOB ストレージに格納されているか、Executor が終了する前にそこに移行されます。

Scenario 効率的なスケールダウンなし 効率的なスケールダウンで
Executor がクラッシュする 失われたデータをシャッフルする。ステージの再実行 データはストレージ内で安全です。再計算なし
ノードのプリエンプション データがなくなった、コストの高い再試行 データは保持される。ジョブは通常どおり継続される
正常停止 シャットダウン時にシャッフルが解除された ピアまたはフォールバック ストレージに移行されたブロック
フェッチ中のネットワークの一時的な不具合 カスケーディング FetchFailedException 読み取りはストレージから行われ、影響を受けません

これにより、運用環境での FetchFailedException の最も一般的な原因がなくなります。

迅速で、本当に柔軟なスケーリング

効率的なスケールダウンがないと、オートスケーラーはノードを再利用できませんが、その上の実行プログラムはシャッフル データまたはキャッシュされたデータを保持します。 効率的なスケールダウンでは、その両方を切り離せます:

  • シャッフル データは BLOB ストレージ内にあります (または、シャットダウン時にそこに移行されます)。
  • キャッシュはエグゼキューターを固定しなくなりました。 Delta スナップショットキャッシュなどの再生成可能なキャッシュは、スケールダウン保護の対象外です。

自動スケーラーは、ワークロードの変更に応じて、アイドル状態のノードを自由に削除し、クラスターのサイズを変更できます。

偏りのあるシャッフルや大規模なシャッフルでのパフォーマンス向上

AQE Shuffle Write を使用すると、Adaptive Query Execution がシャッフル書き込み自体を最適化できるようになります。具体的には、後続の AQE が再統合することなく利用できるパーティション分割を選択し、リモートストレージ向けに、より少数で適切なサイズのブロックを生成します。 デシジョン レイヤーと組み合わせることで、大規模なクエリや傾斜したクエリでのウォールクロック時間が短縮され、小さなクエリの待機時間は変更されません。

概要

この構成を適用して、完全に効率的なスケールダウン スタックを有効にします。

# Remote Shuffle Manager
spark.conf.set("spark.remote.shuffle.enabled", "true")

# Decision Layer — per-stage routing of local vs. remote shuffle
spark.conf.set("spark.sql.rsm.decisionlayer.enabled.level", "stage")

# AQE participates in shuffle write
spark.conf.set("spark.sql.adaptive.shuffleWrite.enabled", "true")

# Shuffle Migration on executor decommission
spark.conf.set("spark.storage.decommission.shuffleBlocks.enabled", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.cleanup", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.migrateToFallbackStorage", "true")
spark.conf.set("spark.storage.decommission.fallbackStorage.cleanUp", "true")

コードに変更を加える必要はありません。 これらは、環境の Spark プロパティでも設定できます。

構成参照

リモート シャッフル マネージャー (RSM)

Setting 推奨 制御する内容
spark.remote.shuffle.enabled true 効率的なスケールダウンをオンにします。 シャッフル データは、Executor ローカル ディスクではなくAzure Blob Storageに行きます。

デシジョン レイヤー

Setting 推奨 制御する内容
spark.sql.rsm.decisionlayer.enabled.level stage デシジョン レイヤーがシャッフルをルーティングする粒度。 stage では、各 Spark ステージが個別に評価されます。

AQE シャッフル書き込み

Setting 推奨 制御する内容
spark.sql.adaptive.shuffleWrite.enabled true AQE がシャッフル書き込みフェーズに参加できるようにします。 下流のAQEが再集約することなく利用できるパーティション分割を生成します。

Note

AQE 自体 (spark.sql.adaptive.enabled) がオンになっている必要があります。 Fabric Spark では既定でオンになっています。

使用停止時のシャッフル移行

Setting 推奨 制御する内容
spark.storage.decommission.shuffleBlocks.enabled true シャッフル ブロックを削除するのではなく、使用停止中の Executor からシャッフル ブロックを移行します。
spark.storage.decommission.shuffleBlocks.cleanup true 移行が成功した後、ソース Executor のシャッフル ブロックをクリーンアップします。
spark.storage.decommission.shuffleBlocks.migrateToFallbackStorage true ピア Executor がブロックを受け入れられない場合は、フォールバック ストレージ (Azure Blob Storage) に移行します。
spark.storage.decommission.fallbackStorage.cleanUp true 必要なくなったフォールバック ストレージからシャッフル ブロックを削除し、ストレージ コストを制限します。

キャッシュ対応の動的割り当て

Setting 推奨 制御する内容
spark.dynamicAllocation.preventShutdownExecutorWithCache false キャッシュされたブロックを保持している場合でも、Executor を解放する動的割り当てを許可します。
spark.dynamicAllocation.excludeDeltaSnapshotCache true エグゼキューターがまだ有効なキャッシュを保持しているかどうかを判断する際に、Delta スナップショット キャッシュを無視します。 差分スナップショットキャッシュは再現性があり、スケールダウンを妨げるべきではありません。

高度なチューニング (RSM)

ほとんどのユーザーは、これらの既定値を変更する必要はありません。

書き込み性能

Setting デフォルト 制御する内容
spark.remote.shuffle.partition.buffersize 16777216 (16 MB) ストレージに書き込む前のパーティションごとのバッファー。
spark.remote.shuffle.blocksize 8388608 (8 MB) Blob Storageにアップロードされた個々のブロックのサイズ。
spark.remote.shuffle.write.maxthreads cores × 16 シャッフル データの書き込みに使用される最大スレッド数。
spark.remote.shuffle.write.maxtasks 16384 最大同時書き込み操作数。

読み取り性能

Setting デフォルト 制御する内容
spark.remote.shuffle.read.parallel.enabled true シャッフル読み込み用の並列ダウンロードストリーム。
spark.remote.shuffle.read.parallelism 4 タスクごとの並列ダウンロード ストリーム。
spark.remote.shuffle.read.prefetchqueuesize 250 読み取り中にキューの深さをプリフェッチします。
spark.remote.shuffle.read.maxthreads cores × 4 読み取りに使用される最大スレッド数。

Reliability

Setting デフォルト 制御する内容
spark.remote.shuffle.retries 5 一時的なストレージ エラーに対する再試行。
spark.remote.shuffle.retrydelayms 800 再試行間の初期バックオフ時間。
spark.remote.shuffle.retrymaxdelayms 60000 バックオフの上限。

Compression

Setting デフォルト 制御する内容
spark.remote.shuffle.compression 用途 spark.io.compression.codec リモート シャッフル データの圧縮形式 ( lz4zstdなど)。

パフォーマンスの結果

TPC-DS ベンチマークで効率的なスケールダウンを有効にした場合と無効にした場合のコンピューティング コストの削減を示すグラフ。コスト削減率は 54% です。

コンピューティング コストの削減 (TPC-DS ベンチマーク)

Metric 効率的なスケールダウンなし 効率的なスケールダウンで
Total Compute (VM-Minutes) 14,952 6,880
コスト削減 54%

ジョブの総実行時間は長くなる可能性があります(自動スケールでは同時実行エグゼキューター数が少なくなるため)が、課金対象のコンピュートは半分以上削減されます。

Decision Layer のパフォーマンス (TPC-DS, RSM 有効)

小さなシャッフルをローカル ディスクにルーティングし、大きなシャッフルのみをリモート ストレージにルーティングすると、最大 57% ランタイムが向上 し、すべてのシャッフルがリモートでルーティングされるのに対して、同じスケールダウンの利点があります。

制限事項

  • NEE が必要です。 効率的なスケールダウンは、ネイティブ実行エンジンによって異なります。
  • Azure Blob Storageのみ。 Standard BlockBlobStorage(HNS は無効)。 Azure Data Lake Gen2/HNS 対応アカウントは、リモート シャッフル ストアとしてサポートされていません。
  • Azure Private Linkではサポートされていません。 プライベート リンク ネットワークを使用する環境には、現在互換性がありません。
  • デシジョン レイヤーの細分性 は、現在ステージごとに行われます。 タスクごとまたはパーティションごとのルーティングはスコープ内にありません。
  • キャッシュ動作の変更。 preventShutdownExecutorWithCache=falseでは、cache()/persist()データを保持している Executor がスケールダウンされる可能性があります。 ホット データの Executor ローカル キャッシュに大きく依存するワークロードは検証する必要があります。