효율적인 축소 및 원격 셔플 관리자

적용 대상:✅ 패브릭 데이터 엔지니어링 및 데이터 과학

효율적인 스케일 다운은 Spark 셔플 데이터를 실행기 수명과 분리하는 Microsoft Fabric Spark의 기능입니다. 셔플 출력을 로컬 실행기 디스크에 고정해 두는 대신, Fabric Spark는 셔플 데이터를 Azure Blob Storage로 라우팅하거나 필요에 따라 그곳으로 마이그레이션하고, 적응형 쿼리 실행(AQE)이 쓰기 방식 자체를 조정하도록 합니다. 그 결과 쿼리, Notebook 또는 파이프라인을 변경하지 않고 클러스터 스케일 다운 속도가 빨라지고 컴퓨팅 비용이 낮아지고 복원력이 뛰어난 작업이 생성됩니다.

개요

효율적인 스케일 다운은 다음 네 가지 협력 기능으로 구축됩니다.

Capability 용도
원격 셔플 관리자(RSM) 셔플 데이터를 실행기 로컬 디스크 대신 Azure Blob Storage에 기록하고 여기서 읽습니다.
마이그레이션 순서 섞기 삭제하는 대신, 서비스 해제되기 전에 실행기에서 셔플 블록을 다른 곳으로 옮깁니다.
의사 결정 계층 작은 순서 섞기를 로컬로 유지하고 큰 순서 섞기를 원격 스토리지로 오프로드하는 단계별 런타임 라우팅입니다.
AQE 셔플 쓰기 적응형 쿼리 실행이 셔플 쓰기 단계에 참여하여 처음부터 파티셔닝이 올바르게 이루어지도록 합니다.

사전 요구 사항

  • NEE(네이티브 실행 엔진)를 사용하도록 설정해야 합니다.
  • 자동 크기 조정 사용(권장). 효율적인 스케일 다운은 아래 Spark 구성을 통해 자동 크기 조정 없이도 작동합니다.
  • 런타임 1.3(Apache Spark 3.5) 이상.

작동 방식

Spark는 쿼리를 처리할 때 종종 단계 간에 데이터를 재분배하는데, 이를 셔플이라고 합니다. 일반적으로 순서 섞기 데이터는 실행기를 해당 데이터와 연결하는 각 실행기의 로컬 디스크에 저장됩니다. 모든 소비자가 읽기를 마칠 때까지 릴리스할 수 없습니다. 이러한 결합은 클러스터가 신속하게 축소할 수 없는 가장 큰 이유이며 실행기를 잃으면 단계 재시도 비용이 많이 드는 이유입니다.

효율적인 규모 축소는 이러한 결합을 끊습니다.

  • 대규모 셔플은 Remote Shuffle Manager를 통해 Azure Blob Storage로 직접 전송됩니다.
  • 작은 셔플은 속도 향상을 위해 로컬 디스크에 유지됩니다. 나중에 해당 실행기를 해제해야 하는 경우, Shuffle Migration은 백그라운드에서 블록을 피어 또는 폴백 스토리지로 이동합니다.
  • 의사 결정 계층은 런타임에 스테이지당 올바른 경로를 선택합니다.
  • AQE Shuffle Write는 writer가 다운스트림 AQE가 다시 코얼레싱하지 않고 사용할 파티셔닝을 생성하도록 보장하여 불필요한 I/O를 방지합니다.
                ┌───────────────────────────┐
   Query  ───►  │   AQE + Decision Layer    │   per-stage choice
                └─────────────┬─────────────┘
                              │
                ┌─────────────▼─────────────┐
                │   AQE Shuffle Write       │   partition-aware writer
                └─────┬─────────────────┬───┘
                      │                 │
              local   ▼                 ▼   remote
        ┌────────────────────┐   ┌──────────────────┐
        │  Local disk +      │   │  RSM → Azure     │
        │  Shuffle Migration │   │  Blob Storage    │
        └─────────┬──────────┘   └─────────┬────────┘
                  │ on decommission        │
                  ▼                        ▼
        fallback storage   Remote shuffle store

