你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用 Azure Synapse Analytics 分析 MongoDB Atlas 上的操作数据

Azure 应用服务
Azure Data Lake Storage
Azure 事件网格
Azure Synapse Analytics
Power BI

本文介绍一个解决方案,用于从 MongoDB Atlas 操作数据派生见解。 该解决方案将 MongoDB Atlas 连接到 Azure Synapse Analytics。 进行连接之后就可以批量实时传输数据。 实时方法使 Azure Synapse Analytics 专用 SQL 池与 MongoDB Atlas 数据源中的更改保持同步。

Apache®、Apache Spark 和火焰徽标是 Apache Software Foundation 在美国和/或其他国家/地区的商标或注册商标。 使用这些标记并不暗示获得 Apache Software Foundation 的认可。

MongoDB Atlas 徽标是 MongoDB 的商标。 使用此标志并不意味着认可。

体系结构

下图显示了如何将 MongoDB Atlas 数据实时同步到 Azure Synapse Analytics。

Architecture diagram that shows data flow from MongoDB Atlas to analysis apps. Interim stages include a change stream API and Azure Synapse Analytics.

下载包含本文中所有示意图的 PowerPoint 文件

数据流

该解决方案提供两个选项来触发管道,这些管道可捕获 MongoDB Atlas 操作数据存储 (ODS) 中的实时更改并同步数据。 以下步骤概述了这两个选项。

  1. 更改发生在已存储在 MongoDB Atlas 中的操作数据和事务数据中。 Mongo Atlas 更改流 API 会将实时更改通知给已订阅的应用程序。

  2. 自定义 Azure 应用服务 Web 应用会订阅 MongoDB 更改流。 Web 应用有两个版本:事件网格版本和存储版本,对应于每个解决方案版本。 这两个应用版本都侦听 Atlas 中的插入、更新或删除操作导致的更改。 应用在检测到更改时,会将更改的文档以 Blob 的形式写入与 Azure Synapse Analytics 集成的 Azure Data Lake Storage。 当应用检测到 Atlas 中的更改时,应用的事件网格版本还会在 Azure 事件网格中创建新事件。

  3. 解决方案的两个版本都触发 Azure Synapse Analytics 管道:

    1. 在事件网格版本中,基于事件的自定义触发器在 Azure Synapse Analytics 中配置。 该触发器订阅 Web 应用将内容发布到的事件网格主题。 该主题的新事件会激活 Azure Synapse Analytics 触发器,导致 Azure Synapse Analytics 数据管道运行。
    2. 在存储版本中,基于存储的触发器在 Azure Synapse Analytics 中配置。 在集成的 Data Lake Storage 文件夹中检测到新 Blob 时,会激活该触发器,导致 Azure Synapse Analytics 数据管道运行。
  4. 在复制活动中,Azure Synapse Analytics 管道会将 Data Lake Storage Blob 中完整的已更改文档复制到专用 SQL 池。 此操作配置为对所选列执行 upsert。 如果专用 SQL 池中存在该列,则 upsert 会更新该列。 如果该列不存在,则 upsert 会插入该列。

  5. 专用 SQL 池是企业数据仓库功能,用于托管数据管道更新的表。 管道的复制数据活动使该表与其对应的 Atlas 集合保持同步。

  6. Power BI 报表和可视化效果会显示当前的准实时分析。 数据还会馈送到下游应用程序中。 MongoDB Atlas 通过 Azure Synapse Analytics 数据管道接收器连接器来充当接收器。 然后,Atlas 会为自定义应用提供实时数据。

