Udostępnij za pośrednictwem


Migrowanie danych z bazy danych MongoDB do konta usługi Azure Cosmos DB dla bazy danych MongoDB przy użyciu usługi Azure Databricks

DOTYCZY: MongoDB

Ten przewodnik migracji jest częścią serii migracji baz danych z bazy danych MongoDB do interfejsu API usługi Azure Cosmos DB dla bazy danych MongoDB. Kroki migracji krytycznej to przed migracją, migracją i po migracji, jak pokazano poniżej.

Diagram kroków migracji

Migracja danych przy użyciu usługi Azure Databricks

Azure Databricks to oferta platformy jako usługi (PaaS) dla platformy Apache Spark. Oferuje ona sposób migracji w trybie offline w zestawie danych na dużą skalę. Za pomocą usługi Azure Databricks można przeprowadzić migrację baz danych w trybie offline z bazy danych MongoDB do usługi Azure Cosmos DB dla bazy danych MongoDB.

Niniejszy samouczek zawiera informacje na temat wykonywania następujących czynności:

  • Aprowizuj klaster usługi Azure Databricks

  • Dodawanie zależności

  • Tworzenie i uruchamianie notesu Języka Scala lub Python

  • Optymalizowanie wydajności migracji

  • Rozwiązywanie problemów z błędami ograniczania szybkości, które mogą być obserwowane podczas migracji

Wymagania wstępne

Do ukończenia tego samouczka niezbędne są następujące elementy:

Aprowizuj klaster usługi Azure Databricks

Aby aprowizować klaster usługi Azure Databricks, możesz postępować zgodnie z instrukcjami. Zalecamy wybranie środowiska uruchomieniowego usługi Databricks w wersji 7.6, która obsługuje platformę Spark 3.0.

Diagram tworzenia nowego klastra usługi Databricks.

Dodawanie zależności

Dodaj do klastra bibliotekę MongoDB Connector for Spark, aby nawiązać połączenie z natywnymi punktami końcowymi bazy danych MongoDB i usługą Azure Cosmos DB dla bazy danych MongoDB. W klastrze wybierz pozycję Biblioteki>Zainstaluj nowe>narzędzie Maven, a następnie dodaj org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 współrzędne narzędzia Maven.

Diagram dodawania zależności klastra usługi Databricks.

Wybierz pozycję Zainstaluj, a następnie uruchom ponownie klaster po zakończeniu instalacji.

Uwaga

Upewnij się, że klaster usługi Databricks został uruchomiony ponownie po zainstalowaniu biblioteki MongoDB Connector for Spark.

Następnie możesz utworzyć notes języka Scala lub Python na potrzeby migracji.

Tworzenie notesu Scala na potrzeby migracji

Tworzenie notesu Scala w usłudze Databricks. Przed uruchomieniem następującego kodu upewnij się, że wprowadzono odpowiednie wartości zmiennych:

import com.mongodb.spark._
import com.mongodb.spark.config._
import org.apache.spark._
import org.apache.spark.sql._

var sourceConnectionString = "mongodb://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<AUTHDB>"
var sourceDb = "<DB NAME>"
var sourceCollection =  "<COLLECTIONNAME>"
var targetConnectionString = "mongodb://<ACCOUNTNAME>:<PASSWORD>@<ACCOUNTNAME>.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@<ACCOUNTNAME>@"
var targetDb = "<DB NAME>"
var targetCollection =  "<COLLECTIONNAME>"

val readConfig = ReadConfig(Map(
  "spark.mongodb.input.uri" -> sourceConnectionString,
  "spark.mongodb.input.database" -> sourceDb,
  "spark.mongodb.input.collection" -> sourceCollection,
))

val writeConfig = WriteConfig(Map(
  "spark.mongodb.output.uri" -> targetConnectionString,
  "spark.mongodb.output.database" -> targetDb,
  "spark.mongodb.output.collection" -> targetCollection,
  "spark.mongodb.output.maxBatchSize" -> "8000"  
))

val sparkSession = SparkSession
  .builder()
  .appName("Data transfer using spark")
  .getOrCreate()

val customRdd = MongoSpark.load(sparkSession, readConfig)

MongoSpark.save(customRdd, writeConfig)

Tworzenie notesu języka Python na potrzeby migracji

Tworzenie notesu języka Python w usłudze Databricks. Przed uruchomieniem następującego kodu upewnij się, że wprowadzono odpowiednie wartości zmiennych:

from pyspark.sql import SparkSession

sourceConnectionString = "mongodb://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<AUTHDB>"
sourceDb = "<DB NAME>"
sourceCollection =  "<COLLECTIONNAME>"
targetConnectionString = "mongodb://<ACCOUNTNAME>:<PASSWORD>@<ACCOUNTNAME>.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@<ACCOUNTNAME>@"
targetDb = "<DB NAME>"
targetCollection =  "<COLLECTIONNAME>"

