Partager via


Prise en charge de Spring Cloud Azure pour Spring Cloud Stream

Cet article s’applique à :✅ version 4.19.0 ✅ version 5.19.0

Spring Cloud Stream est un framework permettant de créer des microservices hautement évolutifs basés sur des événements connectés à des systèmes de messagerie partagés.

Le framework fournit un modèle de programmation flexible basé sur des idiomes spring déjà établis et familiers et des meilleures pratiques. Ces meilleures pratiques incluent la prise en charge de la sémantique pub/sous-sémantique persistante, des groupes de consommateurs et des partitions avec état.

Les implémentations de classeur actuelles sont les suivantes :

Spring Cloud Stream Binder pour Azure Event Hubs

Concepts clés

Spring Cloud Stream Binder pour Azure Event Hubs fournit l’implémentation de liaison pour l’infrastructure Spring Cloud Stream. Cette implémentation utilise les adaptateurs de canal Spring Integration Event Hubs à sa base. Du point de vue de la conception, Event Hubs est similaire à Kafka. En outre, Event Hubs est accessible via l’API Kafka. Si votre projet a une dépendance étroite avec l’API Kafka, vous pouvez essayer Hub d’événements avec l’exemple d’API Kafka

Groupe de consommateurs

Event Hubs fournit une prise en charge similaire du groupe de consommateurs comme Apache Kafka, mais avec une légère logique différente. Bien que Kafka stocke tous les décalages validés dans le répartiteur, vous devez stocker les décalages des messages Event Hubs traités manuellement. Le Kit de développement logiciel (SDK) Event Hubs fournit la fonction pour stocker ces décalages dans le stockage Azure.

Prise en charge du partitionnement

Event Hubs fournit un concept similaire de partition physique comme Kafka. Mais contrairement au rééquilibrage automatique de Kafka entre les consommateurs et les partitions, Event Hubs fournit un type de mode préemptif. Le compte de stockage agit en tant que bail pour déterminer quel consommateur possède la partition. Lorsqu’un nouveau consommateur démarre, il tente de voler certaines partitions des consommateurs les plus chargés pour atteindre l’équilibre de charge de travail.

Pour spécifier la stratégie d’équilibrage de charge, les propriétés de spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* sont fournies. Pour plus d’informations, consultez la section Propriétés du consommateur.

Prise en charge des consommateurs Batch

Spring Cloud Azure Stream Event Hubs prend en charge fonctionnalité consommateur Spring Cloud Stream Batch.

Pour utiliser le mode consommateur par lots, définissez la propriété spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode sur true. Lorsqu’il est activé, un message avec une charge utile d’une liste d’événements par lots est reçu et transmis à la fonction Consumer. Chaque en-tête de message est également converti en liste, dont le contenu est la valeur d’en-tête associée analysée à partir de chaque événement. Les en-têtes communs de l’ID de partition, du point de contrôle et des dernières propriétés mises en file d’attente sont présentés sous forme de valeur unique, car le lot entier d’événements partage la même valeur. Pour plus d’informations, consultez la section des en-têtes de message Event Hubs de prise en charge d’Azure Spring Cloud pour Spring Integration.

Note

L’en-tête de point de contrôle existe uniquement lorsque le mode de point de contrôle MANUAL est utilisé.

Le point de contrôle du consommateur de lots prend en charge deux modes : BATCH et MANUAL. BATCH mode est un mode de point de contrôle automatique pour contrôler l’ensemble du lot d’événements une fois que le classeur les reçoit. MANUAL mode consiste à contrôler les événements par les utilisateurs. Lorsqu’elle est utilisée, la Checkpointer est passée dans l’en-tête de message, et les utilisateurs peuvent l’utiliser pour effectuer des points de contrôle.

Vous pouvez spécifier la taille du lot en définissant les propriétés max-size et max-wait-time qui ont un préfixe de spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.. La propriété max-size est nécessaire et la propriété max-wait-time est facultative. Pour plus d’informations, consultez la section Propriétés du consommateur.

Configuration des dépendances

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

Vous pouvez également utiliser Spring Cloud Azure Stream Event Hubs Starter, comme illustré dans l’exemple suivant pour Maven :

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Configuration

Le classeur fournit les trois parties suivantes des options de configuration :

