Freigeben über


Empfangen von Ereignissen mithilfe der Pullübermittlung mit Java

Dieser Artikel enthält eine schnelle schrittweise Anleitung zum Empfangen von CloudEvents mithilfe der Pullübermittlung von Event Grid. Er stellt Beispielcode zum Empfangen, Bestätigen (Löschen von Ereignissen aus Event Grid) bereit.

Voraussetzungen

Bevor Sie fortfahren, sollten die folgenden Voraussetzungen erfüllt sein:

  • Verstehen, was die Pullübermittlung ist. Weitere Informationen finden Sie unter Pullübermittlungskonzepte und Übersicht über Pullübermittlung.

  • Ein Namespace-, Themen- und Ereignisabonnement.

  • Das neueste Beta-SDK-Paket. Wenn Sie Maven verwenden, können Sie das zentrale Maven-Repository konsultieren.

    Wichtig

    Unterstützung für das Datenebenen-SDK für Pullübermittlung ist in Beta-Paketen verfügbar. Sie sollten in Ihrem Projekt das neueste Betapaket verwenden.

  • Eine IDE, die Java unterstützt, z. B. IntelliJ IDEA, Eclipse IDE oder Visual Studio Code.

  • Java JRE mit Java 8-Sprachebene.

  • Für ein Thema sollten Ereignisse verfügbar sein. Siehe Veröffentlichen von Ereignissen in Namespacethemen.

Empfangen von Ereignissen mithilfe der Pullübermittlung

Sie lesen Ereignisse aus Event Grid, indem Sie ein Namespacethema und ein Warteschlangen-Ereignisabonnement mit dem Vorgang Empfangen angeben. Das Ereignisabonnement ist die Ressource, die effektiv die Sammlung von CloudEvents definiert, die ein Consumerclient lesen kann. Dieser Beispielcode verwendet die schlüsselbasierte Authentifizierung, da sie einen schnellen und einfachen Ansatz für die Authentifizierung bietet. Für Produktionsszenarien sollten Sie die Microsoft Entra ID-Authentifizierung verwenden, da sie einen wesentlich robusteren Authentifizierungsmechanismus bietet.

package com.azure.messaging.eventgrid.samples;

import com.azure.core.credential.AzureKeyCredential;
import com.azure.core.http.HttpClient;
import com.azure.core.models.CloudEvent;
import com.azure.messaging.eventgrid.EventGridClient;
import com.azure.messaging.eventgrid.EventGridClientBuilder;
import com.azure.messaging.eventgrid.EventGridMessagingServiceVersion;
import com.azure.messaging.eventgrid.models.*;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

/**
 * <p>Simple demo consumer app of CloudEvents from queue event subscriptions created for namespace topics.
 * This code samples should use Java 1.8 level or above to avoid compilation errors.
 * You should consult the resources below to use the client SDK and set up your project using maven.
 * @see <a href="https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/eventgrid/azure-messaging-eventgrid">Event Grid data plane client SDK documentation</a>
 * @see <a href="https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/boms/azure-sdk-bom/README.md">Azure BOM for client libraries</a>
 * @see <a href="https://aka.ms/spring/versions">Spring Version Mapping</a> if you are using Spring.
 * @see <a href="https://aka.ms/azsdk">Tool with links to control plane and data plane SDKs across all languages supported</a>.
 *</p>
 */
public class NamespaceTopicConsumer {
    private static final String TOPIC_NAME = "<yourNamespaceTopicName>";
    public static final String EVENT_SUBSCRIPTION_NAME = "<yourEventSusbcriptionName>";
    public static final String ENDPOINT =  "<yourFullHttpsUrlToTheNamespaceEndpoint>";
    public static final int MAX_NUMBER_OF_EVENTS_TO_RECEIVE = 10;
    public static final Duration MAX_WAIT_TIME_FOR_EVENTS = Duration.ofSeconds(10);

    private static EventGridClient eventGridClient;
    private static List<String> receivedCloudEventLockTokens = new ArrayList<>();
    private static List<CloudEvent> receivedCloudEvents = new ArrayList<>();

    //TODO  Do NOT include keys in source code. This code's objective is to give you a succinct sample about using Event Grid, not to provide an authoritative example for handling secrets in applications.
    /**
     * For security concerns, you should not have keys or any other secret in any part of the application code.
     * You should use services like Azure Key Vault for managing your keys.
     */
    public static final AzureKeyCredential CREDENTIAL = new AzureKeyCredential("<namespace key>");
    public static void main(String[] args) {
        //TODO Update Event Grid version number to your desired version. You can find more information on data plane APIs here:
        //https://learn.microsoft.com/en-us/rest/api/eventgrid/.
        eventGridClient = new EventGridClientBuilder()
                .httpClient(HttpClient.createDefault())  // Requires Java 1.8 level
                .endpoint(ENDPOINT)
                .serviceVersion(EventGridMessagingServiceVersion.V2023_06_01_PREVIEW)
                .credential(CREDENTIAL).buildClient();   // you may want to use .buildAsyncClient() for an asynchronous (project reactor) client.

        System.out.println("Waiting " +  MAX_WAIT_TIME_FOR_EVENTS.toSecondsPart() + " seconds for events to be read...");
        List<ReceiveDetails> receiveDetails = eventGridClient.receiveCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME,
                MAX_NUMBER_OF_EVENTS_TO_RECEIVE, MAX_WAIT_TIME_FOR_EVENTS).getValue();

