Azure Synapse Analytics を使用して MongoDB Atlas のオペレーショナル データを分析する

Azure App Service
Azure Data Lake Storage
Azure Event Grid
Azure Synapse Analytics
Power BI

この記事では、MongoDB Atlas のオペレーショナル データから分析情報を導き出すソリューションについて説明します。 このソリューションでは、MongoDB Atlas を Azure Synapse Analytics に接続します。 この接続によって、データをバッチおよびリアルタイムで転送できます。 リアルタイム アプローチを使用すると、Azure Synapse Analytics の専用 SQL プールが常に MongoDB Atlas データ ソースの変更に合わせて同期されます。

Apache®、 Apache Spark、および炎のロゴは、Apache Software Foundation の米国およびその他の国における登録商標です。 これらのマークを使用することが、Apache Software Foundation による保証を意味するものではありません。

MongoDB Atlas ロゴは MongoDB の商標です。 このマークを使用することは、保証を意味するものではありません。

アーキテクチャ

次の図は、MongoDB Atlas データをリアルタイムで Azure Synapse Analytics に同期する方法を示しています。

MongoDB Atlas から分析アプリへのデータ フローを示すアーキテクチャ図。中間段階には、変更ストリーム API と Azure Synapse Analytics が含まれています。

"この記事のすべての図の PowerPoint ファイル をダウンロードします。"

データフロー

このソリューションには、パイプラインをトリガーする 2 つのオプションが用意されていて、パイプラインによって MongoDB Atlas オペレーショナル データ ストア (ODS) のリアルタイムの変更がキャプチャされ、データが同期されます。 次の手順で、両方のオプションの概要を示します。

  1. 変更は、MongoDB Atlas に保存されているオペレーショナルおよびトランザクション データで発生します。 Mongo Atlas の変更ストリーム API では、サブスクライブしているアプリケーションに変更をリアルタイムで通知します。

  2. カスタムの Azure App Service Web アプリでは、MongoDB の変更ストリームをサブスクライブします。 この Web アプリには Event Grid"ストレージ"の 2 つのバージョン (ソリューションのバージョンごとに 1 つずつ) あります。 どちらのアプリ バージョンでも、Atlas での挿入、更新、または削除の各操作によって発生した変更をリッスンします。 アプリは、変更を検出すると変更されたドキュメントを BLOB として Azure Data Lake Storage に書き込み、これが Azure Synapse Analytics に統合されます。 また、Atlas で変更が検出されると、アプリの Event Grid バージョンでも Azure Event Grid に新しいイベントが作成されます。

  3. どちらのバージョンのソリューションでも、Azure Synapse Analytics パイプラインをトリガーします。

    1. Event Grid バージョンでは、Azure Synapse Analytics でカスタムのイベントベース トリガーが構成されます。 このトリガーは、Web アプリで発行される Event Grid トピックをサブスクライブします。 そのトピックの新しいイベントによって Azure Synapse Analytics トリガーがアクティブになり、それによって Azure Synapse Analytics データ パイプラインが実行されます。
    2. ストレージ バージョンでは、Azure Synapse Analytics でストレージベース トリガーが構成されます。 統合された Data Lake Storage フォルダーで新しい BLOB が検出されると、そのトリガーがアクティブになり、それによって Azure Synapse Analytics データ パイプラインが実行されます。
  4. コピー アクティビティでは、Azure Synapse Analytics パイプラインによって、変更されたドキュメント全体が Data Lake Storage BLOB から専用 SQL プールにコピーされます。 この操作は、選択した列で upsert を実行するように構成されます。 列が専用 SQL プールに存在する場合は、upsert によって列が更新されます。 列が存在しない場合は、upsert によって列が挿入されます。

  5. 専用 SQL プールは、データ パイプラインによって更新されるテーブルをホストするエンタープライズ データ ウェアハウス機能です。 パイプラインのコピー データ アクティビティによって、そのテーブルと対応する Atlas コレクションとの同期が維持されます。

  6. Power BI のレポートと視覚化には、現在、およびほぼリアルタイムの分析が表示されます。 このデータはダウンストリーム アプリケーションにもフィードされます。 MongoDB Atlas は、Azure Synapse Analytics データ パイプライン シンク コネクタを使用してシンクとして機能します。 これにより、Atlas によってカスタム アプリにリアルタイム データが提供されます。

