Partager via


Spring Cloud Stream avec Azure Service Bus

Cet article explique comment utiliser Spring Cloud Stream Binder pour échanger des messages avec des queues et topics Service Bus.

Azure fournit une plateforme de messagerie asynchrone appelée Azure Service Bus (« Service Bus ») basée sur la norme Advanced Message Queuing Protocol 1.0 (« AMQP 1.0 »). Le plateforme Service Bus peut être utilisée sur toute la gamme de plateformes Azure prises en charge.

Prérequis

Remarque

Pour accorder à votre compte l'accès à vos ressources Azure Service Bus, attribuez le rôle Azure Service Bus Data Sender et Azure Service Bus Data Receiver au compte Microsoft Entra que vous utilisez actuellement. Pour plus d'informations sur l'attribution de rôles d'accès, consultez les sections Attribuer des rôles Azure à l'aide du portail Azure et Authentifier et autoriser une application avec Microsoft Entra ID à accéder aux entités Azure Service Bus.

Important

Spring Boot version 2.5 ou supérieure est nécessaire pour effectuer les étapes de cet article.

Envoyez et recevez des messages depuis Azure Service Bus

Avec une file d'attente ou une rubrique pour Azure Service Bus, vous pouvez envoyer et recevoir des messages à l'aide de Spring Cloud Azure Stream Binder Service Bus.

Pour installer le module Spring Cloud Azure Stream Binder Service Bus, ajoutez les dépendances suivantes à votre fichier pom.xml :

  • La nomenclature Spring Cloud Azure :

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.18.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Remarque

    Si vous utilisez Spring Boot 2.x, assurez-vous de définir la version spring-cloud-azure-dependencies sur 4.19.0. Cette nomenclature doit être configurée dans la section <dependencyManagement> de votre fichier pom.xml. Cela permet de s'assurer que toutes les dépendances de Spring Cloud Azure utilisent la même version. Pour plus d'informations sur la version utilisée pour cette nomenclature, consultez Quelle version de Spring Cloud Azure dois-je utiliser.

  • L'artefact Spring Cloud Azure Stream Binder Service Bus :

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

Coder l’application

Suivez les étapes suivantes pour configurer votre application afin qu'elle utilise une file d'attente ou une rubrique Service Bus pour envoyer et recevoir des messages.

  1. Configurez les informations d'identification du Service Bus dans le fichier de configuration application.properties.

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    Le tableau suivant décrit les champs de la configuration :

    Champ Description
    spring.cloud.azure.servicebus.namespace Spécifiez l'espace de noms que vous avez obtenu dans votre bus de service à partir du portail Azure.
    spring.cloud.stream.bindings.consume-in-0.destination Spécifiez la file d’attente ou la rubrique Service Bus que vous avez utilisées dans ce didacticiel.
    spring.cloud.stream.bindings.supply-out-0.destination Spécifiez la même valeur que celle utilisée pour la destination d’entrée.
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete Indiquez si les messages doivent être décomptés automatiquement. S'il est défini comme false, un en-tête de message de Checkpointer sera ajouté pour permettre aux développeurs de régler les messages manuellement.
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type Spécifiez le type d'entité pour la liaison de sortie, qui peut être queue ou topic.
    spring.cloud.function.definition Spécifiez le composant Bean fonctionnel à lier aux destinations externes qui sont exposées par les liaisons.
    spring.cloud.stream.poller.fixed-delay Spécifiez le délai fixe pour le poller par défaut en millisecondes. La valeur par défaut est 1000 L. La valeur recommandée est 60000.
    spring.cloud.stream.poller.initial-delay Spécifiez le délai initial pour les déclencheurs périodiques. La valeur par défaut est 0.
  2. Modifiez le fichier de classe de démarrage pour afficher le contenu suivant.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Conseil

    Dans ce tutoriel, il n'y a pas d'opérations d'authentification dans les configurations ou le code. Cependant, la connexion aux services Azure nécessite une authentification. Pour effectuer l’authentification, vous devez utiliser Identité Azure. Spring Cloud Azure utilise DefaultAzureCredential, que la bibliothèque Azure Identity fournit pour vous aider à obtenir des informations d'identification sans modifier le code.

    DefaultAzureCredential prend en charge plusieurs méthodes d’authentification et détermine quelle méthode doit être utilisée au moment de l’exécution. Cette approche permet à votre application d'utiliser différentes méthodes d'authentification dans différents environnements (tels que les environnements locaux et de production) sans implémenter de code spécifique à l'environnement. Pour plus d’informations, consultez DefaultAzureCredential.

    Pour réaliser l'authentification dans les environnements de développement locaux, vous pouvez utiliser Azure CLI, Visual Studio Code, PowerShell ou d'autres méthodes. Pour plus d'informations, consultez la section Authentification Azure dans les environnements de développement Java. Pour compléter l'authentification dans les environnements d'hébergement Azure, nous vous recommandons d'utiliser l'identité gérée attribuée à l'utilisateur. Pour plus d’informations, consultez Que sont les identités managées pour les ressources Azure ?

  3. Lancez l’application. Les messages tels que l'exemple suivant seront affichés dans le journal de votre application :

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

Étapes suivantes