스마트 라우팅(의사 결정 계층)

의사 결정 계층은 각 셔플 교환을 평가하고 다음을 결정합니다:

  • 큰 순서 섞기 → Azure Blob Storage. 최대 규모 축소 및 내결함성 이점.
  • 작은 셔플 → 로컬 디스크 작은 전송에 대한 클라우드 I/O 오버헤드가 없습니다. 나중에 Executor가 서비스 중지되면 Shuffle Migration이 이를 대신 처리합니다.

라우팅은 자동이며 사용자 입력이 필요하지 않습니다. 권장되는 세분성은 단계별입니다.

주요 이점

비용 절감: 사용하는 컴퓨팅에 대해서만 지불

효율적인 규모 축소를 통해 실행기는 작업이 완료되는 즉시 해제됩니다. 더 이상 다운스트림 태스크가 나중에 읽을 수도 있는 셔플 데이터를 보유한 채 유휴 상태로 남아 있지 않습니다.

  • 더 빠른 스케일 다운. 자동 크기 조정은 작업이 완료된 직후 노드를 제거합니다.
  • 유휴 컴퓨팅이 적습니다. 오직 자신의 로컬 셔플을 제공하기 위해서만 계속 살아 있는 "좀비" 실행기는 없습니다.
  • 디스크 오버프로비저닝 없음. 대용량 셔플 작업은 대용량 로컬 디스크를 필요로 하지 않고 Blob Storage에 저장됩니다.
  • 제한된 스토리지 비용입니다. 대체 스토리지는 블록이 더 이상 필요하지 않을 때 자동으로 정리됩니다.

더 탄력적인 작업

순서 섞기 데이터가 로컬 디스크에만 있는 경우 실행기 크래시로 인해 데이터가 사라졌고 Spark에서 데이터를 다시 계산해야 합니다. 효율적인 스케일 다운을 사용하면 실행기가 사라지기 전에 데이터가 이미 Blob Storage에 있거나 마이그레이션됩니다.

Scenario 효율적인 스케일 다운이 없는 경우 효율적인 규모 축소로
실행기 충돌 셔플 데이터 손실; 스테이지가 다시 실행됨 데이터는 스토리지에서 안전합니다. 재계산 없음
노드 선점 데이터가 사라졌고 비용이 많이 드는 재시도 데이터는 유지됩니다. 작업은 정상적으로 계속됩니다.
정상적인 서비스 해제 종료 시 셔플이 해제됨 피어 또는 대체 스토리지로 마이그레이션된 블록
가져오기 중 네트워크 일시 끊김 계단식 FetchFailedException 읽기 작업은 영향을 받지 않으며 스토리지에서 수행됩니다.

이렇게 하면 프로덕션에서 가장 일반적인 원인이 FetchFailedException 제거됩니다.

더 빠르고 탄력적인 크기 조정

효율적인 스케일 다운이 없으면, 해당 노드의 어떤 executor라도 여전히 셔플 데이터나 캐시된 데이터를 보유하고 있는 한 오토스케일러는 그 노드를 회수할 수 없습니다. 효율적인 스케일 다운은 다음 두 가지를 모두 분리합니다.

  • 셔플 데이터는 Blob Storage에 저장되며(또는 종료 시 해당 위치로 마이그레이션됩니다).
  • 캐시는 더 이상 실행기를 고정하지 않습니다. 델타 스냅샷 캐시와 같은 재현 가능한 캐시는 스케일 다운 보호에서 제외됩니다.

자동 크기 조정기는 워크로드 변경에 따라 유휴 노드를 자유롭게 제거하고 클러스터 크기를 조정할 수 있습니다.

편향된 및 대규모 셔플에서의 성능 향상

