Condividi tramite


Ricevere eventi usando il recapito pull con Java

Questo articolo fornisce una guida rapida e dettagliata per ricevere CloudEvents usando il recapito pull di Griglia di eventi. Fornisce codice di esempio per ricevere, confermare (eliminare gli eventi da Griglia di eventi).

Prerequisiti

I prerequisiti necessari prima di procedere sono:

Ricevere eventi con il recapito pull

Gli eventi vengono letti da Griglia di eventi specificando un argomento dello spazio dei nomi e una sottoscrizione di eventi della coda con l'operazione di ricezione. La sottoscrizione di eventi è la risorsa che definisce in modo efficace la raccolta di CloudEvents che un client consumer può leggere. Questo codice di esempio usa l'autenticazione basata su chiave perché fornisce un approccio rapido e semplice per l'autenticazione. Per gli scenari di produzione, è consigliabile usare l'autenticazione Microsoft Entra ID perché offre un meccanismo di autenticazione molto più affidabile.

package com.azure.messaging.eventgrid.samples;

import com.azure.core.credential.AzureKeyCredential;
import com.azure.core.http.HttpClient;
import com.azure.core.models.CloudEvent;
import com.azure.messaging.eventgrid.EventGridClient;
import com.azure.messaging.eventgrid.EventGridClientBuilder;
import com.azure.messaging.eventgrid.EventGridMessagingServiceVersion;
import com.azure.messaging.eventgrid.models.*;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

/**
 * <p>Simple demo consumer app of CloudEvents from queue event subscriptions created for namespace topics.
 * This code samples should use Java 1.8 level or above to avoid compilation errors.
 * You should consult the resources below to use the client SDK and set up your project using maven.
 * @see <a href="https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/eventgrid/azure-messaging-eventgrid">Event Grid data plane client SDK documentation</a>
 * @see <a href="https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/boms/azure-sdk-bom/README.md">Azure BOM for client libraries</a>
 * @see <a href="https://aka.ms/spring/versions">Spring Version Mapping</a> if you are using Spring.
 * @see <a href="https://aka.ms/azsdk">Tool with links to control plane and data plane SDKs across all languages supported</a>.
 *</p>
 */
public class NamespaceTopicConsumer {
    private static final String TOPIC_NAME = "<yourNamespaceTopicName>";
    public static final String EVENT_SUBSCRIPTION_NAME = "<yourEventSubscriptionName>";
    public static final String ENDPOINT =  "<yourFullHttpsUrlToTheNamespaceEndpoint>";
    public static final int MAX_NUMBER_OF_EVENTS_TO_RECEIVE = 10;
    public static final Duration MAX_WAIT_TIME_FOR_EVENTS = Duration.ofSeconds(10);

    private static EventGridClient eventGridClient;
    private static List<String> receivedCloudEventLockTokens = new ArrayList<>();
    private static List<CloudEvent> receivedCloudEvents = new ArrayList<>();

    //TODO  Do NOT include keys in source code. This code's objective is to give you a succinct sample about using Event Grid, not to provide an authoritative example for handling secrets in applications.
    /**
     * For security concerns, you should not have keys or any other secret in any part of the application code.
     * You should use services like Azure Key Vault for managing your keys.
     */
    public static final AzureKeyCredential CREDENTIAL = new AzureKeyCredential("<namespace key>");
    public static void main(String[] args) {
        //TODO Update Event Grid version number to your desired version. You can find more information on data plane APIs here:
        //https://learn.microsoft.com/en-us/rest/api/eventgrid/.
        eventGridClient = new EventGridClientBuilder()
                .httpClient(HttpClient.createDefault())  // Requires Java 1.8 level
                .endpoint(ENDPOINT)
                .serviceVersion(EventGridMessagingServiceVersion.V2023_06_01_PREVIEW)
                .credential(CREDENTIAL).buildClient();   // you may want to use .buildAsyncClient() for an asynchronous (project reactor) client.

        System.out.println("Waiting " +  MAX_WAIT_TIME_FOR_EVENTS.toSecondsPart() + " seconds for events to be read...");
        List<ReceiveDetails> receiveDetails = eventGridClient.receiveCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME,
                MAX_NUMBER_OF_EVENTS_TO_RECEIVE, MAX_WAIT_TIME_FOR_EVENTS).getValue();