        for (ReceiveDetails detail : receiveDetails) {
            // Add order message received to a tracking list
            CloudEvent orderCloudEvent = detail.getEvent();
            receivedCloudEvents.add(orderCloudEvent);
            // Add lock token to a tracking list. Lock token functions like an identifier to a cloudEvent
            BrokerProperties metadataForCloudEventReceived = detail.getBrokerProperties();
            String lockToken = metadataForCloudEventReceived.getLockToken();
            receivedCloudEventLockTokens.add(lockToken);
        }
        System.out.println("<-- Number of events received: " + receivedCloudEvents.size());

Bestätigen von Ereignissen

Verwenden Sie zum Bestätigen von Ereignissen denselben Code, der für den Empfang von Ereignissen verwendet wird, und fügen Sie die folgenden Zeilen hinzu, um eine bestätigende private Methode aufzurufen:

        // Acknowledge (i.e. delete from Event Grid the) events
        acknowledge(receivedCloudEventLockTokens);

Eine Beispielimplementierung einer Bestätigungsmethode zusammen mit einer Hilfsmethode zum Drucken von Informationen zu fehlgeschlagenen Sperrtoken folgt:

    private static void acknowledge(List<String> lockTokens) {
        AcknowledgeResult acknowledgeResult = eventGridClient.acknowledgeCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME, new AcknowledgeOptions(lockTokens));
        List<String> succeededLockTokens = acknowledgeResult.getSucceededLockTokens();
        if (succeededLockTokens != null && lockTokens.size() >= 1)
            System.out.println("@@@ " + succeededLockTokens.size() + " events were successfully acknowledged:");
        for (String lockToken : succeededLockTokens) {
            System.out.println("    Acknowledged event lock token: " + lockToken);
        }
        // Print the information about failed lock tokens
        if (succeededLockTokens.size() < lockTokens.size()) {
            System.out.println("    At least one event was not acknowledged (deleted from Event Grid)");
            writeFailedLockTokens(acknowledgeResult.getFailedLockTokens());
        }
    }

    private static void writeFailedLockTokens(List<FailedLockToken> failedLockTokens) {
        for (FailedLockToken failedLockToken : failedLockTokens) {
            System.out.println("    Failed lock token: " + failedLockToken.getLockToken());
            System.out.println("    Error code: " + failedLockToken.getErrorCode());
            System.out.println("    Error description: " + failedLockToken.getErrorDescription());
        }
    }

Freigeben von Ereignissen

Geben Sie Ereignisse frei, um sie zur erneuten Übermittlung zur Verfügung zu stellen. Ähnlich wie bei der Bestätigung von Ereignissen können Sie die folgende statische Methode und eine Zeile hinzufügen, um sie aufzurufen, um Ereignisse freizugeben, die durch die als Argument übergebenen Sperrtoken identifiziert wurden. Zum Kompilieren dieser Methode benötigen Sie die writeFailedLockTokens-Methode.

   private static void release(List<String> lockTokens) {
        ReleaseResult releaseResult = eventGridClient.releaseCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME, new ReleaseOptions(lockTokens));
        List<String> succeededLockTokens = releaseResult.getSucceededLockTokens();
        if (succeededLockTokens != null && lockTokens.size() >= 1)
            System.out.println("^^^ " + succeededLockTokens.size() + " events were successfully released:");
        for (String lockToken : succeededLockTokens) {
            System.out.println("    Released event lock token: " + lockToken);
        }
        // Print the information about failed lock tokens
        if (succeededLockTokens.size() < lockTokens.size()) {
            System.out.println("    At least one event was not released back to Event Grid.");
            writeFailedLockTokens(releaseResult.getFailedLockTokens());
        }
    }

Ablehnen von Ereignissen

Lehnen Sie Ereignisse ab, die Ihre Consumeranwendung nicht verarbeiten kann. Zu den Bedingungen, unter denen Sie ein Ereignis ablehnen, gehören ein fehlerhaftes Ereignis, das nicht geparst werden kann, oder Probleme mit der Anwendung, die die Ereignisse verarbeitet.

    private static void reject(List<String> lockTokens) {
        RejectResult rejectResult = eventGridClient.rejectCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME, new RejectOptions(lockTokens));
        List<String> succeededLockTokens = rejectResult.getSucceededLockTokens();
        if (succeededLockTokens != null && lockTokens.size() >= 1)
            System.out.println("--- " + succeededLockTokens.size() + " events were successfully rejected:");
        for (String lockToken : succeededLockTokens) {
            System.out.println("    Rejected event lock token: " + lockToken);
        }
        // Print the information about failed lock tokens
        if (succeededLockTokens.size() < lockTokens.size()) {
            System.out.println("    At least one event was not rejected.");
            writeFailedLockTokens(rejectResult.getFailedLockTokens());
        }
    }

Nächste Schritte