Compartilhar via


EventHubProducerAsyncClient Classe

  • java.lang.Object
    • com.azure.messaging.eventhubs.EventHubProducerAsyncClient

Implementações

public class EventHubProducerAsyncClient
implements Closeable

Um produtor assíncrono responsável pela transmissão EventData para um Hub de Eventos específico, agrupado em lotes. Dependendo do CreateBatchOptions especificado ao criar um EventDataBatch, os eventos podem ser roteados automaticamente para uma partição disponível ou específicos para uma partição. Mais informações e recomendações específicas para estratégias a serem usadas ao publicar eventos em: Distribuir eventos para partições

É recomendável permitir o roteamento automático de partições quando:

  • O envio de eventos precisa estar altamente disponível.
  • Os dados do evento devem ser distribuídos uniformemente entre todas as partições disponíveis.

Se nenhuma ID de partição for especificada, as seguintes regras serão usadas para selecionar automaticamente uma:

  1. Distribua os eventos igualmente entre todas as partições disponíveis usando uma abordagem round robin.
  2. Se uma partição ficar indisponível, o serviço hubs de eventos a detectará automaticamente e encaminhará a mensagem para outra partição disponível.

Os exemplos mostrados neste documento usam um objeto de credencial chamado DefaultAzureCredential para autenticação, que é apropriado para a maioria dos cenários, incluindo ambientes locais de desenvolvimento e produção. Além disso, recomendamos usar a identidade gerenciada para autenticação em ambientes de produção. Você pode encontrar mais informações sobre diferentes maneiras de autenticação e seus tipos de credencial correspondentes na documentação da Identidade do Azure".

Exemplo: Construir um EventHubProducerAsyncClient

O exemplo de código a seguir demonstra a criação do cliente EventHubProducerAsyncClientassíncrono . O fullyQualifiedNamespace é o nome do host do Namespace dos Hubs de Eventos. Ele é listado no painel "Essentials" depois de navegar até o Namespace dos Hubs de Eventos por meio do Portal do Azure.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventHubProducerAsyncClient producer = new EventHubClientBuilder()
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .buildAsyncProducerClient();

Exemplo: criar um produtor e publicar eventos em qualquer partição

O exemplo de código a seguir demonstra como enviar um conjunto de eventos para o Hub de Eventos. Os eventos são distribuídos por meio de roteamento automático porque nenhuma opção foi definida ao criar o por meio createBatch()de EventDataBatch . O uso EventDataBatch é recomendado porque vários eventos podem ser enviados pela conexão subjacente com uma única mensagem.

createBatch() e send(EventDataBatch batch) não são chamadas de bloqueio. Depois de configurar a operação, sua representação assíncrona é retornada. O Mono deve ser assinado, como o exemplo abaixo, para começar a publicar o lote de eventos.

// Creating a batch without options set, will allow for automatic routing of events to any partition.
 producer.createBatch().flatMap(batch -> {
     batch.tryAdd(new EventData("test-event-1"));
     batch.tryAdd(new EventData("test-event-2"));
     return producer.send(batch);
 }).subscribe(unused -> {
 },
     error -> System.err.println("Error occurred while sending batch:" + error),
     () -> System.out.println("Send complete."));

Exemplo: Publicar eventos na partição "1"

O exemplo de código a seguir demonstra como enviar um conjunto de eventos para o Hub de Eventos para a partição "1". EventDataBatch por meio de createBatch(CreateBatchOptions options). Os identificadores de partição podem ser obtidos usando getPartitionIds(). O uso EventDataBatch é recomendado porque vários eventos podem ser enviados pela conexão subjacente com uma única mensagem.

createBatch(CreateBatchOptions options) e send(EventDataBatch batch) não são chamadas de bloqueio. Depois de configurar a operação, sua representação assíncrona é retornada. O Mono deve ser assinado, como o exemplo abaixo, para começar a publicar o lote de eventos.

CreateBatchOptions options = new CreateBatchOptions().setPartitionId("1");
 producer.createBatch(options).flatMap(batch -> {
     batch.tryAdd(new EventData("test-event-1"));
     batch.tryAdd(new EventData("test-event-2"));
     return producer.send(batch);
 }).subscribe(unused -> {
 },
     error -> System.err.println("Error occurred while sending batch to partition 1:" + error),
     () -> System.out.println("Send to partition 1 complete."));

Exemplo: publicar eventos na mesma partição, agrupados usando chave de partição