Propriétés de configuration de connexion

Cette section contient les options de configuration utilisées pour la connexion à Azure Event Hubs.

Note

Si vous choisissez d’utiliser un principal de sécurité pour authentifier et autoriser avec Microsoft Entra ID pour accéder à une ressource Azure, consultez Autoriser l’accès avec l’ID Microsoft Entra pour vous assurer que le principal de sécurité a reçu l’autorisation suffisante pour accéder à la ressource Azure.

Propriétés configurables de connexion de spring-cloud-azure-stream-binder-eventhubs :

Propriété Type Description
spring.cloud.azure.eventhubs.enabled booléen Indique si azure Event Hubs est activé.
spring.cloud.azure.eventhubs.connection-string Corde Valeur de chaîne de connexion de l’espace de noms Event Hubs.
spring.cloud.azure.eventhubs.namespace Corde Valeur de l’espace de noms Event Hubs, qui est le préfixe du nom de domaine complet. Un nom de domaine complet doit être composé de NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Corde Nom de domaine d’une valeur d’espace de noms Azure Event Hubs.
spring.cloud.azure.eventhubs.custom-endpoint-address Corde Adresse de point de terminaison personnalisée.

Pourboire

Les options de configuration courantes du Kit de développement logiciel (SDK) Du service Azure Service sont également configurables pour le classeur Spring Cloud Azure Stream Event Hubs. Les options de configuration prises en charge sont introduites dans de configuration Azure Spring Cloud et peuvent être configurées avec le préfixe unifié spring.cloud.azure. ou le préfixe de spring.cloud.azure.eventhubs..

Le classeur prend également en charge Spring Could Azure Resource Manager par défaut. Pour en savoir plus sur la récupération de la chaîne de connexion avec des principaux de sécurité qui ne sont pas accordés avec Data rôles associés, consultez la section Utilisation de base de Spring Could Azure Resource Manager.

Propriétés de configuration de point de contrôle

Cette section contient les options de configuration pour le service Blobs de stockage, qui est utilisé pour conserver la propriété de partition et les informations de point de contrôle.

Note

À partir de la version 4.0.0, lorsque la propriété de spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists n’est pas activée manuellement, aucun conteneur de stockage ne sera créé automatiquement avec le nom de spring.cloud.stream.bindings.binding-name.destination.

Contrôle des propriétés configurables de spring-cloud-azure-stream-binder-eventhubs :

Propriété Type Description
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Booléen Indique s’il faut autoriser la création de conteneurs s’il n’existe pas.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Corde Nom du compte de stockage.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Corde Clé d’accès au compte de stockage.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Corde Nom du conteneur de stockage.

Pourboire

Les options de configuration courantes du Kit de développement logiciel (SDK) Azure Service sont également configurables pour le magasin de points de contrôle d’objets blob de stockage. Les options de configuration prises en charge sont introduites dans de configuration Azure Spring Cloud et peuvent être configurées avec le préfixe unifié spring.cloud.azure. ou le préfixe de spring.cloud.azure.eventhubs.processor.checkpoint-store.

Propriétés de configuration de liaison Azure Event Hubs

Les options suivantes sont divisées en quatre sections : Propriétés du consommateur, Configurations avancées du consommateur, Propriétés du producteur et Configurations de producteur avancées.

Propriétés du consommateur

Ces propriétés sont exposées via EventHubsConsumerProperties.

Note

Pour éviter la répétition, depuis la version 4.19.0 et 5.19.0, Spring Cloud Azure Stream Binder Event Hubs prend en charge la définition de valeurs pour tous les canaux, au format spring.cloud.stream.eventhubs.default.consumer.<property>=<value>.

Propriétés configurables par le consommateur de spring-cloud-azure-stream-binder-eventhubs :