        for (ReceiveDetails detail : receiveDetails) {
            // Add order message received to a tracking list
            CloudEvent orderCloudEvent = detail.getEvent();
            receivedCloudEvents.add(orderCloudEvent);
            // Add lock token to a tracking list. Lock token functions like an identifier to a cloudEvent
            BrokerProperties metadataForCloudEventReceived = detail.getBrokerProperties();
            String lockToken = metadataForCloudEventReceived.getLockToken();
            receivedCloudEventLockTokens.add(lockToken);
        }
        System.out.println("<-- Number of events received: " + receivedCloudEvents.size());

Confermare gli eventi

Per confermare gli eventi, usare lo stesso codice usato per la ricezione di eventi e aggiungere le righe seguenti per chiamare un metodo privato di conferma:

        // Acknowledge (i.e. delete from Event Grid the) events
        acknowledge(receivedCloudEventLockTokens);

Di seguito è riportato un esempio di implementazione di un metodo di riconoscimento insieme a un metodo di utilità per stampare informazioni sui token di blocco non riusciti:

    private static void acknowledge(List<String> lockTokens) {
        AcknowledgeResult acknowledgeResult = eventGridClient.acknowledgeCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME, new AcknowledgeOptions(lockTokens));
        List<String> succeededLockTokens = acknowledgeResult.getSucceededLockTokens();
        if (succeededLockTokens != null && lockTokens.size() >= 1)
            System.out.println("@@@ " + succeededLockTokens.size() + " events were successfully acknowledged:");
        for (String lockToken : succeededLockTokens) {
            System.out.println("    Acknowledged event lock token: " + lockToken);
        }
        // Print the information about failed lock tokens
        if (succeededLockTokens.size() < lockTokens.size()) {
            System.out.println("    At least one event was not acknowledged (deleted from Event Grid)");
            writeFailedLockTokens(acknowledgeResult.getFailedLockTokens());
        }
    }

    private static void writeFailedLockTokens(List<FailedLockToken> failedLockTokens) {
        for (FailedLockToken failedLockToken : failedLockTokens) {
            System.out.println("    Failed lock token: " + failedLockToken.getLockToken());
            System.out.println("    Error code: " + failedLockToken.getErrorCode());
            System.out.println("    Error description: " + failedLockToken.getErrorDescription());
        }
    }

Rilasciare eventi

Rilasciare gli eventi per renderli disponibili per essere nuovamente recapitati. Analogamente a quanto fatto per riconoscere gli eventi, è possibile aggiungere il metodo statico seguente e una riga per richiedere di rilasciare eventi identificati dai token di blocco passati come argomento. Per compilare questo metodo è necessario il metodo writeFailedLockTokens.

   private static void release(List<String> lockTokens) {
        ReleaseResult releaseResult = eventGridClient.releaseCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME, new ReleaseOptions(lockTokens));
        List<String> succeededLockTokens = releaseResult.getSucceededLockTokens();
        if (succeededLockTokens != null && lockTokens.size() >= 1)
            System.out.println("^^^ " + succeededLockTokens.size() + " events were successfully released:");
        for (String lockToken : succeededLockTokens) {
            System.out.println("    Released event lock token: " + lockToken);
        }
        // Print the information about failed lock tokens
        if (succeededLockTokens.size() < lockTokens.size()) {
            System.out.println("    At least one event was not released back to Event Grid.");
            writeFailedLockTokens(releaseResult.getFailedLockTokens());
        }
    }

Rifiutare gli eventi

Rifiutare gli eventi che l'applicazione consumer non può elaborare. Le condizioni per cui si rifiuta un evento includono un evento in formato non valido che non può essere analizzato o problemi con l'applicazione che elabora gli eventi.

    private static void reject(List<String> lockTokens) {
        RejectResult rejectResult = eventGridClient.rejectCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME, new RejectOptions(lockTokens));
        List<String> succeededLockTokens = rejectResult.getSucceededLockTokens();
        if (succeededLockTokens != null && lockTokens.size() >= 1)
            System.out.println("--- " + succeededLockTokens.size() + " events were successfully rejected:");
        for (String lockToken : succeededLockTokens) {
            System.out.println("    Rejected event lock token: " + lockToken);
        }
        // Print the information about failed lock tokens
        if (succeededLockTokens.size() < lockTokens.size()) {
            System.out.println("    At least one event was not rejected.");
            writeFailedLockTokens(rejectResult.getFailedLockTokens());
        }
    }

Passaggi successivi