Partager via


EventProcessorClient Classe

  • java.lang.Object
    • com.azure.messaging.eventhubs.EventProcessorClient

public class EventProcessorClient

EventProcessorClient fournit un mécanisme pratique pour consommer des événements à partir de toutes les partitions d’un Event Hub dans le contexte d’un groupe de consommateurs. L’application basée sur le processeur d’événements se compose d’une ou plusieurs instances d’EventProcessorClient qui sont configurées pour consommer des événements à partir du même event hub, groupe de consommateurs pour équilibrer la charge de travail entre différentes instances et suivre la progression du traitement des événements. En fonction du nombre d’instances en cours d’exécution, chaque EventProcessorClient peut posséder zéro ou plusieurs partitions pour équilibrer la charge de travail entre toutes les instances.

Exemple : Construire un EventProcessorClient

L’exemple ci-dessous utilise un en mémoireCheckpointStore, mais azure-messaging-eventhubs-checkpointstore-blob fournit un magasin de points de contrôle soutenu par Stockage Blob Azure. En outre, fullyQualifiedNamespace est le nom d’hôte de l’espace de noms Event Hubs. Il est répertorié sous le panneau « Essentials » après avoir accédé à l’espace de noms Event Hubs via le portail Azure. Les informations d’identification utilisées sont DefaultAzureCredential dues au fait qu’elles combinent les informations d’identification couramment utilisées dans le déploiement et le développement, et qu’elles choisissent les informations d’identification à utiliser en fonction de leur environnement d’exécution. Le consumerGroup est trouvé en accédant à l’instance Event Hub et en sélectionnant « Groupes de consommateurs » sous le panneau « Entités ». L'attribut consumerGroup est requis. Les informations d’identification utilisées sont DefaultAzureCredential dues au fait qu’elles combinent les informations d’identification couramment utilisées dans le déploiement et le développement, et qu’elles choisissent les informations d’identification à utiliser en fonction de leur environnement d’exécution.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
     .consumerGroup("<< CONSUMER GROUP NAME >>")
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .checkpointStore(new SampleCheckpointStore())
     .processEvent(eventContext -> {
         System.out.printf("Partition id = %s and sequence number of event = %s%n",
             eventContext.getPartitionContext().getPartitionId(),
             eventContext.getEventData().getSequenceNumber());
     })
     .processError(errorContext -> {
         System.out.printf("Error occurred in partition processor for partition %s, %s%n",
             errorContext.getPartitionContext().getPartitionId(),
             errorContext.getThrowable());
     })
     .buildEventProcessorClient();

Résumé de la méthode

Modificateur et type Méthode et description
String getIdentifier()

L’identificateur est un nom unique donné à ce instance de processeur d’événements.

synchronized boolean isRunning()

Retourne true si le processeur d’événements est en cours d’exécution.

synchronized void start()

Démarre le traitement des événements pour toutes les partitions du hub d’événements que ce processeur d’événements peut posséder, en affectant un dédié PartitionProcessor à chaque partition.

synchronized void stop()

Arrête le traitement des événements pour toutes les partitions appartenant à ce processeur d’événements.

Méthodes héritées de java.lang.Object

Détails de la méthode

getIdentifier

public String getIdentifier()

L’identificateur est un nom unique donné à ce instance de processeur d’événements.

Returns:

Identificateur de ce processeur d’événements.

isRunning

public synchronized boolean isRunning()

Retourne true si le processeur d’événements est en cours d’exécution. Si le processeur d’événements est déjà en cours d’exécution, l’appel start() n’a aucun effet.

Returns:

true si le processeur d’événements est en cours d’exécution.

start

public synchronized void start()

Démarre le traitement des événements pour toutes les partitions du hub d’événements que ce processeur d’événements peut posséder, en affectant un dédié PartitionProcessor à chaque partition. Si d’autres processeurs d’événements sont actifs pour le même groupe de consommateurs sur le hub d’événements, la responsabilité des partitions est partagée entre eux.

Les appels suivants au démarrage seront ignorés si ce processeur d’événements est déjà en cours d’exécution. L’appel de start after stop() est appelé redémarre ce processeur d’événements.

Démarrage du processeur pour consommer des événements de toutes les partitions

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
     .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .processEvent(eventContext -> {
         System.out.printf("Partition id = %s and sequence number of event = %s%n",
             eventContext.getPartitionContext().getPartitionId(),
             eventContext.getEventData().getSequenceNumber());
     })
     .processError(errorContext -> {
         System.out.printf("Error occurred in partition processor for partition %s, %s%n",
             errorContext.getPartitionContext().getPartitionId(),
             errorContext.getThrowable());
     })
     .checkpointStore(new SampleCheckpointStore())
     .buildEventProcessorClient();

 eventProcessorClient.start();

 // Continue to perform other tasks while the processor is running in the background.
 //
 // Finally, stop the processor client when application is finished.
 eventProcessorClient.stop();

stop

public synchronized void stop()

Arrête le traitement des événements pour toutes les partitions appartenant à ce processeur d’événements. Toutes seront arrêtées et toutes les PartitionProcessor ressources ouvertes seront fermées.

Les appels suivants à arrêter seront ignorés si le processeur d’événements n’est pas en cours d’exécution.

Arrêt du processeur

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
     .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .processEvent(eventContext -> {
         System.out.printf("Partition id = %s and sequence number of event = %s%n",
             eventContext.getPartitionContext().getPartitionId(),
             eventContext.getEventData().getSequenceNumber());
     })
     .processError(errorContext -> {
         System.out.printf("Error occurred in partition processor for partition %s, %s%n",
             errorContext.getPartitionContext().getPartitionId(),
             errorContext.getThrowable());
     })
     .checkpointStore(new SampleCheckpointStore())
     .buildEventProcessorClient();

 eventProcessorClient.start();

 // Continue to perform other tasks while the processor is running in the background.
 //
 // Finally, stop the processor client when application is finished.
 eventProcessorClient.stop();

S’applique à