Share via


Azure Databricks를 사용하여 MongoDB에서 Azure Cosmos DB for MongoDB 계정으로 데이터 마이그레이션

적용 대상: MongoDB

이 마이그레이션 가이드는 MongoDB에서 Azure Cosmos DB API for MongoDB로 데이터베이스를 마이그레이션하는 시리즈의 일부입니다. 중요한 마이그레이션 단계는 아래와 같이 마이그레이션 전, 마이그레이션 및 마이그레이션 후입니다.

Diagram of migration steps

Azure Databricks를 사용한 데이터 마이그레이션

Azure Databricks는 Apache Spark용 PaaS(Platform as a Service) 제품입니다. 대규모 데이터 세트에서 오프라인 마이그레이션을 수행하는 방법을 제공합니다. Azure Databricks를 사용하여 MongoDB에서 Azure Cosmos DB for MongoDB로 데이터베이스를 오프라인으로 마이그레이션할 수 있습니다.

이 자습서에서는 다음을 수행하는 방법을 배우게 됩니다.

  • Azure Databricks 클러스터 프로비전

  • 종속성 추가

  • Scala 또는 Python Notebook 만들기 및 실행

  • 마이그레이션 성능 최적화

  • 마이그레이션 중에 관찰될 수 있는 속도 제한 오류 문제 해결

필수 조건

이 자습서를 완료하려면 다음을 수행해야 합니다.

Azure Databricks 클러스터 프로비전

지침에 따라 Azure Databricks 클러스터를 프로비전할 수 있습니다. Spark 3.0을 지원하는 Databricks 런타임 버전 7.6을 선택하는 것이 좋습니다.

Diagram of databricks new cluster creation.

종속성 추가

클러스터에 MongoDB Connector for Spark 라이브러리를 추가하여 기본 MongoDB 및 Azure Cosmos DB for MongoDB 엔드포인트에 모두 연결합니다. 클러스터에서 라이브러리>설치 새>Maven을 선택한 다음 Maven 좌표를 추가 org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 합니다.

Diagram of adding databricks cluster dependencies.

설치를 선택한 다음 설치가 완료되면 클러스터를 다시 시작합니다.

참고 항목

MongoDB 커넥트or for Spark 라이브러리가 설치된 후 Databricks 클러스터를 다시 시작해야 합니다.

그런 다음 마이그레이션을 위한 Scala 또는 Python Notebook을 만들 수 있습니다.

마이그레이션을 위한 Scala Notebook 만들기

Databricks에서 Scala Notebook을 만듭니다. 다음 코드를 실행하기 전에 변수에 적합한 값을 입력해야 합니다.

import com.mongodb.spark._
import com.mongodb.spark.config._
import org.apache.spark._
import org.apache.spark.sql._

var sourceConnectionString = "mongodb://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<AUTHDB>"
var sourceDb = "<DB NAME>"
var sourceCollection =  "<COLLECTIONNAME>"
var targetConnectionString = "mongodb://<ACCOUNTNAME>:<PASSWORD>@<ACCOUNTNAME>.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@<ACCOUNTNAME>@"
var targetDb = "<DB NAME>"
var targetCollection =  "<COLLECTIONNAME>"

val readConfig = ReadConfig(Map(
  "spark.mongodb.input.uri" -> sourceConnectionString,
  "spark.mongodb.input.database" -> sourceDb,
  "spark.mongodb.input.collection" -> sourceCollection,
))

val writeConfig = WriteConfig(Map(
  "spark.mongodb.output.uri" -> targetConnectionString,
  "spark.mongodb.output.database" -> targetDb,
  "spark.mongodb.output.collection" -> targetCollection,
  "spark.mongodb.output.maxBatchSize" -> "8000"  
))

val sparkSession = SparkSession
  .builder()
  .appName("Data transfer using spark")
  .getOrCreate()

val customRdd = MongoSpark.load(sparkSession, readConfig)

MongoSpark.save(customRdd, writeConfig)

마이그레이션을 위한 Python Notebook 만들기

Databricks에서 Python Notebook을 만듭니다. 다음 코드를 실행하기 전에 변수에 적합한 값을 입력해야 합니다.

from pyspark.sql import SparkSession

sourceConnectionString = "mongodb://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<AUTHDB>"
sourceDb = "<DB NAME>"
sourceCollection =  "<COLLECTIONNAME>"
targetConnectionString = "mongodb://<ACCOUNTNAME>:<PASSWORD>@<ACCOUNTNAME>.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@<ACCOUNTNAME>@"
targetDb = "<DB NAME>"
targetCollection =  "<COLLECTIONNAME>"

my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .getOrCreate()