Propriété Type Description
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode CheckpointMode Mode de point de contrôle utilisé lorsque le consommateur décide de la façon dont le message de point de contrôle
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count Entier Détermine la quantité de message pour chaque partition à effectuer un point de contrôle. Prend effet uniquement lorsque PARTITION_COUNT mode de point de contrôle est utilisé.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval Durée Détermine l’intervalle de temps pour effectuer un point de contrôle. Prend effet uniquement lorsque TIME mode de point de contrôle est utilisé.
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size Entier Nombre maximal d’événements dans un lot. Obligatoire pour le mode consommateur par lots.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Durée Durée maximale pour la consommation de lots. Prend effet uniquement lorsque le mode consommateur par lots est activé et est facultatif.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval Durée Durée d’intervalle de mise à jour.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy LoadBalancingStrategy Stratégie d’équilibrage de charge.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval Durée Durée après laquelle la propriété de la partition expire.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties Booléen Indique si le processeur d’événements doit demander des informations sur le dernier événement en file d’attente sur sa partition associée et suivre ces informations à mesure que les événements sont reçus.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Entier Nombre utilisé par le consommateur pour contrôler le nombre d’événements que le consommateur Event Hub reçoit activement et file d’attente localement.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Mapper avec la clé en tant qu’ID de partition et valeurs de StartPositionProperties Carte contenant la position d’événement à utiliser pour chaque partition si un point de contrôle pour la partition n’existe pas dans le magasin de points de contrôle. Cette carte est clé hors de l’ID de partition.

Note

La configuration initial-partition-event-position accepte une map pour spécifier la position initiale de chaque hub d’événements. Ainsi, sa clé est l’ID de partition et la valeur est de StartPositionProperties, qui inclut les propriétés de décalage, de numéro de séquence, d’heure de date mise en file d’attente et si inclusive. Par exemple, vous pouvez le définir comme

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Configuration avancée du consommateur

Lesde connexion ci-dessus , point de contrôleet client du kit de développement logiciel (SDK) Azure courants prennent en charge la personnalisation de la configuration pour chaque consommateur de classeurs, que vous pouvez configurer avec le préfixe .

Propriétés du producteur

Ces propriétés sont exposées via EventHubsProducerProperties.

Note

Pour éviter la répétition, depuis la version 4.19.0 et 5.19.0, Spring Cloud Azure Stream Binder Event Hubs prend en charge la définition de valeurs pour tous les canaux, au format spring.cloud.stream.eventhubs.default.producer.<property>=<value>.

Propriétés configurables par le producteur de spring-cloud-azure-stream-binder-eventhubs :

Propriété Type Description
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync booléen Indicateur de commutateur pour la synchronisation du producteur. Si la valeur est true, le producteur attend une réponse après une opération d’envoi.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout long Durée d’attente d’une réponse après une opération d’envoi. Prend effet uniquement lorsqu’un producteur de synchronisation est activé.
Configuration avancée du producteur

La connexion ci-dessus et client du Kit de développement logiciel (SDK) Azure commun prennent en charge la personnalisation de chaque producteur de classeurs, que vous pouvez configurer avec le préfixe spring.cloud.stream.eventhubs.bindings.<binding-name>.producer..

Utilisation de base

Envoi et réception de messages depuis/vers Event Hubs

  1. Renseignez les options de configuration avec des informations d’identification.

    • Pour les informations d’identification sous forme de chaîne de connexion, configurez les propriétés suivantes dans votre fichier application.yml :

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • Pour les informations d’identification en tant que principal de service, configurez les propriétés suivantes dans votre fichier application.yml :

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Note

Les valeurs autorisées pour tenant-id sont : common, organizations, consumersou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez le Utilisé le point de terminaison incorrect (comptes personnels et d’organisation) section Erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans ledu locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur Microsoft Entra ID.

  • Pour les informations d’identification en tant qu’identités managées, configurez les propriétés suivantes dans votre fichier application.yml :

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Définir le fournisseur et le consommateur.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Prise en charge du partitionnement

Une PartitionSupplier avec des informations de partition fournies par l’utilisateur est créée pour configurer les informations de partition sur le message à envoyer. L’organigramme suivant montre le processus d’obtention de priorités différentes pour l’ID de partition et la clé :

Diagramme montrant un organigramme du processus de prise en charge du partitionnement.

Prise en charge des consommateurs Batch

  1. Fournissez les options de configuration par lots, comme indiqué dans l’exemple suivant :

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Définir le fournisseur et le consommateur.

    Pour le mode de point de contrôle en tant que BATCH, vous pouvez utiliser le code suivant pour envoyer des messages et les consommer par lots.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    Pour le mode de point de contrôle en tant que MANUAL, vous pouvez utiliser le code suivant pour envoyer des messages et consommer/point de contrôle par lots.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Note

En mode de consommation par lots, le type de contenu par défaut du classeur Spring Cloud Stream est application/json. Assurez-vous que la charge utile du message est alignée sur le type de contenu. Par exemple, lorsque vous utilisez le type de contenu par défaut de application/json pour recevoir des messages avec String charge utile, la charge utile doit être JSON String, entourée de guillemets doubles pour le texte de String d’origine. Bien que pour text/plain type de contenu, il peut s’agir d’un objet String directement. Pour plus d’informations, consultez négociation de type de contenu Spring Cloud Stream.

Gérer les messages d’erreur

  • Gérer les messages d’erreur de liaison sortante

    Par défaut, Spring Integration crée un canal d’erreur global appelé errorChannel. Configurez le point de terminaison de message suivant pour gérer les messages d’erreur de liaison sortante.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Gérer les messages d’erreur de liaison entrante

    Spring Cloud Stream Event Hubs Binder prend en charge une solution pour gérer les erreurs pour les liaisons de messages entrantes : gestionnaires d’erreurs.

    gestionnaire d’erreurs:

    Spring Cloud Stream expose un mécanisme permettant de fournir un gestionnaire d’erreurs personnalisé en ajoutant un Consumer qui accepte des instances ErrorMessage. Pour plus d’informations, consultez Gérer les messages d’erreur dans la documentation Spring Cloud Stream.

    • Gestionnaire d’erreurs par défaut de liaison

      Configurez un seul Consumer bean pour consommer tous les messages d’erreur de liaison entrante. La fonction par défaut suivante s’abonne à chaque canal d’erreur de liaison entrante :

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Vous devez également définir la propriété spring.cloud.stream.default.error-handler-definition sur le nom de la fonction.

    • Gestionnaire d’erreurs spécifique à la liaison

      Configurez un Consumer bean pour consommer les messages d’erreur de liaison entrant spécifiques. La fonction suivante s’abonne au canal d’erreur de liaison entrant spécifique et a une priorité plus élevée que le gestionnaire d’erreurs par défaut de liaison :

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Vous devez également définir la propriété spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition sur le nom de la fonction.

En-têtes de message Event Hubs

Pour connaître les en-têtes de message de base pris en charge, consultez la section en-têtes de message Event Hubs de prise en charge d’Azure Spring Cloud pour Spring Integration.

Prise en charge de plusieurs classeurs

La connexion à plusieurs espaces de noms Event Hubs est également prise en charge à l’aide de plusieurs classeurs. Cet exemple prend une chaîne de connexion comme exemple. Les informations d’identification des principaux de service et des identités managées sont également prises en charge. Vous pouvez définir des propriétés associées dans les paramètres d’environnement de chaque classeur.

  1. Pour utiliser plusieurs classeurs avec Event Hubs, configurez les propriétés suivantes dans votre fichier application.yml :

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Note

    Le fichier d’application précédent montre comment configurer un seul polleur par défaut pour l’application sur toutes les liaisons. Si vous souhaitez configurer l’polleur pour une liaison spécifique, vous pouvez utiliser une configuration telle que spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. Nous devons définir deux fournisseurs et deux consommateurs :

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Approvisionnement des ressources

Event Hubs binder prend en charge l’approvisionnement d’event hub et de groupe de consommateurs, les utilisateurs peuvent utiliser les propriétés suivantes pour activer l’approvisionnement.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Note

Les valeurs autorisées pour tenant-id sont : common, organizations, consumersou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez le Utilisé le point de terminaison incorrect (comptes personnels et d’organisation) section Erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans ledu locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur Microsoft Entra ID.

Échantillons

Pour plus d’informations, consultez le dépôt azure-spring-boot-samples sur GitHub.

Spring Cloud Stream Binder pour Azure Service Bus

Concepts clés

Spring Cloud Stream Binder pour Azure Service Bus fournit l’implémentation de liaison pour Spring Cloud Stream Framework. Cette implémentation utilise les adaptateurs de canal Spring Integration Service Bus à sa base.

Message planifié

Ce classeur prend en charge l’envoi de messages à une rubrique pour le traitement différé. Les utilisateurs peuvent envoyer des messages planifiés avec l’en-tête x-delay exprimer en millisecondes un délai pour le message. Le message est remis aux rubriques respectives après x-delay millisecondes.

