你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
Azure Cosmos DB for Apache Cassandra 中的更改源
适用对象: Cassandra
Azure Cosmos DB for Apache Cassandra 中的更改源支持通过 Cassandra 查询语言 (CQL) 中的查询谓词提供。 使用这些谓词条件可以查询更改源 API。 应用程序可以使用 CQL 中必需的主键(也称为分区键)来获取对表所做的更改。 然后,可以根据结果采取进一步的措施。 对表中的行所做的更改将按照其修改时间顺序捕获,而排序顺序是按分区键捕获。
以下示例演示如何使用 .NET 获取 API for Cassandra 密钥空间表中所有行上的更改源。 直接在 CQL 中使用谓词 COSMOS_CHANGEFEED_START_TIME(),以从指定的开始时间(在本例中为当前日期时间)查询更改源中的项。 可以在此处(对于 C#)和此处(对于 Java)下载完整示例。
在每个迭代中,查询将使用分页状态从上次读取更改的时间点恢复。 可以看到,新的更改不断地流式传输到密钥空间中的表。 我们将会看到对已插入或更新的行所做的更改。 目前,不支持使用 API for Cassandra 中的更改源来监视删除操作。
注意
在删除集合后重新使用令牌,然后使用相同的名称重新创建它会导致错误。 建议在创建新集合并重用集合名称时将 pageState 设置为 null。
Session cassandraSession = utils.getSession();
try {
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime now = LocalDateTime.now().minusHours(6).minusMinutes(30);
String query="SELECT * FROM uprofile.user where COSMOS_CHANGEFEED_START_TIME()='"
+ dtf.format(now)+ "'";
byte[] token=null;
System.out.println(query);
while(true)
{
SimpleStatement st=new SimpleStatement(query);
st.setFetchSize(100);
if(token!=null)
st.setPagingStateUnsafe(token);
ResultSet result=cassandraSession.execute(st) ;
token=result.getExecutionInfo().getPagingState().toBytes();
for(Row row:result)
{
System.out.println(row.getString("user_name"));
}
}
} finally {
utils.close();
LOGGER.info("Please delete your table after verifying the presence of the data in portal or from CQL");
}
若要按主键获取对单个行所做的更改,可以在查询中添加主键。 以下示例演示如何跟踪指定了“user_id = 1”的行的更改。
String query="SELECT * FROM uprofile.user where user_id=1 and COSMOS_CHANGEFEED_START_TIME()='"
+ dtf.format(now)+ "'";
SimpleStatement st=new SimpleStatement(query);
当前限制
使用 API for Cassandra 的更改源时,以下限制适用:
- 目前支持插入和更新操作。 尚不支持删除操作。 一种解决方法是,对正在删除的行添加一个软标记。 例如,在名为“deleted”的行中添加一个字段,并将其设置为“true”。
- 与在核心 API for NoSQL 中一样,上次更新将会保留;对实体的中间性更新将不可用。
错误处理。
使用 API for Cassandra 中的更改源时,支持以下错误代码和消息:
- HTTP 错误代码 429 - 当更改源的速率受到限制时,它会返回一个空页。