AQE 셔플 쓰기를 사용하면 적응형 쿼리 실행(AQE)이 셔플 쓰기 자체를 조정할 수 있습니다. 즉, 후속 AQE가 다시 코얼레싱하지 않고 그대로 사용할 파티셔닝을 선택하고, 원격 스토리지를 위해 수는 더 적고 크기는 더 적절한 블록을 생성합니다. 의사 결정 계층과 결합하면 큰/기울어진 쿼리에서 더 빠른 벽시계 시간과 작은 쿼리에 대한 변경되지 않은 대기 시간을 얻을 수 있습니다.

시작하기

이 구성을 적용하여 전체 효율적인 스케일 다운 스택을 사용하도록 설정합니다.

# Remote Shuffle Manager
spark.conf.set("spark.remote.shuffle.enabled", "true")

# Decision Layer — per-stage routing of local vs. remote shuffle
spark.conf.set("spark.sql.rsm.decisionlayer.enabled.level", "stage")

# AQE participates in shuffle write
spark.conf.set("spark.sql.adaptive.shuffleWrite.enabled", "true")

# Shuffle Migration on executor decommission
spark.conf.set("spark.storage.decommission.shuffleBlocks.enabled", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.cleanup", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.migrateToFallbackStorage", "true")
spark.conf.set("spark.storage.decommission.fallbackStorage.cleanUp", "true")

코드 변경은 필요하지 않습니다. 환경 Spark 속성에서 이러한 속성을 설정할 수도 있습니다.

구성 참조

원격 셔플 관리자(RSM)

Setting 권장 제어하는 내용
spark.remote.shuffle.enabled true 효율적인 축소를 사용하도록 설정합니다. 셔플 데이터는 실행기의 로컬 디스크 대신 Azure Blob Storage에 저장됩니다.

의사 결정 계층

Setting 권장 제어하는 내용
spark.sql.rsm.decisionlayer.enabled.level stage 의사 결정 계층이 라우팅 셔플을 수행하는 세분화 수준입니다. stage 는 각 Spark 단계를 독립적으로 평가합니다.

AQE 셔플 쓰기

Setting 권장 제어하는 내용
spark.sql.adaptive.shuffleWrite.enabled true AQE가 셔플 쓰기 단계에 참여하도록 합니다. 후속 AQE가 재병합 없이 활용할 수 있는 분할 방식을 생성합니다.

비고

AQE 자체(spark.sql.adaptive.enabled)는 켜야 합니다. Fabric Spark에서는 기본적으로 설정됩니다.

서비스 해제 시 마이그레이션 무작위화

Setting 권장 제어하는 내용
spark.storage.decommission.shuffleBlocks.enabled true 삭제하는 대신 서비스 해제 중인 실행기에서 다른 위치로 셔플 블록을 마이그레이션합니다.
spark.storage.decommission.shuffleBlocks.cleanup true 마이그레이션이 성공한 후 원본 실행기에서 순서 섞기 블록을 정리합니다.
spark.storage.decommission.shuffleBlocks.migrateToFallbackStorage true 피어 실행기가 블록을 수락할 수 없는 경우 해당 블록을 대체 스토리지(Azure Blob Storage)로 마이그레이션합니다.
spark.storage.decommission.fallbackStorage.cleanUp true 더 이상 필요하지 않은 대체 스토리지에서 순서 섞기 블록을 제거하여 스토리지 비용을 제한합니다.

캐시 인식 동적 할당

Setting 권장 제어하는 내용
spark.dynamicAllocation.preventShutdownExecutorWithCache false 캐시된 블록을 보유하고 있는 경우에도 동적 할당에서 실행기를 해제할 수 있도록 합니다.
spark.dynamicAllocation.excludeDeltaSnapshotCache true 실행기가 여전히 유용한 캐시를 보유하는지 여부를 결정할 때 델타 스냅샷 캐시를 무시합니다. 델타 스냅샷 캐시는 재현 가능하며 스케일 다운을 차단해서는 안 됩니다.