Groupe de consommateurs

Service Bus Topic fournit une prise en charge similaire du groupe de consommateurs comme Apache Kafka, mais avec une légère logique différente. Ce classeur s’appuie sur Subscription d’une rubrique pour agir en tant que groupe de consommateurs.

Configuration des dépendances

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

Vous pouvez également utiliser Spring Cloud Azure Stream Service Bus Starter, comme illustré dans l’exemple suivant pour Maven :

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Configuration

Le classeur fournit les deux parties suivantes des options de configuration :

Propriétés de configuration de connexion

Cette section contient les options de configuration utilisées pour la connexion à Azure Service Bus.

Note

Si vous choisissez d’utiliser un principal de sécurité pour authentifier et autoriser avec Microsoft Entra ID pour accéder à une ressource Azure, consultez Autoriser l’accès avec l’ID Microsoft Entra pour vous assurer que le principal de sécurité a reçu l’autorisation suffisante pour accéder à la ressource Azure.

Propriétés configurables de connexion de spring-cloud-azure-stream-binder-servicebus :

Propriété Type Description
spring.cloud.azure.servicebus.enabled booléen Indique si Azure Service Bus est activé.
spring.cloud.azure.servicebus.connection-string Corde Valeur de chaîne de connexion de l’espace de noms Service Bus.
spring.cloud.azure.servicebus.custom-endpoint-address Corde Adresse de point de terminaison personnalisée à utiliser lors de la connexion à Service Bus.
spring.cloud.azure.servicebus.namespace Corde Valeur de l’espace de noms Service Bus, qui est le préfixe du nom de domaine complet. Un nom de domaine complet doit être composé de NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Corde Nom de domaine d’une valeur d’espace de noms Azure Service Bus.

Note

Les options de configuration courantes du Kit de développement logiciel (SDK) Azure Service Service Service Sont configurables pour le classeur Spring Cloud Azure Stream Service Bus. Les options de configuration prises en charge sont introduites dans de configuration Azure Spring Cloud et peuvent être configurées avec le préfixe unifié spring.cloud.azure. ou le préfixe de spring.cloud.azure.servicebus..

Le classeur prend également en charge Spring Could Azure Resource Manager par défaut. Pour en savoir plus sur la récupération de la chaîne de connexion avec des principaux de sécurité qui ne sont pas accordés avec Data rôles associés, consultez la section Utilisation de base de Spring Could Azure Resource Manager.

Propriétés de configuration de la liaison Azure Service Bus

Les options suivantes sont divisées en quatre sections : Propriétés du consommateur, Configurations avancées du consommateur, Propriétés du producteur et Configurations de producteur avancées.

Propriétés du consommateur

Ces propriétés sont exposées via ServiceBusConsumerProperties.

Note

Pour éviter la répétition, étant donné que la version 4.19.0 et la version 5.19.0, Spring Cloud Azure Stream Binder Service Bus prend en charge la définition de valeurs pour tous les canaux, au format de spring.cloud.stream.servicebus.default.consumer.<property>=<value>.

Propriétés configurables par le consommateur de spring-cloud-azure-stream-binder-servicebus :

Propriété Type Faire défaut Description
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected booléen faux Si les messages ayant échoué sont routés vers la DLQ.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Entier 1 Nombre maximal de messages simultanés que le client du processeur Service Bus doit traiter. Lorsque la session est activée, elle s’applique à chaque session.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions Entier zéro Nombre maximal de sessions simultanées à traiter à tout moment.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled Booléen zéro Indique si la session est activée.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Entier 0 Nombre de prérécupérations du client de processeur Service Bus.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue Sous-file d’attente aucun Type de la sous-file d’attente à laquelle se connecter.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Durée 5m Durée de la poursuite du renouvellement automatique du verrou.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock Mode de réception du client du processeur Service Bus.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete Booléen vrai Indique s’il faut régler automatiquement les messages. Si la valeur est false, un en-tête de message de Checkpointer est ajouté pour permettre aux développeurs de régler manuellement les messages.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabytes Long 1 024 Taille maximale de la file d’attente/rubrique en mégaoctets, qui correspond à la taille de la mémoire allouée pour la file d’attente/rubrique.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live Durée P10675199DT2H48M5.4775807S. (10675199 jours, 2 heures, 48 minutes, 5 secondes et 477 millisecondes) Durée après laquelle le message expire, à partir de laquelle le message est envoyé à Service Bus.

Important

Lorsque vous utilisez le Azure Resource Manager (ARM), vous devez configurer la propriété spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type. Pour plus d’informations, consultez l’exemple servicebus-queue-binder-arm sur GitHub.

Configuration avancée du consommateur

La connexion ci-dessus et client du kit de développement logiciel (SDK) Azure commun prennent en charge la personnalisation de chaque consommateur de classeurs, que vous pouvez configurer avec le préfixe spring.cloud.stream.servicebus.bindings.<binding-name>.consumer..

Propriétés du producteur

Ces propriétés sont exposées via ServiceBusProducerProperties.

Note

Pour éviter la répétition, étant donné que la version 4.19.0 et la version 5.19.0, Spring Cloud Azure Stream Binder Service Bus prend en charge la définition de valeurs pour tous les canaux, au format de spring.cloud.stream.servicebus.default.producer.<property>=<value>.

Propriétés configurables par le producteur de spring-cloud-azure-stream-binder-servicebus :

Propriété Type Faire défaut Description
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync booléen faux Indicateur de commutateur pour la synchronisation du producteur.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout long 10000 Valeur de délai d’expiration pour l’envoi du producteur.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType zéro Type d’entité Service Bus du producteur, requis pour le producteur de liaison.
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabytes Long 1 024 Taille maximale de la file d’attente/rubrique en mégaoctets, qui correspond à la taille de la mémoire allouée pour la file d’attente/rubrique.
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live Durée P10675199DT2H48M5.4775807S. (10675199 jours, 2 heures, 48 minutes, 5 secondes et 477 millisecondes) Durée après laquelle le message expire, à partir de laquelle le message est envoyé à Service Bus.

Important

Lors de l’utilisation du producteur de liaison, la propriété de spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type doit être configurée.

Configuration avancée du producteur

La connexion ci-dessus et client du Kit de développement logiciel (SDK) Azure commun prennent en charge la personnalisation de chaque producteur de classeurs, que vous pouvez configurer avec le préfixe spring.cloud.stream.servicebus.bindings.<binding-name>.producer..

Utilisation de base

Envoi et réception de messages depuis/vers Service Bus

  1. Renseignez les options de configuration avec des informations d’identification.

    • Pour les informations d’identification sous forme de chaîne de connexion, configurez les propriétés suivantes dans votre fichier application.yml :

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • Pour les informations d’identification en tant que principal de service, configurez les propriétés suivantes dans votre fichier application.yml :

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Note

Les valeurs autorisées pour tenant-id sont : common, organizations, consumersou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez le Utilisé le point de terminaison incorrect (comptes personnels et d’organisation) section Erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans ledu locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur Microsoft Entra ID.

  • Pour les informations d’identification en tant qu’identités managées, configurez les propriétés suivantes dans votre fichier application.yml :

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Définir le fournisseur et le consommateur.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Prise en charge des clés de partition

Le classeur prend en charge partitionnement Service Bus en autorisant la définition de la clé de partition et de l’ID de session dans l’en-tête du message. Cette section explique comment définir la clé de partition pour les messages.

Spring Cloud Stream fournit une propriété d’expression SpEL de clé de partition spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression. Par exemple, la définition de cette propriété en tant que "'partitionKey-' + headers[<message-header-key>]" et l’ajout d’un en-tête appelé message-header-key. Spring Cloud Stream utilise la valeur de cet en-tête lors de l’évaluation de l’expression pour affecter une clé de partition. Le code suivant fournit un exemple de producteur :

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Prise en charge des sessions

Le classeur prend en charge sessions de messages de Service Bus. L’ID de session d’un message peut être défini via l’en-tête du message.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Note

Selon de partitionnement Service Bus, l’ID de session a une priorité supérieure à la clé de partition. Par conséquent, lorsque les en-têtes ServiceBusMessageHeaders#SESSION_ID et ServiceBusMessageHeaders#PARTITION_KEY sont définis, la valeur de l’ID de session est finalement utilisée pour remplacer la valeur de la clé de partition.

