다음을 통해 공유


Apache Spark를 사용하는 변경 피드

Azure Cosmos DB Spark 커넥터는 Apache Spark를 사용하여 대규모로 변경 피드를 처리하는 강력한 방법을 제공합니다. 커넥터는 아래의 Java SDK를 사용하고 Spark 실행기에 투명하게 처리를 분산하는 끌어오기 모델을 구현하므로 대규모 데이터 처리 시나리오에 적합합니다.

Spark 커넥터 작동 방식

Azure Cosmos DB용 Spark 커넥터는 Azure Cosmos DB Java SDK를 기반으로 빌드되며 변경 피드를 읽는 끌어오기 모델 접근 방식을 구현합니다. 주요 특징은 다음과 같습니다.

  • Java SDK 기초: 신뢰할 수 있는 변경 피드 처리를 위해 아래에 있는 강력한 Azure Cosmos DB Java SDK를 사용합니다.
  • 끌어오기 모델 구현: 변경 피드 끌어오기 모델 패턴을 따라 처리 속도를 제어할 수 있습니다.
  • 분산 처리: 병렬 처리를 위해 변경 피드 처리를 여러 Spark 실행기에 자동으로 분산
  • 투명한 크기 조정: 커넥터는 수동 개입 없이 분할 및 부하 분산을 처리합니다.

고유 검사점 기능

변경 피드 처리를 위해 Spark 커넥터를 사용할 때의 주요 이점 중 하나는 기본 제공 검사점 메커니즘입니다. 이 기능은 다음을 제공합니다.

  • 자동 복구: 대규모로 변경 피드를 처리할 때 복구를 위한 기본 메커니즘
  • 내결함성: 오류 발생 시 마지막 검사점에서 처리를 다시 시작하는 기능
  • 상태 관리: Spark 세션 및 클러스터 다시 시작에서 처리 상태를 유지 관리합니다.
  • 확장성: 분산된 Spark 환경에서 검사점 지정을 지원합니다.

이 검사점 기능은 Spark 커넥터에 고유하며 SDK를 직접 사용할 때 사용할 수 없으므로 고가용성 및 안정성이 필요한 프로덕션 시나리오에 특히 유용합니다.

경고

spark.cosmos.changeFeed.startFrom 검사점 위치에 기존 책갈피가 있는 경우 구성이 무시됩니다. 검사점에서 다시 시작할 때 커넥터는 지정된 시작점이 아닌 마지막으로 처리된 위치에서 계속됩니다.

변경 피드 처리에 Spark를 사용하는 경우

다음 시나리오에서 변경 피드 처리를 위해 Spark 커넥터를 사용하는 것이 좋습니다.

  • 대규모 데이터 처리: 단일 컴퓨터 기능을 초과하는 대량의 변경 피드 데이터를 처리해야 하는 경우
  • 복잡한 변환: 변경 피드 처리에 복잡한 데이터 변환, 집계 또는 다른 데이터 세트와의 조인이 포함되는 경우
  • 분산 분석: 분산 환경에서 변경 피드 데이터에 대한 실시간 또는 거의 실시간 분석을 수행해야 하는 경우
  • 데이터 파이프라인과의 통합: 변경 피드 처리가 이미 Spark를 사용하는 대규모 ETL/ELT 파이프라인의 일부인 경우
  • 내결함성 요구 사항: 프로덕션 워크로드에 대한 강력한 검사점 및 복구 메커니즘이 필요한 경우
  • 다중 컨테이너 처리: 여러 컨테이너에서 변경 피드를 동시에 처리해야 하는 경우

더 간단한 시나리오 또는 개별 문서 처리에 대한 세분화된 제어가 필요한 경우 변경 피드 프로세서 또는 끌어오기 모델을 SDK와 직접 사용하는 것이 좋습니다.

코드 예제

다음 예제에서는 Spark 커넥터를 사용하여 변경 피드에서 읽는 방법을 보여 줍니다. 보다 포괄적인 예제는 전체 샘플 Notebook을 참조하세요.

# Configure change feed reading

changeFeedConfig = {
    "spark.cosmos.accountEndpoint": "https://<account-name>.documents.azure.com:443/",
    "spark.cosmos.accountKey": "<account-key>",
    "spark.cosmos.database": "<database-name>",
    "spark.cosmos.container": "<container-name>",
    # Start from beginning, now, or specific timestamp (ignored if checkpoints exist)
    "spark.cosmos.changeFeed.startFrom": "Beginning",  # "Now" or "2020-02-10T14:15:03"
    "spark.cosmos.changeFeed.mode": "LatestVersion",  # or "AllVersionsAndDeletes"
    # Control batch size - if not set, all available data processed in first batch
    "spark.cosmos.changeFeed.itemCountPerTriggerHint": "50000",
    "spark.cosmos.read.partitioning.strategy": "Restrictive"
}

# Read change feed as a streaming DataFrame
changeFeedDF = spark \
    .readStream \
    .format("cosmos.oltp.changeFeed") \
    .options(**changeFeedConfig) \
    .load()

# Configure output settings with checkpointing
outputConfig = {
    "spark.cosmos.accountEndpoint": "https://<target-account>.documents.azure.com:443/",
    "spark.cosmos.accountKey": "<target-account-key>",
    "spark.cosmos.database": "<target-database>",
    "spark.cosmos.container": "<target-container>",
    "spark.cosmos.write.strategy": "ItemOverwrite"
}

# Process and write the change feed data with checkpointing
query = changeFeedDF \
    .selectExpr("*") \
    .writeStream \
    .format("cosmos.oltp") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/changefeed-checkpoint") \
    .options(**outputConfig) \
    .start()

# Wait for the streaming query to finish
query.awaitTermination()

키 구성 옵션

Spark에서 변경 피드를 사용하는 경우 이러한 구성 옵션은 특히 중요합니다.

  • spark.cosmos.changeFeed.startFrom: 변경 피드 읽기를 시작할 위치를 제어합니다.
    • "Beginning" - 변경 피드의 시작 부분부터 시작
    • "Now" - 현재 시간부터 시작
    • "2020-02-10T14:15:03" - 특정 타임스탬프에서 시작(ISO 8601 형식)
    • 참고: 검사점 위치에 기존 책갈피가 있는 경우 이 설정은 무시됩니다.
  • spark.cosmos.changeFeed.mode: 변경 피드 모드를 지정합니다.
    • "LatestVersion" - 변경된 문서의 최신 버전만 처리
    • "AllVersionsAndDeletes" - 삭제를 포함한 모든 버전의 변경 내용 처리
  • spark.cosmos.changeFeed.itemCountPerTriggerHint: 일괄 처리 크기를 제어합니다.
    • 각 마이크로 일괄 처리/트리거에 대한 변경 피드에서 읽은 대략 최대 항목 수
    • 예: "50000"
    • 중요: 설정하지 않으면 변경 피드에서 사용 가능한 모든 데이터가 첫 번째 마이크로 일괄 처리에서 처리됩니다.
  • checkpointLocation: 내결함성 및 복구에 대한 검사점 정보를 저장할 위치를 지정합니다.
  • spark.cosmos.read.partitioning.strategy: Spark 실행기에서 데이터를 분할하는 방법을 제어합니다.

다음 단계