Share via


Migrera data från MongoDB till ett Azure Cosmos DB för MongoDB-konto med hjälp av Azure Databricks

GÄLLER FÖR: Mongodb

Den här migreringsguiden är en del av serien om migrering av databaser från MongoDB till Azure Cosmos DB API för MongoDB. De kritiska migreringsstegen är före migrering, migrering och efter migrering, enligt nedan.

Diagram of migration steps

Datamigrering med Azure Databricks

Azure Databricks är ett PaaS-erbjudande (plattform som en tjänst) för Apache Spark. Den erbjuder ett sätt att utföra offlinemigreringar på en storskalig datauppsättning. Du kan använda Azure Databricks för att utföra en offlinemigrering av databaser från MongoDB till Azure Cosmos DB för MongoDB.

I den här självstudien får du lära dig hur man:

  • Etablera ett Azure Databricks-kluster

  • Lägga till beroenden

  • Skapa och köra Notebook-filen Scala eller Python

  • Optimera migreringsprestanda

  • Felsöka hastighetsbegränsningsfel som kan observeras under migreringen

Förutsättningar

För att slutföra den här kursen behöver du:

Etablera ett Azure Databricks-kluster

Du kan följa anvisningarna för att etablera ett Azure Databricks-kluster. Vi rekommenderar att du väljer Databricks runtime version 7.6, som stöder Spark 3.0.

Diagram of databricks new cluster creation.

Lägga till beroenden

Lägg till MongoDB-Anslut eller för Spark-biblioteket i klustret för att ansluta till både interna MongoDB- och Azure Cosmos DB för MongoDB-slutpunkter. I klustret väljer du Bibliotek>Installera ny>Maven och lägger sedan till org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 Maven-koordinater.

Diagram of adding databricks cluster dependencies.

Välj Installera och starta sedan om klustret när installationen är klar.

Kommentar

Kontrollera att du startar om Databricks-klustret när MongoDB-Anslut eller för Spark-biblioteket har installerats.

Därefter kan du skapa en Scala- eller Python-notebook-fil för migrering.

Skapa Scala Notebook för migrering

Skapa en Scala Notebook i Databricks. Se till att ange rätt värden för variablerna innan du kör följande kod:

import com.mongodb.spark._
import com.mongodb.spark.config._
import org.apache.spark._
import org.apache.spark.sql._

var sourceConnectionString = "mongodb://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<AUTHDB>"
var sourceDb = "<DB NAME>"
var sourceCollection =  "<COLLECTIONNAME>"
var targetConnectionString = "mongodb://<ACCOUNTNAME>:<PASSWORD>@<ACCOUNTNAME>.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@<ACCOUNTNAME>@"
var targetDb = "<DB NAME>"
var targetCollection =  "<COLLECTIONNAME>"

val readConfig = ReadConfig(Map(
  "spark.mongodb.input.uri" -> sourceConnectionString,
  "spark.mongodb.input.database" -> sourceDb,
  "spark.mongodb.input.collection" -> sourceCollection,
))

val writeConfig = WriteConfig(Map(
  "spark.mongodb.output.uri" -> targetConnectionString,
  "spark.mongodb.output.database" -> targetDb,
  "spark.mongodb.output.collection" -> targetCollection,
  "spark.mongodb.output.maxBatchSize" -> "8000"  
))

val sparkSession = SparkSession
  .builder()
  .appName("Data transfer using spark")
  .getOrCreate()

val customRdd = MongoSpark.load(sparkSession, readConfig)

MongoSpark.save(customRdd, writeConfig)

Skapa Python-notebook-fil för migrering

Skapa en Python Notebook i Databricks. Se till att ange rätt värden för variablerna innan du kör följande kod:

from pyspark.sql import SparkSession

sourceConnectionString = "mongodb://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<AUTHDB>"
sourceDb = "<DB NAME>"
sourceCollection =  "<COLLECTIONNAME>"
targetConnectionString = "mongodb://<ACCOUNTNAME>:<PASSWORD>@<ACCOUNTNAME>.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@<ACCOUNTNAME>@"
targetDb = "<DB NAME>"
targetCollection =  "<COLLECTIONNAME>"