df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", sourceConnectionString).option("database", sourceDb).option("collection", sourceCollection).load()

df.write.format("mongo").mode("append").option("uri", targetConnectionString).option("maxBatchSize",2500).option("database", targetDb).option("collection", targetCollection).save()

마이그레이션 성능 최적화

마이그레이션 성능은 다음 구성을 통해 조정할 수 있습니다.

  • Spark 클러스터의 작업자 및 코어 수: 더 많은 작업자가 작업을 실행하기 위해 더 많은 컴퓨팅 분할된 데이터베이스를 의미합니다.

  • maxBatchSize: 값은 maxBatchSize 데이터가 대상 Azure Cosmos DB 컬렉션에 저장되는 속도를 제어합니다. 그러나 maxBatchSize가 컬렉션 처리량에 비해 너무 높으면 속도 제한 오류가 발생할 수 있습니다.

    Spark 클러스터의 실행자 수, 작성되는 각 문서의 잠재적 크기(RU 비용) 및 대상 컬렉션 처리량 제한에 따라 작업자 수와 maxBatchSize를 조정해야 합니다.

    maxBatchSize = 컬렉션 처리량/(문서 1개의 RU 비용 * Spark 작업자 수 * 작업자당 CPU 코어 수)

  • MongoDB Spark partitioner 및 partitionKey: 사용되는 기본 파티셔너는 MongoDefaultPartitioner이고 기본 partitionKey는 _id. 입력 구성 속성spark.mongodb.input.partitioner에 값을 MongoSamplePartitioner 할당하여 파티셔너를 변경할 수 있습니다. 마찬가지로, 입력 구성 속성 spark.mongodb.input.partitioner.partitionKey에 적절한 필드 이름을 할당하여 partitionKey를 변경할 수 있습니다. 오른쪽 partitionKey는 데이터 기울이기를 방지하는 데 도움이 될 수 있습니다(동일한 분할 키 값에 대해 많은 수의 레코드가 작성됨).

  • 데이터 전송 중 인덱스 사용 안 함: 대량의 데이터 마이그레이션의 경우 대상 컬렉션에서 인덱스(특히 야생카드 인덱스를 사용하지 않도록 설정하는 것이 좋습니다. 인덱스는 각 문서를 작성하는 데 드는 RU 비용을 증가합니다. 이러한 RU를 해제하면 데이터 전송 속도를 개선하는 데 도움이 될 수 있습니다. 데이터가 마이그레이션되면 인덱스를 사용하도록 설정할 수 있습니다.

문제 해결

시간 제한 오류(오류 코드 50)

Azure Cosmos DB for MongoDB 데이터베이스에 대한 작업에 50 오류 코드가 표시될 수 있습니다. 다음 시나리오에서는 시간 제한 오류가 발생할 수 있습니다.

  • 데이터베이스에 할당된 처리량이 낮습니다. 대상 컬렉션에 할당된 처리량이 충분한지 확인합니다.
  • 큰 데이터 볼륨으로 인한 과도한 데이터 기울이기. 지정된 테이블로 마이그레이션할 데이터가 많지만 데이터에 상당한 오차가 있는 경우 테이블에 프로비전된 요청 단위가 여러 대인 경우에도 속도 제한이 발생할 수 있습니다. 요청 단위는 실제 파티션 간에 균등하게 분할되며, 데이터 오차가 많으면 단일 분할된 데이터베이스에 대한 요청의 병목 현상이 발생할 수 있습니다. 데이터 오차는 동일한 분할 키 값에 대한 많은 수의 레코드를 의미합니다.

속도 제한(오류 코드 16500)

Azure Cosmos DB for MongoDB 데이터베이스에 대한 작업에 16500 오류 코드가 표시될 수 있습니다. 이는 속도 제한 오류이며 이전 계정이나 서버 측 다시 시도 기능이 사용하지 않도록 설정된 계정에서 관찰될 수 있습니다.

  • 서버 쪽 다시 시도 사용: SSR(서버 쪽 다시 시도) 기능을 사용하도록 설정하고 서버가 속도 제한 작업을 자동으로 다시 시도하도록 합니다.

마이그레이션 후 최적화

데이터를 마이그레이션한 후 Azure Cosmos DB에 연결하고 데이터를 관리할 수 있습니다. 인덱싱 정책 최적화, 기본 일관성 수준 업데이트 또는 Azure Cosmos DB 계정에 대한 전역 배포 구성과 같은 다른 마이그레이션 후 단계를 수행할 수도 있습니다. 자세한 내용은 마이그레이션 후 최적화 문서를 참조하세요.

추가 리소스

  • Azure Cosmos DB로 마이그레이션하기 위한 용량 계획을 수행하려고 하시나요?

다음 단계