你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

Azure Cosmos DB 分析存储中的变更数据捕获入门

适用对象: NoSQL MongoDB

使用 Azure Cosmos DB 分析存储中的变更数据捕获 (CDC) 作为 Azure 数据工厂Azure Synapse Analytics 的源,以捕获对数据所做的特定更改。

注意

请注意,数据流上尚不提供 Azure Cosmos DB for MongoDB API 的链接服务接口。 但是,可以将你的帐户的文档终结点与“Azure Cosmos DB for NoSQL”链接服务接口配合使用,将其用作 Mongo 链接服务受支持之前的解决方法。 在 NoSQL 链接服务上,选择“手动输入”以提供 Cosmos DB 帐户信息并使用帐户的文档终结点(例如:https://[your-database-account-uri].documents.azure.com:443/,而不是 MongoDB 终结点,例如:mongodb://[your-database-account-uri].mongo.cosmos.azure.com:10255/

先决条件

启用分析存储

首先,在帐户级别启用 Azure Synapse Link,然后为适合工作负载的容器启用分析存储。

  1. 启用 Azure Synapse Link:为 Azure Cosmos DB 帐户启用 Azure Synapse Link

  2. 为容器启用分析存储:

    选项 指南
    为特定的新容器启用 为新容器启用 Azure Synapse Link
    为特定的现有容器启用 为现有容器启用 Azure Synapse Link

使用数据流创建目标 Azure 资源

分析存储的变更数据捕获功能可通过 Azure 数据工厂Azure Synapse Analytics 的数据流功能获得。 对于本指南,请使用 Azure 数据工厂。

重要

也可以使用 Azure Synapse Analytics。 首先,创建 Azure Synapse 工作区(如果还没有)。 在新创建的工作区中,依次选择“开发”选项卡、“添加新资源”和“数据流”。

  1. 创建 Azure 数据工厂(如果还没有)。

    提示

    如果可能,请在 Azure Cosmos DB 帐户所在的同一区域中创建数据工厂。

  2. 启动新创建的数据工厂。

  3. 在数据工厂中,选择“数据流”选项卡,然后选择“新建数据流”。

  4. 为新创建的数据流指定唯一名称。 在此示例中,数据流命名为 cosmoscdc

    名为 cosmoscdc 的新数据流的屏幕截图。

配置分析存储容器的源设置

现在,创建并配置一个源以从 Azure Cosmos DB 帐户的分析存储中传输数据。

  1. 选择“添加源”。

    “添加源”菜单选项的屏幕截图。

  2. 在“输出流名称”字段中,输入 cosmos

    将新创建的源命名为 cosmos 的屏幕截图。

  3. 在“源类型”部分,选择“内联”。

    选择内联源类型的屏幕截图。

  4. 在“数据集”字段中,选择“Azure - Azure Cosmos DB for NoSQL”。

    选择 Azure Cosmos DB for NoSQL 作为数据集类型的屏幕截图。

  5. 为名为 cosmoslinkedservice 的帐户创建新的链接服务。 在“新建链接服务”弹出对话框中,选择现有的 Azure Cosmos DB for NoSQL 帐户,然后选择“确定”。 在此示例中,我们选择名为 msdocs-cosmos-source 的现存 Azure Cosmos DB for NoSQL 帐户和名为 cosmicworks 的数据库。

    “新建链接服务”对话框的屏幕截图,其中选择了 Azure Cosmos DB 帐户。

  6. 为存储类型选择“分析”。

    为链接服务选择的“分析”选项的屏幕截图。

  7. 选择“源选项”选项卡。

  8. 在“源选项”中,选择目标容器并启用“数据流调试”。 在此示例中,容器命名为 products

    源容器的屏幕截图,其中选择了命名的产品。

  9. 选择“数据流调试”。 在“启用数据流调试”弹出对话框中,保留默认选项,然后选择“确定”。

    用于启用数据流调试的切换选项的屏幕截图。

  10. 源选项”选项卡还包含你可能希望启用的其他选项。 下表描述了这些选项:

选项 说明
捕获中间更新 如果要捕获项目的更改历史记录,包括变更数据捕获读取之间的中间更改,请启用此选项。
捕获删除 启用此选项可捕获用户删除的记录并将其应用于接收器。 不能在 Azure 数据资源管理器和 Azure Cosmos DB 接收器上应用删除。
捕获事务存储 TTL 启用此选项可捕获 Azure Cosmos DB 事务存储 TTL(生存时间)删除的记录,并将其应用于接收器。 不能在 Azure 数据资源管理器和 Azure Cosmos DB 接收器上应用 TTL 删除。
批大小(以字节为单位) 此设置实际上是千兆字节 (GB)。 如果要对变更数据捕获源进行批处理,请指定大小(以 GB 为单位)
额外配置 额外的 Azure Cosmos DB 分析存储配置及其值。 (例如:spark.cosmos.allowWhiteSpaceInFieldNames -> true

使用源选项

选中任何 Capture intermediate updatesCapture DeltesCapture Transactional store TTLs 选项时,CDC 进程将在接收器中创建 __usr_opType 字段并填入以下值:

说明 选项
1 UPDATE 捕获中间更新
2 INSERT 没有插入选项,其默认处于打开状态
3 USER_DELETE 捕获删除
4 TTL_DELETE 捕获事务存储 TTL

如果必须将 TTL 删除的记录与用户或应用程序删除的文档区分开来,则要同时选中 Capture intermediate updatesCapture Transactional store TTLs 选项。 然后,必须根据业务需求调整 CDC 进程、应用程序或查询以使用 __usr_opType

提示

如果需要下游使用者在选中“捕获中间更新”选项的情况下还原更新顺序,则可以将系统时间戳 _ts 字段用作排序字段。

创建和配置用于更新和删除操作的接收器设置

首先,创建一个简单的 Azure Blob 存储接收器,然后将该接收器配置为仅筛选特定操作的数据。

  1. 创建 Azure Blob 存储帐户和容器(如果还没有)。 在接下来的示例中,我们将使用名为 msdocsblobstorage 的帐户和名为 output 的容器。

    提示

    如果可能,请在 Azure Cosmos DB 帐户所在的同一区域中创建存储帐户。

  2. 返回 Azure 数据工厂,为从 cosmos 源捕获的变更数据创建新的接收器。

    屏幕截图显示添加已连接到现有源的新接收器。

  3. 为接收器提供唯一名称。 在此示例中,接收器命名为 storage

    为新创建的接收器存储命名的屏幕截图。

  4. 在“接收器类型”部分,选择“内联”。 在“数据集”字段中,选择“增量”。

    为接收器类型选择“内联”并为数据集类型选择“增量”的屏幕截图。

  5. 使用名为 storagelinkedserviceAzure Blob 存储为帐户创建新的链接服务。 在“新建链接服务”弹出对话框中,选择现有的 Azure Blob 存储帐户,然后选择“确定”。 在此示例中,我们选择名为 msdocsblobstorage 的现存 Azure Blob 存储帐户。

    新的增量链接服务的服务类型选项的屏幕截图。

    “新建链接服务”对话框的屏幕截图,其中选择了 Azure Blob 存储帐户。

  6. 选择“设置”选项卡。

  7. 在“设置”中,将“文件夹路径”设置为 Blob 容器的名称。 在此示例中,容器的名称为 output

    将名为 output 的 Blob 容器设置为接收器目标的屏幕截图。

  8. 找到“更新方法”部分,并将选择更改为仅允许“删除”和“更新”操作。 此外,使用字段 {_rid} 作为唯一标识符,将“键列”指定为“列的列表”。

    为接收器指定的更新方法和键列的屏幕截图。

  9. 选择“验证”以确保未出现任何错误或遗漏。 然后,选择“发布”以发布数据流。

    用于验证并发布当前数据流的选项的屏幕截图。

计划变更数据捕获执行

发布数据流后,可以添加新管道来移动和转换数据。

  1. 创建新管道。 为管道指定唯一名称。 在此示例中,管道名称为 cosmoscdcpipeline

    “资源”部分中的“新建管道”选项的屏幕截图。

  2. 在“活动”部分中,展开“移动和转换”选项,然后选择“数据流”。

    “活动”部分中的数据流活动选项的屏幕截图。

  3. 为数据流活动指定唯一名称。 在此示例中,活动命名为 cosmoscdcactivity

  4. 在“设置”选项卡中,选择之前在本指南中创建的名为 cosmoscdc 的数据流。 然后,根据数据量和工作负载所需的延迟选择计算大小。

    活动的数据流和计算大小的配置设置的屏幕截图。

    提示

    对于大于 100 GB 的增量数据大小,建议使用核心数为 32(+16 个驱动程序核心)的自定义大小。

  5. 选择“添加触发器”。 计划此管道,使其按对工作负载有意义的节奏执行。 在此示例中,管道配置为每五分钟执行一次。

    新管道的“添加触发器”按钮的屏幕截图。

    基于计划的触发器配置(从 2023 年开始,每五分钟运行一次)的屏幕截图。

    注意

    执行变更数据捕获的最小定期时段为一分钟。

  6. 选择“验证”以确保未出现任何错误或遗漏。 然后,选择“发布”以发布管道。

  7. 使用 Azure Cosmos DB 分析存储变更数据捕获,观察作为数据流输出置于 Azure Blob 存储容器中的数据。

    Azure Blob 存储容器中管道的输出文件的屏幕截图。

    注意

    初始群集启动时间最长可能需要三分钟。 若要在后续的变更数据捕获执行中避免群集启动时间,请配置数据流群集的生存时间值。 有关集成运行时和 TTL 的详细信息,请参阅 Azure 数据工厂中的集成运行时

并发作业

源选项中的批大小,或者接收器引入更改流的速度较慢的情况,可能会导致同时执行多个作业。 若要避免这种情况,请在“管道”设置中将“并发”选项设置为 1,以确保在当前执行完成之前不会触发新的执行。

后续步骤