다음을 통해 공유


Azure Databricks를 사용하여 MongoDB에서 Azure Cosmos DB for MongoDB 계정으로 데이터 마이그레이션

적용 대상: MongoDB

이 마이그레이션 가이드는 데이터베이스를 MongoDB에서 Azure Cosmos DB API for MongoDB로 마이그레이션하는 시리즈의 일부입니다. 중요한 마이그레이션 단계는 아래와 같이 마이그레이션 사전 작업, 마이그레이션 및 마이그레이션 사후 작업입니다.

마이그레이션 단계 다이어그램

Azure Databricks를 사용한 데이터 마이그레이션

Azure DatabricksApache Spark에 대한 PaaS(Platform as a Service) 제품입니다. 대규모 데이터 세트에서 오프라인 마이그레이션을 수행하는 방법을 제공합니다. Azure Databricks를 사용하여 MongoDB에서 Azure Cosmos DB for MongoDB로 데이터베이스를 오프라인으로 마이그레이션할 수 있습니다.

이 자습서에서는 다음을 수행하는 방법을 배우게 됩니다.

  • Azure Databricks 클러스터 프로비전

  • 종속성 추가

  • Scala 또는 Python Notebook 만들기 및 실행

  • 마이그레이션 성능 최적화

  • 마이그레이션 중에 관찰될 수 있는 속도 제한 오류 문제 해결

필수 조건

이 자습서를 완료하려면 다음이 필요합니다.

Azure Databricks 클러스터 프로비전

지침에 따라 Azure Databricks 클러스터를 프로비저닝할 수 있습니다. Spark 3.0을 지원하는 Databricks Runtime 버전 7.6을 선택하는 것이 좋습니다.

Databrick 새 클러스터 만들기 다이어그램

종속성 추가

클러스터에 MongoDB Connector for Spark 라이브러리를 추가하여 기본 MongoDB 및 Azure Cosmos DB for MongoDB 엔드포인트에 모두 연결합니다. 클러스터에서 라이브러리>새로 설치>Maven을 선택한 다음, org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 Maven 좌표를 추가합니다.

Databrick 클러스터 종속성 추가 다이어그램

설치를 선택한 다음 설치가 완료되면 클러스터를 다시 시작합니다.

참고 항목

MongoDB Connector for Spark 라이브러리가 설치된 후 Databricks 클러스터를 다시 시작해야 합니다.

그런 다음, 마이그레이션을 위해 Scala 또는 Python Notebook을 만들 수 있습니다.

마이그레이션을 위한 Scala Notebook 만들기

Databricks에서 Scala Notebook을 만듭니다. 다음 코드를 실행하기 전에 변수에 올바른 값을 입력해야 합니다.

import com.mongodb.spark._
import com.mongodb.spark.config._
import org.apache.spark._
import org.apache.spark.sql._

var sourceConnectionString = "mongodb://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<AUTHDB>"
var sourceDb = "<DB NAME>"
var sourceCollection =  "<COLLECTIONNAME>"
var targetConnectionString = "mongodb://<ACCOUNTNAME>:<PASSWORD>@<ACCOUNTNAME>.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@<ACCOUNTNAME>@"
var targetDb = "<DB NAME>"
var targetCollection =  "<COLLECTIONNAME>"

val readConfig = ReadConfig(Map(
  "spark.mongodb.input.uri" -> sourceConnectionString,
  "spark.mongodb.input.database" -> sourceDb,
  "spark.mongodb.input.collection" -> sourceCollection,
))

val writeConfig = WriteConfig(Map(
  "spark.mongodb.output.uri" -> targetConnectionString,
  "spark.mongodb.output.database" -> targetDb,
  "spark.mongodb.output.collection" -> targetCollection,
  "spark.mongodb.output.maxBatchSize" -> "8000"  
))

val sparkSession = SparkSession
  .builder()
  .appName("Data transfer using spark")
  .getOrCreate()

val customRdd = MongoSpark.load(sparkSession, readConfig)

MongoSpark.save(customRdd, writeConfig)

마이그레이션을 위한 Python Notebook 만들기

Databricks에서 Python Notebook을 만듭니다. 다음 코드를 실행하기 전에 변수에 올바른 값을 입력해야 합니다.

from pyspark.sql import SparkSession

sourceConnectionString = "mongodb://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<AUTHDB>"
sourceDb = "<DB NAME>"
sourceCollection =  "<COLLECTIONNAME>"
targetConnectionString = "mongodb://<ACCOUNTNAME>:<PASSWORD>@<ACCOUNTNAME>.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@<ACCOUNTNAME>@"
targetDb = "<DB NAME>"
targetCollection =  "<COLLECTIONNAME>"

