本文介绍用于从 MongoDB Atlas 操作数据派生见解的解决方案。 该解决方案将 MongoDB Atlas 连接到 Azure Synapse Analytics。 通过连接,可以批量和实时传输数据。 实时方法使 Azure Synapse Analytics 专用 SQL 池与 MongoDB Atlas 数据源中的更改保持同步。
Apache®、Apache Spark,火焰徽标是美国和/或其他国家/地区 Apache Software Foundation 的注册商标或商标。 Apache Software Foundation 不暗示使用这些标记。
MongoDB Atlas 徽标是 MongoDB 的商标。 使用此标记不会暗示任何认可。
建筑
下图显示了如何将 MongoDB Atlas 数据实时同步到 Azure Synapse Analytics。
数据流
该解决方案提供了两个选项用于触发管道,这些管道捕获 MongoDB Atlas 操作数据存储(ODS)中的实时更改并同步数据。 以下步骤概述了这两个选项。
在 MongoDB Atlas 中存储的操作和事务数据中发生更改。 Mongo Atlas 更改流 API 实时通知订阅的应用程序更改。
自定义 Azure 应用服务 Web 应用订阅 MongoDB 更改流。 Web 应用有两个版本,事件网格 和 存储,每个版本的解决方案各有一个。 这两个应用版本都侦听 Atlas 中插入、更新或删除操作导致的更改。 当应用检测到更改时,会将更改的文档作为 Blob 写入 Azure Data Lake Storage,后者与 Azure Synapse Analytics 集成。 当应用检测到 Atlas 中的更改时,应用的事件网格版本还会在 Azure 事件网格中创建一个新事件。
解决方案的两个版本都触发 Azure Synapse Analytics 管道:
- 在事件网格版本中,Azure Synapse Analytics 中配置了基于事件的自定义触发器。 该触发器订阅 Web 应用发布到的事件网格主题。 该主题的新事件激活 Azure Synapse Analytics 触发器,这会导致 Azure Synapse Analytics 数据管道运行。
- 在存储版本中,Azure Synapse Analytics 中配置了基于存储的触发器。 当在集成的 Data Lake Storage 文件夹中检测到新 Blob 时,该触发器将激活,这会导致 Azure Synapse Analytics 数据管道运行。
在复制活动中,Azure Synapse Analytics 管道将完整更改的文档从 Data Lake Storage Blob 复制到专用 SQL 池。 此操作配置为对所选列执行 更新插入。 如果列存在于专用 SQL 池中,则更新插入会更新该列。 如果该列不存在,则 upsert 插入该列。
专用 SQL 池是托管数据管道更新表的企业数据仓库功能。 管道的复制数据活动使该表与其对应的 Atlas 集合保持同步。
Power BI 报表和可视化效果显示当前和准实时分析。 数据还会馈送下游应用程序。 MongoDB Atlas 使用 Azure Synapse Analytics 数据管道接收器连接器充当接收器。 然后,Atlas 提供自定义应用与实时数据。
组件
MongoDB Atlas 是 MongoDB 提供的数据库即服务产品/ 服务。 此多云应用程序数据平台提供事务处理、基于相关性的搜索、实时分析和移动到云数据同步。 MongoDB 还提供本地解决方案 MongoDB Enterprise Advanced。
MongoDB Atlas 中的更改流 使应用程序能够访问实时数据更改,以便应用能够立即对这些更改做出反应。 更改流为应用程序提供了一种接收有关特定集合、数据库或整个部署群集更改的通知的方法。
应用服务 及其 Web 应用、移动应用和 API 应用功能提供了用于生成、部署和缩放 Web 应用、移动应用和 REST API 的框架。 此解决方案使用在 ASP.NET 中编程的 Web 应用。 代码在 GitHub 上可用:
Azure Synapse Analytics 是此解决方案用于数据引入、处理和分析的核心服务。
Data Lake Storage 提供存储和处理数据的功能。 作为一个基于 Blob 存储构建的数据湖,Data Lake Storage 提供了一个可缩放的解决方案,用于管理来自多个异类源的大量数据。
Azure Synapse Analytics 管道 用于对数据执行提取、转换、加载(ETL)操作。 Azure 数据工厂提供类似的服务,但可以在 Synapse Studio 中创建 Azure Synapse Analytics 管道。 可以在同一管道中使用多个活动。 还可以创建依赖项终结点,以将一个活动与管道中的另一个活动连接。
映射数据流 在 Azure Synapse Analytics 中直观地设计了数据转换。 数据流为数据工程师提供了一种在不编写代码的情况下开发数据转换逻辑的方法。 可以将生成的数据流作为使用横向扩展 Apache Spark 群集的 Azure Synapse Analytics 管道中的活动运行。 可以使用现有的 Azure Synapse Analytics 计划、控制、流和监视功能将数据流活动投入使用。
专用 SQL 池 在处理和规范化数据后为数据提供数据仓库功能。 Azure Synapse Analytics 的此功能以前称为 SQL 数据仓库。 专用 SQL 池使优化的数据可供最终用户和应用程序使用。
Azure Synapse Analytics 触发器 提供了运行管道的自动化方法。 可以计划这些触发器。 还可以设置基于事件的触发器,例如 存储事件触发器 和 自定义事件触发器。 解决方案使用这两种类型的基于事件的触发器。
事件网格 是高度可缩放的无服务器事件代理。 可以使用事件网格将事件传送到订阅服务器目标。
Power BI 是显示分析信息的软件服务和应用的集合。 在此解决方案中,Power BI 提供了一种使用处理的数据执行高级分析和派生见解的方法。
方案详细信息
MongoDB Atlas 充当许多企业应用程序的运营数据层。 此云数据库从多个渠道存储来自内部应用程序、面向客户的服务和第三方 API 的数据。 通过使用 Azure Synapse Analytics 管道,可以将 MongoDB Atlas 数据与其他传统应用程序和来自源(如日志)的非结构化数据的关系数据相结合。
Batch 集成
在 Azure Synapse Analytics 中,可以将 MongoDB 本地实例和 MongoDB Atlas 无缝集成为源或接收器资源。 MongoDB 是唯一一个具有 Azure Synapse Analytics 和数据工厂的源和接收器连接器的 NoSQL 数据库。
使用历史数据,可以一次性检索所有数据。 还可以在批处理模式下使用筛选器以增量方式检索特定时间段的数据。 然后,可以使用 Azure Synapse Analytics 中的 SQL 池和 Apache Spark 池来转换和分析数据。 如果需要将分析或查询结果存储在分析数据存储中,可以使用 Azure Synapse Analytics 中的接收器资源。
有关如何设置和配置连接器的详细信息,请参阅以下资源:
- 使用 Azure 数据工厂或 Azure Synapse Analytics 将数据从 MongoDB Atlas 复制或复制到 MongoDB Atlas
- 使用 Azure 数据工厂或 Azure Synapse Analytics 从 MongoDB 复制数据
源连接器提供了一种在 MongoDB 或 Atlas 中存储的操作数据之上运行 Azure Synapse Analytics 的便捷方法。 使用源连接器从 Atlas 检索数据后,可以将数据作为 Parquet、Avro、JSON、文本或 CSV 文件加载到 Data Lake Storage Blob 存储中。 然后,可以转换这些文件,或者将它们与其他来自多数据库、多云或混合云环境中的其他数据源的文件联接起来。
可以在以下方案中使用从 MongoDB Enterprise Advanced 或 MongoDB Atlas 检索的数据:
从一批中的 MongoDB 检索特定日期中的所有数据。 然后将数据加载到 Data Lake Storage 中。 在此处使用无服务器 SQL 池或 Spark 池进行分析,或者将数据复制到专用 SQL 池中。 检索此批后,可以按 数据流中所述,对数据应用更改。 Storage-CopyPipeline_mdb_synapse_ded_pool_RTS 示例管道 作为此解决方案的一部分提供。 可以从 GitHub 导出管道以实现此一次性加载目的。
以特定频率生成见解,例如,每日或每小时报告。 对于此方案,在运行分析管道之前,请计划管道定期检索数据。 可以使用 MongoDB 查询应用筛选条件,并仅检索特定数据子集。
实时同步
企业需要基于实时数据而不是过时数据的见解。 见解交付延迟数小时可能会阻碍决策过程,并导致竞争优势的丧失。 此解决方案通过实时将 MongoDB 事务数据库中发生的更改传播到专用 SQL 池来激发关键决策。
此解决方案包含三个部分,以下部分介绍。
捕获 MongoDB Atlas 更改
MongoDB 更改流捕获数据库中发生的更改。 更改流 API 提供有关订阅更改流的应用服务 Web 应用可用的更改的信息。 这些应用将更改写入 Data Lake Storage Blob 存储。
触发管道以将更改传播到 Azure Synapse Analytics
该解决方案提供了两个选项,用于在 Blob 写入 Data Lake Storage 后触发 Azure Synapse Analytics 管道:
基于存储的触发器。 如果需要实时分析,请使用此选项,因为在写入更改的 blob 后立即触发管道。 但是,如果数据量很大,则此选项可能不是首选方法。 Azure Synapse Analytics 限制可并发运行的管道数。 发生大量数据更改时,可能会达到该限制。
基于事件的自定义触发器。 这种类型的触发器的优势在于它不在 Azure Synapse Analytics 之外,因此更易于控制。 Web 应用的事件网格版本将更改的数据文档写入 Blob 存储。 同时,应用会创建新的事件网格事件。 事件中的数据包含 blob 的文件名。 事件触发的管道接收文件名作为参数,然后使用该文件更新专用 SQL 池。
将更改传播到专用 SQL 池
Azure Synapse Analytics 管道将更改传播到专用 SQL 池。 该解决方案在 GitHub 上提供了一个 CopyPipeline_mdb_synapse_ded_pool_RTS 管道,用于将 Blob 中的更改从 Data Lake Storage 复制到专用 SQL 池。 此管道由存储或事件网格触发器触发。
潜在的用例
此解决方案的用例跨越许多行业和领域:
零售
- 将智能构建到产品捆绑和产品推广中
- 优化使用 IoT 流式传输的冷存储
- 优化库存补充
- 向全渠道分发添加值
银行和金融
- 自定义客户金融服务
- 检测潜在的欺诈易
电信
- 优化下一代网络
- 最大化边缘网络的价值
汽车
- 优化联网车辆的参数化
- 检测联网车辆中 IoT 通信中的异常情况
制造业
- 为机械提供预测性维护
- 优化存储和库存管理
下面是两个具体示例:
- 如本文前面 Batch 集成中所述,可以在批处理中检索 MongoDB 数据,然后在发生更改时更新数据。 此功能使实时决策和结论能够进行实时见解。 此功能可用于分析敏感和关键信息,例如财务交易和欺诈检测数据。
- 如 Batch 集成 所述,可以计划管道定期检索 MongoDB 数据。 此功能适用于零售方案,例如使用每日销售数据更新库存级别。 在这种情况下,分析报表和仪表板并不重要,实时分析不值得努力。
以下部分更详细地了解了两个零售行业用例。
产品捆绑
若要推广产品销售,可以将产品作为捆绑包的一部分与其他相关产品一起销售。 目标是使用销售模式数据来制定将产品捆绑到包的策略。
有两个数据源:
- MongoDB 中的产品目录数据
- Azure SQL 中的销售数据
这两组数据都使用 Azure Synapse Analytics 管道迁移到 Azure Synapse Analytics 专用 SQL 池。 触发器和更改数据捕获用于在一次性迁移数据的基础上实现准实时数据同步。
以下 Power BI 图表显示了产品与销售模式之间的相关性。 笔和基于墨迹的重新填充的相关性很高。 销售数据显示笔在指定区域中的销售量较高。
分析提出了两个建议,用于产生更好的销售:
- 捆绑笔和基于墨迹的重新填充
- 在某些区域中推广捆绑包
产品促销
若要推广产品销售,可以将产品推荐给对相关产品感兴趣的客户。 目标是使用销售数据和客户购买模式数据来制定向客户推荐产品的策略。
通过使用 Azure Synapse Analytics,可以开发 AI 和机器学习模型来确定向客户推荐哪些产品。
下图显示了如何使用各种类型的数据创建模型来确定备用产品建议。 这些数据包括客户购买模式、利润、产品相关性、产品销售量和产品目录参数。
如果模型实现高准确度,则它提供可向客户推荐的产品列表。
考虑
这些注意事项实现 Azure Well-Architected 框架的支柱,这是一组指导原则,可用于提高工作负荷的质量。 有关详细信息,请参阅 azure Well-Architected Framework
安全
安全性提供针对故意攻击和滥用宝贵数据和系统的保证。 有关详细信息,请参阅 安全支柱概述。
有关解决方案中 Azure 组件的安全要求和控制的详细信息,请参阅每个产品文档的安全部分。
成本优化
成本优化是研究减少不必要的开支和提高运营效率的方法。 有关详细信息,请参阅 成本优化支柱概述。
- 若要估算 Azure 产品和配置的成本,请使用 Azure 定价计算器。
- Azure 通过分析一段时间内的支出,以及根据业务需求进行缩放来满足业务需求而无需超支,从而帮助你避免不必要的成本。 例如,当不需要任何负载时,可以暂停专用 SQL 池。 稍后可以恢复它们。
- 可以将应用服务替换为 Azure Functions。 通过在 Azure Synapse Analytics 管道中协调函数,可以降低成本。
- 若要降低 Spark 群集成本,请选择正确的数据流计算类型。 常规选项和内存优化选项可用。 此外,请选择适当的核心计数和生存时间(TTL)值。
- 若要详细了解如何管理关键解决方案组件的成本,请参阅以下资源:
性能效率
性能效率是工作负荷的缩放能力,以满足用户以高效方式放置的需求。 有关详细信息,请参阅 性能效率支柱概述。
当发生大量更改时,在 Azure Synapse Analytics 中针对集合中的每个更改运行数千个管道可能会导致排队管道积压。 若要提高此方案中的性能,请考虑以下方法:
- 使用基于存储的应用服务代码,该代码使用对 Data Lake Storage 所做的更改编写 JSON 文档。 不要将基于存储的触发器与管道链接。 请改为按短间隔使用计划触发器,例如每两到五分钟一次。 计划触发器运行时,它会获取指定 Data Lake Storage 目录中的所有文件,并为每个文件更新专用 SQL 池。
- 修改事件网格应用服务代码。 将它编程为在将新主题添加到包含文件名的元数据的事件之前,向 Blob 存储添加大约 100 个更改的微批。 进行此修改后,只需为一个 Blob 触发一个管道,且更改了 100 个。 可以调整微批大小以适应你的方案。 以高频率使用小型微批处理来提供接近实时的更新。 或者以较低的频率使用较大的微批处理来延迟更新和降低开销。
有关提高 Azure Synapse Analytics 管道复制活动性能和可伸缩性的详细信息,请参阅 复制活动性能和可伸缩性指南。
部署此方案
有关实现此解决方案的信息,请参阅 Real-Time MongoDB Atlas 与 Synapse集成同步解决方案。
贡献
本文由Microsoft维护。 它最初由以下参与者编写。
主要作者:
- 戴安娜·安妮·杰诺什 |高级解决方案架构师
- 巴布·斯里尼瓦桑 |高级解决方案架构师
- 乌特萨夫·塔尔瓦尔 |关联解决方案架构师
其他参与者:
- Krishnakumar Rukmangathan |高级项目经理
- Sunil Sabat |首席项目经理
- Wee Hyong T |首席董事
- Paresh Saraf |技术总监
若要查看非公共LinkedIn配置文件,请登录到LinkedIn。
后续步骤
有关解决方案的详细信息,请联系 partners@mongodb.com。
有关 MongoDB 的信息,请参阅以下资源:
有关 Azure 解决方案组件的信息,请参阅以下资源:
- 什么是 Azure Synapse Analytics?
- Azure Synapse Analytics 用例
- Azure Synapse Analytics 行业特定的用例
- Azure Synapse Analytics 连接器
- 应用服务概述
- 什么是 Power BI?
- Azure Data Lake Storage Gen2 简介
- 什么是 Azure 事件网格?