No exemplo de código abaixo, todos os eventos com a mesma chave de partição, "bread" são enviados para a mesma partição. Quando setPartitionId(String partitionId) é especificado, ele informa ao serviço hubs de eventos que esses eventos pertencem ao mesmo grupo e devem pertencer à mesma partição. Útil no caso em que os desenvolvedores desejam que os eventos acabem na mesma partição, mas não se importam em qual partição ela acaba.

createBatch(CreateBatchOptions options) e send(EventDataBatch batch) não são chamadas de bloqueio. Depois de configurar a operação, sua representação assíncrona é retornada. O Mono deve ser assinado, como o exemplo abaixo, para começar a publicar o lote de eventos.

CreateBatchOptions options = new CreateBatchOptions().setPartitionKey("bread");

 producer.createBatch(options).flatMap(batch -> {
     batch.tryAdd(new EventData("sourdough"));
     batch.tryAdd(new EventData("rye"));
     return producer.send(batch);
 }).subscribe(unused -> {
 },
     error -> System.err.println("Error occurred while sending batch:" + error),
     () -> System.out.println("Send complete."));

Exemplo: publicar eventos usando um tamanho limitado EventDataBatch

No exemplo de código abaixo, todos os lotes são criados com um tamanho máximo de 256 bytes usando setMaximumSizeInBytes(int maximumSizeInBytes) é especificado. Os eventos dentro do lote são roteados automaticamente porque nenhuma ID de partição ou chave de partição é especificada.

createBatch(CreateBatchOptions options) e send(EventDataBatch batch) não são chamadas de bloqueio. Depois de configurar a operação, sua representação assíncrona é retornada. O Mono deve ser assinado, como o exemplo abaixo, para começar a publicar o lote de eventos.

Flux<EventData> telemetryEvents = Flux.just(firstEvent, secondEvent);

 // Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch.
 // In this case, all the batches created with these options are limited to 256 bytes.
 CreateBatchOptions options = new CreateBatchOptions().setMaximumSizeInBytes(256);
 AtomicReference<EventDataBatch> currentBatch = new AtomicReference<>(
     producer.createBatch(options).block());

 // The sample Flux contains two events, but it could be an infinite stream of telemetry events.
 Disposable publishingOperation = telemetryEvents.flatMap(event -> {
     EventDataBatch batch = currentBatch.get();

     if (batch.tryAdd(event)) {
         return Mono.empty();
     }

     // Send the current batch then create another size-limited EventDataBatch and try to fit the event into
     // this new batch.
     return producer.send(batch).then(
         producer.createBatch(options).map(newBatch -> {
             currentBatch.set(newBatch);

             // Add the event that did not fit in the previous batch.
             if (!newBatch.tryAdd(event)) {
                 return Mono.error(new IllegalArgumentException(
                     "Event was too large to fit in an empty batch. Max size: "
                         + newBatch.getMaxSizeInBytes()));
             }

             return Mono.empty();
         }));
 }).subscribe(unused -> {
 }, error -> {
     System.out.println("Error occurred publishing events: " + error);
 }, () -> {
     System.out.println("Completed publishing operation.");
 });

Resumo do método

Modificador e tipo Método e descrição
void close()

Descarta o EventHubProducerAsyncClient.

Mono<EventDataBatch> createBatch()

Cria um EventDataBatch que pode se ajustar a tantos eventos quanto o transporte permite.

Mono<EventDataBatch> createBatch(CreateBatchOptions options)

Cria um EventDataBatch configurado com as opções especificadas.

String getEventHubName()

Obtém o nome do Hub de Eventos com o qual esse cliente interage.

Mono<EventHubProperties> getEventHubProperties()

Recupera informações sobre um Hub de Eventos, incluindo o número de partições presentes e seus identificadores.

String getFullyQualifiedNamespace()

Obtém o namespace dos Hubs de Eventos totalmente qualificado ao qual a conexão está associada.

String getIdentifier()

Obtém o identificador do cliente.

Flux<String> getPartitionIds()

Recupera os identificadores para as partições de um Hub de Eventos.

Mono<PartitionProperties> getPartitionProperties(String partitionId)

Recupera informações sobre uma partição específica para um Hub de Eventos, incluindo elementos que descrevem os eventos disponíveis no fluxo de eventos de partição.

Mono<Void> send(EventDataBatch batch)

Envia o lote para o Hub de Eventos associado.

Mono<Void> send(Iterable<EventData> events)

Envia um conjunto de eventos para o Hub de Eventos associado usando uma abordagem em lote.

Mono<Void> send(Iterable<EventData> events, SendOptions options)

Envia um conjunto de eventos para o Hub de Eventos associado usando uma abordagem em lote.

