EventHubProducerAsyncClient Classe
- java.
lang. Object - com.
azure. messaging. eventhubs. EventHubProducerAsyncClient
- com.
Implementações
public class EventHubProducerAsyncClient
implements Closeable
Um produtor assíncrono responsável pela transmissão EventData para um Hub de Eventos específico, agrupado em lotes. Dependendo do CreateBatchOptions especificado ao criar um EventDataBatch, os eventos podem ser roteados automaticamente para uma partição disponível ou específicos para uma partição. Mais informações e recomendações específicas para estratégias a serem usadas ao publicar eventos em: Distribuir eventos para partições
É recomendável permitir o roteamento automático de partições quando:
- O envio de eventos precisa estar altamente disponível.
- Os dados do evento devem ser distribuídos uniformemente entre todas as partições disponíveis.
Se nenhuma ID de partição for especificada, as seguintes regras serão usadas para selecionar automaticamente uma:
- Distribua os eventos igualmente entre todas as partições disponíveis usando uma abordagem round robin.
- Se uma partição ficar indisponível, o serviço hubs de eventos a detectará automaticamente e encaminhará a mensagem para outra partição disponível.
Os exemplos mostrados neste documento usam um objeto de credencial chamado DefaultAzureCredential para autenticação, que é apropriado para a maioria dos cenários, incluindo ambientes locais de desenvolvimento e produção. Além disso, recomendamos usar a identidade gerenciada para autenticação em ambientes de produção. Você pode encontrar mais informações sobre diferentes maneiras de autenticação e seus tipos de credencial correspondentes na documentação da Identidade do Azure".
Exemplo: Construir um EventHubProducerAsyncClient
O exemplo de código a seguir demonstra a criação do cliente EventHubProducerAsyncClientassíncrono . O fullyQualifiedNamespace
é o nome do host do Namespace dos Hubs de Eventos. Ele é listado no painel "Essentials" depois de navegar até o Namespace dos Hubs de Eventos por meio do Portal do Azure.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubProducerAsyncClient producer = new EventHubClientBuilder()
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.buildAsyncProducerClient();
Exemplo: criar um produtor e publicar eventos em qualquer partição
O exemplo de código a seguir demonstra como enviar um conjunto de eventos para o Hub de Eventos. Os eventos são distribuídos por meio de roteamento automático porque nenhuma opção foi definida ao criar o por meio createBatch()de EventDataBatch . O uso EventDataBatch é recomendado porque vários eventos podem ser enviados pela conexão subjacente com uma única mensagem.
createBatch() e send(EventDataBatch batch) não são chamadas de bloqueio. Depois de configurar a operação, sua representação assíncrona é retornada. O Mono
deve ser assinado, como o exemplo abaixo, para começar a publicar o lote de eventos.
// Creating a batch without options set, will allow for automatic routing of events to any partition.
producer.createBatch().flatMap(batch -> {
batch.tryAdd(new EventData("test-event-1"));
batch.tryAdd(new EventData("test-event-2"));
return producer.send(batch);
}).subscribe(unused -> {
},
error -> System.err.println("Error occurred while sending batch:" + error),
() -> System.out.println("Send complete."));
Exemplo: Publicar eventos na partição "1"
O exemplo de código a seguir demonstra como enviar um conjunto de eventos para o Hub de Eventos para a partição "1". EventDataBatch por meio de createBatch(CreateBatchOptions options). Os identificadores de partição podem ser obtidos usando getPartitionIds(). O uso EventDataBatch é recomendado porque vários eventos podem ser enviados pela conexão subjacente com uma única mensagem.
createBatch(CreateBatchOptions options) e send(EventDataBatch batch) não são chamadas de bloqueio. Depois de configurar a operação, sua representação assíncrona é retornada. O Mono
deve ser assinado, como o exemplo abaixo, para começar a publicar o lote de eventos.
CreateBatchOptions options = new CreateBatchOptions().setPartitionId("1");
producer.createBatch(options).flatMap(batch -> {
batch.tryAdd(new EventData("test-event-1"));
batch.tryAdd(new EventData("test-event-2"));
return producer.send(batch);
}).subscribe(unused -> {
},
error -> System.err.println("Error occurred while sending batch to partition 1:" + error),
() -> System.out.println("Send to partition 1 complete."));
Exemplo: publicar eventos na mesma partição, agrupados usando chave de partição
No exemplo de código abaixo, todos os eventos com a mesma chave de partição, "bread" são enviados para a mesma partição. Quando setPartitionId(String partitionId) é especificado, ele informa ao serviço hubs de eventos que esses eventos pertencem ao mesmo grupo e devem pertencer à mesma partição. Útil no caso em que os desenvolvedores desejam que os eventos acabem na mesma partição, mas não se importam em qual partição ela acaba.
createBatch(CreateBatchOptions options) e send(EventDataBatch batch) não são chamadas de bloqueio. Depois de configurar a operação, sua representação assíncrona é retornada. O Mono
deve ser assinado, como o exemplo abaixo, para começar a publicar o lote de eventos.
CreateBatchOptions options = new CreateBatchOptions().setPartitionKey("bread");
producer.createBatch(options).flatMap(batch -> {
batch.tryAdd(new EventData("sourdough"));
batch.tryAdd(new EventData("rye"));
return producer.send(batch);
}).subscribe(unused -> {
},
error -> System.err.println("Error occurred while sending batch:" + error),
() -> System.out.println("Send complete."));
Exemplo: publicar eventos usando um tamanho limitado EventDataBatch
No exemplo de código abaixo, todos os lotes são criados com um tamanho máximo de 256 bytes usando setMaximumSizeInBytes(int maximumSizeInBytes) é especificado. Os eventos dentro do lote são roteados automaticamente porque nenhuma ID de partição ou chave de partição é especificada.
createBatch(CreateBatchOptions options) e send(EventDataBatch batch) não são chamadas de bloqueio. Depois de configurar a operação, sua representação assíncrona é retornada. O Mono
deve ser assinado, como o exemplo abaixo, para começar a publicar o lote de eventos.
Flux<EventData> telemetryEvents = Flux.just(firstEvent, secondEvent);
// Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch.
// In this case, all the batches created with these options are limited to 256 bytes.
CreateBatchOptions options = new CreateBatchOptions().setMaximumSizeInBytes(256);
AtomicReference<EventDataBatch> currentBatch = new AtomicReference<>(
producer.createBatch(options).block());
// The sample Flux contains two events, but it could be an infinite stream of telemetry events.
Disposable publishingOperation = telemetryEvents.flatMap(event -> {
EventDataBatch batch = currentBatch.get();
if (batch.tryAdd(event)) {
return Mono.empty();
}
// Send the current batch then create another size-limited EventDataBatch and try to fit the event into
// this new batch.
return producer.send(batch).then(
producer.createBatch(options).map(newBatch -> {
currentBatch.set(newBatch);
// Add the event that did not fit in the previous batch.
if (!newBatch.tryAdd(event)) {
return Mono.error(new IllegalArgumentException(
"Event was too large to fit in an empty batch. Max size: "
+ newBatch.getMaxSizeInBytes()));
}
return Mono.empty();
}));
}).subscribe(unused -> {
}, error -> {
System.out.println("Error occurred publishing events: " + error);
}, () -> {
System.out.println("Completed publishing operation.");
});
Resumo do método
Modificador e tipo | Método e descrição |
---|---|
void |
close()
Descarta o EventHubProducerAsyncClient. |
Mono<Event |
createBatch()
Cria um EventDataBatch que pode se ajustar a tantos eventos quanto o transporte permite. |
Mono<Event |
createBatch(CreateBatchOptions options)
Cria um EventDataBatch configurado com as opções especificadas. |
String |
getEventHubName()
Obtém o nome do Hub de Eventos com o qual esse cliente interage. |
Mono<Event |
getEventHubProperties()
Recupera informações sobre um Hub de Eventos, incluindo o número de partições presentes e seus identificadores. |
String |
getFullyQualifiedNamespace()
Obtém o namespace dos Hubs de Eventos totalmente qualificado ao qual a conexão está associada. |
String |
getIdentifier()
Obtém o identificador do cliente. |
Flux<String> |
getPartitionIds()
Recupera os identificadores para as partições de um Hub de Eventos. |
Mono<Partition |
getPartitionProperties(String partitionId)
Recupera informações sobre uma partição específica para um Hub de Eventos, incluindo elementos que descrevem os eventos disponíveis no fluxo de eventos de partição. |
Mono<Void> |
send(EventDataBatch batch)
Envia o lote para o Hub de Eventos associado. |
Mono<Void> |
send(Iterable<EventData> events)
Envia um conjunto de eventos para o Hub de Eventos associado usando uma abordagem em lote. |
Mono<Void> |
send(Iterable<EventData> events, SendOptions options)
Envia um conjunto de eventos para o Hub de Eventos associado usando uma abordagem em lote. |
Métodos herdados de java.lang.Object
Detalhes do método
close
public void close()
Descarta o EventHubProducerAsyncClient. Se o cliente tiver uma conexão dedicada, a conexão subjacente também será fechada.
createBatch
public Mono
Cria um EventDataBatch que pode se ajustar a tantos eventos quanto o transporte permite.
Returns:
createBatch
public Mono
Cria um EventDataBatch configurado com as opções especificadas.
Parameters:
Returns:
getEventHubName
public String getEventHubName()
Obtém o nome do Hub de Eventos com o qual esse cliente interage.
Returns:
getEventHubProperties
public Mono
Recupera informações sobre um Hub de Eventos, incluindo o número de partições presentes e seus identificadores.
Returns:
getFullyQualifiedNamespace
public String getFullyQualifiedNamespace()
Obtém o namespace dos Hubs de Eventos totalmente qualificado ao qual a conexão está associada. Isso provavelmente é semelhante a {yournamespace}.servicebus.windows.net
.
Returns:
getIdentifier
public String getIdentifier()
Obtém o identificador do cliente.
Returns:
getPartitionIds
public Flux
Recupera os identificadores para as partições de um Hub de Eventos.
Returns:
getPartitionProperties
public Mono
Recupera informações sobre uma partição específica para um Hub de Eventos, incluindo elementos que descrevem os eventos disponíveis no fluxo de eventos de partição.
Parameters:
Returns:
send
public Mono
Envia o lote para o Hub de Eventos associado.
Parameters:
Returns:
send
public Mono
Envia um conjunto de eventos para o Hub de Eventos associado usando uma abordagem em lote. Se o tamanho dos eventos exceder o tamanho máximo de um único lote, uma exceção será disparada e o envio falhará. Por padrão, o tamanho da mensagem é o valor máximo permitido no link.
List<EventData> events = Arrays.asList(new EventData("maple"), new EventData("aspen"),
new EventData("oak"));
producer.send(events)
.subscribe(unused -> {
},
error -> System.err.println("Error occurred while sending events:" + error),
() -> System.out.println("Send complete."));
Para obter mais informações sobre o tamanho máximo do evento permitido, consulte Cotas e limites de Hubs de Eventos do Azure.
Parameters:
Returns:
send
public Mono
Envia um conjunto de eventos para o Hub de Eventos associado usando uma abordagem em lote. Se o tamanho dos eventos exceder o tamanho máximo de um único lote, uma exceção será disparada e o envio falhará. Por padrão, o tamanho da mensagem é o valor máximo permitido no link.
List<EventData> events = Arrays.asList(new EventData("Melbourne"), new EventData("London"),
new EventData("New York"));
SendOptions sendOptions = new SendOptions().setPartitionKey("cities");
producer.send(events, sendOptions)
.subscribe(unused -> {
},
error -> System.err.println("Error occurred while sending events:" + error),
() -> System.out.println("Send complete."));
Para obter mais informações sobre o tamanho máximo do evento permitido, consulte Cotas e limites de Hubs de Eventos do Azure.
Parameters:
Returns:
Aplica-se a
Azure SDK for Java