Azure Data Factory または Synapse Analytics を使用して MongoDB との間で双方向にデータをコピーする

適用対象: Azure Data Factory Azure Synapse Analytics

ヒント

企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新たに試用を開始する方法については、こちらをご覧ください。

この記事では、Azure Data Factory および Azure Data Factory Synapse Analytics パイプラインで Copy アクティビティを使用して、MongoDB データベースとの間で双方向にデータをコピーする方法について説明します。 この記事は、コピー アクティビティの概要を示しているコピー アクティビティの概要に関する記事に基づいています。

重要

新しい MongoDB コネクタでは、ネイティブ MongoDB サポートが強化されています。 下位互換性のためにのみ現状のままサポートされているレガシ MongoDB コネクタをソリューションで使用している場合は、MongoDB コネクタ (レガシ)に関するページを参照してください。

サポートされる機能

この MongoDB コネクタでは、次の機能がサポートされます。

サポートされる機能 IR
Copy アクティビティ (ソース/シンク) 1.1

① Azure 統合ランタイム ② セルフホステッド統合ランタイム

ソースおよびシンクとしてサポートされているデータ ストアの一覧については、「サポートされているデータ ストア」の表を参照してください。

具体的には、この MongoDB コネクタでは 4.2 までのバージョンがサポートされます。

前提条件

データ ストアがオンプレ ミスネットワーク、Azure 仮想ネットワーク、または Amazon Virtual Private Cloud 内にある場合は、それに接続するようセルフホステッド統合ランタイムを構成する必要があります。

データ ストアがマネージド クラウド データ サービスである場合は、Azure Integration Runtime を使用できます。 ファイアウォール規則で承認されている IP にアクセスが制限されている場合は、Azure Integration Runtime の IP を許可リストに追加できます。

また、Azure Data Factory のマネージド仮想ネットワーク統合ランタイム機能を使用すれば、セルフホステッド統合ランタイムをインストールして構成しなくても、オンプレミス ネットワークにアクセスすることができます。

Data Factory によってサポートされるネットワーク セキュリティ メカニズムやオプションの詳細については、「データ アクセス戦略」を参照してください。

作業の開始

パイプラインでコピー アクティビティを実行するには、次のいずれかのツールまたは SDK を使用します。

UI を使用して MongoDB のリンク サービスを作成する

次の手順を使用して、Azure portal UI で MongoDB のリンク サービスを作成します。

  1. Azure Data Factory または Synapse ワークスペースの [管理] タブに移動し、[リンクされたサービス] を選択して、[新規] をクリックします。

  2. MongoDB を検索し、MongoDB コネクタを選択します。

    Select the MongoDB connector.

  3. サービスの詳細を構成し、接続をテストして、新しいリンク サービスを作成します。

    Configure a linked service to MongoDB.

コネクタの構成の詳細

次のセクションでは、MongoDB コネクタに固有の Data Factory エンティティを定義するために使用されるプロパティについて詳しく説明します。

リンクされたサービスのプロパティ

MongoDB のリンクされたサービスでは、次のプロパティがサポートされます。