组件

  • MongoDB Atlas 是 MongoDB 提供的数据库即服务产品。 此多云应用程序数据平台提供事务处理、基于相关性的搜索、实时分析以及从移动到云的数据同步。 MongoDB 还提供本地解决方案,即 MongoDB Enterprise Advanced。

  • MongoDB Atlas 中的更改流允许应用程序访问实时数据更改,以便应用可以立即响应这些更改。 更改流为应用程序提供了一种方法来接收有关特定集合、数据库或整个部署群集的更改的通知。

  • 应用服务及其 Web 应用、移动应用和 API 应用功能提供了一个用于生成、部署和缩放 Web 应用、移动应用和 REST API 的框架。 此解决方案使用在 ASP.NET 中对其进行编程的 Web 应用。 GitHub 上提供了代码:

  • Azure Synapse Analytics 是供此解决方案用于数据引入、处理和分析的核心服务。

  • Data Lake Storage 提供用于存储和处理数据的功能。 作为基于 Blob 存储构建的数据湖,Data Lake Storage 提供了一个可缩放的解决方案,用于管理来自多个异类源的大量数据。

  • Azure Synapse Analytics 管道用于对数据执行提取、转换和加载 (ETL) 操作。 Azure 数据工厂提供类似的服务,但你可以在 Synapse Studio 中创建 Azure Synapse Analytics 管道。 你可以在同一管道中使用多个活动。 还可以创建依赖项终结点,以便在管道中将一个活动与另一个活动连接起来。

  • 映射数据流是 Azure Synapse Analytics 中以可视方式设计的数据转换。 使用数据流,数据工程师可以开发数据转换逻辑而无需编写代码。 可以将生成的数据流作为使用横向扩展的 Apache Spark 群集的 Azure Synapse Analytics 管道中的活动运行。 可以通过现有的 Azure Synapse Analytics 计划、控制、流和监视功能,将数据流活动置于操作中。

  • 专用 SQL 池在数据经过处理和规范化后为数据提供数据仓库功能。 Azure Synapse Analytics 的此功能以前称为 SQL 数据仓库。 专用 SQL 池使优化的数据可供最终用户和应用程序使用。

  • Azure Synapse Analytics 触发器提供了一种运行管道的自动化方法。 可以对这些触发器进行计划。 还可以设置基于事件的触发器,例如存储事件触发器自定义事件触发器。 该解决方案使用这两种类型的基于事件的触发器。

  • 事件网格是高度可缩放的无服务器事件中转站。 可以使用事件网格将事件传送到订阅者目标。

  • Power BI 是显示分析信息的软件服务和应用的集合。 在该解决方案中,Power BI 提供了一种使用已处理的数据来执行高级分析和派生见解的方法。

方案详细信息

MongoDB Atlas 充当许多企业应用程序的操作数据层。 此云数据库存储来自内部应用程序、面向客户的服务和多个渠道的第三方 API 的数据。 通过使用 Azure Synapse Analytics 管道,你可以将 MongoDB Atlas 数据与来自其他传统应用程序的关系数据以及来自日志之类的源的非结构化数据组合在一起使用。

Batch 集成

在 Azure Synapse Analytics 中,可以将 MongoDB 本地实例和 MongoDB Atlas 无缝集成为源或接收器资源。 MongoDB 是唯一一个具有源连接器和接收器连接器(适用于 Azure Synapse Analytics 和数据工厂)的 NoSQL 数据库。

使用历史数据时,可以一次检索所有数据。 还可以在批处理模式下使用筛选器以增量方式检索特定时间段的数据。 然后,可以在 Azure Synapse Analytics 中使用 SQL 池和 Apache Spark 池来转换和分析数据。 如果需要将分析或查询结果存储在分析数据存储中,则可以在 Azure Synapse Analytics 中使用接收器资源。

Architecture diagram that shows the source and sink connectors that connect data from consumers to Azure Synapse Analytics and MongoDB data storage.

若要详细了解如何设置和配置连接器,请查看以下资源:

源连接器提供了一种在存储在 MongoDB 或 Atlas 中的操作数据上运行 Azure Synapse Analytics 的便捷方法。 使用源连接器从 Atlas 检索数据后,可以将数据作为 Parquet、Avro、JSON、文本或 CSV 文件加载到 Data Lake Storage Blob 存储中。 然后可以转换这些文件,或者将它们与来自多数据库、多云或混合云环境中的其他数据源的其他文件联接在一起。

可以在以下方案中使用从 MongoDB Enterprise Advanced 或 MongoDB Atlas 检索的数据:

  • 从 MongoDB 成批检索特定日期的所有数据。 然后将数据加载到 Data Lake Storage 中。 在那里,可以使用无服务器 SQL 池或 Spark 池进行分析,也可以将数据复制到专用 SQL 池中。 检索此批后,可以按照数据流中所述,在数据更改出现时应用数据更改。 Storage-CopyPipeline_mdb_synapse_ded_pool_RTS 示例管道作为此解决方案的一部分提供。 可以从 GitHub 导出此管道以实现此一次性加载目的。

  • 以特定频率生成见解,例如,每日或每小时报表。 对于此方案,请在运行分析管道之前将管道安排为定期检索数据。 可以使用 MongoDB 查询来应用筛选条件,仅检索特定数据子集。