コンポーネント

  • MongoDB Atlas は、MongoDB のサービスとしてのデータベースのオファリングです。 このマルチクラウド アプリケーション データ プラットフォームでは、トランザクション処理、関連性ベースの検索、リアルタイム分析、モバイルからクラウドへのデータ同期を実現します。 MongoDB には、オンプレミスのソリューションである MongoDB Enterprise Advanced も用意されています。

  • MongoDB Atlas の変更ストリーム を使用すると、アプリケーションからリアルタイムのデータ変更にアクセスできるため、アプリではそれらの変更にすぐに対応できます。 変更ストリームを使用すると、アプリケーションで、特定のコレクション、データベース、またはデプロイ クラスター全体に対する変更に関する通知を受け取ることができます。

  • App Service とその Web Apps、Mobile Apps、API Apps の機能は、Web アプリ、モバイル アプリ、REST API を構築、デプロイ、スケーリングするためのフレームワークになります。 このソリューションでは、ASP.NET でプログラミングされた Web アプリを使用します。 コードは GitHub から入手できます。

  • Azure Synapse Analytics は、このソリューションでデータ インジェスト、処理、分析に使用されるコア サービスです。

  • Data Lake Storage では、データを保存および処理できます。 Data Lake Storage は、 Blob Storage の上に構築されたデータ レイクとして、複数の異種ソースからの大量のデータを管理するためのスケーラブルなソリューションを提供します。

  • Azure Synapse Analytics パイプライン は、データに対する抽出、変換、読み込み (ETL) 操作を実行するために使用します。 Azure Data Factory でも同様のサービスが提供されますが、Azure Synapse Analytics パイプラインは Synapse Studio 内に作成できます。 同じパイプライン内で複数のアクティビティを使用できます。 また、依存関係エンドポイントを作成して、1 つのアクティビティをパイプライン内の別のアクティビティと接続することもできます。

  • マッピング データ フロー は、Azure Synapse Analytics における視覚的に設計されたデータ変換です。 データ フローを使用すると、データ エンジニアは、コードを記述せずにデータ変換ロジックを開発できます。 生成されたデータ フローは、スケールアウトされた Apache Spark クラスターを使用する Azure Synapse Analytics パイプライン内のアクティビティとして実行できます。 データ フロー アクティビティは、既存の Azure Synapse Analytics のスケジュール設定、制御、フロー、監視機能を使用して操作に組み込むことができます。

  • 専用 SQL プール は、データが処理および正規化された後の、データのデータ ウェアハウス機能を提供します。 Azure Synapse Analytics のこの機能は、以前は SQL Data Warehouse と呼ばれていました。 専用 SQL プールを利用すると、調整されたデータをエンド ユーザーとアプリケーションで使用できます。

  • Azure Synapse Analytics トリガー を使用すると、パイプラインを自動的に実行できます。 これらのトリガーにはスケジュールを設定できます。 ストレージ イベント トリガーカスタム イベント トリガーなど、イベントベースのトリガーを設定することもできます。 このソリューションでは、両方の種類のイベントベースのトリガーを使用します。

  • Event Grid は、高度にスケーラブルなサーバーレス イベント ブローカーです。 Event Grid を使用すると、イベントをサブスクライバーの宛先に配信できます。

  • Power BI は、分析情報を表示するソフトウェア サービスとアプリのコレクションです。 このソリューションでは、Power BI を利用することで、処理済みのデータを使用して高度な分析を実行し、分析情報を導き出すことができます。

シナリオの詳細

MongoDB Atlas は、多くのエンタープライズ アプリケーションのオペレーショナル データ レイヤーとして機能します。 このクラウド データベースには、内部アプリケーション、顧客向けサービス、および複数のチャネルのサードパーティ API からのデータが保存されます。 Azure Synapse Analytics パイプラインを使用すると、MongoDB Atlas データと、他の従来のアプリケーションのリレーショナル データや、ログなどのソースからの非構造化データとを結合できます。

バッチ統合

Azure Synapse Analytics では、MongoDB オンプレミス インスタンスと MongoDB Atlas をソースまたはシンク リソースとしてシームレスに統合できます。 MongoDB は、Azure Synapse Analytics および Data Factory 用のソースおよびシンク コネクタを持つ唯一の NoSQL データベースです。

