你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
通过 Java 使用拉取传递接收事件
本文提供了使用事件网格拉取传递接收 CloudEvent 的快速分步指南。 它提供用于接收、确认(从事件网格中删除事件)的示例代码。
先决条件
在继续操作之前,需要具备的先决条件如下:
命名空间、主题和事件订阅。
最新的 beta 版 SDK 包。 如果使用 maven,则可以查阅 maven 中央存储库。
重要
Beta 版本包中提供了拉取传递数据平面 SDK 支持。 应在项目中使用最新的 beta 版本包。
支持 Java 的 IDE(如 IntelliJ IDEA、Eclipse IDE 或 Visual Studio Code)。
运行 Java 8 语言级别的 Java JRE。
应具有在主题上可用的事件。 请参阅将事件发布到命名空间主题。
使用拉取传递接收事件
通过使用接收操作指定命名空间主题和队列事件订阅,从事件网格读取事件。 事件订阅是可以有效定义使用者客户端可以读取的 CloudEvents 集合的资源。 此示例代码使用基于密钥的身份验证,因为它提供了一种快速简单的身份验证方法。 对于生产方案,应使用 Microsoft Entra ID 身份验证,因为它可提供更可靠的身份验证机制。
package com.azure.messaging.eventgrid.samples;
import com.azure.core.credential.AzureKeyCredential;
import com.azure.core.http.HttpClient;
import com.azure.core.models.CloudEvent;
import com.azure.messaging.eventgrid.EventGridClient;
import com.azure.messaging.eventgrid.EventGridClientBuilder;
import com.azure.messaging.eventgrid.EventGridMessagingServiceVersion;
import com.azure.messaging.eventgrid.models.*;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
/**
* <p>Simple demo consumer app of CloudEvents from queue event subscriptions created for namespace topics.
* This code samples should use Java 1.8 level or above to avoid compilation errors.
* You should consult the resources below to use the client SDK and set up your project using maven.
* @see <a href="https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/eventgrid/azure-messaging-eventgrid">Event Grid data plane client SDK documentation</a>
* @see <a href="https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/boms/azure-sdk-bom/README.md">Azure BOM for client libraries</a>
* @see <a href="https://aka.ms/spring/versions">Spring Version Mapping</a> if you are using Spring.
* @see <a href="https://aka.ms/azsdk">Tool with links to control plane and data plane SDKs across all languages supported</a>.
*</p>
*/
public class NamespaceTopicConsumer {
private static final String TOPIC_NAME = "<yourNamespaceTopicName>";
public static final String EVENT_SUBSCRIPTION_NAME = "<yourEventSusbcriptionName>";
public static final String ENDPOINT = "<yourFullHttpsUrlToTheNamespaceEndpoint>";
public static final int MAX_NUMBER_OF_EVENTS_TO_RECEIVE = 10;
public static final Duration MAX_WAIT_TIME_FOR_EVENTS = Duration.ofSeconds(10);
private static EventGridClient eventGridClient;
private static List<String> receivedCloudEventLockTokens = new ArrayList<>();
private static List<CloudEvent> receivedCloudEvents = new ArrayList<>();
//TODO Do NOT include keys in source code. This code's objective is to give you a succinct sample about using Event Grid, not to provide an authoritative example for handling secrets in applications.
/**
* For security concerns, you should not have keys or any other secret in any part of the application code.
* You should use services like Azure Key Vault for managing your keys.
*/
public static final AzureKeyCredential CREDENTIAL = new AzureKeyCredential("<namespace key>");
public static void main(String[] args) {
//TODO Update Event Grid version number to your desired version. You can find more information on data plane APIs here:
//https://learn.microsoft.com/en-us/rest/api/eventgrid/.
eventGridClient = new EventGridClientBuilder()
.httpClient(HttpClient.createDefault()) // Requires Java 1.8 level
.endpoint(ENDPOINT)
.serviceVersion(EventGridMessagingServiceVersion.V2023_06_01_PREVIEW)
.credential(CREDENTIAL).buildClient(); // you may want to use .buildAsyncClient() for an asynchronous (project reactor) client.
System.out.println("Waiting " + MAX_WAIT_TIME_FOR_EVENTS.toSecondsPart() + " seconds for events to be read...");
List<ReceiveDetails> receiveDetails = eventGridClient.receiveCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME,
MAX_NUMBER_OF_EVENTS_TO_RECEIVE, MAX_WAIT_TIME_FOR_EVENTS).getValue();
for (ReceiveDetails detail : receiveDetails) {
// Add order message received to a tracking list
CloudEvent orderCloudEvent = detail.getEvent();
receivedCloudEvents.add(orderCloudEvent);
// Add lock token to a tracking list. Lock token functions like an identifier to a cloudEvent
BrokerProperties metadataForCloudEventReceived = detail.getBrokerProperties();
String lockToken = metadataForCloudEventReceived.getLockToken();
receivedCloudEventLockTokens.add(lockToken);
}
System.out.println("<-- Number of events received: " + receivedCloudEvents.size());
确认事件
要确认事件,请使用用于接收事件的相同代码,并添加以下行来调用确认专用方法:
// Acknowledge (i.e. delete from Event Grid the) events
acknowledge(receivedCloudEventLockTokens);
确认方法的示例实现以及用于打印有关失败锁定令牌信息的实用工具方法如下:
private static void acknowledge(List<String> lockTokens) {
AcknowledgeResult acknowledgeResult = eventGridClient.acknowledgeCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME, new AcknowledgeOptions(lockTokens));
List<String> succeededLockTokens = acknowledgeResult.getSucceededLockTokens();
if (succeededLockTokens != null && lockTokens.size() >= 1)
System.out.println("@@@ " + succeededLockTokens.size() + " events were successfully acknowledged:");
for (String lockToken : succeededLockTokens) {
System.out.println(" Acknowledged event lock token: " + lockToken);
}
// Print the information about failed lock tokens
if (succeededLockTokens.size() < lockTokens.size()) {
System.out.println(" At least one event was not acknowledged (deleted from Event Grid)");
writeFailedLockTokens(acknowledgeResult.getFailedLockTokens());
}
}
private static void writeFailedLockTokens(List<FailedLockToken> failedLockTokens) {
for (FailedLockToken failedLockToken : failedLockTokens) {
System.out.println(" Failed lock token: " + failedLockToken.getLockToken());
System.out.println(" Error code: " + failedLockToken.getErrorCode());
System.out.println(" Error description: " + failedLockToken.getErrorDescription());
}
}
发布事件
发布事件,以将其设为可重新传递。 与为确认事件所执行的操作类似,可以添加以下静态方法和一行来调用它,以发布由作为参数传递的锁令牌标识的事件。 需要 writeFailedLockTokens
方法来编译此方法。
private static void release(List<String> lockTokens) {
ReleaseResult releaseResult = eventGridClient.releaseCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME, new ReleaseOptions(lockTokens));
List<String> succeededLockTokens = releaseResult.getSucceededLockTokens();
if (succeededLockTokens != null && lockTokens.size() >= 1)
System.out.println("^^^ " + succeededLockTokens.size() + " events were successfully released:");
for (String lockToken : succeededLockTokens) {
System.out.println(" Released event lock token: " + lockToken);
}
// Print the information about failed lock tokens
if (succeededLockTokens.size() < lockTokens.size()) {
System.out.println(" At least one event was not released back to Event Grid.");
writeFailedLockTokens(releaseResult.getFailedLockTokens());
}
}
拒绝事件
拒绝使用者应用程序无法处理的事件。 拒绝事件的条件包括无法分析的格式不正确的事件,或处理事件的应用程序的问题。
private static void reject(List<String> lockTokens) {
RejectResult rejectResult = eventGridClient.rejectCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME, new RejectOptions(lockTokens));
List<String> succeededLockTokens = rejectResult.getSucceededLockTokens();
if (succeededLockTokens != null && lockTokens.size() >= 1)
System.out.println("--- " + succeededLockTokens.size() + " events were successfully rejected:");
for (String lockToken : succeededLockTokens) {
System.out.println(" Rejected event lock token: " + lockToken);
}
// Print the information about failed lock tokens
if (succeededLockTokens.size() < lockTokens.size()) {
System.out.println(" At least one event was not rejected.");
writeFailedLockTokens(rejectResult.getFailedLockTokens());
}
}
后续步骤
- 请参阅 Java API 参考。
- 若要详细了解拉取传递模型,请参阅拉取传递概述。