고급 튜닝(RSM)

대부분의 사용자는 이러한 기본값을 변경할 필요가 없습니다.

쓰기 성능

Setting 기본값 제어하는 내용
spark.remote.shuffle.partition.buffersize 16777216 (16MB) 스토리지에 기록하기 전에 파티션별로 버퍼링합니다.
spark.remote.shuffle.blocksize 8388608 (8MB) Blob Storage 업로드된 개별 블록의 크기입니다.
spark.remote.shuffle.write.maxthreads cores × 16 순서 섞기 데이터를 작성하는 데 사용되는 최대 스레드입니다.
spark.remote.shuffle.write.maxtasks 16384 최대 동시 쓰기 작업입니다.

읽기 성능

Setting 기본값 제어하는 내용
spark.remote.shuffle.read.parallel.enabled true 순서 섞기 읽기를 위한 병렬 다운로드 스트림입니다.
spark.remote.shuffle.read.parallelism 4 작업당 병렬 다운로드 스트림
spark.remote.shuffle.read.prefetchqueuesize 250 읽기 중에 큐 깊이를 프리페치합니다.
spark.remote.shuffle.read.maxthreads cores × 4 읽기에 사용되는 최대 스레드입니다.

신뢰도

Setting 기본값 제어하는 내용
spark.remote.shuffle.retries 5 일시적인 스토리지 오류에 대해 다시 시도합니다.
spark.remote.shuffle.retrydelayms 800 재시도 사이의 초기 백오프입니다.
spark.remote.shuffle.retrymaxdelayms 60000 백오프 최대값.

Compression

Setting 기본값 제어하는 내용
spark.remote.shuffle.compression 사용함 spark.io.compression.codec 원격 셔플 데이터의 압축 형식(예: lz4, zstd).

성능 결과

TPC-DS 벤치마크에서 효율적인 스케일 다운을 사용하도록 설정하고 사용하지 않도록 설정한 컴퓨팅 비용 절감을 보여 주는 차트는 54% 비용 절감을 보여 줍니다.

컴퓨팅 비용 절감(TPC-DS 벤치마크)

Metric 효율적인 스케일 다운이 없는 경우 효율적인 규모 축소로
Total Compute(VM-Minutes) 14,952 6,880
비용 절감 54%

총 작업 런타임은 더 길 수 있지만(자동 크기 조정은 더 적은 동시 실행기를 사용함) 청구된 컴퓨팅은 절반 이상으로 줄어듭니다.

의사 결정 계층 성능(TPC-DS, RSM 켜기)

로컬 디스크에 작은 순서 섞기를 라우팅하고 원격 스토리지에 대한 큰 순서 섞기만 사용하면 동일한 스케일 다운 혜택으로 모든 순서 섞기를 원격으로 라우팅하는 것과 비교해 최대 57개의% 런타임 개선 이 가능합니다.

Limitations

  • NEE가 필요합니다. 효율적인 규모 축소는 네이티브 실행 엔진에 따라 달라집니다.
  • Azure Blob Storage 전용. HNS를 사용하지 않도록 설정된 표준 BlockBlobStorage 입니다. Azure 데이터 레이크 Gen2 / HNS가 사용하도록 설정된 계정은 원격 셔플 저장소로 지원되지 않습니다.
  • Azure Private Link 지원되지 않습니다. 프라이빗 링크 네트워킹을 사용하는 환경은 현재 호환되지 않습니다.
  • 의사 결정 계층 세분성은 현재 단계별로 진행됩니다. 작업별 또는 파티션별 라우팅은 범위에 없습니다.
  • 캐시 동작 변경. 를 사용하면 preventShutdownExecutorWithCache=false데이터를 보유하는 cache()/persist() 실행기가 축소될 수 있습니다. 핫 데이터에 대한 실행기-로컬 캐시에 크게 의존하는 워크로드의 유효성을 검사해야 합니다.