EventProcessorClientBuilder Classe
- java.
lang. Object - com.
azure. messaging. eventhubs. EventProcessorClientBuilder
- com.
Implementações
public class EventProcessorClientBuilder
implements TokenCredentialTrait<EventProcessorClientBuilder>, AzureNamedKeyCredentialTrait<EventProcessorClientBuilder>, ConnectionStringTrait<EventProcessorClientBuilder>, AzureSasCredentialTrait<EventProcessorClientBuilder>, AmqpTrait<EventProcessorClientBuilder>, ConfigurationTrait<EventProcessorClientBuilder>
Essa classe fornece uma API de construtor fluente para ajudar a ajudar na configuração e na instanciação do EventProcessorClient. Chamar buildEventProcessorClient() constrói uma nova instância do EventProcessorClient.
Para criar uma instância do EventProcessorClient, os seguintes campos são necessários:
CheckpointStore - Uma implementação do CheckpointStore que armazena informações de propriedade de ponto de verificação e partição para habilitar o balanceamento de carga e os eventos processados de ponto de verificação.
processEvent(Consumer<EventContext> processEvent) ou processEventBatch(Consumer<EventBatchContext> processEventBatch, int maxBatchSize, Duration maxWaitTime) – um retorno de chamada que processa eventos recebidos do Hub de Eventos.
processError(Consumer<ErrorContext> processError) - Um retorno de chamada que manipula erros que podem ocorrer durante a execução do EventProcessorClient.
Credenciais para executar operações em Hubs de Eventos do Azure. Eles podem ser definidos usando um dos seguintes métodos:
- connectionString(String connectionString) com uma cadeia de conexão para um Hub de Eventos específico.
- connectionString(String connectionString, String eventHubName) com uma cadeia de conexão de namespace do Hub de Eventos e o nome do Hub de Eventos.
- credential(String fullyQualifiedNamespace, String eventHubName, TokenCredential credential) com o namespace totalmente qualificado, o nome do Hub de Eventos e um conjunto de credenciais autorizadas a usar o Hub de Eventos.
- credential(TokenCredential credential), credential(AzureSasCredential credential)ou credential(AzureNamedKeyCredential credential) junto com fullyQualifiedNamespace(String fullyQualifiedNamespace) e eventHubName(String eventHubName). O namespace totalmente qualificado, o nome do Hub de Eventos e as credenciais autorizadas para usar o Hub de Eventos.
Os exemplos mostrados neste documento usam um objeto de credencial chamado DefaultAzureCredential para autenticação, que é apropriado para a maioria dos cenários, incluindo ambientes locais de desenvolvimento e produção. Além disso, recomendamos usar a identidade gerenciada para autenticação em ambientes de produção. Você pode encontrar mais informações sobre diferentes maneiras de autenticação e seus tipos de credencial correspondentes na documentação da Identidade do Azure".
Exemplo: Construir um EventProcessorClient
O exemplo de código a seguir demonstra a criação do cliente do processador. O cliente processador é recomendado para cenários de produção porque pode balancear a carga entre várias instâncias em execução, pode executar pontos de verificação e se reconecta em falhas transitórias, como interrupções de rede. O exemplo a seguir usa um azure-messaging-eventhubs-checkpointstore-blob na memória CheckpointStore fornece um repositório de ponto de verificação apoiado por Armazenamento de Blobs do Azure.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.checkpointStore(new SampleCheckpointStore())
.processEvent(eventContext -> {
System.out.printf("Partition id = %s and sequence number of event = %s%n",
eventContext.getPartitionContext().getPartitionId(),
eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
})
.buildEventProcessorClient();
Resumo do campo
Modificador e tipo | Campo e descrição |
---|---|
static final Duration |
DEFAULT_LOAD_BALANCING_UPDATE_INTERVAL
Intervalo de atualização de balanceamento de carga padrão. |
static final Duration |
DEFAULT_OWNERSHIP_EXPIRATION_INTERVAL
Expiração de propriedade padrão. |
Resumo do Construtor
Construtor | Description |
---|---|
EventProcessorClientBuilder() |
Cria uma nova instância de EventProcessorClientBuilder. |
Resumo do método
Modificador e tipo | Método e descrição |
---|---|
Event |
buildEventProcessorClient()
Isso criará um novo EventProcessorClient configurado com as opções definidas neste construtor. |
Event |
checkpointStore(CheckpointStore checkpointStore)
Define o CheckpointStore que EventProcessorClient será usado para armazenar informações de ponto de verificação e propriedade da partição. |
Event |
clientOptions(ClientOptions clientOptions)
Define as opções de cliente para o cliente processador. |
Event |
configuration(Configuration configuration)
Define o repositório de configuração usado durante a construção do cliente de serviço. |
Event |
connectionString(String connectionString)
Define as informações de credencial dada uma cadeia de conexão para a instância do Hub de Eventos. |
Event |
connectionString(String connectionString, String eventHubName)
Define as informações de credencial fornecidas por uma cadeia de conexão para o namespace e o nome dos Hubs de Eventos como uma instância específica do Hub de Eventos. |
Event |
consumerGroup(String consumerGroup)
Define o nome do grupo de consumidores do qual o EventProcessorClient deve consumir eventos. |
Event |
credential(AzureNamedKeyCredential credential)
Define as informações de credencial às quais a instância do Hub de Eventos se conectar e como autorizar nela. |
Event |
credential(AzureSasCredential credential)
Define as informações de credencial às quais a instância do Hub de Eventos se conectar e como autorizar nela. |
Event |
credential(TokenCredential credential)
Define o TokenCredential usado para autorizar solicitações enviadas ao serviço. |
Event |
credential(String fullyQualifiedNamespace, String eventHubName, AzureNamedKeyCredential credential)
Define as informações de credencial às quais a instância do Hub de Eventos se conectar e como autorizar nela. |
Event |
credential(String fullyQualifiedNamespace, String eventHubName, AzureSasCredential credential)
Define as informações de credencial às quais a instância do Hub de Eventos se conectar e como autorizar nela. |
Event |
credential(String fullyQualifiedNamespace, String eventHubName, TokenCredential credential)
Define as informações de credencial às quais a instância do Hub de Eventos se conectar e como autorizar nela. |
Event |
customEndpointAddress(String customEndpointAddress)
Define um endereço de ponto de extremidade personalizado ao se conectar ao serviço de Hubs de Eventos. |
Event |
eventHubName(String eventHubName)
Define o nome do Hub de Eventos ao qual conectar o cliente. |
Event |
fullyQualifiedNamespace(String fullyQualifiedNamespace)
Define o nome totalmente qualificado para o namespace dos Hubs de Eventos. |
Event |
initialPartitionEventPosition(Map<String,EventPosition> initialPartitionEventPosition)
Define o mapa que contém a posição do evento a ser usada para cada partição se um ponto de verificação para a partição não existir em CheckpointStore. |
Event |
initialPartitionEventPosition(Function<String,EventPosition> initialEventPositionProvider)
Define a posição inicial padrão para cada partição se um ponto de verificação para essa partição não existir no CheckpointStore. |
Event |
loadBalancingStrategy(LoadBalancingStrategy loadBalancingStrategy)
O LoadBalancingStrategy usará EventProcessorClient para reivindicar a propriedade da partição. |
Event |
loadBalancingUpdateInterval(Duration loadBalancingUpdateInterval)
O intervalo de tempo entre os ciclos de atualização de balanceamento de carga. |
Event |
partitionOwnershipExpirationInterval(Duration partitionOwnershipExpirationInterval)
A duração do tempo após a qual a propriedade da partição expira se não for renovada pela instância de processador proprietária. |
Event |
prefetchCount(int prefetchCount)
Define a contagem usada pelos receptores para controlar o número de eventos que cada consumidor receberá ativamente e fará fila localmente sem considerar se uma operação de recebimento está ativa no momento. |
Event |
processError(Consumer<ErrorContext> processError)
A função que é chamada quando ocorre um erro durante o processamento de eventos. |
Event |
processEvent(Consumer<EventContext> processEvent)
A função que é chamada para cada evento recebido por este EventProcessorClient. |
Event |
processEvent(Consumer<EventContext> processEvent, Duration maxWaitTime)
A função que é chamada para cada evento recebido por este EventProcessorClient. |
Event |
processEventBatch(Consumer<EventBatchContext> processEventBatch, int maxBatchSize)
A função que é chamada para cada evento recebido por este EventProcessorClient. |
Event |
processEventBatch(Consumer<EventBatchContext> processEventBatch, int maxBatchSize, Duration maxWaitTime)
A função que é chamada para cada evento recebido por este EventProcessorClient. |
Event |
processPartitionClose(Consumer<CloseContext> closePartition)
A função que é chamada quando um processamento de uma partição é interrompido. |
Event |
processPartitionInitialization(Consumer<InitializationContext> initializePartition)
A função que é chamada antes do início do processamento de uma partição. |
Event |
proxyOptions(ProxyOptions proxyOptions)
Define a configuração de proxy a ser usada para EventHubAsyncClient. |
Event |
retry(AmqpRetryOptions retryOptions)
Preterido
Substituído por retryOptions(AmqpRetryOptions retryOptions).
Define a política de repetição para EventHubAsyncClient. |
Event |
retryOptions(AmqpRetryOptions retryOptions)
Define a política de repetição para EventHubAsyncClient. |
Event |
trackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties)
Define se o processador de eventos deve ou não solicitar informações sobre o último evento enfileirado em sua partição associada e acompanhar essas informações à medida que os eventos são recebidos. |
Event |
transportType(AmqpTransportType transport)
Define o tipo de transporte pelo qual ocorre toda a comunicação com Hubs de Eventos do Azure. |
Métodos herdados de java.lang.Object
Detalhes do campo
DEFAULT_LOAD_BALANCING_UPDATE_INTERVAL
public static final Duration DEFAULT_LOAD_BALANCING_UPDATE_INTERVAL
Intervalo de atualização de balanceamento de carga padrão. O intervalo de balanceamento deve levar em conta a latência entre o cliente e a conta de armazenamento.
DEFAULT_OWNERSHIP_EXPIRATION_INTERVAL
public static final Duration DEFAULT_OWNERSHIP_EXPIRATION_INTERVAL
Expiração de propriedade padrão.
Detalhes do construtor
EventProcessorClientBuilder
public EventProcessorClientBuilder()
Cria uma nova instância de EventProcessorClientBuilder.
Detalhes do método
buildEventProcessorClient
public EventProcessorClient buildEventProcessorClient()
Isso criará um novo EventProcessorClient configurado com as opções definidas neste construtor. Cada chamada para esse método retornará uma nova instância de EventProcessorClient.
Todas as partições processadas por isso EventProcessorClient iniciarão o processamento do earliest() evento disponível nas respectivas partições.
Returns:
checkpointStore
public EventProcessorClientBuilder checkpointStore(CheckpointStore checkpointStore)
Define o CheckpointStore que EventProcessorClient será usado para armazenar informações de ponto de verificação e propriedade da partição.
Os usuários podem, opcionalmente, fornecer sua própria implementação da qual armazenará informações de propriedade e ponto de CheckpointStore verificação.
Parameters:
Returns:
clientOptions
public EventProcessorClientBuilder clientOptions(ClientOptions clientOptions)
Define as opções de cliente para o cliente processador. A ID do aplicativo definida nas opções do cliente será usada para rastreamento. Os cabeçalhos definidos em ClientOptions
atualmente não são usados, mas podem ser usados em versões posteriores para adicionar à mensagem AMQP.
Parameters:
Returns:
configuration
public EventProcessorClientBuilder configuration(Configuration configuration)
Define o repositório de configuração usado durante a construção do cliente de serviço. Se não for especificado, o repositório de configuração padrão será usado para configurar o EventHubAsyncClient. Use NONE para ignorar o uso de definições de configuração durante a construção.
Parameters:
Returns:
connectionString
public EventProcessorClientBuilder connectionString(String connectionString)
Define as informações de credencial dada uma cadeia de conexão para a instância do Hub de Eventos.
Se a cadeia de conexão for copiada do namespace dos Hubs de Eventos, ela provavelmente não conterá o nome para o Hub de Eventos desejado, o que é necessário. Nesse caso, o nome pode ser adicionado manualmente adicionando "EntityPath=EVENT_HUB_NAME" ao final da cadeia de conexão. Por exemplo, "EntityPath=telemetry-hub".
Se você tiver definido uma política de acesso compartilhado diretamente no próprio Hub de Eventos, copiar a cadeia de conexão desse Hub de Eventos resultará em uma cadeia de conexão que contém o nome.
Parameters:
Returns:
connectionString
public EventProcessorClientBuilder connectionString(String connectionString, String eventHubName)
Define as informações de credencial fornecidas por uma cadeia de conexão para o namespace e o nome dos Hubs de Eventos como uma instância específica do Hub de Eventos.
Parameters:
Returns:
consumerGroup
public EventProcessorClientBuilder consumerGroup(String consumerGroup)
Define o nome do grupo de consumidores do qual o EventProcessorClient deve consumir eventos.
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(AzureNamedKeyCredential credential)
Define as informações de credencial às quais a instância do Hub de Eventos se conectar e como autorizar nela.
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(AzureSasCredential credential)
Define as informações de credencial às quais a instância do Hub de Eventos se conectar e como autorizar nela.
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(TokenCredential credential)
Define o TokenCredential usado para autorizar solicitações enviadas ao serviço. Consulte a documentação de identidade e autenticação do SDK do Azure para Java para obter mais detalhes sobre o uso adequado do TokenCredential tipo.
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, AzureNamedKeyCredential credential)
Define as informações de credencial às quais a instância do Hub de Eventos se conectar e como autorizar nela.
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, AzureSasCredential credential)
Define as informações de credencial às quais a instância do Hub de Eventos se conectar e como autorizar nela.
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, TokenCredential credential)
Define as informações de credencial às quais a instância do Hub de Eventos se conectar e como autorizar nela.
Parameters:
Returns:
customEndpointAddress
public EventProcessorClientBuilder customEndpointAddress(String customEndpointAddress)
Define um endereço de ponto de extremidade personalizado ao se conectar ao serviço de Hubs de Eventos. Isso pode ser útil quando sua rede não permite a conexão com o endereço padrão do ponto de extremidade Hubs de Eventos do Azure, mas permite a conexão por meio de um intermediário. Por exemplo: https://my.custom.endpoint.com:55300.
Se nenhuma porta for especificada, a porta padrão para o transportType(AmqpTransportType transport) será usada.
Parameters:
Returns:
eventHubName
public EventProcessorClientBuilder eventHubName(String eventHubName)
Define o nome do Hub de Eventos ao qual conectar o cliente.
Parameters:
Returns:
fullyQualifiedNamespace
public EventProcessorClientBuilder fullyQualifiedNamespace(String fullyQualifiedNamespace)
Define o nome totalmente qualificado para o namespace dos Hubs de Eventos.
Parameters:
Returns:
initialPartitionEventPosition
public EventProcessorClientBuilder initialPartitionEventPosition(Map
Define o mapa que contém a posição do evento a ser usada para cada partição se um ponto de verificação para a partição não existir em CheckpointStore. Esse mapa é retirado da ID da partição.
Apenas uma sobrecarga de initialPartitionEventPosition
deve ser usada ao construir um EventProcessorClient.
Parameters:
Returns:
initialPartitionEventPosition
public EventProcessorClientBuilder initialPartitionEventPosition(Function
Define a posição inicial padrão para cada partição se um ponto de verificação para essa partição não existir no CheckpointStore.
Apenas uma sobrecarga de initialPartitionEventPosition
deve ser usada ao construir um EventProcessorClient.
Parameters:
Returns:
loadBalancingStrategy
public EventProcessorClientBuilder loadBalancingStrategy(LoadBalancingStrategy loadBalancingStrategy)
O LoadBalancingStrategy usará EventProcessorClient para reivindicar a propriedade da partição. Por padrão, uma BALANCED abordagem será usada.
Parameters:
Returns:
loadBalancingUpdateInterval
public EventProcessorClientBuilder loadBalancingUpdateInterval(Duration loadBalancingUpdateInterval)
O intervalo de tempo entre os ciclos de atualização de balanceamento de carga. Isso também geralmente é o intervalo no qual a propriedade das partições é renovada. Por padrão, esse intervalo é definido como 10 segundos.
Parameters:
Returns:
partitionOwnershipExpirationInterval
public EventProcessorClientBuilder partitionOwnershipExpirationInterval(Duration partitionOwnershipExpirationInterval)
A duração do tempo após a qual a propriedade da partição expira se não for renovada pela instância de processador proprietária. Essa é a duração que essa instância do processador aguardará antes de assumir a propriedade de partições anteriormente pertencentes a um processador inativo. Por padrão, essa duração é definida como um minuto.
Parameters:
Returns:
prefetchCount
public EventProcessorClientBuilder prefetchCount(int prefetchCount)
Define a contagem usada pelos receptores para controlar o número de eventos que cada consumidor receberá ativamente e fará fila localmente sem considerar se uma operação de recebimento está ativa no momento.
Parameters:
Returns:
processError
public EventProcessorClientBuilder processError(Consumer
A função que é chamada quando ocorre um erro durante o processamento de eventos. A entrada contém as informações de partição em que o erro ocorreu.
Parameters:
Returns:
processEvent
public EventProcessorClientBuilder processEvent(Consumer
A função que é chamada para cada evento recebido por este EventProcessorClient. A entrada contém o contexto de partição e os dados do evento.
Parameters:
Returns:
processEvent
public EventProcessorClientBuilder processEvent(Consumer
A função que é chamada para cada evento recebido por este EventProcessorClient. A entrada contém o contexto de partição e os dados do evento. Se o tempo máximo de espera for definido, o recebimento aguardará que essa duração receba um evento e, se nenhum evento for recebido, o consumidor será invocado com dados de evento nulos.
Parameters:
Returns:
processEventBatch
public EventProcessorClientBuilder processEventBatch(Consumer
A função que é chamada para cada evento recebido por este EventProcessorClient. A entrada contém o contexto de partição e os dados do evento. Se o tempo máximo de espera for definido, o recebimento aguardará que essa duração receba um evento e, se nenhum evento for recebido, o consumidor será invocado com dados de evento nulos.
Parameters:
Returns:
processEventBatch
public EventProcessorClientBuilder processEventBatch(Consumer
A função que é chamada para cada evento recebido por este EventProcessorClient. A entrada contém o contexto de partição e os dados do evento. Se o tempo máximo de espera for definido, o recebimento aguardará que essa duração receba um evento e, se nenhum evento for recebido, o consumidor será invocado com dados de evento nulos.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.checkpointStore(new SampleCheckpointStore())
.processEventBatch(eventBatchContext -> {
eventBatchContext.getEvents().forEach(eventData -> {
System.out.printf("Partition id = %s and sequence number of event = %s%n",
eventBatchContext.getPartitionContext().getPartitionId(),
eventData.getSequenceNumber());
});
}, 50, Duration.ofSeconds(30))
.processError(errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
})
.buildEventProcessorClient();
Parameters:
Returns:
processPartitionClose
public EventProcessorClientBuilder processPartitionClose(Consumer
A função que é chamada quando um processamento de uma partição é interrompido. A entrada contém as informações de partição junto com o motivo para interromper o processamento de eventos para essa partição.
Parameters:
Returns:
processPartitionInitialization
public EventProcessorClientBuilder processPartitionInitialization(Consumer
A função que é chamada antes do início do processamento de uma partição. A entrada contém as informações de partição juntamente com uma posição inicial padrão para processar eventos que serão usados no caso de um ponto de verificação indisponível no CheckpointStore. Os usuários poderão atualizar essa posição se uma posição inicial diferente for preferencial.
Parameters:
Returns:
proxyOptions
public EventProcessorClientBuilder proxyOptions(ProxyOptions proxyOptions)
Define a configuração de proxy a ser usada para EventHubAsyncClient. Quando um proxy é configurado, AMQP_WEB_SOCKETS deve ser usado para o tipo de transporte.
Parameters:
Returns:
retry
@Deprecated
public EventProcessorClientBuilder retry(AmqpRetryOptions retryOptions)
Preterido
Define a política de repetição para EventHubAsyncClient. Se não for especificado, as opções de repetição padrão serão usadas.
Parameters:
Returns:
retryOptions
public EventProcessorClientBuilder retryOptions(AmqpRetryOptions retryOptions)
Define a política de repetição para EventHubAsyncClient. Se não for especificado, as opções de repetição padrão serão usadas.
Parameters:
Returns:
trackLastEnqueuedEventProperties
public EventProcessorClientBuilder trackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties)
Define se o processador de eventos deve ou não solicitar informações sobre o último evento enfileirado em sua partição associada e acompanhar essas informações à medida que os eventos são recebidos.
Quando informações sobre o último evento enfileirado da partição estiverem sendo rastreadas, cada evento recebido do serviço hubs de eventos carregará metadados sobre a partição que, de outra forma, não o faria. Isso resulta em uma pequena quantidade de consumo adicional de largura de banda de rede que geralmente é uma compensação favorável quando considerada para fazer periodicamente solicitações de propriedades de partição usando o cliente do Hub de Eventos.
Parameters:
true
se os eventos resultantes acompanharem as últimas informações enfileiradas para essa partição; false
Caso contrário.
Returns:
transportType
public EventProcessorClientBuilder transportType(AmqpTransportType transport)
Define o tipo de transporte pelo qual ocorre toda a comunicação com Hubs de Eventos do Azure. O valor padrão é AMQP.
Parameters:
Returns:
Aplica-se a
Azure SDK for Java