Azure Databricks를 사용하여 MongoDB에서 Azure Cosmos DB for MongoDB 계정으로 데이터 마이그레이션
적용 대상: MongoDB
이 마이그레이션 가이드는 MongoDB에서 Azure Cosmos DB API for MongoDB로 데이터베이스를 마이그레이션하는 시리즈의 일부입니다. 중요한 마이그레이션 단계는 아래와 같이 마이그레이션 전, 마이그레이션 및 마이그레이션 후입니다.
Azure Databricks를 사용한 데이터 마이그레이션
Azure Databricks는 Apache Spark용 PaaS(Platform as a Service) 제품입니다. 대규모 데이터 세트에서 오프라인 마이그레이션을 수행하는 방법을 제공합니다. Azure Databricks를 사용하여 MongoDB에서 Azure Cosmos DB for MongoDB로 데이터베이스를 오프라인으로 마이그레이션할 수 있습니다.
이 자습서에서는 다음을 수행하는 방법을 배우게 됩니다.
Azure Databricks 클러스터 프로비전
종속성 추가
Scala 또는 Python Notebook 만들기 및 실행
마이그레이션 성능 최적화
마이그레이션 중에 관찰될 수 있는 속도 제한 오류 문제 해결
필수 조건
이 자습서를 완료하려면 다음을 수행해야 합니다.
Azure Databricks 클러스터 프로비전
지침에 따라 Azure Databricks 클러스터를 프로비전할 수 있습니다. Spark 3.0을 지원하는 Databricks 런타임 버전 7.6을 선택하는 것이 좋습니다.
종속성 추가
클러스터에 MongoDB Connector for Spark 라이브러리를 추가하여 기본 MongoDB 및 Azure Cosmos DB for MongoDB 엔드포인트에 모두 연결합니다. 클러스터에서 라이브러리>설치 새>Maven을 선택한 다음 Maven 좌표를 추가 org.mongodb.spark:mongo-spark-connector_2.12:3.0.1
합니다.
설치를 선택한 다음 설치가 완료되면 클러스터를 다시 시작합니다.
참고 항목
MongoDB 커넥트or for Spark 라이브러리가 설치된 후 Databricks 클러스터를 다시 시작해야 합니다.
그런 다음 마이그레이션을 위한 Scala 또는 Python Notebook을 만들 수 있습니다.
마이그레이션을 위한 Scala Notebook 만들기
Databricks에서 Scala Notebook을 만듭니다. 다음 코드를 실행하기 전에 변수에 적합한 값을 입력해야 합니다.
import com.mongodb.spark._
import com.mongodb.spark.config._
import org.apache.spark._
import org.apache.spark.sql._
var sourceConnectionString = "mongodb://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<AUTHDB>"
var sourceDb = "<DB NAME>"
var sourceCollection = "<COLLECTIONNAME>"
var targetConnectionString = "mongodb://<ACCOUNTNAME>:<PASSWORD>@<ACCOUNTNAME>.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@<ACCOUNTNAME>@"
var targetDb = "<DB NAME>"
var targetCollection = "<COLLECTIONNAME>"
val readConfig = ReadConfig(Map(
"spark.mongodb.input.uri" -> sourceConnectionString,
"spark.mongodb.input.database" -> sourceDb,
"spark.mongodb.input.collection" -> sourceCollection,
))
val writeConfig = WriteConfig(Map(
"spark.mongodb.output.uri" -> targetConnectionString,
"spark.mongodb.output.database" -> targetDb,
"spark.mongodb.output.collection" -> targetCollection,
"spark.mongodb.output.maxBatchSize" -> "8000"
))
val sparkSession = SparkSession
.builder()
.appName("Data transfer using spark")
.getOrCreate()
val customRdd = MongoSpark.load(sparkSession, readConfig)
MongoSpark.save(customRdd, writeConfig)
마이그레이션을 위한 Python Notebook 만들기
Databricks에서 Python Notebook을 만듭니다. 다음 코드를 실행하기 전에 변수에 적합한 값을 입력해야 합니다.
from pyspark.sql import SparkSession
sourceConnectionString = "mongodb://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<AUTHDB>"
sourceDb = "<DB NAME>"
sourceCollection = "<COLLECTIONNAME>"
targetConnectionString = "mongodb://<ACCOUNTNAME>:<PASSWORD>@<ACCOUNTNAME>.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@<ACCOUNTNAME>@"
targetDb = "<DB NAME>"
targetCollection = "<COLLECTIONNAME>"
my_spark = SparkSession \
.builder \
.appName("myApp") \
.getOrCreate()
df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", sourceConnectionString).option("database", sourceDb).option("collection", sourceCollection).load()
df.write.format("mongo").mode("append").option("uri", targetConnectionString).option("maxBatchSize",2500).option("database", targetDb).option("collection", targetCollection).save()
마이그레이션 성능 최적화
마이그레이션 성능은 다음 구성을 통해 조정할 수 있습니다.
Spark 클러스터의 작업자 및 코어 수: 더 많은 작업자가 작업을 실행하기 위해 더 많은 컴퓨팅 분할된 데이터베이스를 의미합니다.
maxBatchSize: 값은
maxBatchSize
데이터가 대상 Azure Cosmos DB 컬렉션에 저장되는 속도를 제어합니다. 그러나 maxBatchSize가 컬렉션 처리량에 비해 너무 높으면 속도 제한 오류가 발생할 수 있습니다.Spark 클러스터의 실행자 수, 작성되는 각 문서의 잠재적 크기(RU 비용) 및 대상 컬렉션 처리량 제한에 따라 작업자 수와 maxBatchSize를 조정해야 합니다.
팁
maxBatchSize = 컬렉션 처리량/(문서 1개의 RU 비용 * Spark 작업자 수 * 작업자당 CPU 코어 수)
MongoDB Spark partitioner 및 partitionKey: 사용되는 기본 파티셔너는 MongoDefaultPartitioner이고 기본 partitionKey는 _id. 입력 구성 속성
spark.mongodb.input.partitioner
에 값을MongoSamplePartitioner
할당하여 파티셔너를 변경할 수 있습니다. 마찬가지로, 입력 구성 속성spark.mongodb.input.partitioner.partitionKey
에 적절한 필드 이름을 할당하여 partitionKey를 변경할 수 있습니다. 오른쪽 partitionKey는 데이터 기울이기를 방지하는 데 도움이 될 수 있습니다(동일한 분할 키 값에 대해 많은 수의 레코드가 작성됨).데이터 전송 중 인덱스 사용 안 함: 대량의 데이터 마이그레이션의 경우 대상 컬렉션에서 인덱스(특히 야생카드 인덱스를 사용하지 않도록 설정하는 것이 좋습니다. 인덱스는 각 문서를 작성하는 데 드는 RU 비용을 증가합니다. 이러한 RU를 해제하면 데이터 전송 속도를 개선하는 데 도움이 될 수 있습니다. 데이터가 마이그레이션되면 인덱스를 사용하도록 설정할 수 있습니다.
문제 해결
시간 제한 오류(오류 코드 50)
Azure Cosmos DB for MongoDB 데이터베이스에 대한 작업에 50 오류 코드가 표시될 수 있습니다. 다음 시나리오에서는 시간 제한 오류가 발생할 수 있습니다.
- 데이터베이스에 할당된 처리량이 낮습니다. 대상 컬렉션에 할당된 처리량이 충분한지 확인합니다.
- 큰 데이터 볼륨으로 인한 과도한 데이터 기울이기. 지정된 테이블로 마이그레이션할 데이터가 많지만 데이터에 상당한 오차가 있는 경우 테이블에 프로비전된 요청 단위가 여러 대인 경우에도 속도 제한이 발생할 수 있습니다. 요청 단위는 실제 파티션 간에 균등하게 분할되며, 데이터 오차가 많으면 단일 분할된 데이터베이스에 대한 요청의 병목 현상이 발생할 수 있습니다. 데이터 오차는 동일한 분할 키 값에 대한 많은 수의 레코드를 의미합니다.
속도 제한(오류 코드 16500)
Azure Cosmos DB for MongoDB 데이터베이스에 대한 작업에 16500 오류 코드가 표시될 수 있습니다. 이는 속도 제한 오류이며 이전 계정이나 서버 측 다시 시도 기능이 사용하지 않도록 설정된 계정에서 관찰될 수 있습니다.
- 서버 쪽 다시 시도 사용: SSR(서버 쪽 다시 시도) 기능을 사용하도록 설정하고 서버가 속도 제한 작업을 자동으로 다시 시도하도록 합니다.
마이그레이션 후 최적화
데이터를 마이그레이션한 후 Azure Cosmos DB에 연결하고 데이터를 관리할 수 있습니다. 인덱싱 정책 최적화, 기본 일관성 수준 업데이트 또는 Azure Cosmos DB 계정에 대한 전역 배포 구성과 같은 다른 마이그레이션 후 단계를 수행할 수도 있습니다. 자세한 내용은 마이그레이션 후 최적화 문서를 참조하세요.
추가 리소스
- Azure Cosmos DB로 마이그레이션하기 위한 용량 계획을 수행하려고 하시나요?
- 기존 데이터베이스 클러스터의 vcore 및 서버 수만 알고 있는 경우 vCore 또는 vCPU를 사용하여 요청 단위를 예측하는 방법에 대해 읽어봅니다.
- 현재 데이터베이스 워크로드에 대한 일반적인 요청 비율을 알고 있는 경우 Azure Cosmos DB 용량 계획 도구를 사용하여 요청 단위 예측에 대해 읽어보세요.