你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
CosmosAsyncContainer 类
- java.
lang. Object - com.
azure. cosmos. CosmosAsyncContainer
- com.
public class CosmosAsyncContainer
提供用于读取、删除和替换现有容器的方法。 提供用于与子资源交互的方法, (项、脚本、冲突)
方法摘要
方法继承自 java.lang.Object
方法详细信息
createItem
public Mono
创建一个项。
订阅后,将执行该操作。 成功完成后, Mono 将包含单个资源响应以及创建的 Cosmos 项。 如果失败,将 Mono 出错。
Parameters:
Returns:
createItem
public Mono
创建 Cosmos 项。
Parameters:
Returns:
createItem
public Mono
创建一个项。
订阅后,将执行该操作。 成功完成后, Mono 将包含单个资源响应以及创建的 Cosmos 项。 如果失败,将 Mono 出错。
Parameters:
Returns:
deleteItem
public Mono
删除该项。
订阅后,将执行该操作。 成功完成后, Mono 将包含已删除项的单个 Cosmos 项响应。
Parameters:
Returns:
patchItem
public Mono
运行部分更新,以修改项的特定属性或字段,而不替换整个项。
CosmosPatchOperations cosmosPatchOperations = CosmosPatchOperations.create();
cosmosPatchOperations
.add("/departure", "SEA")
.increment("/trips", 1);
cosmosAsyncContainer.patchItem(
passenger.getId(),
new PartitionKey(passenger.getId()),
cosmosPatchOperations,
Passenger.class)
.subscribe(response -> {
System.out.println(response);
}, throwable -> {
throwable.printStackTrace();
});
订阅后,将执行该操作。 成功完成后, Mono 将包含单个 Cosmos 项响应以及已修补的项。
Parameters:
Returns:
patchItem
public Mono
运行部分更新,以修改项的特定属性或字段,而不替换整个项。
CosmosPatchOperations cosmosPatchOperations = CosmosPatchOperations.create();
cosmosPatchOperations
.add("/departure", "SEA")
.increment("/trips", 1);
cosmosAsyncContainer.patchItem(
passenger.getId(),
new PartitionKey(passenger.getId()),
cosmosPatchOperations,
Passenger.class)
.subscribe(response -> {
System.out.println(response);
}, throwable -> {
throwable.printStackTrace();
});
订阅后,将执行该操作。 成功完成后, Mono 将包含单个 Cosmos 项响应以及已修补的项。
Parameters:
Returns:
queryChangeFeed
public CosmosPagedFlux
使用 查询当前容器的更改源中的 CosmosChangeFeedRequestOptions项。
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromNow(FeedRange.forFullRange())
.allVersionsAndDeletes();
cosmosAsyncContainer.queryChangeFeed(options, Passenger.class)
.byPage()
.flatMap(passengerFeedResponse -> {
for (Passenger passenger : passengerFeedResponse.getResults()) {
System.out.println(passenger);
}
return Flux.empty();
})
.subscribe();
订阅后,将执行该操作。 将 Flux 包含获取的项的一个或多个源响应。 如果失败,将 CosmosPagedFlux<T> 出错。
Parameters:
Returns:
queryItems
public CosmosPagedFlux
使用 SqlQuerySpec 和 CosmosQueryRequestOptions查询当前容器中的项。
订阅后,将执行该操作。 将 Flux 包含获取的项的一个或多个源响应。 如果失败,将 CosmosPagedFlux<T> 出错。
Parameters:
Returns:
queryItems
public CosmosPagedFlux
使用 SqlQuerySpec查询当前容器中的项。
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
String query = "SELECT * FROM Passenger p WHERE (p.departure = @departure)";
List<SqlParameter> parameters = Collections.singletonList(new SqlParameter("@departure", "SEA"));
SqlQuerySpec sqlQuerySpec = new SqlQuerySpec(query, parameters);
cosmosAsyncContainer.queryItems(sqlQuerySpec, options, Passenger.class)
.byPage()
.flatMap(passengerFeedResponse -> {
for (Passenger passenger : passengerFeedResponse.getResults()) {
System.out.println(passenger);
}
return Flux.empty();
})
.subscribe();
订阅后,将执行该操作。 将 CosmosPagedFlux<T> 包含获取的项的一个或多个源响应。 如果失败,将 CosmosPagedFlux<T> 出错。
Parameters:
Returns:
queryItems
public CosmosPagedFlux
使用字符串查询当前容器中的项。
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
String query = "SELECT * FROM Passenger WHERE Passenger.departure IN ('SEA', 'IND')";
cosmosAsyncContainer.queryItems(query, options, Passenger.class)
.byPage()
.flatMap(passengerFeedResponse -> {
for (Passenger passenger : passengerFeedResponse.getResults()) {
System.out.println(passenger);
}
return Flux.empty();
})
.subscribe();
订阅后,将执行该操作。 将 CosmosPagedFlux<T> 包含获取的项的一个或多个源响应。 如果失败,将 CosmosPagedFlux<T> 出错。
Parameters:
Returns:
queryItems
public CosmosPagedFlux
查询当前容器中的项。
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
String query = "SELECT * FROM Passenger WHERE Passenger.departure IN ('SEA', 'IND')";
cosmosAsyncContainer.queryItems(query, options, Passenger.class)
.byPage()
.flatMap(passengerFeedResponse -> {
for (Passenger passenger : passengerFeedResponse.getResults()) {
System.out.println(passenger);
}
return Flux.empty();
})
.subscribe();
订阅后,将执行该操作。 将 CosmosPagedFlux<T> 包含获取的项的一个或多个源响应。 如果失败,将 CosmosPagedFlux<T> 出错。
Parameters:
Returns:
readAllItems
public CosmosPagedFlux
读取逻辑分区的所有项
cosmosAsyncContainer
.readAllItems(new PartitionKey(partitionKey), Passenger.class)
.byPage(100)
.flatMap(passengerFeedResponse -> {
for (Passenger passenger : passengerFeedResponse.getResults()) {
System.out.println(passenger);
}
return Flux.empty();
})
.subscribe();
订阅后,将执行该操作。 将 CosmosPagedFlux<T> 包含读取 Cosmos 项的一个或多个源响应。 如果失败,将 CosmosPagedFlux<T> 出错。
Parameters:
Returns:
readAllItems
public CosmosPagedFlux
读取逻辑分区的所有项
cosmosAsyncContainer
.readAllItems(new PartitionKey(partitionKey), Passenger.class)
.byPage(100)
.flatMap(passengerFeedResponse -> {
for (Passenger passenger : passengerFeedResponse.getResults()) {
System.out.println(passenger);
}
return Flux.empty();
})
.subscribe();
订阅后,将执行该操作。 将 CosmosPagedFlux<T> 包含读取 Cosmos 项的一个或多个源响应。 如果失败,将 CosmosPagedFlux<T> 出错。
Parameters:
Returns:
readItem
public Mono
使用配置的 CosmosItemRequestOptions按 itemId 读取项。 此操作用于根据容器的唯一标识符 (ID) 和分区键从容器中检索单个项。 readItem 操作使用特定项的唯一标识符提供对特定项的直接访问,该标识符由项的 ID 和分区键值组成。 此操作对于按已知项的 ID 和分区键检索已知项非常有效,无需进行复杂的查询。
订阅后,将执行该操作。 成功完成后, Mono 将包含包含读取项的 Cosmos 项响应。
Parameters:
Returns:
readItem
public Mono
按 itemId 读取项。 此操作用于根据容器的唯一标识符 (ID) 和分区键从容器中检索单个项。 readItem 操作使用特定项的唯一标识符提供对特定项的直接访问,该标识符由项的 ID 和分区键值组成。 此操作对于按已知项的 ID 和分区键检索已知项非常有效,无需进行复杂的查询。
订阅后,将执行该操作。 成功完成时, Mono 将包含包含读取项的项响应。
// Read an item
cosmosAsyncContainer.readItem(passenger.getId(), new PartitionKey(passenger.getId()), Passenger.class)
.flatMap(response -> Mono.just(response.getItem()))
.subscribe(passengerItem -> System.out.println(passengerItem), throwable -> {
CosmosException cosmosException = (CosmosException) throwable;
cosmosException.printStackTrace();
});
// ...
Parameters:
Returns:
readMany
public Mono
读取许多文档。 可用于在单个请求中读取具有特定 ID 和分区键的许多文档。 如果列表中缺少任何文档,则不会引发异常。
List<CosmosItemIdentity> itemIdentityList = new ArrayList<>();
itemIdentityList.add(new CosmosItemIdentity(new PartitionKey(passenger1Id), passenger1Id));
itemIdentityList.add(new CosmosItemIdentity(new PartitionKey(passenger2Id), passenger2Id));
cosmosAsyncContainer.readMany(itemIdentityList, Passenger.class)
.flatMap(passengerFeedResponse -> {
for (Passenger passenger : passengerFeedResponse.getResults()) {
System.out.println(passenger);
}
return Mono.empty();
})
.subscribe();
Parameters:
Returns:
readMany
public Mono
读取许多文档。 可用于在单个请求中读取具有特定 ID 和分区键的许多文档。 如果列表中缺少任何文档,则不会引发异常。
List<CosmosItemIdentity> itemIdentityList = new ArrayList<>();
itemIdentityList.add(new CosmosItemIdentity(new PartitionKey(passenger1Id), passenger1Id));
itemIdentityList.add(new CosmosItemIdentity(new PartitionKey(passenger2Id), passenger2Id));
cosmosAsyncContainer.readMany(itemIdentityList, Passenger.class)
.flatMap(passengerFeedResponse -> {
for (Passenger passenger : passengerFeedResponse.getResults()) {
System.out.println(passenger);
}
return Mono.empty();
})
.subscribe();
Parameters:
Returns:
replaceItem
public Mono
将容器中的现有项替换为新项。 它执行项的完全替换,将其所有属性替换为新项的属性
cosmosAsyncContainer.replaceItem(
newPassenger,
oldPassenger.getId(),
new PartitionKey(oldPassenger.getId()),
new CosmosItemRequestOptions())
.subscribe(response -> {
System.out.println(response);
}, throwable -> {
throwable.printStackTrace();
});
订阅后,将执行该操作。 成功完成后, Mono 将包含单个 Cosmos 项响应以及替换的项。
Parameters:
Returns:
replaceItem
public Mono
将容器中的现有项替换为新项。 它执行项的完全替换,将其所有属性替换为新项的属性
cosmosAsyncContainer.replaceItem(
newPassenger,
oldPassenger.getId(),
new PartitionKey(oldPassenger.getId()),
new CosmosItemRequestOptions())
.subscribe(response -> {
System.out.println(response);
}, throwable -> {
throwable.printStackTrace();
});
订阅后,将执行该操作。 成功完成后, Mono 将包含单个 Cosmos 项响应以及替换的项。
Parameters:
Returns:
upsertItem
public Mono
更新插入项。
订阅后,将执行该操作。 成功完成后, Mono 将包含单个资源响应以及已插入的项。 如果失败,将 Mono 出错。
Parameters:
Returns:
upsertItem
public Mono
更新插入项。
订阅后,将执行该操作。 成功完成后, Mono 将包含单个资源响应以及已插入的项。 如果失败,将 Mono 出错。
Parameters:
Returns:
upsertItem
public Mono
更新插入项。
订阅后,将执行该操作。 成功完成后, Mono 将包含单个资源响应以及已插入的项。 如果失败,将 Mono 出错。
Parameters:
Returns:
executeBulkOperations
public Flux
批量执行操作的通量。
Parameters:
Returns:
executeBulkOperations
public Flux
批量执行操作的通量。
Parameters:
Returns:
delete
public Mono
删除当前容器。
订阅后,将执行该操作。 成功完成后, Mono 将包含已删除容器的单个 Cosmos 容器响应。 如果失败,将 Mono 出错。
Returns:
delete
public Mono
删除容器
订阅后,将执行该操作。 成功完成后, Mono 将包含已删除数据库的单个 Cosmos 容器响应。 如果失败,将 Mono 出错。
Parameters:
Returns:
deleteAllItemsByPartitionKey
public Mono
删除容器中具有指定 partitionKey 值的所有项。 启动异步 Cosmos DB 后台操作,此操作会删除容器中具有指定值的所有项。 异步 Cosmos DB 后台操作使用用户 RU 的百分比运行。
订阅后,将执行该操作。 成功完成后, Mono 将包含所有已删除项的单个 Cosmos 项响应。
Parameters:
Returns:
deleteItem
public Mono
删除项目。
订阅后,将执行该操作。 成功完成后, Mono 将包含已删除项的单个 Cosmos 项响应。
cosmosAsyncContainer.deleteItem(
passenger.getId(),
new PartitionKey(passenger.getId())
).subscribe(response -> {
System.out.println(response);
}, throwable -> {
CosmosException cosmosException = (CosmosException) throwable;
cosmosException.printStackTrace();
});
Parameters:
Returns:
deleteItem
public Mono
删除该项。
订阅后,将执行该操作。 成功完成后, Mono 将包含已删除项的单个 Cosmos 项响应。
Parameters:
Returns:
enableGlobalThroughputControlGroup
public void enableGlobalThroughputControlGroup(ThroughputControlGroupConfig groupConfig, GlobalThroughputControlConfig globalControlConfig)
使用全局控制模式启用吞吐量控制组。 定义的吞吐量限制将在不同的客户端之间共享。
ThroughputControlGroupConfig groupConfig =
new ThroughputControlGroupConfigBuilder()
.groupName("localControlGroup")
.targetThroughputThreshold(0.1)
.build();
GlobalThroughputControlConfig globalControlConfig =
this.client.createGlobalThroughputControlConfigBuilder(database.getId(), container.getId())
.setControlItemRenewInterval(Duration.ofSeconds(5))
.setControlItemExpireInterval(Duration.ofSeconds(10))
.build();
container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
Parameters:
enableLocalThroughputControlGroup
public void enableLocalThroughputControlGroup(ThroughputControlGroupConfig groupConfig)
使用本地控制模式启用吞吐量控制组。
ThroughputControlGroupConfig groupConfig =
new ThroughputControlGroupConfigBuilder()
.groupName("localControlGroup")
.targetThroughputThreshold(0.1)
.build();
container.enableLocalThroughputControlGroup(groupConfig);
Parameters:
executeCosmosBatch
public Mono
执行事务批处理。
Parameters:
Returns:
如果事务批处理成功执行,则返回的响应时返回 CosmosBatchResponse#getStatusCode 的值将设置为 200}。
如果事务批处理中的操作在执行过程中失败,则不会提交批处理的更改,并且失败操作的状态由 CosmosBatchResponse#getStatusCode 或 异常提供。 若要获取有关在出现某些用户错误(如冲突、找不到等)时失败的操作的信息,可以枚举响应。 这将返回 CosmosBatchOperationResult 与事务批处理中每个操作对应的实例,这些实例将按照它们添加到事务批处理的顺序返回。 对于与事务批处理中的操作对应的结果,请使用 CosmosBatchOperationResult#getStatusCode 访问操作的状态。 如果操作未执行或由于事务批处理中的另一个操作失败而中止,则此字段的值将为 424;对于导致批处理中止的操作,此字段的值将指示失败的原因。
如果存在请求超时、消失、会话不可用、网络故障或服务以某种方式返回 5xx 等问题,则 Mono 将返回错误,而不是 CosmosBatchResponse。
对返回的响应使用 CosmosBatchResponse#isSuccessStatusCode 以确保事务批处理成功。
executeCosmosBatch
public Mono
执行事务批处理。
Parameters:
Returns:
如果事务批处理成功执行,则返回的响应时返回 CosmosBatchResponse#getStatusCode 的值将设置为 200}。
如果事务批处理中的操作在执行过程中失败,则不会提交批处理的更改,并且失败操作的状态由 CosmosBatchResponse#getStatusCode 或 异常提供。 若要获取有关在出现某些用户错误(如冲突、找不到等)时失败的操作的信息,可以枚举响应。 这将返回 CosmosBatchOperationResult 与事务批处理中每个操作对应的实例,这些实例将按照它们添加到事务批处理的顺序返回。 对于与事务批处理中的操作对应的结果,请使用 CosmosBatchOperationResult#getStatusCode 访问操作的状态。 如果操作未执行或由于事务批处理中的另一个操作失败而中止,则此字段的值将为 424;对于导致批处理中止的操作,此字段的值将指示失败的原因。
如果存在请求超时、消失、会话不可用、网络故障或服务以某种方式返回 5xx 等问题,则 Mono 将返回错误,而不是 CosmosBatchResponse。
对返回的响应使用 CosmosBatchResponse#isSuccessStatusCode 以确保事务批处理成功。
getConflict
public CosmosAsyncConflict getConflict(String id)
获取使用 CosmosAsyncConflict 当前容器作为上下文的对象。
Parameters:
Returns:
getDatabase
getFeedRanges
getId
getScripts
public CosmosAsyncScripts getScripts()
CosmosAsyncScripts获取使用当前容器作为上下文的 。
这可以进一步用于对 Cosmos 脚本执行各种操作。
Returns:
openConnectionsAndInitCaches
@Deprecated
public Mono
已放弃
尽力通过预热当前读取区域的缓存和连接来初始化容器。
根据容器具有的分区数,所需的总时间也会更改。 但通常可以使用以下公式来获取估计时间:如果建立连接需要 200 毫秒,并且容器中有 100 个分区,则获取地址列表后,大约需要 (100 * 4 / CPUCore) * 200 毫秒才能打开所有连接
注意:理想情况下,在任何工作负载之前,在应用程序初始化期间,只应调用此 API 一次。 如果出现任何暂时性错误,调用方应使用该错误并继续执行常规工作负荷。
Returns:
openConnectionsAndInitCaches
@Deprecated
public Mono
已放弃
尽力通过预热缓存和连接到首选区域列表中的指定编号区域来初始化容器。
根据容器具有的分区数,所需的总时间也会更改。 但通常可以使用以下公式来获取估计时间:如果建立连接需要 200 毫秒,并且容器中有 100 个分区,则大约需要 (100 * 4 / (10 * CPUCores) ) * 200 毫秒 * RegionsWithProactiveConnections 在获取地址列表后打开所有连接
注意:理想情况下,在任何工作负载之前,在应用程序初始化期间,只应调用此 API 一次。 如果出现任何暂时性错误,调用方应使用该错误并继续执行常规工作负荷。
为了尽量减少与预热缓存和打开连接相关的延迟,主动连接区域的编号不能超过 CosmosContainerProactiveInitConfigBuilder#MAX_NO_OF_PROACTIVE_CONNECTION_REGIONS。
Parameters:
Returns:
queryConflicts
public CosmosPagedFlux
查询当前容器中的所有冲突。
try {
cosmosAsyncContainer.queryConflicts(query).
byPage(100)
.subscribe(response -> {
for (CosmosConflictProperties conflictProperties : response.getResults()) {
System.out.println(conflictProperties);
}
}, throwable -> {
throwable.printStackTrace();
});
} catch (CosmosException ce) {
ce.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
Parameters:
Returns:
queryConflicts
public CosmosPagedFlux
查询当前容器中的所有冲突。
Parameters:
Returns:
read
public Mono
读取当前容器。
订阅后将执行该操作。 成功完成后, Mono 将包含单个 Cosmos 容器响应和读取容器。 如果失败, Mono 将出错。
Returns:
read
public Mono
在指定其他选项(如 If-Match)时读取当前容器。
订阅后将执行该操作。 成功完成后, Mono 将包含单个 Cosmos 容器响应和读取容器。 如果失败, Mono 将出错。
Parameters:
Returns:
readAllConflicts
public CosmosPagedFlux
列出当前容器中的所有冲突。
try {
cosmosAsyncContainer.readAllConflicts(options).
byPage(100)
.subscribe(response -> {
for (CosmosConflictProperties conflictProperties : response.getResults()) {
System.out.println(conflictProperties);
}
}, throwable -> {
throwable.printStackTrace();
});
} catch (CosmosException ce) {
ce.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
Parameters:
Returns:
readThroughput
public Mono
读取为当前容器预配的吞吐量。
Mono<ThroughputResponse> throughputResponseMono = cosmosAsyncContainer.readThroughput();
throughputResponseMono.subscribe(throughputResponse -> {
System.out.println(throughputResponse);
}, throwable -> {
throwable.printStackTrace();
});
Returns:
replace
public Mono
替换当前容器的属性。
订阅后将执行该操作。 Mono成功完成时的 将包含单个 Cosmos 容器响应,其中包含替换的容器属性。 如果失败, Mono 将出错。
Parameters:
Returns:
replace
public Mono
使用非默认请求选项时替换当前容器属性。
订阅后将执行该操作。 Mono成功完成时的 将包含单个 Cosmos 容器响应,其中包含替换的容器属性。 如果失败, Mono 将出错。
Parameters:
Returns:
replaceThroughput
public Mono
替换吞吐量。
ThroughputProperties throughputProperties =
ThroughputProperties.createAutoscaledThroughput(1000);
cosmosAsyncContainer.replaceThroughput(throughputProperties)
.subscribe(throughputResponse -> {
System.out.println(throughputResponse);
},
throwable -> {
throwable.printStackTrace();
});
Parameters:
Returns: