Перенос данных из MongoDB в учетную запись Azure Cosmos DB для MongoDB с помощью Azure Databricks

Область применения: Mongodb

Это руководство по миграции является частью серии по переносу баз данных из MongoDB в API Azure Cosmos DB для MongoDB. Как показано ниже, критически важные этапы миграции — это подготовка к миграции, миграция и действия после миграции.

Diagram of migration steps

Перенос данных с помощью Azure Databricks

Azure Databricks — это предложение "платформа как услуга" (PaaS) для Apache Spark. Она позволяет выполнять автономную миграцию крупных наборов данных. Azure Databricks можно использовать для автономной миграции баз данных из MongoDB в Azure Cosmos DB для MongoDB.

В этом учебнике рассматривается следующее.

  • Предоставление кластера Azure Databricks

  • Добавление зависимостей

  • Создание и запуск записной книжки Scala или Python

  • Оптимизация производительности миграции

  • Устранение ошибок, которые могут возникать во время миграции и приводить к ограничению скорости

Необходимые компоненты

Для работы с этим руководством вам потребуется следующее:

Предоставление кластера Azure Databricks

Вы можете выполнить инструкции по подготовке кластера Azure Databricks. Мы рекомендуем выбрать среду выполнения Databricks версии 7.6, которая поддерживает Spark 3.0.

Diagram of databricks new cluster creation.

Добавление зависимостей

Добавьте в кластер Подключение or Подключение MongoDB для Spark, чтобы подключиться как к собственным конечным точкам MongoDB, так и к azure Cosmos DB для конечных точек MongoDB. В кластере щелкните Библиотеки>Установить новую>Maven и добавьте координаты Maven org.mongodb.spark:mongo-spark-connector_2.12:3.0.1.

Diagram of adding databricks cluster dependencies.

Нажмите кнопку Установить, а затем перезапустите кластер после завершения установки.

Примечание.

После установки библиотеки MongoDB Connector for Spark обязательно перезапустите кластер Databricks.

После этого вы сможете создать записную книжку Scala или Python для миграции.

Создание записной книжки Scala для миграции

Создайте записную книжку Scala в Databricks. Перед выполнением следующего кода обязательно укажите правильные значения переменных:

import com.mongodb.spark._
import com.mongodb.spark.config._
import org.apache.spark._
import org.apache.spark.sql._

var sourceConnectionString = "mongodb://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<AUTHDB>"
var sourceDb = "<DB NAME>"
var sourceCollection =  "<COLLECTIONNAME>"
var targetConnectionString = "mongodb://<ACCOUNTNAME>:<PASSWORD>@<ACCOUNTNAME>.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@<ACCOUNTNAME>@"
var targetDb = "<DB NAME>"
var targetCollection =  "<COLLECTIONNAME>"

val readConfig = ReadConfig(Map(
  "spark.mongodb.input.uri" -> sourceConnectionString,
  "spark.mongodb.input.database" -> sourceDb,
  "spark.mongodb.input.collection" -> sourceCollection,
))

val writeConfig = WriteConfig(Map(
  "spark.mongodb.output.uri" -> targetConnectionString,
  "spark.mongodb.output.database" -> targetDb,
  "spark.mongodb.output.collection" -> targetCollection,
  "spark.mongodb.output.maxBatchSize" -> "8000"  
))

val sparkSession = SparkSession
  .builder()
  .appName("Data transfer using spark")
  .getOrCreate()

val customRdd = MongoSpark.load(sparkSession, readConfig)

MongoSpark.save(customRdd, writeConfig)

Создание записной книжки Python для миграции

Создайте записную книжку Python в Databricks. Перед выполнением следующего кода обязательно укажите правильные значения переменных:

from pyspark.sql import SparkSession

sourceConnectionString = "mongodb://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<AUTHDB>"
sourceDb = "<DB NAME>"
sourceCollection =  "<COLLECTIONNAME>"
targetConnectionString = "mongodb://<ACCOUNTNAME>:<PASSWORD>@<ACCOUNTNAME>.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@<ACCOUNTNAME>@"
targetDb = "<DB NAME>"
targetCollection =  "<COLLECTIONNAME>"

my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .getOrCreate()

df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", sourceConnectionString).option("database", sourceDb).option("collection", sourceCollection).load()

df.write.format("mongo").mode("append").option("uri", targetConnectionString).option("maxBatchSize",2500).option("database", targetDb).option("collection", targetCollection).save()

Оптимизация производительности миграции

Производительность миграции можно регулировать с помощью указанных ниже конфигураций.

  • Количество рабочих и ядер в кластере Spark: больше рабочих ролей означает больше сегментов вычислений для выполнения задач.

  • maxBatchSize: значение maxBatchSize управляет скоростью сохранения данных в целевую коллекцию Azure Cosmos DB. Если значение параметра maxBatchSize слишком велико для пропускной способности коллекции, это может привести к возникновению ошибок, связанных с ограничением скорости.

    Вам потребуется настроить количество рабочих ролей и параметр maxBatchSize в зависимости от количества исполнителей в кластере Spark, потенциального размера (и в связи с этим от стоимости единицы запроса) каждого записываемого документа и предельных значений пропускной способности для целевой коллекции.

    Совет

    maxBatchSize = Пропускная способность коллекции / (Стоимость единицы запроса для 1 документа * Количество рабочих ролей Spark * Количество ядер ЦП на рабочую роль)

  • Средство для работы с разделами MongoDB Spark и параметр partitionKey: по умолчанию используется средство для работы с разделами MongoDefaultPartitioner и значение параметра partitionKey _id. Средство для работы с разделами можно изменить, назначив значение MongoSamplePartitioner свойству конфигурации входных данных spark.mongodb.input.partitioner. Аналогичным образом можно изменить параметр partitionKey, назначив соответствующее имя поля свойству конфигурации входных данных spark.mongodb.input.partitioner.partitionKey. Правильный значение параметра partitionKey позволяет избежать неравномерного распределения данных (записи большого количества записей для одного и того же значения ключа сегмента).

  • Отключение индексов при переносе данных: при миграции больших объемов данных рассмотрите возможность отключения индексов (особенно индекса с подстановочными знаками) в целевой коллекции. Индексы увеличивают затраты на единицу запроса при записи каждого документа. Высвобождение этих единиц запросов позволяет повысить скорость передачи данных. Вы можете включить индексы по окончании миграции данных.

Устранение неполадок

Ошибка времени ожидания (код ошибки 50)

Может появиться 50 код ошибки для операций с базой данных Azure Cosmos DB для MongoDB. Ошибки времени ожидания могут возникать в указанных ниже сценариях.

  • Нехватка пропускной способности, выделенной для базы данных: убедитесь, что для целевой коллекции выделена достаточная пропускная способность.
  • Неравномерное распределение данных при большом объеме данных. Если необходимо перенести большой объем данных в определенную таблицу, но данные распределены неравномерно, могут возникнуть ограничения скорости, даже если для вашей таблицы предусмотрено несколько единиц запросов. Система равномерно делит единицы запросов между физическими разделами, а чрезмерно неравномерное распределение данных может привести к "узким местам" при выполнении запросов к одному сегменту. Неравномерное распределение данных означает наличие большого количества записей для одного и того же значения ключа сегмента.

Ограничение скорости (код ошибки 16500)

Может появиться код ошибки 16500 для операций с базой данных Azure Cosmos DB для MongoDB. Это ошибки ограничения скорости, которые могут возникать в старых учетных записях или учетных записях, для которых отключена функция повтора попыток на стороне сервера.

  • Функция повтора попыток на стороне сервера: вы можете включить функцию повтора попыток на стороне сервера (SSR), чтобы сервер автоматически повторял операции, для которых ограничена скорость.

Оптимизация после переноса

По завершении миграции данных можно подключиться к Azure Cosmos DB и управлять этими данными. После миграции можно выполнить другие действия, например оптимизировать политику индексирования, обновить уровень согласованности, используемый по умолчанию, или настроить глобальное распределение для вашей учетной записи Azure Cosmos DB. См. подробнее об оптимизации после переноса.

Дополнительные ресурсы

Следующие шаги