实时同步

企业需要基于实时数据而非过时数据的见解。 见解交付延迟数小时可能会阻碍决策过程,导致竞争优势丧失。 此解决方案通过将 MongoDB 事务数据库中发生的更改实时传播到专用 SQL 池来推动关键决策。

此解决方案有三个部分,在以下部分进行了介绍。

捕获 MongoDB Atlas 更改

MongoDB 更改流捕获数据库中发生的更改。 更改流 API 将有关更改的信息提供给订阅更改流的应用服务 Web 应用。 这些应用将更改写入 Data Lake Storage Blob 存储。

触发管道以将更改传播到 Azure Synapse Analytics

该解决方案提供了两个选项,用于在 Blob 写入 Data Lake Storage 后触发 Azure Synapse Analytics 管道:

  • 基于存储的触发器。 如果需要实时分析,请使用此选项,因为一旦写入包含更改的 Blob,就会触发管道。 但是,在有大量数据更改时,此选项可能不是首选方法。 Azure Synapse Analytics 限制可以并发运行的管道数。 有大量数据更改时,可能会达到该限制。

  • 基于事件的自定义触发器。 这种类型的触发器的优势在于它位于 Azure Synapse Analytics 之外,因此更易于控制。 Web 应用的事件网格版本会将更改的数据文档写入 Blob 存储。 同时,应用会创建新的事件网格事件。 事件中的数据包含 Blob 的文件名。 事件触发的管道接收文件名作为参数,然后使用该文件更新专用 SQL 池。

将更改传播到专用 SQL 池

Azure Synapse Analytics 管道会将更改传播到专用 SQL 池。 该解决方案在 GitHub 上提供了一个 CopyPipeline_mdb_synapse_ded_pool_RTS 管道,用于将 Blob 中的更改从 Data Lake Storage 复制到专用 SQL 池。 此管道由存储或事件网格触发器触发。

可能的用例

此解决方案的用例跨越许多行业和领域:

  • Retail

    • 将智能构建到产品捆绑和产品促销中
    • 优化使用 IoT 流式处理的冷存储
    • 优化库存补充
    • 提升全渠道配送的价值
  • 银行和财务行业

    • 自定义客户金融服务
    • 检测潜在的欺诈性交易
  • 电信

    • 优化下一代网络
    • 最大化边缘网络的价值
  • 汽车

    • 优化联网车辆的参数化
    • 检测联网车辆的 IoT 通信中的异常
  • 制造

    • 为机械提供预测性维护
    • 优化存储和库存管理

下面是两个具体示例:

  • 如本文前面介绍的 Batch 集成中所述,可以成批检索 MongoDB 数据,然后在发生更改时更新数据。 此功能使实时见解可用于实时决策和结论。 此功能可用于分析敏感且关键的信息,例如财务交易和欺诈检测数据。
  • 正如 Batch 集成还介绍的那样,你可以将管道计划为定期检索 MongoDB 数据。 此功能适用于零售方案,例如使用每日销售数据更新库存水平。 在这种情况下,分析报表和仪表板并不很重要,不值得进行实时分析。

以下部分更深入地审视了两个零售行业用例。

产品捆绑

若要促进产品销售,可以将产品作为捆绑包的一部分与其他相关产品一起销售。 目标是使用销售模式数据来制定将产品捆绑到包中的策略。

有两个数据源:

  • 产品目录数据来自 MongoDB
  • 销售数据来自 Azure SQL

这两组数据都通过 Azure Synapse Analytics 管道迁移到 Azure Synapse Analytics 专用 SQL 池。 触发器和更改数据捕获用于在一次性迁移数据的基础上实现准实时数据同步。

以下 Power BI 图表显示了产品和销售模式之间的相关性。 笔和补墨瓶的相关性很高。 销售数据显示笔在指定区域的销售量高。