プロパティ Description 必須
type type プロパティは、次のように設定する必要があります:MongoDbV2 はい
connectionString MongoDB 接続文字列 (例: mongodb://[username:password@]host[:port][/[database][?options]]) を指定します。 詳細については、MongoDB のマニュアルの接続文字列に関するページを参照してください。

接続文字列を Azure Key Vault に格納することもできます。 詳細については、「Azure Key Vault への資格情報の格納」を参照してください。
はい
database アクセスするデータベースの名前。 はい
connectVia データ ストアに接続するために使用される統合ランタイム。 詳細については、「前提条件」セクションを参照してください。 指定されていない場合は、既定の Azure 統合ランタイムが使用されます。 いいえ

例:

{
    "name": "MongoDBLinkedService",
    "properties": {
        "type": "MongoDbV2",
        "typeProperties": {
            "connectionString": "mongodb://[username:password@]host[:port][/[database][?options]]",
            "database": "myDatabase"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

データセットのプロパティ

データセットの定義に使用できるセクションとプロパティの完全な一覧については、「データセットとリンクされたサービス」を参照してください。 MongoDB データセットでは、次のプロパティがサポートされます。

プロパティ Description 必須
type データセットの type プロパティは、次のように設定する必要があります:MongoDbV2Collection はい
collectionName MongoDB データベースのコレクション名前。 はい

例:

{
    "name": "MongoDbDataset",
    "properties": {
        "type": "MongoDbV2Collection",
        "typeProperties": {
            "collectionName": "<Collection name>"
        },
        "schema": [],
        "linkedServiceName": {
            "referenceName": "<MongoDB linked service name>",
            "type": "LinkedServiceReference"
        }
    }
}

コピー アクティビティのプロパティ

アクティビティの定義に利用できるセクションとプロパティの完全な一覧については、パイプラインに関する記事を参照してください。 このセクションでは、MongoDB ソースとシンクでサポートされるプロパティの一覧を示します。

ソースとしての MongoDB

コピー アクティビティの source セクションでは、次のプロパティがサポートされます。

プロパティ Description 必須
type コピー アクティビティのソースの type プロパティは、次のように設定する必要があります:MongoDbV2Source はい
filter クエリ演算子を使用して選択フィルターを指定します。 コレクション内のすべてのドキュメントを返すには、このパラメーターを省略するか、空のドキュメント ({}) を渡します。 いいえ
cursorMethods.project プロジェクションのドキュメントで返されるフィールドを指定します。 一致するドキュメント内のすべてのフィールドを返すには、このパラメーターを省略します。 いいえ
cursorMethods.sort 一致するドキュメントがクエリによって返される順序を指定します。 「cursor.sort()」を参照してください。 いいえ
cursorMethods.limit サーバーが返すドキュメントの最大数を指定します。 「cursor.limit()」を参照してください。 いいえ
cursorMethods.skip スキップするドキュメントの数と、MongoDB が結果を返すときの開始位置を指定します。 「cursor.skip()」を参照してください。 いいえ
batchSize MongoDB インスタンスからの応答の各バッチで返されるドキュメントの数を指定します。 ほとんどの場合、バッチ サイズを変更しても、ユーザーまたはアプリケーションへの影響はありません。 Azure Cosmos DB では各バッチのサイズが 40 MB を超過しないように制限されていますが、これはドキュメントが batchSize の数だけ存在するときの合計サイズなので、ドキュメントのサイズが大きくなる場合はこの値を減らしてください。 いいえ
(既定値は 100)

ヒント

このサービスは、厳格モードでの BSON ドキュメントの利用をサポートしています。 フィルター クエリがシェル モードではなく厳格モードであることを確認してください。 詳細については、MongoDB のマニュアルを参照してください。

例:

"activities":[
    {
        "name": "CopyFromMongoDB",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<MongoDB input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "MongoDbV2Source",
                "filter": "{datetimeData: {$gte: ISODate(\"2018-12-11T00:00:00.000Z\"),$lt: ISODate(\"2018-12-12T00:00:00.000Z\")}, _id: ObjectId(\"5acd7c3d0000000000000000\") }",
                "cursorMethods": {
                    "project": "{ _id : 1, name : 1, age: 1, datetimeData: 1 }",
                    "sort": "{ age : 1 }",
                    "skip": 3,
                    "limit": 3
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

シンクとしての MongoDB

コピー アクティビティの sink セクションでは、次のプロパティがサポートされます。

プロパティ Description 必須
type Copy アクティビティのシンクの type プロパティには MongoDbV2Sink を設定する必要があります。 はい
writeBehavior MongoDB データを書き込む方法について説明します。 使用可能な値は、InsertUpsert です。

upsert の動作は、同じ _id を持つドキュメントが既に存在する場合、そのドキュメントを置き換えます。それ以外の場合は、ドキュメントを挿入します。

: 元のドキュメントまたは列マッピングで _id が指定されていない場合は、サービスによってドキュメントの _id が自動的に生成されます。 つまり、upsert が期待どおりに動作するには、ドキュメントに ID があることを確認する必要があります。
いいえ
(既定値は insert です)
writeBatchSize writeBatchSize プロパティにより、各バッチで書き込むドキュメントのサイズが制御されます。 パフォーマンスを向上させるには writeBatchSize の値を大きくしてみて、ドキュメントのサイズが大きい場合は値を小さくしてみます。 いいえ
(既定値は 10,000)
writeBatchTimeout タイムアウトするまでに一括挿入操作の完了を待つ時間です。許容される値は期間です。 いいえ
(既定値は 00:30:00 - 30 分)

ヒント

JSON ドキュメントをそのままインポートするには、「JSON ドキュメントをインポートまたはエクスポートする」セクションを参照してください。表形式のデータからコピーするには、「スキーマ マッピング」を参照してください。

"activities":[
    {
        "name": "CopyToMongoDB",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Document DB output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "MongoDbV2Sink",
                "writeBehavior": "upsert"
            }
        }
    }
]

JSON ドキュメントのインポートとエクスポート

この MongoDB コネクタを使用すると、簡単に次を実行できます。

  • 2 つの MongoDB コレクション間でドキュメントをそのままコピーします。
  • Azure Cosmos DB、Azure Blob Storage、Azure Data Lake Store、その他のサポートされているファイルベースのストアなど、さまざまなソースから MongoDB に JSON ドキュメントをインポートします。
  • JSON ドキュメントを MongoDB コレクションからさまざまなファイル ベースのストアにエクスポートします。

このようなスキーマに依存しないコピーを実現するには、データセットの "構造" ("スキーマ" とも呼ばれる) のセクションと、コピー アクティビティでのスキーマ マッピングをスキップします。

スキーマ マッピング

MongoDB から表形式のシンクにデータをコピーする、またはシンクから MongoDB にデータをコピーするには、スキーマ マッピングに関する記事をご覧ください。

MongoDB リンク サービスをアップグレードする

リンク サービスと関連するクエリをアップグレードするのに役立つ手順を次に示します。

  1. 新しい MongoDB リンク サービスを作成し、リンク サービス プロパティを参照してそれを構成します。

  2. 古い MongoDB リンク サービスを参照する SQL クエリをパイプラインで使用する場合は、それらを同等の MongoDB クエリに置き換えます。 置き換えの例については、次の表を参照してください。

    SQL クエリ (SQL query) 同等の MongoDB クエリ
    SELECT * FROM users db.users.find({})
    SELECT username, age FROM users db.users.find({}, {username: 1, age: 1})
    SELECT username AS User, age AS Age, statusNumber AS Status, CASE WHEN Status = 0 THEN "Pending" CASE WHEN Status = 1 THEN "Finished" ELSE "Unknown" END AS statusEnum LastUpdatedTime + interval '2' hour AS NewLastUpdatedTime FROM users db.users.aggregate([{ $project: { _id: 0, User: "$username", Age: "$age", Status: "$statusNumber", statusEnum: { $switch: { branches: [ { case: { $eq: ["$Status", 0] }, then: "Pending" }, { case: { $eq: ["$Status", 1] }, then: "Finished" } ], default: "Unknown" } }, NewLastUpdatedTime: { $add: ["$LastUpdatedTime", 2 * 60 * 60 * 1000] } } }])
    SELECT employees.name, departments.name AS department_name FROM employees LEFT JOIN departments ON employees.department_id = departments.id; db.employees.aggregate([ { $lookup: { from: "departments", localField: "department_id", foreignField: "_id", as: "department" } }, { $unwind: "$department" }, { $project: { _id: 0, name: 1, department_name: "$department.name" } } ])

Copy アクティビティでソースおよびシンクとしてサポートされるデータ ストアの一覧については、サポートされるデータ ストアに関するセクションを参照してください。