my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .getOrCreate()

df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", sourceConnectionString).option("database", sourceDb).option("collection", sourceCollection).load()

df.write.format("mongo").mode("append").option("uri", targetConnectionString).option("maxBatchSize",2500).option("database", targetDb).option("collection", targetCollection).save()

Optymalizowanie wydajności migracji

Wydajność migracji można dostosować za pomocą następujących konfiguracji:

  • Liczba procesów roboczych i rdzeni w klastrze Spark: więcej procesów roboczych oznacza więcej fragmentów obliczeniowych do wykonywania zadań.

  • maxBatchSize: maxBatchSize wartość określa szybkość zapisywania danych w docelowej kolekcji usługi Azure Cosmos DB. Jeśli jednak parametr maxBatchSize jest zbyt wysoki dla przepływności kolekcji, może to spowodować błędy ograniczania szybkości.

    Należy dostosować liczbę procesów roboczych i maxBatchSize, w zależności od liczby funkcji wykonawczych w klastrze Spark, potencjalnie rozmiar (i dlatego koszt jednostek RU) każdego zapisywanego dokumentu oraz limity przepływności kolekcji docelowej.

    Napiwek

    maxBatchSize = Przepływność kolekcji / ( koszt jednostek RU dla 1 dokumentu * liczba procesów roboczych platformy Spark * liczba rdzeni procesora CPU na proces roboczy)

  • Partycjonator i klucz partycji Platformy Spark bazy danych MongoDB: domyślnym elementem partycjonatora jest MongoDefaultPartitioner, a domyślny klucz partitionKey jest _id. Partycjonator można zmienić, przypisując wartość MongoSamplePartitioner do właściwości spark.mongodb.input.partitionerkonfiguracji wejściowej . Podobnie klucz partycji można zmienić, przypisując odpowiednią nazwę pola do właściwości spark.mongodb.input.partitioner.partitionKeykonfiguracji wejściowej . Prawo partitionKey może pomóc uniknąć niesymetryczności danych (duża liczba rekordów zapisywanych dla tej samej wartości klucza fragmentu).

  • Wyłącz indeksy podczas transferu danych: w przypadku dużych ilości migracji danych rozważ wyłączenie indeksów, szczególnie wieloznacznych indeksów w kolekcji docelowej. Indeksy zwiększają koszt jednostek żądania pisania każdego dokumentu. Zwalnianie tych jednostek RU może pomóc zwiększyć szybkość transferu danych. Indeksy można włączyć po zakończeniu migracji danych.

Rozwiązywanie problemów

Błąd przekroczenia limitu czasu (kod błędu 50)

Może zostać wyświetlony kod błędu 50 dla operacji względem bazy danych Usługi Azure Cosmos DB dla bazy danych MongoDB. Następujące scenariusze mogą powodować błędy przekroczenia limitu czasu:

  • Przepływność przydzielona do bazy danych jest niska: upewnij się, że kolekcja docelowa ma przypisaną wystarczającą przepływność.
  • Nadmierne niesymetryczność danych z dużą ilością danych. Jeśli masz dużą ilość danych do przeprowadzenia migracji do danej tabeli, ale masz znaczną niesymetryczność danych, nadal może wystąpić ograniczenie szybkości, nawet jeśli w tabeli jest aprowizowanych kilka jednostek żądań . Jednostki żądań są dzielone równomiernie między partycje fizyczne, a duże niesymetryczność danych może spowodować wąskie gardło żądań do pojedynczego fragmentu. Niesymetryczność danych oznacza dużą liczbę rekordów dla tej samej wartości klucza fragmentu.

Ograniczanie szybkości (kod błędu 16500)

Może zostać wyświetlony kod błędu 16500 dla operacji względem bazy danych Usługi Azure Cosmos DB dla bazy danych MongoDB. Są to błędy ograniczania szybkości i mogą być obserwowane na starszych kontach lub kontach, na których funkcja ponawiania po stronie serwera jest wyłączona.

  • Włącz ponawianie po stronie serwera: włącz funkcję Ponawianie po stronie serwera (SSR) i pozwól serwerowi na automatyczne ponawianie liczby ograniczonych operacji.

Optymalizacja po migracji

Po przeprowadzeniu migracji danych możesz nawiązać połączenie z usługą Azure Cosmos DB i zarządzać danymi. Możesz również wykonać inne kroki po migracji, takie jak optymalizacja zasad indeksowania, zaktualizowanie domyślnego poziomu spójności lub skonfigurowanie dystrybucji globalnej dla konta usługi Azure Cosmos DB. Aby uzyskać więcej informacji, zobacz artykuł Dotyczący optymalizacji po migracji .

Dodatkowe zasoby

Następne kroki