Diagram that shows pipeline stages and charts that show pen sales by product, year, region, and affinity. Pen sales are highest in 2022 in the South.

分析提出了两个用于改进销售的建议:

  • 将笔和补墨瓶捆绑销售
  • 在某些区域推广捆绑销售

产品促销

若要促进某个产品的销售,可以将该产品推荐给对相关产品感兴趣的客户。 目标是使用销售数据和客户购买模式数据来制定向客户推荐产品的策略。

通过使用 Azure Synapse Analytics,可以开发 AI 和机器学习模型来确定要向客户推荐的产品。

下图显示了如何使用各种类型的数据来创建模型以确定备用产品建议。 这些数据包括客户购买模式、利润、产品相关性、产品销售量和产品目录参数。

Diagrams that show pipeline stages and a workflow for an AI model. Data fields include the customer ID, price, sales, and profit.

如果模型达到了很高的准确度,则你可以将它提供的一系列产品推荐给客户。

注意事项

这些注意事项实施 Azure 架构良好的框架的支柱原则,即一套可用于改善工作负载质量的指导原则。 有关详细信息,请参阅 Microsoft Azure 架构良好的框架

安全性

安全性针对蓄意攻击及滥用宝贵数据和系统提供保障措施。 有关详细信息,请参阅安全性支柱概述

若要详细了解解决方案中 Azure 组件的安全要求和控制,请参阅每个产品的文档的安全性部分。

成本优化

成本优化是关于寻找减少不必要的费用和提高运营效率的方法。 有关详细信息,请参阅成本优化支柱概述

  • 若要估算 Azure 产品和配置的成本,请使用 Azure 定价计算器
  • Azure 可以确定符合你需求的正确资源数、分析一段时间内的支出,以及在不超支的情况下进行缩放来满足业务需求,让你避免不必要的成本。 例如,当你不需要任何负载时,可以暂停专用 SQL 池。 稍后可以恢复它们。
  • 可以将应用服务替换为 Azure Functions。 你可以通过在 Azure Synapse Analytics 管道中协调函数的使用来降低成本。
  • 若要降低 Spark 群集成本,请选择正确的数据流计算类型。 常规选项和内存优化选项可用。 还可以选择适当的核心计数和生存时间 (TTL) 值。
  • 若要详细了解如何管理关键解决方案组件的成本,请查看以下资源:

性能效率

性能效率是指工作负荷以高效方式缩放以满足用户对它的需求的能力。 有关详细信息,请参阅性能效率要素概述

出现大量更改时,针对集合中的每个更改在 Azure Synapse Analytics 中运行数千个管道可能会导致排队管道积压。 若要在此方案中提高性能,请考虑以下方法:

  • 使用基于存储的应用服务代码,该代码使用对 Data Lake Storage 的更改编写 JSON 文档。 不要将基于存储的触发器与管道链接, 而应以短间隔(例如每两分钟或五分钟一次)使用计划触发器。 当计划触发器运行时,它将采用指定 Data Lake Storage 目录中的所有文件,并更新每个文件的专用 SQL 池。
  • 修改事件网格应用服务代码。 在代码将新主题添加到事件(其中的元数据包含文件名)之前,请对代码进行编辑,以将一微批的更改(大约 100 项)添加到 Blob 存储。 通过此修改,只需为一个包含 100 项更改的 Blob 触发一个管道。 可以调整微批大小,使之满足你的方案。 以高频率使用小型微批来提供准实时更新。 或者以较低频率使用较大的微批进行延迟更新并降低开销。

若要详细了解如何提高 Azure Synapse Analytics 管道复制活动的性能和可伸缩性,请参阅复制活动性能和可伸缩性指南

部署此方案

若要了解如何实现此解决方案,请参阅用于将 MongoDB Atlas 与 Synapse 集成的实时同步解决方案

作者

本文由 Microsoft 维护, 它最初是由以下贡献者撰写的。

主要作者:

其他参与者:

若要查看非公开领英个人资料,请登录领英。

后续步骤

有关此解决方案的详细信息,请联系 partners@mongodb.com

有关 MongoDB 的详细信息,请参阅以下资源:

有关 Azure 解决方案组件的信息,请参阅以下资源: