使用 Azure Data Factory 或 Synapse Analytics 從 MongoDB 複製資料或將資料複製到 MongoDB

適用於:Azure Data Factory Azure Synapse Analytics

提示

試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告等所有項目。 了解如何免費啟動新的試用版

本文概述如何使用 Azure Data Factory Synapse Analytics 管道中的複製活動,從 MongoDB 資料庫複製資料或將資料複製到 MongoDB。 本文是根據複製活動概觀一文,該文提供複製活動的一般概觀。

重要

新的 MongoDB 連接器提供改良的原生 MongoDB 支援。 如果您在解決方案中使用舊版 MongoDB 連接器 (依原樣僅為其提供回溯相容性的支援),請參閱 MongoDB 連接器 (舊版) 一文。

支援的功能

此 MongoDB 連接器支援下列功能:

支援的功能 IR
複製活動 (來源/接收) ① ②

① 整合執行階段 ② 自我裝載整合執行階段

如需支援做為來源/接收器的資料存放區清單,請參閱支援的資料存放區表格。

具體而言,這個 MongoDB 連接器所支援的版本最高可達 4.2 版

必要條件

如果您的資料存放區位於內部部署網路、Azure 虛擬網路或 Amazon 虛擬私人雲端中,則必須設定自我裝載整合執行階段以與其連線。

如果您的資料存放區是受控雲端資料服務,則可使用 Azure Integration Runtime。 如果只能存取防火牆規則中核准的 IP,您可以將 Azure Integration Runtime IP 新增至允許清單。

您也可以使用 Azure Data Factory 中的受控虛擬網路整合執行階段功能來存取內部部署網路,而不需要安裝和設定自我裝載整合執行階段。

如需 Data Factory 支援的網路安全性機制和選項的詳細資訊,請參閱資料存取策略

開始使用

若要透過管線執行複製活動,您可以使用下列其中一個工具或 SDK:

使用 UI 建立 MongoDB 的連結服務

使用下列步驟,在 Azure 入口網站 UI 中建立至 MongoDB 的連結服務。

  1. 前往 Azure Data Factory 或 Synapse 工作區的 [管理] 索引標籤,選取 [連結服務],然後按一下 [新增]:

  2. 搜尋 MongoDB,然後選取 MongoDB 連接器。

    Select the MongoDB connector.

  3. 設定服務詳細資料,測試連線,然後建立新的連結服務。

    Configure a linked service to MongoDB.

連接器設定詳細資料

下列各節提供屬性的相關詳細資料,這些屬性是用來定義 MongoDB 連接器專屬的 Data Factory 實體。

連結服務屬性

以下是針對 MongoDB 已連結服務支援的屬性:

屬性 描述 必要
type 類型屬性必須設定為:MongoDbV2 Yes
connectionString 指定 MongoDB 連接字串,例如 mongodb://[username:password@]host[:port][/[database][?options]]。 如需詳細資訊,請參閱 MongoDB 手冊中關於連接字串的內容

您也可以在 Azure Key Vault 中放置連接字串。 如需詳細資訊,請參閱在 Azure Key Vault 中儲存認證
Yes
database 您要存取的資料庫名稱。 Yes
connectVia 用於連線到資料存放區的 Integration Runtime。 深入了解必要條件一節。 如果未指定,就會使用預設的 Azure Integration Runtime。 No

範例:

