你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

更改 Azure Cosmos DB 的 API for MongoDB 中的流

适用对象: MongoDB

可通过更改流 API 在 Azure Cosmos DB 的 API for MongoDB 中获取更改源支持。 通过使用更改流 API,你的应用程序可以获取对集合或对单个分片中的项的更改。 以后,可以根据结果采取进一步的措施。 对集合中的项所做的更改将按照其修改时间的顺序进行捕获,并按分片键保证排序顺序。

注意

若要使用更改流,请使用 3.6 或更高版本的服务器创建 Azure Cosmos DB 的用于 MongoDB 的 API 帐户。 如果对较早版本运行更改流示例,可能会看到“无法识别的管道阶段名称: $changeStream”错误。

示例

下面的示例演示如何获取集合中所有项的更改流。 此示例在插入、更新或替换项时创建光标来监视项。 若要获取更改流,需要使用 $match 阶段、$project 阶段和 fullDocument 选项。 当前不支持监视使用更改流删除操作。 作为一种替代方法,你可以对要删除的项添加软标记。 例如,可以在名为“deleted”的项中添加属性。如果要删除该项,可以将“deleted”设置为 true 并在该项上设置 TTL。 由于将“deleted”更新为 true 是一项更新,此更改将显示在更改流中。

var cursor = db.coll.watch(
    [
        { $match: { "operationType": { $in: ["insert", "update", "replace"] } } },
        { $project: { "_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1 } }
    ],
    { fullDocument: "updateLookup" });

while (!cursor.isExhausted()) {
    if (cursor.hasNext()) {
        printjson(cursor.next());
    }
}

单个分片中的更改

以下示例演示如何获取对单个分片中的项所做的更改。 此示例获取项的更改,这些项的分片键等于“a”,分片键值等于“1”。 可以让不同的客户端从不同的分片并行读取更改。

var cursor = db.coll.watch(
    [
        { 
            $match: { 
                $and: [
                    { "fullDocument.a": 1 }, 
                    { "operationType": { $in: ["insert", "update", "replace"] } }
                ]
            }
        },
        { $project: { "_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1} }
    ],
    { fullDocument: "updateLookup" });

缩放更改流

大规模使用更改流时,最好均匀分布负载。 利用 GetChangeStreamTokens 自定义命令在物理分片/分区之间分布负载。

当前限制

使用更改流时,以下限制适用:

  • 输出文档中尚不支持 operationTypeupdateDescription 属性。
  • 目前支持 insertupdatereplace 操作类型。 但是,尚不支持删除操作或其他事件。

由于存在这些限制,需要按前面示例中所示指定 $match 阶段、$project 阶段和 fullDocument 选项。

与 Azure Cosmos DB 的 API for NoSQL 中的更改源不同,没有单独的更改源处理器库来使用更改流,或无需使用租用容器。 目前不支持使用 Azure Functions 触发器来处理更改流。

错误处理。

使用更改流时,支持以下错误代码和消息:

  • HTTP 错误代码 16500 - 当更改流受到限制时,它会返回一个空页。

  • NamespaceNotFound (OperationType 失效) - 如果对不存在的集合运行更改流,或删除了集合,则会返回 NamespaceNotFound 错误。 由于 operationType 属性无法在输出文档中返回,因此会返回 NamespaceNotFound 错误,而不是 operationType Invalidate 错误。

后续步骤