Condividi tramite


Eseguire la migrazione dei dati da MongoDB a un account Azure Cosmos DB for MongoDB usando Azure Databricks

SI APPLICA A: MongoDB

Questa guida alla migrazione fa parte della serie sulla migrazione di database da MongoDB all'API di Azure Cosmos DB for MongoDB. I passaggi critici per la migrazione sono pre-migrazione, migrazione e post-migrazione, come illustrato di seguito.

Diagramma dei passaggi di migrazione

Migrazione dei dati con Azure Databricks

Azure Databricks è un'offerta platform as a service (PaaS) per Apache Spark. Offre una soluzione per eseguire migrazioni offline in un set di dati su larga scala. È possibile usare Azure Databricks per eseguire una migrazione offline dei database da MongoDB ad Azure Cosmos DB for MongoDB.

La presente esercitazione include informazioni su come:

  • Effettuare il provisioning di un cluster di Azure Databricks

  • Aggiungere le dipendenze

  • Creare ed eseguire notebook Scala o Python

  • Ottimizzare le prestazioni della migrazione

  • Risolvere gli errori di limitazione della frequenza che possono essere osservati durante la migrazione

Prerequisiti

Per completare questa esercitazione, è necessario:

Effettuare il provisioning di un cluster di Azure Databricks

È possibile seguire le istruzioni per effettuare il provisioning di un cluster di Azure Databricks. È consigliabile selezionare Databricks runtime versione 7.6, che supporta Spark 3.0.

Diagramma della creazione di un nuovo cluster databricks.

Aggiungere le dipendenze

Aggiungere il connettore MongoDB per la libreria Spark al cluster per connettersi agli endpoint di MongoDB nativo e di Azure Cosmos DB for MongoDB. Nel cluster selezionare Librerie>Installa nuova>Maven e quindi aggiungere le coordinate Maven org.mongodb.spark:mongo-spark-connector_2.12:3.0.1.

Diagramma dell'aggiunta di dipendenze del cluster databricks.

Selezionare Installa e quindi riavviare il cluster al termine dell'installazione.

Nota

Assicurarsi di riavviare il cluster Databricks dopo l'installazione del connettore MongoDB per la libreria Spark.

A questo punto, è possibile creare un notebook Scala o Python per la migrazione.

Creare un notebook Scala per la migrazione

Creare un notebook Scala in Databricks. Assicurarsi di immettere i valori corretti per le variabili prima di eseguire il codice seguente:

import com.mongodb.spark._
import com.mongodb.spark.config._
import org.apache.spark._
import org.apache.spark.sql._

var sourceConnectionString = "mongodb://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<AUTHDB>"
var sourceDb = "<DB NAME>"
var sourceCollection =  "<COLLECTIONNAME>"
var targetConnectionString = "mongodb://<ACCOUNTNAME>:<PASSWORD>@<ACCOUNTNAME>.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@<ACCOUNTNAME>@"
var targetDb = "<DB NAME>"
var targetCollection =  "<COLLECTIONNAME>"

val readConfig = ReadConfig(Map(
  "spark.mongodb.input.uri" -> sourceConnectionString,
  "spark.mongodb.input.database" -> sourceDb,
  "spark.mongodb.input.collection" -> sourceCollection,
))

val writeConfig = WriteConfig(Map(
  "spark.mongodb.output.uri" -> targetConnectionString,
  "spark.mongodb.output.database" -> targetDb,
  "spark.mongodb.output.collection" -> targetCollection,
  "spark.mongodb.output.maxBatchSize" -> "8000"  
))

val sparkSession = SparkSession
  .builder()
  .appName("Data transfer using spark")
  .getOrCreate()

val customRdd = MongoSpark.load(sparkSession, readConfig)

MongoSpark.save(customRdd, writeConfig)

Creare un notebook Python per la migrazione

Creare un notebook Python in Databricks. Assicurarsi di immettere i valori corretti per le variabili prima di eseguire il codice seguente:

from pyspark.sql import SparkSession

sourceConnectionString = "mongodb://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<AUTHDB>"
sourceDb = "<DB NAME>"
sourceCollection =  "<COLLECTIONNAME>"
targetConnectionString = "mongodb://<ACCOUNTNAME>:<PASSWORD>@<ACCOUNTNAME>.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@<ACCOUNTNAME>@"
targetDb = "<DB NAME>"
targetCollection =  "<COLLECTIONNAME>"

my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .getOrCreate()

