ServiceBusSessionReceiverAsyncClient Classe
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusSessionReceiverAsyncClient
- com.
Implementações
public final class ServiceBusSessionReceiverAsyncClient
implements AutoCloseable
Esse cliente receptor de sessão assíncrona é usado para adquirir bloqueios de sessão de uma fila ou tópico e criar ServiceBusReceiverAsyncClient instâncias vinculadas às sessões bloqueadas. As sessões podem ser usadas como um primeiro processamento fifo (primeiro a entrar e sair) de mensagens. Filas e tópicos/assinaturas dão suporte a sessões do Barramento de Serviço, no entanto, ele deve ser habilitado no momento da criação da entidade.
Os exemplos mostrados neste documento usam um objeto de credencial chamado DefaultAzureCredential para autenticação, que é apropriado para a maioria dos cenários, incluindo ambientes locais de desenvolvimento e produção. Além disso, recomendamos usar a identidade gerenciada para autenticação em ambientes de produção. Você pode encontrar mais informações sobre diferentes maneiras de autenticação e seus tipos de credencial correspondentes na documentação da Identidade do Azure".
Exemplo: receber mensagens de uma sessão específica
Use acceptSession(String sessionId) para adquirir o bloqueio de uma sessão se você souber a ID da sessão e PEEK_LOCKdisableAutoComplete() for altamente recomendado para que os usuários tenham controle sobre a liquidação de mensagens.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.disableAutoComplete()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
// successfully locked.
// `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
// operations complete.
// `Mono.usingWhen` can also be used if the resource closure returns a single item.
Flux<Void> sessionMessages = Flux.usingWhen(
sessionReceiver.acceptSession("<<my-session-id>>"),
receiver -> {
// Receive messages from <<my-session-id>> session.
return receiver.receiveMessages().flatMap(message -> {
System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
message.getBody());
// Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return receiver.complete(message);
} else {
return receiver.abandon(message);
}
});
},
receiver -> Mono.fromRunnable(() -> {
// Dispose of resources.
receiver.close();
sessionReceiver.close();
}));
// When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
// is non-blocking and kicks off the operation.
Disposable subscription = sessionMessages.subscribe(
unused -> {
}, error -> System.err.print("Error receiving message from session: " + error),
() -> System.out.println("Completed receiving from session."));
Exemplo: receber mensagens da primeira sessão disponível
Use acceptNextSession() para adquirir o bloqueio da próxima sessão disponível sem especificar a ID da sessão.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.disableAutoComplete()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// Creates a client to receive messages from the first available session. It waits until
// AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
// completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();
Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
receiver -> receiver.receiveMessages().flatMap(message -> {
System.out.println("Received message: " + message.getBody());
// Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return receiver.complete(message);
} else {
return receiver.abandon(message);
}
}),
receiver -> Mono.fromRunnable(() -> {
// Dispose of the receiver and sessionReceiver when done receiving messages.
receiver.close();
sessionReceiver.close();
}));
// This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
// operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
// receiving messages.
Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
}, error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
Resumo do método
Modificador e tipo | Método e descrição |
---|---|
Mono<Service |
acceptNextSession()
Adquire um bloqueio de sessão para a próxima sessão disponível e cria um ServiceBusReceiverAsyncClient para receber mensagens da sessão. |
Mono<Service |
acceptSession(String sessionId)
Adquire um bloqueio de sessão para |
void | close() |
Métodos herdados de java.lang.Object
Detalhes do método
acceptNextSession
public Mono
Adquire um bloqueio de sessão para a próxima sessão disponível e cria um ServiceBusReceiverAsyncClient para receber mensagens da sessão. Ele aguardará até que uma sessão esteja disponível se nenhuma estiver disponível imediatamente.
Returns:
acceptSession
public Mono
Adquire um bloqueio de sessão para sessionId
e crie um ServiceBusReceiverAsyncClient para receber mensagens da sessão. Se a sessão já estiver bloqueada por outro cliente, um AmqpException será gerado.
Parameters:
Returns:
close
public void close()
Aplica-se a
Azure SDK for Java