EventProcessorClientBuilder Clase
- java.
lang. Object - com.
azure. messaging. eventhubs. EventProcessorClientBuilder
- com.
Implementaciones
public class EventProcessorClientBuilder
implements TokenCredentialTrait<EventProcessorClientBuilder>, AzureNamedKeyCredentialTrait<EventProcessorClientBuilder>, ConnectionStringTrait<EventProcessorClientBuilder>, AzureSasCredentialTrait<EventProcessorClientBuilder>, AmqpTrait<EventProcessorClientBuilder>, ConfigurationTrait<EventProcessorClientBuilder>
Esta clase proporciona una API fluida de Builder para ayudar a la configuración y creación de instancias de EventProcessorClient. La llamada a buildEventProcessorClient() crea una nueva instancia de EventProcessorClient.
Para crear una instancia de EventProcessorClient, se requieren los siguientes campos:
CheckpointStore - Una implementación de CheckpointStore que almacena información de propiedad de puntos de control y partición para habilitar el equilibrio de carga y los eventos procesados de puntos de control.
processEvent(Consumer<EventContext> processEvent) o processEventBatch(Consumer<EventBatchContext> processEventBatch, int maxBatchSize, Duration maxWaitTime) bien: devolución de llamada que procesa los eventos recibidos del centro de eventos.
processError(Consumer<ErrorContext> processError) - Devolución de llamada que controla los errores que pueden producirse al ejecutar EventProcessorClient.
Credenciales para realizar operaciones con Azure Event Hubs. Se pueden establecer mediante uno de los métodos siguientes:
- connectionString(String connectionString) con una cadena de conexión a un centro de eventos específico.
- connectionString(String connectionString, String eventHubName) con una cadena de conexión del espacio de nombres del centro de eventos y el nombre del centro de eventos.
- credential(String fullyQualifiedNamespace, String eventHubName, TokenCredential credential) con el espacio de nombres completo, el nombre del centro de eventos y un conjunto de credenciales autorizadas para usar el centro de eventos.
- credential(TokenCredential credential), credential(AzureSasCredential credential)o credential(AzureNamedKeyCredential credential) junto con fullyQualifiedNamespace(String fullyQualifiedNamespace) y eventHubName(String eventHubName). El espacio de nombres completo, el nombre del centro de eventos y las credenciales autorizadas para usar el centro de eventos.
Los ejemplos que se muestran en este documento usan un objeto de credencial denominado DefaultAzureCredential para la autenticación, que es adecuado para la mayoría de los escenarios, incluidos los entornos de desarrollo y producción locales. Además, se recomienda usar la identidad administrada para la autenticación en entornos de producción. Puede encontrar más información sobre las distintas formas de autenticación y sus tipos de credenciales correspondientes en la documentación de Azure Identity.
Ejemplo: Construir un EventProcessorClient
En el ejemplo de código siguiente se muestra la creación del cliente del procesador. Se recomienda el cliente del procesador para escenarios de producción, ya que puede equilibrar la carga entre varias instancias en ejecución, realizar puntos de control y volver a conectarse en errores transitorios, como interrupciones de red. En el ejemplo siguiente se usa una instancia en memoria CheckpointStore pero azure-messaging-eventhubs-checkpointstore-blob proporciona un almacén de puntos de control respaldado por Azure Blob Storage.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.checkpointStore(new SampleCheckpointStore())
.processEvent(eventContext -> {
System.out.printf("Partition id = %s and sequence number of event = %s%n",
eventContext.getPartitionContext().getPartitionId(),
eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
})
.buildEventProcessorClient();
Resumen del campo
Modificador y tipo | Campo y descripción |
---|---|
static final Duration |
DEFAULT_LOAD_BALANCING_UPDATE_INTERVAL
Intervalo de actualización de equilibrio de carga predeterminado. |
static final Duration |
DEFAULT_OWNERSHIP_EXPIRATION_INTERVAL
Expiración de la propiedad predeterminada. |
Resumen del constructor
Constructor | Description |
---|---|
EventProcessorClientBuilder() |
Crea una nueva instancia de EventProcessorClientBuilder. |
Resumen del método
Modificador y tipo | Método y descripción |
---|---|
Event |
buildEventProcessorClient()
Esto creará un nuevo EventProcessorClient configurado con las opciones establecidas en este generador. |
Event |
checkpointStore(CheckpointStore checkpointStore)
Establece el objeto CheckpointStore que EventProcessorClient se usará para almacenar la propiedad de la partición y la información del punto de control. |
Event |
clientOptions(ClientOptions clientOptions)
Establece las opciones de cliente para el cliente del procesador. |
Event |
configuration(Configuration configuration)
Establece el almacén de configuración que se usa durante la construcción del cliente de servicio. |
Event |
connectionString(String connectionString)
Establece la información de credenciales dada una cadena de conexión a la instancia del centro de eventos. |
Event |
connectionString(String connectionString, String eventHubName)
Establece la información de credenciales dada una cadena de conexión al espacio de nombres y el nombre de Event Hubs en una instancia específica del centro de eventos. |
Event |
consumerGroup(String consumerGroup)
Establece el nombre del grupo de consumidores desde el EventProcessorClient que debe consumir eventos. |
Event |
credential(AzureNamedKeyCredential credential)
Establece la información de credenciales a la que se conectará la instancia del centro de eventos y cómo autorizarla. |
Event |
credential(AzureSasCredential credential)
Establece la información de credenciales a la que se conectará la instancia del centro de eventos y cómo autorizarla. |
Event |
credential(TokenCredential credential)
Establece el TokenCredential objeto utilizado para autorizar las solicitudes enviadas al servicio. |
Event |
credential(String fullyQualifiedNamespace, String eventHubName, AzureNamedKeyCredential credential)
Establece la información de credenciales a la que se conectará la instancia del centro de eventos y cómo autorizarla. |
Event |
credential(String fullyQualifiedNamespace, String eventHubName, AzureSasCredential credential)
Establece la información de credenciales a la que se conectará la instancia del centro de eventos y cómo autorizarla. |
Event |
credential(String fullyQualifiedNamespace, String eventHubName, TokenCredential credential)
Establece la información de credenciales a la que se conectará la instancia del centro de eventos y cómo autorizarla. |
Event |
customEndpointAddress(String customEndpointAddress)
Establece una dirección de punto de conexión personalizada al conectarse al servicio Event Hubs. |
Event |
eventHubName(String eventHubName)
Establece el nombre del centro de eventos al que se va a conectar el cliente. |
Event |
fullyQualifiedNamespace(String fullyQualifiedNamespace)
Establece el nombre completo del espacio de nombres de Event Hubs. |
Event |
initialPartitionEventPosition(Map<String,EventPosition> initialPartitionEventPosition)
Establece el mapa que contiene la posición del evento que se va a usar para cada partición si no existe un punto de control para la partición en CheckpointStore. |
Event |
initialPartitionEventPosition(Function<String,EventPosition> initialEventPositionProvider)
Establece la posición inicial predeterminada para cada partición si no existe un punto de control para esa partición en .CheckpointStore |
Event |
loadBalancingStrategy(LoadBalancingStrategy loadBalancingStrategy)
el LoadBalancingStrategy objeto EventProcessorClient usará para reclamar la propiedad de la partición. |
Event |
loadBalancingUpdateInterval(Duration loadBalancingUpdateInterval)
Intervalo de tiempo entre ciclos de actualización de equilibrio de carga. |
Event |
partitionOwnershipExpirationInterval(Duration partitionOwnershipExpirationInterval)
La duración del tiempo después de la cual expira la propiedad de la partición si la instancia del procesador propietaria no la renueva. |
Event |
prefetchCount(int prefetchCount)
Establece el recuento utilizado por los receptores para controlar el número de eventos que cada consumidor recibirá y pondrá en cola localmente sin tener en cuenta si una operación de recepción está activa actualmente. |
Event |
processError(Consumer<ErrorContext> processError)
Función a la que se llama cuando se produce un error al procesar eventos. |
Event |
processEvent(Consumer<EventContext> processEvent)
Función a la que se llama para cada evento recibido por este .EventProcessorClient |
Event |
processEvent(Consumer<EventContext> processEvent, Duration maxWaitTime)
Función a la que se llama para cada evento recibido por este .EventProcessorClient |
Event |
processEventBatch(Consumer<EventBatchContext> processEventBatch, int maxBatchSize)
Función a la que se llama para cada evento recibido por este .EventProcessorClient |
Event |
processEventBatch(Consumer<EventBatchContext> processEventBatch, int maxBatchSize, Duration maxWaitTime)
Función a la que se llama para cada evento recibido por este .EventProcessorClient |
Event |
processPartitionClose(Consumer<CloseContext> closePartition)
Función a la que se llama cuando se detiene un procesamiento para una partición. |
Event |
processPartitionInitialization(Consumer<InitializationContext> initializePartition)
Función a la que se llama antes de que se inicie el procesamiento para una partición. |
Event |
proxyOptions(ProxyOptions proxyOptions)
Establece la configuración de proxy que se va a usar para EventHubAsyncClient. |
Event |
retry(AmqpRetryOptions retryOptions)
Obsoleto
Reemplazado por retryOptions(AmqpRetryOptions retryOptions).
Establece la directiva de reintento para EventHubAsyncClient. |
Event |
retryOptions(AmqpRetryOptions retryOptions)
Establece la directiva de reintento para EventHubAsyncClient. |
Event |
trackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties)
Establece si el procesador de eventos debe solicitar información sobre el último evento en cola en su partición asociada y realizar un seguimiento de esa información a medida que se reciben los eventos. |
Event |
transportType(AmqpTransportType transport)
Establece el tipo de transporte por el que se produce toda la comunicación con Azure Event Hubs. |
Métodos heredados de java.lang.Object
Detalles del campo
DEFAULT_LOAD_BALANCING_UPDATE_INTERVAL
public static final Duration DEFAULT_LOAD_BALANCING_UPDATE_INTERVAL
Intervalo de actualización de equilibrio de carga predeterminado. El intervalo de equilibrio debe tener en cuenta la latencia entre el cliente y la cuenta de almacenamiento.
DEFAULT_OWNERSHIP_EXPIRATION_INTERVAL
public static final Duration DEFAULT_OWNERSHIP_EXPIRATION_INTERVAL
Expiración de la propiedad predeterminada.
Detalles del constructor
EventProcessorClientBuilder
public EventProcessorClientBuilder()
Crea una nueva instancia de EventProcessorClientBuilder.
Detalles del método
buildEventProcessorClient
public EventProcessorClient buildEventProcessorClient()
Esto creará un nuevo EventProcessorClient configurado con las opciones establecidas en este generador. Cada llamada a este método devolverá una nueva instancia de EventProcessorClient.
Todas las particiones procesadas por esto EventProcessorClient comenzarán a procesarse desde earliest() el evento disponible en las particiones respectivas.
Returns:
checkpointStore
public EventProcessorClientBuilder checkpointStore(CheckpointStore checkpointStore)
Establece el objeto CheckpointStore que EventProcessorClient se usará para almacenar la propiedad de la partición y la información del punto de control.
Los usuarios pueden, opcionalmente, proporcionar su propia implementación de que almacenará la información de propiedad y punto de CheckpointStore control.
Parameters:
Returns:
clientOptions
public EventProcessorClientBuilder clientOptions(ClientOptions clientOptions)
Establece las opciones de cliente para el cliente del procesador. El identificador de aplicación establecido en las opciones de cliente se usará para el seguimiento. Los encabezados establecidos en ClientOptions
no se usan actualmente, pero se pueden usar en versiones posteriores para agregar al mensaje AMQP.
Parameters:
Returns:
configuration
public EventProcessorClientBuilder configuration(Configuration configuration)
Establece el almacén de configuración que se usa durante la construcción del cliente de servicio. Si no se especifica, el almacén de configuración predeterminado se usa para configurar .EventHubAsyncClient Use NONE para omitir el uso de opciones de configuración durante la construcción.
Parameters:
Returns:
connectionString
public EventProcessorClientBuilder connectionString(String connectionString)
Establece la información de credenciales dada una cadena de conexión a la instancia del centro de eventos.
Si la cadena de conexión se copia del espacio de nombres de Event Hubs, es probable que no contenga el nombre en el centro de eventos deseado, que es necesario. En este caso, el nombre se puede agregar manualmente agregando "EntityPath=EVENT_HUB_NAME" al final de la cadena de conexión. Por ejemplo, "EntityPath=telemetry-hub".
Si ha definido una directiva de acceso compartido directamente en el propio centro de eventos, copiar la cadena de conexión de ese centro de eventos dará lugar a una cadena de conexión que contenga el nombre.
Parameters:
Returns:
connectionString
public EventProcessorClientBuilder connectionString(String connectionString, String eventHubName)
Establece la información de credenciales dada una cadena de conexión al espacio de nombres y el nombre de Event Hubs en una instancia específica del centro de eventos.
Parameters:
Returns:
consumerGroup
public EventProcessorClientBuilder consumerGroup(String consumerGroup)
Establece el nombre del grupo de consumidores desde el EventProcessorClient que debe consumir eventos.
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(AzureNamedKeyCredential credential)
Establece la información de credenciales a la que se conectará la instancia del centro de eventos y cómo autorizarla.
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(AzureSasCredential credential)
Establece la información de credenciales a la que se conectará la instancia del centro de eventos y cómo autorizarla.
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(TokenCredential credential)
Establece el TokenCredential objeto utilizado para autorizar las solicitudes enviadas al servicio. Consulte la documentación de autenticación e identidad de Azure SDK para Java para más información sobre el uso adecuado del TokenCredential tipo.
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, AzureNamedKeyCredential credential)
Establece la información de credenciales a la que se conectará la instancia del centro de eventos y cómo autorizarla.
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, AzureSasCredential credential)
Establece la información de credenciales a la que se conectará la instancia del centro de eventos y cómo autorizarla.
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, TokenCredential credential)
Establece la información de credenciales a la que se conectará la instancia del centro de eventos y cómo autorizarla.
Parameters:
Returns:
customEndpointAddress
public EventProcessorClientBuilder customEndpointAddress(String customEndpointAddress)
Establece una dirección de punto de conexión personalizada al conectarse al servicio Event Hubs. Esto puede ser útil cuando la red no permite conectarse a la dirección del punto de conexión de Azure Event Hubs estándar, pero permite conectarse a través de un intermediario. Por ejemplo: https://my.custom.endpoint.com:55300.
Si no se especifica ningún puerto, se usa el puerto predeterminado para .transportType(AmqpTransportType transport)
Parameters:
Returns:
eventHubName
public EventProcessorClientBuilder eventHubName(String eventHubName)
Establece el nombre del centro de eventos al que se va a conectar el cliente.
Parameters:
Returns:
fullyQualifiedNamespace
public EventProcessorClientBuilder fullyQualifiedNamespace(String fullyQualifiedNamespace)
Establece el nombre completo del espacio de nombres de Event Hubs.
Parameters:
Returns:
initialPartitionEventPosition
public EventProcessorClientBuilder initialPartitionEventPosition(Map
Establece el mapa que contiene la posición del evento que se va a usar para cada partición si no existe un punto de control para la partición en CheckpointStore. Esta asignación se asigna a la clave del identificador de partición.
Solo se debe usar una sobrecarga de initialPartitionEventPosition
al construir un EventProcessorClient.
Parameters:
Returns:
initialPartitionEventPosition
public EventProcessorClientBuilder initialPartitionEventPosition(Function
Establece la posición inicial predeterminada para cada partición si no existe un punto de control para esa partición en .CheckpointStore
Solo se debe usar una sobrecarga de initialPartitionEventPosition
al construir un EventProcessorClient.
Parameters:
partitionId
a un objeto EventPosition.
Returns:
loadBalancingStrategy
public EventProcessorClientBuilder loadBalancingStrategy(LoadBalancingStrategy loadBalancingStrategy)
el LoadBalancingStrategy objeto EventProcessorClient usará para reclamar la propiedad de la partición. De forma predeterminada, se usará un BALANCED enfoque.
Parameters:
Returns:
loadBalancingUpdateInterval
public EventProcessorClientBuilder loadBalancingUpdateInterval(Duration loadBalancingUpdateInterval)
Intervalo de tiempo entre ciclos de actualización de equilibrio de carga. Esto también suele ser el intervalo en el que se renueva la propiedad de las particiones. De forma predeterminada, este intervalo se establece en 10 segundos.
Parameters:
Returns:
partitionOwnershipExpirationInterval
public EventProcessorClientBuilder partitionOwnershipExpirationInterval(Duration partitionOwnershipExpirationInterval)
La duración del tiempo después de la cual expira la propiedad de la partición si la instancia del procesador propietaria no la renueva. Esta es la duración que esta instancia del procesador esperará antes de asumir la propiedad de las particiones que anteriormente eran propiedad de un procesador inactivo. De forma predeterminada, esta duración se establece en un minuto.
Parameters:
Returns:
prefetchCount
public EventProcessorClientBuilder prefetchCount(int prefetchCount)
Establece el recuento utilizado por los receptores para controlar el número de eventos que cada consumidor recibirá y pondrá en cola localmente sin tener en cuenta si una operación de recepción está activa actualmente.
Parameters:
Returns:
processError
public EventProcessorClientBuilder processError(Consumer
Función a la que se llama cuando se produce un error al procesar eventos. La entrada contiene la información de partición donde se produjo el error.
Parameters:
Returns:
processEvent
public EventProcessorClientBuilder processEvent(Consumer
Función a la que se llama para cada evento recibido por este .EventProcessorClient La entrada contiene el contexto de partición y los datos del evento.
Parameters:
Returns:
processEvent
public EventProcessorClientBuilder processEvent(Consumer
Función a la que se llama para cada evento recibido por este .EventProcessorClient La entrada contiene el contexto de partición y los datos del evento. Si se establece el tiempo de espera máximo, la recepción esperará a que se reciba un evento y, si no se recibe ningún evento, se invocará al consumidor con datos de evento NULL.
Parameters:
Returns:
processEventBatch
public EventProcessorClientBuilder processEventBatch(Consumer
Función a la que se llama para cada evento recibido por este .EventProcessorClient La entrada contiene el contexto de partición y los datos del evento. Si se establece el tiempo de espera máximo, la recepción esperará a que se reciba un evento y, si no se recibe ningún evento, se invocará al consumidor con datos de evento NULL.
Parameters:
Returns:
processEventBatch
public EventProcessorClientBuilder processEventBatch(Consumer
Función a la que se llama para cada evento recibido por este .EventProcessorClient La entrada contiene el contexto de partición y los datos del evento. Si se establece el tiempo de espera máximo, la recepción esperará a que se reciba un evento y, si no se recibe ningún evento, se invocará al consumidor con datos de evento NULL.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.checkpointStore(new SampleCheckpointStore())
.processEventBatch(eventBatchContext -> {
eventBatchContext.getEvents().forEach(eventData -> {
System.out.printf("Partition id = %s and sequence number of event = %s%n",
eventBatchContext.getPartitionContext().getPartitionId(),
eventData.getSequenceNumber());
});
}, 50, Duration.ofSeconds(30))
.processError(errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
})
.buildEventProcessorClient();
Parameters:
Returns:
processPartitionClose
public EventProcessorClientBuilder processPartitionClose(Consumer
Función a la que se llama cuando se detiene un procesamiento para una partición. La entrada contiene la información de partición junto con el motivo de detener el procesamiento de eventos para esta partición.
Parameters:
Returns:
processPartitionInitialization
public EventProcessorClientBuilder processPartitionInitialization(Consumer
Función a la que se llama antes de que se inicie el procesamiento para una partición. La entrada contiene la información de partición junto con una posición inicial predeterminada para procesar eventos que se usarán en el caso de que un punto de control no esté disponible en CheckpointStore. Los usuarios pueden actualizar esta posición si se prefiere una posición inicial diferente.
Parameters:
Returns:
proxyOptions
public EventProcessorClientBuilder proxyOptions(ProxyOptions proxyOptions)
Establece la configuración de proxy que se va a usar para EventHubAsyncClient. Cuando se configura un proxy, AMQP_WEB_SOCKETS se debe usar para el tipo de transporte.
Parameters:
Returns:
retry
@Deprecated
public EventProcessorClientBuilder retry(AmqpRetryOptions retryOptions)
Obsoleto
Establece la directiva de reintento para EventHubAsyncClient. Si no se especifica, se usan las opciones de reintento predeterminadas.
Parameters:
Returns:
retryOptions
public EventProcessorClientBuilder retryOptions(AmqpRetryOptions retryOptions)
Establece la directiva de reintento para EventHubAsyncClient. Si no se especifica, se usan las opciones de reintento predeterminadas.
Parameters:
Returns:
trackLastEnqueuedEventProperties
public EventProcessorClientBuilder trackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties)
Establece si el procesador de eventos debe solicitar información sobre el último evento en cola en su partición asociada y realizar un seguimiento de esa información a medida que se reciben los eventos.
Cuando se realiza un seguimiento de la información sobre el último evento en cola de la partición, cada evento recibido del servicio Event Hubs llevará metadatos sobre la partición que, de lo contrario, no lo haría. Esto da como resultado una pequeña cantidad de consumo de ancho de banda de red adicional que generalmente es un equilibrio favorable cuando se considera en contra de realizar periódicamente solicitudes de propiedades de partición mediante el cliente del centro de eventos.
Parameters:
true
si los eventos resultantes realizarán un seguimiento de la última información puesta en cola de esa partición; false
Lo contrario.
Returns:
transportType
public EventProcessorClientBuilder transportType(AmqpTransportType transport)
Establece el tipo de transporte por el que se produce toda la comunicación con Azure Event Hubs. El valor predeterminado es AMQP.
Parameters:
Returns:
Se aplica a
Azure SDK for Java