履歴データでは、すべてのデータを一度に取得できます。 また、バッチ モードでフィルターを使用して、特定の期間のデータを段階的に取得することもできます。 その後に、Azure Synapse Analytics で SQL プールと Apache Spark プールを使用して、データを変換および分析できます。 分析またはクエリの結果を分析データ ストアに保存する必要がある場合は、Azure Synapse Analytics でシンク リソースを使用できます。

コンシューマーから Azure Synapse Analytics および MongoDB データ ストレージにデータを接続するソースおよびシンク コネクタを示すアーキテクチャ図。

これらのコネクタを設定および構成する方法の詳細については、以下のリソースを参照してください。

ソース コネクタを使用すると、MongoDB または Atlas に保存されているオペレーショナル データ上で簡単に Azure Synapse Analytics を実行できます。 ソース コネクタを使用して Atlas からデータを取得した後、データを Data Lake Storage BLOB ストレージに Parquet、Avro、JSON、テキスト、または CSV ファイルとして読み込むことができます。 その後に、これらのファイルを変換したり、マルチデータベース、マルチクラウド、またはハイブリッド クラウド環境の他のデータ ソースの他のファイルと結合したりすることができます。

MongoDB Enterprise Advanced または MongoDB Atlas から取得したデータは、次のシナリオで使用できます。

  • MongoDB から特定の日付のすべてのデータをバッチで取得します。 次に、そのデータを Data Lake Storage に読み込みます。 そこから、サーバーレス SQL プールまたは Spark プールを使用して分析するか、データを専用 SQL プールにコピーします。 このバッチを取得した後、「データフロー」で説明したように、発生した変更をそのつどデータに適用できます。 このソリューションには、お使いいただける Storage-CopyPipeline_mdb_synapse_ded_pool_RTS サンプル パイプライン が含まれています。 このパイプラインは、この 1 回限りの読み込みを目的として、GitHub からエクスポートできます。

  • 特定の頻度 (たとえば、毎日または毎時のレポート) 用に分析情報を生成します。 このシナリオでは、分析パイプラインを実行する前に、定期的にデータを取得するようにパイプラインのスケジュールを設定します。 MongoDB クエリを使用すると、フィルター条件を適用して、データの特定のサブセットのみを取得できます。

リアルタイムの同期

企業には、古いデータではなく、リアルタイム データに基づく分析情報が必要です。 分析情報の配信が数時間遅れた場合、意思決定プロセスが中断し、競争上の優位性が失われるおそれがあります。 このソリューションでは、MongoDB トランザクション データベースで発生した変更をリアルタイムで専用 SQL プールに反映することで、重要な意思決定を迅速化します。

このソリューションには、この後のセクションで説明する 3 つの部分があります。

MongoDB Atlas の変更をキャプチャする

MongoDB 変更ストリームでは、データベースで発生した変更をキャプチャします。 変更ストリーム API では、変更に関する情報を、変更ストリームをサブスクライブしている App Service Web アプリで利用できるようにします。 これらのアプリによって、変更が Data Lake Storage BLOB ストレージに書き込まれます。

変更を Azure Synapse Analytics に反映するようにパイプラインをトリガーする

このソリューションには、BLOB が Data Lake Storage に書き込まれた後に Azure Synapse Analytics パイプラインをトリガーするためのオプションが 2 つあります。

  • ストレージ ベースのトリガー。 リアルタイム分析が必要な場合は、このオプションを使用します。変更された BLOB が書き込まれるとすぐにパイプラインがトリガーされるためです。 ただし、大量のデータ変更がある場合、このオプションは方法として適切でない可能性があります。 Azure Synapse Analytics では、同時に実行できるパイプラインの数を制限しています。 大量のデータ変更がある場合は、その制限に達するおそれがあります。

  • イベントベースのカスタム トリガー。 この種類のトリガーには Azure Synapse Analytics の外部にあるという利点があるため、簡単に制御できます。 Event Grid バージョンの Web アプリでは、変更されたデータ ドキュメントを BLOB ストレージに書き込みます。 同時に、アプリでは新しい Event Grid イベントを作成します。 イベント内のデータには、BLOB のファイル名が含まれています。 イベントによってトリガーされるパイプラインではファイル名をパラメーターとして受け取り、そのファイルを使用して専用 SQL プールを更新します。

