ServiceBusProcessorClient Classe
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusProcessorClient
- com.
Implémente
public final class ServiceBusProcessorClient
implements AutoCloseable
Client processeur pour le traitement des messages Service Bus. ServiceBusProcessorClient fournit un mécanisme basé sur l’envoi (push) qui appelle le rappel de traitement des messages lorsqu’un message est reçu ou le gestionnaire d’erreurs lorsqu’une erreur se produit lors de la réception de messages. Un ServiceBusProcessorClient peut être créé pour traiter les messages d’une entité Service Bus activée ou non pour la session. Il prend en charge le règlement automatique des messages par défaut.
Exemple de code pour instancier un client processeur et recevoir en mode PeekLock
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
// Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
// handling message reaches desired state such that it doesn't require Service Bus to redeliver
// the same message, then context.complete() should be called otherwise context.abandon().
final boolean success = Math.random() < 0.5;
if (success) {
try {
context.complete();
} catch (RuntimeException error) {
System.out.printf("Completion of the message %s failed.%n Error: %s%n",
message.getMessageId(), error);
}
} else {
try {
context.abandon();
} catch (RuntimeException error) {
System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
message.getMessageId(), error);
}
}
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
Exemple de code pour instancier un client de processeur et recevoir en mode ReceiveAndDelete
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
message.getSessionId(), message.getSequenceNumber(), message.getBody());
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
Créer et exécuter un processeur activé pour la session
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
message.getSessionId(), message.getSequenceNumber(), message.getBody());
};
Consumer<ServiceBusErrorContext> onError = context -> {
System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
context.getFullyQualifiedNamespace(), context.getEntityPath());
if (context.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) context.getException();
System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", context.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.sessionProcessor()
.queueName(sessionEnabledQueueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete()
.maxConcurrentSessions(2)
.processMessage(onMessage)
.processError(onError)
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
sessionProcessor.start();
// Stop processor and dispose when done processing messages.
sessionProcessor.stop();
sessionProcessor.close();
Résumé de la méthode
Modificateur et type | Méthode et description |
---|---|
synchronized void |
close()
Arrête le traitement des messages et ferme le processeur. |
synchronized String |
getIdentifier()
Obtient l’identificateur du instance de ServiceBusProcessorClient. |
String |
getQueueName()
Retourne le nom de file d’attente associé à cette instance de ServiceBusProcessorClient. |
String |
getSubscriptionName()
Retourne le nom d’abonnement associé à cette instance de ServiceBusProcessorClient. |
String |
getTopicName()
Retourne le nom de la rubrique associé à cette instance de ServiceBusProcessorClient. |
synchronized boolean |
isRunning()
Retourne |
synchronized void |
start()
Démarre le processeur en arrière-plan. |
synchronized void |
stop()
Arrête le traitement des messages pour ce processeur. |
Méthodes héritées de java.lang.Object
Détails de la méthode
close
public synchronized void close()
Arrête le traitement des messages et ferme le processeur. Les liens et sessions de réception sont fermés et les appels start() créent un nouveau cycle de traitement avec de nouveaux liens et de nouvelles sessions.
getIdentifier
public synchronized String getIdentifier()
Obtient l’identificateur du instance de ServiceBusProcessorClient.
Returns:
getQueueName
public String getQueueName()
Retourne le nom de file d’attente associé à cette instance de ServiceBusProcessorClient.
Returns:
null
si le instance processeur est destiné à une rubrique et un abonnement.getSubscriptionName
public String getSubscriptionName()
Retourne le nom d’abonnement associé à cette instance de ServiceBusProcessorClient.
Returns:
null
si le instance processeur est destiné à une file d’attente.getTopicName
public String getTopicName()
Retourne le nom de la rubrique associé à cette instance de ServiceBusProcessorClient.
Returns:
null
si le instance processeur est destiné à une file d’attente.isRunning
public synchronized boolean isRunning()
Retourne true
si le processeur est en cours d’exécution. Si le processeur est arrêté ou fermé, cette méthode retourne false
.
Returns:
true
si le processeur est en cours d’exécution ; false
Sinon.start
public synchronized void start()
Démarre le processeur en arrière-plan. Lorsque cette méthode est appelée, le processeur lance un récepteur de messages qui appelle le gestionnaire de messages lorsque de nouveaux messages sont disponibles. Cette méthode est idempotente (c’est-à-dire qu’appeler start()
à nouveau une fois que le processeur est déjà en cours d’exécution est une opération sans opération).
L’appel start()
après l’appel stop() reprend le traitement des messages à l’aide de la même connexion sous-jacente.
L’appel start()
après l’appel close() démarre le processeur avec une nouvelle connexion.
stop
public synchronized void stop()
Arrête le traitement des messages pour ce processeur. Les liens et les sessions de réception sont maintenus actifs et ce processeur peut reprendre le traitement des messages en appelant start() à nouveau.
S’applique à
Azure SDK for Java