共用方式為


使用 Azure Databricks 將資料從 MongoDB 移轉至 Azure Cosmos DB for MongoDB 帳戶

適用於: MongoDB

此移轉指南是將資料庫從 MongoDB 移轉至適用於 MongoDB 的 Azure Cosmos DB API 系列的一部分。 重要的移轉步驟分為移轉前、移轉和移轉後,如下所示。

移轉步驟的圖表

使用 Azure Databricks 移轉資料

Azure DatabricksApache Spark 的平台即服務 (PaaS) 供應項目。 其提供一種在大規模資料集上執行離線移轉的方法。 您可以使用 Azure Databricks,將資料庫從 MongoDB 離線移轉至 Azure Cosmos DB for MongoDB。

在本教學課程中,您將了解如何:

  • 佈建 Azure Databricks 叢集

  • 新增相依性

  • 建立並執行 Scala 或 Python 筆記本

  • 最佳化移轉效能

  • 針對可能會在移轉期間觀察到的錯誤進行疑難排解

必要條件

若要完成本教學課程,您需要:

佈建 Azure Databricks 叢集

您可以遵循指示來佈建 Azure Databricks 叢集。 建議您選取支援 Spark 3.0 的 Databricks Runtime 7.6 版本。

databricks 新叢集建立的圖表。

新增相依性

將適用於 Spark 的 MongoDB 連接器程式庫新增至您的叢集,以同時連線至原生 MongoDB 端點和 Azure Cosmos DB for MongoDB 端點。 在您的叢集中,選取 [程式庫] > [安裝新的] > [Maven],然後新增 org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 Maven 座標。

新增 databricks 叢集相依性的圖表。

選取 [安裝],然後在安裝完成時重新啟動叢集。

注意

在安裝了適用於 Spark 的 MongoDB 連接器程式庫之後,請務必重新啟動 Databricks 叢集。

接著,您可以建立 Scala 或 Python 筆記本以進行移轉。

建立 Scala 筆記本以進行移轉

在 Databricks 中建立 Scala 筆記本。 請務必先為變數輸入正確的值,然後再執行下列程式碼:

import com.mongodb.spark._
import com.mongodb.spark.config._
import org.apache.spark._
import org.apache.spark.sql._

var sourceConnectionString = "mongodb://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<AUTHDB>"
var sourceDb = "<DB NAME>"
var sourceCollection =  "<COLLECTIONNAME>"
var targetConnectionString = "mongodb://<ACCOUNTNAME>:<PASSWORD>@<ACCOUNTNAME>.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@<ACCOUNTNAME>@"
var targetDb = "<DB NAME>"
var targetCollection =  "<COLLECTIONNAME>"

val readConfig = ReadConfig(Map(
  "spark.mongodb.input.uri" -> sourceConnectionString,
  "spark.mongodb.input.database" -> sourceDb,
  "spark.mongodb.input.collection" -> sourceCollection,
))

val writeConfig = WriteConfig(Map(
  "spark.mongodb.output.uri" -> targetConnectionString,
  "spark.mongodb.output.database" -> targetDb,
  "spark.mongodb.output.collection" -> targetCollection,
  "spark.mongodb.output.maxBatchSize" -> "8000"  
))

val sparkSession = SparkSession
  .builder()
  .appName("Data transfer using spark")
  .getOrCreate()

val customRdd = MongoSpark.load(sparkSession, readConfig)

MongoSpark.save(customRdd, writeConfig)

建立 Python 筆記本以進行移轉

在 Databricks 中建立 Python 筆記本。 請務必先為變數輸入正確的值,然後再執行下列程式碼:

from pyspark.sql import SparkSession

sourceConnectionString = "mongodb://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<AUTHDB>"
sourceDb = "<DB NAME>"
sourceCollection =  "<COLLECTIONNAME>"
targetConnectionString = "mongodb://<ACCOUNTNAME>:<PASSWORD>@<ACCOUNTNAME>.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@<ACCOUNTNAME>@"
targetDb = "<DB NAME>"
targetCollection =  "<COLLECTIONNAME>"

my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .getOrCreate()

df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", sourceConnectionString).option("database", sourceDb).option("collection", sourceCollection).load()

df.write.format("mongo").mode("append").option("uri", targetConnectionString).option("maxBatchSize",2500).option("database", targetDb).option("collection", targetCollection).save()

最佳化移轉效能

您可以透過下列設定來調整移轉效能:

  • Spark 叢集中的背景工作角色和核心數目:背景工作角色越多,表示有越多計算節點執行工作。

  • maxBatchSizemaxBatchSize 值會控制將資料儲存至目標 Azure Cosmos DB 集合的速率。 不過,如果 maxBatchSize 對集合輸送量而言太高,其可能會導致速率限制錯誤。

    您必須調整背景工作角色的數目和 maxBatchSize,取決於 Spark 叢集中的執行程式數目、也可能是每個所寫入文件的大小 (這就是為何使用 RU 成本的原因),以及目標集合輸送量限制。

    提示

    maxBatchSize = 集合輸送量 / (1 份文件的 RU 成本 * Spark 背景工作角色的數目 * 每個背景工作角色的 CPU 核心數目)

  • MongoDB Spark 分割區和 partitionKey:所使用的預設分割區為 MongoDefaultPartitioner,而預設 partitionKey 為 _id。 您可以藉由將值 MongoSamplePartitioner 指派給輸入設定屬性 spark.mongodb.input.partitioner 來變更分割器。 同樣地,可以藉由將適當的欄位名稱指派給輸入設定屬性 spark.mongodb.input.partitioner.partitionKey 來變更 partitionKey。 正確的 partitionKey 有助於避免資料扭曲 (針對相同的分區金鑰值寫入大量記錄)。

  • 在資料傳輸期間停用索引:針對大量的資料移轉,請考慮停用索引,特別是目標集合上的萬用字元索引。 索引會增加寫入每份文件的 RU 成本。 釋放這些 RU 可協助改善資料傳輸速率。 一旦資料移轉後,您就可以啟用索引。

疑難排解

逾時錯誤 (錯誤碼 50)

對 Azure Cosmos DB for MongoDB 資料庫進行作業時,您可能會看到 50 錯誤碼。 下列案例可能會導致逾時錯誤:

  • 配置給資料庫的輸送量很低:請確定目標集合已獲派足夠的輸送量。
  • 具有大量資料量的資料扭曲過多。 如果您有大量資料要移轉到給定的資料表,但在資料中有明顯的扭曲,則即使您在資料表中佈建了數個要求單位,仍可能會遇到速率限制。 要求單位會平均分配至實體分割區,而大量資料扭曲可能會造成單一分區要求的瓶頸。 資料扭曲表示相同分區金鑰值的大量記錄。

速率限制 (錯誤碼 16500)

對 Azure Cosmos DB for MongoDB 資料庫進行作業時,您可能會看到 16500 錯誤碼。 這些是速率限制錯誤,而且可能會在停用伺服器端重試功能的較舊帳戶上觀察到這些錯誤。

  • 啟用伺服器端重試:啟用伺服器端重試 (SSR) 功能,並讓伺服器自動重試速率限制的作業。

移轉後最佳化

移轉資料後,您可以連線到 Azure Cosmos DB 並管理資料。 您也可以遵循其他的移轉後步驟,例如將索引編製原則最佳化、更新預設的一致性層級或設定 Azure Cosmos DB 帳戶的全域散發。 如需詳細資訊,請參閱移轉後最佳化一文。

其他資源

下一步