専用 SQL プールに変更を反映する

Azure Synapse Analytics パイプラインでは、変更を専用 SQL プールに反映します。 このソリューションでは、GitHub 上に CopyPipeline_mdb_synapse_ded_pool_RTS パイプラインが用意されていて、これによって BLOB の変更が Data Lake Storage から専用 SQL プールにコピーされます。 このパイプラインをトリガーするのは、ストレージまたは Event Grid のトリガーです。

考えられるユース ケース

このソリューションのユース ケースは、多くの業界と分野にまたがっています。

  • 小売

    • インテリジェンスを製品バンドルと製品のプロモーションに組み込む
    • IoT ストリーミングを使用するコールド ストレージを最適化する
    • 在庫補充を最適化する
    • オムニチャネル配信に価値を付加する
  • 銀行と金融

    • 顧客の金融サービスをカスタマイズする
    • 不正の可能性がある取引を検出する
  • 遠距離通信

    • 次世代ネットワークを最適化する
    • エッジ ネットワークの価値を最大にする
  • 自動車

    • 接続済み車両のパラメーター化を最適化する
    • 接続済み車両の IoT 通信の異常を検出する
  • 製造

    • 機械の予測メンテナンスを実現する
    • 保管と在庫管理を最適化する

具体的な例を 2 つ示します。

  • この記事の「バッチ統合」で先ほど説明したように、MongoDB データをバッチで取得し、変更が発生すると、それに応じてデータを更新できます。 この機能を使用して、リアルタイムの分析情報による Just-In-Time の意思決定と結論付けが実現します。 この機能は、金融取引や不正検出データなど、機密性の高い重要な情報の分析に役立ちます。
  • また、「バッチ統合」で説明しているように、MongoDB データを定期的に取得するようにパイプラインのスケジュールを設定することもできます。 この機能は、日次売上データを使用して在庫レベルを更新するといった小売のシナリオで役立ちます。 このような場合、分析レポートとダッシュボードは重要ではなく、リアルタイム分析をわざわざ行う必要はありません。

この後のセクションでは、2 つの小売業界のユース ケースについて詳しく検証します。

製品バンドル

製品の販売を促進するために、他の関連製品と共にバンドルに含めて製品を販売できます。 この目的は、販売パターン データを使用して、製品をパッケージにバンドルするための戦略を策定することです。

データ ソースは 2 つあります。

  • MongoDB の製品カタログ データ
  • Azure SQL の販売データ

両方のデータ セットは、Azure Synapse Analytics パイプラインを使用して、Azure Synapse Analytics の専用 SQL プールに移行されます。 トリガーと変更データ キャプチャを使用して、1 回限りの移行データに対してほぼリアルタイムのデータ同期を実現します。

次の Power BI グラフは、製品と販売パターンのアフィニティを示しています。 ペンとインクベースのリフィルとのアフィニティが高くなっています。 売上データは、特定の分野でペンの売上が多いことを示しています。

パイプラインのステージと、製品、年、リージョン、アフィニティ別のペンの売上を表すグラフを示す図。ペンの売上は南部で 2022 年に最も多くなっています。

この分析により、売上を増やすために 2 つの示唆が得られます。

  • ペンとインクベースのレフィルをバンドルする
  • 特定の地域でバンドルの販売を促進する

製品のプロモーション

製品の販売を促進するために、関連製品に関心のある顧客にその製品を勧めることができます。 この目的は、売上データと顧客の購入パターン データを使用して、顧客に製品を勧めるための戦略を策定することです。

Azure Synapse Analytics を使用すると、顧客に勧める製品を決定するための AI および機械学習モデルを開発できます。

次の図は、さまざまな種類のデータを使用して、勧める代替製品を決定するためのモデルを作成する方法を示しています。 このデータには、顧客の購入パターン、利益、製品アフィニティ、製品の売上、製品のカタログ パラメーターが含まれます。

パイプラインのステージと AI モデルのワークフローを示す図。データ フィールドには、顧客 ID、価格、売上、利益が含まれます。

