EventProcessorClient Clase
- java.
lang. Object - com.
azure. messaging. eventhubs. EventProcessorClient
- com.
public class EventProcessorClient
EventProcessorClient proporciona un mecanismo práctico para consumir eventos de todas las particiones de un centro de eventos en el contexto de un grupo de consumidores. La aplicación basada en el procesador de eventos consta de una o varias instancias de EventProcessorClient que están configuradas para consumir eventos del mismo centro de eventos, el grupo de consumidores para equilibrar la carga de trabajo entre distintas instancias y realizar un seguimiento del progreso cuando se procesan los eventos. Según el número de instancias que se ejecutan, cada EventProcessorClient puede tener cero o más particiones para equilibrar la carga de trabajo entre todas las instancias.
Ejemplo: Construir un EventProcessorClient
En el ejemplo siguiente se usa una instancia en memoria CheckpointStore pero azure-messaging-eventhubs-checkpointstore-blob proporciona un almacén de puntos de control respaldado por Azure Blob Storage. Además, fullyQualifiedNamespace
es el nombre de host del espacio de nombres de Event Hubs. Aparece en el panel "Essentials" después de navegar al espacio de nombres de Event Hubs a través de Azure Portal. La credencial usada se debe DefaultAzureCredential
a que combina credenciales usadas habitualmente en la implementación y el desarrollo y elige la credencial que se usará en función de su entorno en ejecución. Para consumerGroup
encontrar , vaya a la instancia del centro de eventos y seleccione "Grupos de consumidores" en el panel "Entidades". Se necesita consumerGroup
. La credencial usada se debe DefaultAzureCredential
a que combina credenciales usadas habitualmente en la implementación y el desarrollo y elige la credencial que se usará en función de su entorno en ejecución.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.checkpointStore(new SampleCheckpointStore())
.processEvent(eventContext -> {
System.out.printf("Partition id = %s and sequence number of event = %s%n",
eventContext.getPartitionContext().getPartitionId(),
eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
})
.buildEventProcessorClient();
Resumen del método
Modificador y tipo | Método y descripción |
---|---|
String |
getIdentifier()
El identificador es un nombre único que se asigna a esta instancia del procesador de eventos. |
synchronized boolean |
isRunning()
Devuelve |
synchronized void |
start()
Inicia el procesamiento de eventos para todas las particiones del centro de eventos que este procesador de eventos puede poseer, asignando un dedicado PartitionProcessor a cada partición. |
synchronized void |
stop()
Detiene el procesamiento de eventos para todas las particiones que pertenecen a este procesador de eventos. |
Métodos heredados de java.lang.Object
Detalles del método
getIdentifier
public String getIdentifier()
El identificador es un nombre único que se asigna a esta instancia del procesador de eventos.
Returns:
isRunning
public synchronized boolean isRunning()
Devuelve true
si el procesador de eventos se está ejecutando. Si el procesador de eventos ya se está ejecutando, la llamada start() a no tiene ningún efecto.
Returns:
true
si el procesador de eventos se está ejecutando.start
public synchronized void start()
Inicia el procesamiento de eventos para todas las particiones del centro de eventos que este procesador de eventos puede poseer, asignando un dedicado PartitionProcessor a cada partición. Si hay otros procesadores de eventos activos para el mismo grupo de consumidores en el centro de eventos, la responsabilidad de las particiones se compartirá entre ellos.
Las llamadas posteriores para iniciar se omitirán si este procesador de eventos ya se está ejecutando. Al llamar a start después stop() de llamar a , se reiniciará este procesador de eventos.
Iniciar el procesador para consumir eventos de todas las particiones
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.processEvent(eventContext -> {
System.out.printf("Partition id = %s and sequence number of event = %s%n",
eventContext.getPartitionContext().getPartitionId(),
eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
})
.checkpointStore(new SampleCheckpointStore())
.buildEventProcessorClient();
eventProcessorClient.start();
// Continue to perform other tasks while the processor is running in the background.
//
// Finally, stop the processor client when application is finished.
eventProcessorClient.stop();
stop
public synchronized void stop()
Detiene el procesamiento de eventos para todas las particiones que pertenecen a este procesador de eventos. Todo PartitionProcessor se apagará y se cerrarán todos los recursos abiertos.
Las llamadas posteriores a stop se omitirán si el procesador de eventos no se está ejecutando.
Detener el procesador
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.processEvent(eventContext -> {
System.out.printf("Partition id = %s and sequence number of event = %s%n",
eventContext.getPartitionContext().getPartitionId(),
eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
})
.checkpointStore(new SampleCheckpointStore())
.buildEventProcessorClient();
eventProcessorClient.start();
// Continue to perform other tasks while the processor is running in the background.
//
// Finally, stop the processor client when application is finished.
eventProcessorClient.stop();
Se aplica a
Azure SDK for Java