my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .getOrCreate()

df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", sourceConnectionString).option("database", sourceDb).option("collection", sourceCollection).load()

df.write.format("mongo").mode("append").option("uri", targetConnectionString).option("maxBatchSize",2500).option("database", targetDb).option("collection", targetCollection).save()

Optimera migreringsprestanda

Migreringsprestanda kan justeras genom dessa konfigurationer:

  • Antal arbetare och kärnor i Spark-klustret: Fler arbetare innebär fler beräkningsshards för att utföra uppgifter.

  • maxBatchSize: Värdet maxBatchSize styr hur snabbt data sparas i Azure Cosmos DB-målsamlingen. Men om maxBatchSize är för högt för samlingens dataflöde kan det orsaka hastighetsbegränsningsfel .

    Du skulle behöva justera antalet arbetare och maxBatchSize, beroende på antalet utförare i Spark-klustret, potentiellt storleken (och det är därför RU-kostnaden) för varje dokument skrivs och målsamlingens dataflödesgränser.

    Dricks

    maxBatchSize = Insamlingsdataflöde/( RU-kostnad för 1 dokument * antal Spark-arbetare * antal CPU-kärnor per arbetare )

  • MongoDB Spark-partitionerare och partitionKey: Standardpartitioneraren som används är MongoDefaultPartitioner och standardpartitionKey är _id. Partitioneraren kan ändras genom att tilldela värdet MongoSamplePartitioner till indatakonfigurationsegenskapen spark.mongodb.input.partitioner. På samma sätt kan partitionKey ändras genom att tilldela lämpligt fältnamn till indatakonfigurationsegenskapen spark.mongodb.input.partitioner.partitionKey. Höger partitionKey kan hjälpa till att undvika datasnedvridning (ett stort antal poster skrivs för samma shardnyckelvärde).

  • Inaktivera index under dataöverföring: För stora mängder datamigrering bör du överväga att inaktivera index, särskilt jokerteckenindex i målsamlingen. Index ökar RU-kostnaden för att skriva varje dokument. Om du frigör dessa RU:er kan du förbättra dataöverföringshastigheten. Du kan aktivera indexen när data har migrerats över.

Felsöka

Timeout-fel (felkod 50)

Du kan se en 50-felkod för åtgärder mot Azure Cosmos DB for MongoDB-databasen. Följande scenarier kan orsaka timeout-fel:

  • Dataflödet som allokerats till databasen är lågt: Kontrollera att målsamlingen har tillräckligt med dataflöde tilldelat till den.
  • Överdriven dataförskjutning med stor datavolym. Om du har en stor mängd data att migrera till en viss tabell men har en betydande snedställning i data, kan du fortfarande uppleva hastighetsbegränsning även om du har flera enheter för begäranden etablerade i tabellen. Enheter för begärande delas lika mellan fysiska partitioner, och stora datasnedvridningar kan orsaka en flaskhals med begäranden till en enda shard. Datasnedvridning innebär ett stort antal poster för samma shardnyckelvärde.

Hastighetsbegränsning (felkod 16500)

Du kan se en 16500-felkod för åtgärder mot Azure Cosmos DB for MongoDB-databasen. Det här är hastighetsbegränsningsfel och kan observeras på äldre konton eller konton där funktionen för återförsök på serversidan är inaktiverad.

  • Aktivera återförsök på serversidan: Aktivera funktionen SSR (Server Side Retry) och låt servern försöka med hastighetsbegränsade åtgärder automatiskt.

Optimering efter migrering

När du har migrerat data kan du ansluta till Azure Cosmos DB och hantera data. Du kan också följa andra steg efter migreringen, till exempel optimera indexeringsprincipen, uppdatera standardkonsekvensnivån eller konfigurera global distribution för ditt Azure Cosmos DB-konto. Mer information finns i artikeln om optimering efter migreringen.

Ytterligare resurser

Nästa steg