{
    "name": "MongoDBLinkedService",
    "properties": {
        "type": "MongoDbV2",
        "typeProperties": {
            "connectionString": "mongodb://[username:password@]host[:port][/[database][?options]]",
            "database": "myDatabase"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

資料集屬性

如需定義資料集的區段和屬性完整清單,請參閱資料集和連結服務。 以下是針對 MongoDB 資料集支援的屬性:

屬性 描述 必要
type 資料集的類型屬性必須設定為:MongoDbV2Collection Yes
collectionName MongoDB 資料庫中集合的名稱。 Yes

範例:

{
    "name": "MongoDbDataset",
    "properties": {
        "type": "MongoDbV2Collection",
        "typeProperties": {
            "collectionName": "<Collection name>"
        },
        "schema": [],
        "linkedServiceName": {
            "referenceName": "<MongoDB linked service name>",
            "type": "LinkedServiceReference"
        }
    }
}

複製活動屬性

如需可用來定義活動的區段和屬性完整清單,請參閱管線一文。 本節提供 MongoDB 來源和接收器所支援的屬性清單。

MongoDB 作為來源

複製活動的 source 區段支援下列屬性:

屬性 描述 必要
type 複製活動來源的類型屬性必須設定為:MongoDbV2Source Yes
filter 使用查詢運算子指定選取範圍篩選。 若要傳回集合中的所有文件,請省略此參數,或傳遞空白文件 ({})。 No
cursorMethods.project 指定要在文件中傳回以便投影的欄位。 若要傳回比對文件中的所有欄位,請省略此參數。 No
cursorMethods.sort 指定查詢傳回比對文件的順序。 請參閱 cursor.sort() No
cursorMethods.limit 指定伺服器傳回的文件數目上限。 請參閱 cursor.limit() No
cursorMethods.skip 指定要跳過的文件數,以及 MongoDB 開始傳回結果的位置。 請參閱 cursor.skip() No
batchSize 指定要在回應的每個批次中從 MongoDB 執行個體傳回的文件數目。 在大部分情況下,修改批次大小不會影響使用者或應用程式。 Azure Cosmos DB 限制每個批次的大小不能超過 40 MB,也就是文件大小的 batchSize 總數,因此如果您的文件大小很大,請降低此值。 No
(預設為 100)

提示

此服務支援以 Strict 模式取用 BSON 文件。 請確定您的篩選查詢處於 Strict 模式,而非 Shell 模式。 如需詳細說明,請參閱 MongoDB manual (MongoDB 手冊)。

範例:

"activities":[
    {
        "name": "CopyFromMongoDB",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<MongoDB input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "MongoDbV2Source",
                "filter": "{datetimeData: {$gte: ISODate(\"2018-12-11T00:00:00.000Z\"),$lt: ISODate(\"2018-12-12T00:00:00.000Z\")}, _id: ObjectId(\"5acd7c3d0000000000000000\") }",
                "cursorMethods": {
                    "project": "{ _id : 1, name : 1, age: 1, datetimeData: 1 }",
                    "sort": "{ age : 1 }",
                    "skip": 3,
                    "limit": 3
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

MongoDB 做為接收器

複製活動的 [接收] 區段支援下列屬性:

屬性 描述 必要
type 複製活動接收的類型屬性必須設定為 MongoDbV2Sink Yes
writeBehavior 描述如何將資料寫入至 MongoDB。 允許的值:insertupsert

如果存在具有相同 _id 的文件,upsert 的行為會用來取代文件;否則會插入文件。

附註:如果未在原始文件中或藉由資料行對應來指定 _id,則服務會自動為文件產生 _id。 這表示您必須確定,為了讓 upsert 如預期般運作,您的文件具有識別碼。
No
(預設值為 insert)
writeBatchSize writeBatchSize 屬性可控制在每個批次中寫入的文件大小。 您可以嘗試增加 writeBatchSize 的值來改善效能,如果您的文件大小很大,則可嘗試降低此值。 No
(預設值為 10,000)
writeBatchTimeout 在逾時前等待批次插入作業完成的時間。允許的值為時間範圍。 No
(預設為 00:30:00 - 30 分鐘)

提示

若要以現況匯入 JSON 文件,請參閱匯入或匯出 JSON 文件一節;若要從表格式資料複製,請參閱結構描述對應

範例

"activities":[
    {
        "name": "CopyToMongoDB",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Document DB output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "MongoDbV2Sink",
                "writeBehavior": "upsert"
            }
        }
    }
]

匯入和匯出 JSON 文件

您可以使用此 MongoDB 連接器輕鬆地:

  • 在兩個 MongoDB 集合之間依原樣複製文件。
  • 將 JSON 文件從各種來源匯入到 MongoDB,包括 Azure Cosmos DB、Azure Blob 儲存體、Azure Data Lake Store 及其他支援的檔案型存放區。
  • 將 JSON 文件從 MongoDB 集合匯出至各種檔案型存放區。

若要完成這種跨平台結構描述的複製,請跳過資料集中的「結構」(也稱為「結構描述」) 區段,以及複製活動中的結構描述對應。

結構描述對應

若要將資料從 MongoDB 複製到表格式接收器,或將資料從表格式接收器複製到 MongoDB,請參閱結構描述對應

升級 MongoDB 連結服務

以下是可協助您升級連結服務和相關查詢的步驟:

  1. 建立新的 MongoDB 連結服務,並藉由參考連結服務屬性加以設定。

  2. 如果您在管線中使用參考舊 MongoDB 連結服務的 SQL 查詢,請將其取代為對等 MongoDB 查詢。 請參閱下表以便了解取代範例:

    SQL 查詢 對等 MongoDB 查詢
    SELECT * FROM users db.users.find({})
    SELECT username, age FROM users db.users.find({}, {username: 1, age: 1})
    SELECT username AS User, age AS Age, statusNumber AS Status, CASE WHEN Status = 0 THEN "Pending" CASE WHEN Status = 1 THEN "Finished" ELSE "Unknown" END AS statusEnum LastUpdatedTime + interval '2' hour AS NewLastUpdatedTime FROM users db.users.aggregate([{ $project: { _id: 0, User: "$username", Age: "$age", Status: "$statusNumber", statusEnum: { $switch: { branches: [ { case: { $eq: ["$Status", 0] }, then: "Pending" }, { case: { $eq: ["$Status", 1] }, then: "Finished" } ], default: "Unknown" } }, NewLastUpdatedTime: { $add: ["$LastUpdatedTime", 2 * 60 * 60 * 1000] } } }])
    SELECT employees.name, departments.name AS department_name FROM employees LEFT JOIN departments ON employees.department_id = departments.id; db.employees.aggregate([ { $lookup: { from: "departments", localField: "department_id", foreignField: "_id", as: "department" } }, { $unwind: "$department" }, { $project: { _id: 0, name: 1, department_name: "$department.name" } } ])

如需複製活動支援作為來源和接收器的資料存放區清單,請參閱支援的資料存放區