my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .getOrCreate()

df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", sourceConnectionString).option("database", sourceDb).option("collection", sourceCollection).load()

df.write.format("mongo").mode("append").option("uri", targetConnectionString).option("maxBatchSize",2500).option("database", targetDb).option("collection", targetCollection).save()

마이그레이션 성능 최적화

마이그레이션 성능은 다음 구성을 통해 조정할 수 있습니다.

  • Spark 클러스터의 작업자 및 코어 수: 작업자가 많을수록 작업을 실행하는 데 필요한 컴퓨팅 분할이 더 많아집니다.

  • maxBatchSize: maxBatchSize 값은 데이터가 대상 Azure Cosmos DB 컬렉션에 저장되는 속도를 제어합니다. 그러나 maxBatchSize가 컬렉션 처리량에 비해 너무 높으면 속도 제한 오류가 발생할 수 있습니다.

    Spark 클러스터의 실행자 수, 작성되는 각 문서의 잠재적 크기(RU 비용) 및 대상 컬렉션 처리량 제한에 따라 작업자 수와 maxBatchSize를 조정해야 합니다.

    maxBatchSize = 컬렉션 처리량/(문서 1개의 RU 비용 * Spark 작업자 수 * 작업자당 CPU 코어 수)

  • MongoDB Spark partitioner 및 partitionKey: 사용되는 기본 partitioner는 MongoDefaultPartitioner이고 기본 partitionKey는 _id입니다. 입력 구성 속성 spark.mongodb.input.partitioner에 값 MongoSamplePartitioner를 할당하여 partitioner를 변경할 수 있습니다. 마찬가지로, 입력 구성 속성 spark.mongodb.input.partitioner.partitionKey에 적절한 필드 이름을 할당하여 partitionKey를 변경할 수 있습니다. 오른쪽 partitionKey는 데이터 기울이기(동일한 분할 키 값에 대해 많은 수의 레코드가 기록됨)를 방지하는 데 도움이 될 수 있습니다.

  • 데이터 전송 중 인덱스 사용 안 함: 대량의 데이터 마이그레이션의 경우 인덱스, 특히 대상 컬렉션의 와일드카드 인덱스를 사용하지 않도록 설정하는 것이 좋습니다. 인덱스는 각 문서를 작성하기 위한 RU 비용을 증가시킵니다. 이러한 RU를 해제하면 데이터 전송 속도를 개선하는 데 도움이 될 수 있습니다. 데이터가 마이그레이션되면 인덱스를 사용하도록 설정할 수 있습니다.

문제 해결

시간 초과 오류(오류 코드 50)

Azure Cosmos DB for MongoDB 데이터베이스에 대한 작업에 50 오류 코드가 표시될 수 있습니다. 다음 시나리오에서는 시간 초과 오류가 발생할 수 있습니다.

  • 데이터베이스에 할당된 처리량이 낮음: 대상 컬렉션에 충분한 처리량이 할당되었는지 확인합니다.
  • 큰 데이터 볼륨으로 인한 과도한 데이터 기울이기. 특정 테이블로 마이그레이션할 데이터가 많지만 데이터에 상당한 기울이기가 있는 경우 테이블에 프로비저닝된 여러 요청 단위가 있더라도 여전히 비율 제한이 발생할 수 있습니다. 요청 단위는 실제 파티션 간에 균등하게 분할되며 과도한 데이터 기울이기로 인해 단일 분할에 대한 요청 병목 현상이 발생할 수 있습니다. 데이터 기울이기는 동일한 분할 키 값에 대한 많은 수의 레코드를 의미합니다.

속도 제한(오류 코드 16500)

Azure Cosmos DB for MongoDB 데이터베이스에 대한 작업에 16500 오류 코드가 표시될 수 있습니다. 이는 속도 제한 오류이며 이전 계정이나 서버 측 다시 시도 기능이 사용하지 않도록 설정된 계정에서 관찰될 수 있습니다.

  • 서버 측 다시 시도 사용: SSR(서버 측 다시 시도) 기능을 사용하도록 설정하고 서버가 속도 제한 작업을 자동으로 다시 시도하도록 합니다.

마이그레이션 후 최적화

데이터를 마이그레이션한 후 Azure Cosmos DB에 연결하여 데이터를 관리할 수 있습니다. 인덱싱 정책 최적화와 같은 다른 마이그레이션 후 단계를 따르거나, 기본 일관성 수준을 업데이트하거나, Azure Cosmos DB 계정에 대한 전역 배포를 구성할 수도 있습니다. 자세한 내용은 마이그레이션 후 최적화 문서를 참조하세요.

추가 리소스

다음 단계