Métodos herdados de java.lang.Object

Detalhes do método

close

public void close()

Descarta o EventHubProducerAsyncClient. Se o cliente tiver uma conexão dedicada, a conexão subjacente também será fechada.

createBatch

public Mono createBatch()

Cria um EventDataBatch que pode se ajustar a tantos eventos quanto o transporte permite.

Returns:

Um novo EventDataBatch que pode se ajustar a tantos eventos quanto o transporte permite.

createBatch

public Mono createBatch(CreateBatchOptions options)

Cria um EventDataBatch configurado com as opções especificadas.

Parameters:

options - Um conjunto de opções usado para configurar o EventDataBatch.

Returns:

Um novo EventDataBatch que pode se ajustar a tantos eventos quanto o transporte permite.

getEventHubName

public String getEventHubName()

Obtém o nome do Hub de Eventos com o qual esse cliente interage.

Returns:

O nome do Hub de Eventos com o qual esse cliente interage.

getEventHubProperties

public Mono getEventHubProperties()

Recupera informações sobre um Hub de Eventos, incluindo o número de partições presentes e seus identificadores.

Returns:

O conjunto de informações para o Hub de Eventos ao qual esse cliente está associado.

getFullyQualifiedNamespace

public String getFullyQualifiedNamespace()

Obtém o namespace dos Hubs de Eventos totalmente qualificado ao qual a conexão está associada. Isso provavelmente é semelhante a {yournamespace}.servicebus.windows.net.

Returns:

O namespace dos Hubs de Eventos totalmente qualificado ao qual a conexão está associada.

getIdentifier

public String getIdentifier()

Obtém o identificador do cliente.

Returns:

A cadeia de caracteres de identificador exclusiva para o cliente atual.

getPartitionIds

public Flux getPartitionIds()

Recupera os identificadores para as partições de um Hub de Eventos.

Returns:

Um Fluxo de identificadores para as partições de um Hub de Eventos.

getPartitionProperties

public Mono getPartitionProperties(String partitionId)

Recupera informações sobre uma partição específica para um Hub de Eventos, incluindo elementos que descrevem os eventos disponíveis no fluxo de eventos de partição.

Parameters:

partitionId - O identificador exclusivo de uma partição associada ao Hub de Eventos.

Returns:

O conjunto de informações para a partição solicitada no Hub de Eventos ao qual esse cliente está associado.

send

public Mono send(EventDataBatch batch)

Envia o lote para o Hub de Eventos associado.

Parameters:

batch - O lote a ser enviado para o serviço.

Returns:

Um Mono que é concluído quando o lote é enviado por push para o serviço.

send

public Mono send(Iterable events)

Envia um conjunto de eventos para o Hub de Eventos associado usando uma abordagem em lote. Se o tamanho dos eventos exceder o tamanho máximo de um único lote, uma exceção será disparada e o envio falhará. Por padrão, o tamanho da mensagem é o valor máximo permitido no link.

List<EventData> events = Arrays.asList(new EventData("maple"), new EventData("aspen"),
     new EventData("oak"));

 producer.send(events)
     .subscribe(unused -> {
     },
         error -> System.err.println("Error occurred while sending events:" + error),
         () -> System.out.println("Send complete."));

Para obter mais informações sobre o tamanho máximo do evento permitido, consulte Cotas e limites de Hubs de Eventos do Azure.

Parameters:

events - Eventos a serem enviados para o serviço.

Returns:

Um Mono que é concluído quando todos os eventos são enviados por push para o serviço.

send

public Mono send(Iterable events, SendOptions options)

Envia um conjunto de eventos para o Hub de Eventos associado usando uma abordagem em lote. Se o tamanho dos eventos exceder o tamanho máximo de um único lote, uma exceção será disparada e o envio falhará. Por padrão, o tamanho da mensagem é o valor máximo permitido no link.

List<EventData> events = Arrays.asList(new EventData("Melbourne"), new EventData("London"),
     new EventData("New York"));

 SendOptions sendOptions = new SendOptions().setPartitionKey("cities");
 producer.send(events, sendOptions)
     .subscribe(unused -> {
     },
         error -> System.err.println("Error occurred while sending events:" + error),
         () -> System.out.println("Send complete."));

Para obter mais informações sobre o tamanho máximo do evento permitido, consulte Cotas e limites de Hubs de Eventos do Azure.

Parameters:

events - Eventos a serem enviados para o serviço.
options - O conjunto de opções a serem consideradas ao enviar esse lote.

Returns:

Um Mono que é concluído quando todos os eventos são enviados por push para o serviço.

Aplica-se a