Gérer les messages d’erreur

  • Gérer les messages d’erreur de liaison sortante

    Par défaut, Spring Integration crée un canal d’erreur global appelé errorChannel. Configurez le point de terminaison de message suivant pour gérer le message d’erreur de liaison sortante.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Gérer les messages d’erreur de liaison entrante

    Spring Cloud Stream Service Bus Binder prend en charge deux solutions pour gérer les erreurs pour les liaisons de messages entrantes : le gestionnaire d’erreurs de classeur et les gestionnaires.

    gestionnaire d’erreurs Binder:

    Le gestionnaire d’erreurs de classeur par défaut gère la liaison entrante. Vous utilisez ce gestionnaire pour envoyer des messages ayant échoué à la file d’attente de lettres mortes lorsque spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected est activé. Sinon, les messages ayant échoué sont abandonnés. Le gestionnaire d’erreurs du classeur s’exclue mutuellement avec d’autres gestionnaires d’erreurs fournis.

    gestionnaire d’erreurs:

    Spring Cloud Stream expose un mécanisme permettant de fournir un gestionnaire d’erreurs personnalisé en ajoutant un Consumer qui accepte des instances ErrorMessage. Pour plus d’informations, consultez Gérer les messages d’erreur dans la documentation Spring Cloud Stream.

    • Gestionnaire d’erreurs par défaut de liaison

      Configurez un seul Consumer bean pour consommer tous les messages d’erreur de liaison entrante. La fonction par défaut suivante s’abonne à chaque canal d’erreur de liaison entrante :

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Vous devez également définir la propriété spring.cloud.stream.default.error-handler-definition sur le nom de la fonction.

    • Gestionnaire d’erreurs spécifique à la liaison

      Configurez un Consumer bean pour consommer les messages d’erreur de liaison entrant spécifiques. La fonction suivante s’abonne au canal d’erreur de liaison entrant spécifique avec une priorité supérieure au gestionnaire d’erreurs par défaut de liaison.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Vous devez également définir la propriété spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition sur le nom de la fonction.

En-têtes de message Service Bus

Pour connaître les en-têtes de message de base pris en charge, consultez la section en-têtes de message Service Bus de prise en charge d’Azure Spring Cloud pour Spring Integration.

Note

Lorsque vous définissez la clé de partition, la priorité de l’en-tête de message est supérieure à la propriété Spring Cloud Stream. Par conséquent, spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression prend effet uniquement quand aucun des ServiceBusMessageHeaders#SESSION_ID et ServiceBusMessageHeaders#PARTITION_KEY en-têtes ne sont configurés.

Prise en charge de plusieurs classeurs

La connexion à plusieurs espaces de noms Service Bus est également prise en charge à l’aide de plusieurs classeurs. Cet exemple utilise la chaîne de connexion comme exemple. Les informations d’identification des principaux de service et des identités managées sont également prises en charge, les utilisateurs peuvent définir des propriétés associées dans les paramètres d’environnement de chaque classeur.

  1. Pour utiliser plusieurs classeurs de ServiceBus, configurez les propriétés suivantes dans votre fichier application.yml :

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Note

    Le fichier d’application précédent montre comment configurer un seul polleur par défaut pour l’application sur toutes les liaisons. Si vous souhaitez configurer l’polleur pour une liaison spécifique, vous pouvez utiliser une configuration telle que spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. nous devons définir deux fournisseurs et deux consommateurs

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Approvisionnement des ressources

Service Bus binder prend en charge l’approvisionnement de files d’attente, de rubriques et d’abonnements, les utilisateurs peuvent utiliser les propriétés suivantes pour activer l’approvisionnement.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Note

Les valeurs autorisées pour tenant-id sont : common, organizations, consumersou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez le Utilisé le point de terminaison incorrect (comptes personnels et d’organisation) section Erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans ledu locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur Microsoft Entra ID.

Personnaliser les propriétés du client Service Bus

Les développeurs peuvent utiliser AzureServiceClientBuilderCustomizer pour personnaliser les propriétés du client Service Bus. L’exemple suivant personnalise la propriété sessionIdleTimeout dans ServiceBusClientBuilder:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Échantillons

Pour plus d’informations, consultez le dépôt azure-spring-boot-samples sur GitHub.