モデルの精度が高い場合は、顧客に勧めることができる製品の一覧が示されます。

考慮事項

以降の考慮事項には、ワークロードの品質向上に使用できる一連の基本原則である Azure "Well-Architected Framework" の要素が組み込まれています。 詳細については、「Microsoft Azure Well-Architected Framework」を参照してください。

セキュリティ

セキュリティは、重要なデータやシステムの意図的な攻撃や悪用に対する保証を提供します。 詳細については、「セキュリティの重要な要素の概要」を参照してください。

ソリューション内の Azure コンポーネントのセキュリティ要件と制御の詳細については、各製品のドキュメントのセキュリティに関するセクションを参照してください。

コストの最適化

コストの最適化とは、不要な費用を削減し、運用効率を向上させる方法を検討することです。 詳しくは、 コスト最適化の柱の概要に関する記事をご覧ください。

  • Azure 製品の構成とコストの見積もりには Azure 料金計算ツールを使用してください。
  • Azure では、ニーズに合った適切なリソースの数を特定し、時間の経過に伴う支出を分析し、超過出費のないようビジネス ニーズに合わせてスケーリングして、不要なコストを抑えることができます。 たとえば、読み込みがないと想定される場合は、専用 SQL プールを一時停止できます。 これらは後で再開できます。
  • App Service は Azure Functions に置き換えることができます。 Azure Synapse Analytics パイプライン内で関数を調整することで、コストを削減できます。
  • Spark クラスターのコストを削減するには、適切な種類のデータ フロー コンピューティングを選択します。 一般およびメモリ最適化のオプションを使用できます。 また、コア数と有効期間 (TTL) にも適切な値を選択します。
  • 主要なソリューション コンポーネントのコスト管理の詳細については、次のリソースを参照してください。

パフォーマンス効率

パフォーマンス効率とは、ユーザーからの要求に合わせて効率的な方法でワークロードをスケーリングできることです。 詳細については、「パフォーマンス効率の柱の概要」を参照してください。

大量の変更がある場合、コレクション内のすべての変更に対して Azure Synapse Analytics で何千ものパイプラインを実行すると、キューに登録されたパイプラインのバックログが発生するおそれがあります。 このシナリオでのパフォーマンスを向上するために、次の方法を検討してください。

  • ストレージ ベースの App Service コードを使用します。これによって、変更された JSON ドキュメントが Data Lake Storage に書き込まれます。 ストレージ ベースのトリガーはパイプラインにリンクしないでください。 代わりに、2 分や 5 分おきといった、短い間隔でスケジュールされたトリガーを使用します。 スケジュールされたトリガーが実行されると、指定された Data Lake Storage ディレクトリ内のすべてのファイルが取得され、そのそれぞれに対して専用 SQL プールが更新されます。
  • Event Grid App Service コードを変更します。 ファイル名を含むメタデータがある新しいトピックをイベントに追加する前に、BLOB ストレージに約 100 件の変更のマイクロバッチを追加するようにプログラムします。 この変更で、100 件の変更を含む 1 つの BLOB に対して 1 つのパイプラインのみがトリガーされます。 マイクロバッチ サイズは、シナリオに合わせて調整できます。 リアルタイムに近い形で更新するには、小さなマイクロバッチを高い頻度で使用します。 または、更新を遅延してオーバーヘッドを削減するには、大きなマイクロバッチを低い頻度で使用します。

Azure Synapse Analytics パイプラインの Copy アクティビティのパフォーマンスとスケーラビリティの向上に関する詳細については、「Copy アクティビティのパフォーマンスとスケーラビリティに関するガイド」を参照してください。

このシナリオのデプロイ

このソリューションの実装の詳細については、「MongoDB Atlas と Synapse の統合のリアルタイム同期ソリューション」を参照してください。

共同作成者

この記事は、Microsoft によって保守されています。 当初の寄稿者は以下のとおりです。

プリンシパルの作成者:

その他の共同作成者:

パブリックでない LinkedIn プロファイルを表示するには、LinkedIn にサインインします。

次のステップ

このソリューションの詳細については、 partners@mongodb.comにお問い合わせください。

MongoDB の詳細については、次のリソースを参照してください。

Azure ソリューションのコンポーネントの詳細については、次のリソースを参照してください。