df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", sourceConnectionString).option("database", sourceDb).option("collection", sourceCollection).load()

df.write.format("mongo").mode("append").option("uri", targetConnectionString).option("maxBatchSize",2500).option("database", targetDb).option("collection", targetCollection).save()

Ottimizzare le prestazioni della migrazione

Le prestazioni della migrazione possono essere modificate tramite queste configurazioni:

  • Numero di ruoli di lavoro e core nel cluster Spark: più ruoli di lavoro si traducono in più partizioni di calcolo per l'esecuzione di attività.

  • maxBatchSize: il valore maxBatchSize controlla la frequenza con cui i dati vengono salvati nella raccolta di Azure Cosmos DB di destinazione. Tuttavia, se il valore di maxBatchSize è troppo elevato per la velocità effettiva della raccolta, possono verificarsi errori di limitazione della frequenza.

    È necessario modificare il numero di ruoli di lavoro e maxBatchSize, a seconda del numero di executor nel cluster Spark, potenzialmente delle dimensioni (ed è questo il motivo del costo delle UR) di ogni documento scritto e dei limiti di velocità effettiva della raccolta di destinazione.

    Suggerimento

    maxBatchSize = Velocità effettiva della raccolta/ ( Costo UR per 1 documento * numero di ruoli di lavoro Spark * numero di core della CPU per ruolo di lavoro)

  • partitioner e partitionKey Spark per MongoDB: il partitioner predefinito usato è MongoDefaultPartitioner e il valore predefinito di partitionKey è _id. È possibile modificare il partitioner assegnando il valore MongoSamplePartitioner alla proprietà di configurazione di input spark.mongodb.input.partitioner. Analogamente, partitionKey può essere modificato assegnando il nome del campo appropriato alla proprietà di configurazione di input spark.mongodb.input.partitioner.partitionKey. Il valore corretto di partitionKey consente di evitare l'asimmetria dei dati (numero elevato di record scritti per lo stesso valore della chiave di partizione).

  • Disabilitare gli indici durante il trasferimento dei dati: per la migrazione di grandi quantità di dati è consigliabile disabilitare gli indici, in particolare l'indice con caratteri jolly nella raccolta di destinazione. Gli indici aumentano il costo delle UR per la scrittura di ogni documento. Eliminando queste UR è possibile contribuire a migliorare la velocità di trasferimento dei dati. Gli indici possono poi essere abilitati dopo la migrazione dei dati.

Risoluzione dei problemi

Errore di timeout (codice errore 50)

È possibile che venga visualizzato un codice errore 50 per le operazioni sul database Azure Cosmos DB for MongoDB. Gli scenari seguenti possono causare errori di timeout:

  • La velocità effettiva allocata al database è bassa: assicurarsi che alla raccolta di destinazione sia assegnata una velocità effettiva sufficiente.
  • Eccessiva asimmetria dei dati con un elevato volumi di dati. Se si ha una grande quantità di dati di cui eseguire la migrazione in una determinata tabella, ma è presente un'asimmetria significativa nei dati, è possibile che si verifichino limitazioni della frequenza anche se sono presenti diverse unità richiesta di cui è stato effettuato il provisioning nella tabella. Le unità richiesta sono divise equamente tra le partizioni fisiche e una forte asimmetria dei dati può causare un collo di bottiglia delle richieste a una singola partizione. L'asimmetria dei dati implica un numero elevato di record per lo stesso valore della chiave di partizione.

Limitazione della frequenza (codice errore 16500)

È possibile che venga visualizzato un codice errore 16500 per le operazioni sul database Azure Cosmos DB for MongoDB. Si tratta di errori di limitazione della frequenza e possono essere osservati su account meno recenti o su account in cui la funzionalità di ripetizione dei tentativi sul lato server è disabilitata.

  • Abilitare la ripetizione dei tentativi sul lato server: abilitare la funzionalità Server Side Retry (SSR) e consentire al server di ripetere automaticamente le operazioni con frequenza limitata.

Ottimizzazione della post-migrazione

Dopo aver eseguito la migrazione dei dati, è possibile connettersi ad Azure Cosmos DB e gestire i dati. È anche possibile seguire altri passaggi post-migrazione, ad esempio l'ottimizzazione dei criteri di indicizzazione, l'aggiornamento del livello di coerenza predefinito o la configurazione della distribuzione globale per l'account Azure Cosmos DB. Per altre informazioni, vedere l'articolo Ottimizzazione della post-migrazione.

Risorse aggiuntive

Passaggi successivi