Partager via


Prise en charge d’Azure Spring Cloud pour Spring Integration

Cet article s’applique à :✅ version 4.19.0 ✅ version 5.20.1

Spring Integration Extension pour Azure fournit des adaptateurs Spring Integration pour les différents services fournis par le sdk Azure pour Java. Nous fournissons la prise en charge de Spring Integration pour ces services Azure : Event Hubs, Service Bus, File d’attente de stockage. Voici une liste des adaptateurs pris en charge :

Intégration spring à Azure Event Hubs

Concepts clés

Azure Event Hubs est une plateforme de streaming Big Data et un service d’ingestion d’événements. Il peut recevoir et traiter des millions d’événements par seconde. Les données envoyées à un concentrateur d’événements peuvent être transformées et stockées à l’aide de n’importe quel fournisseur d’analyse en temps réel ou d’adaptateurs de traitement par lots/adaptateurs de stockage.

Spring Integration permet une messagerie légère dans les applications Spring et prend en charge l’intégration avec des systèmes externes via des adaptateurs déclaratifs. Ces adaptateurs fournissent un niveau d’abstraction supérieur sur la prise en charge de Spring pour la communication à distance, la messagerie et la planification. Le projet d’extension Spring Integration pour Event Hubs fournit des adaptateurs de canal entrants et sortants et des passerelles pour Azure Event Hubs.

Note

Les API de prise en charge rxJava sont supprimées de la version 4.0.0. Pour plus d’informations, consultez Javadoc.

Groupe de consommateurs

Event Hubs fournit une prise en charge similaire du groupe de consommateurs comme Apache Kafka, mais avec une légère logique différente. Bien que Kafka stocke tous les décalages validés dans le répartiteur, vous devez stocker les décalages des messages Event Hubs traités manuellement. Le Kit de développement logiciel (SDK) Event Hubs fournit la fonction pour stocker ces décalages dans le stockage Azure.

Prise en charge du partitionnement

Event Hubs fournit un concept similaire de partition physique comme Kafka. Mais contrairement à la rééquilibrage automatique de Kafka entre les consommateurs et les partitions, Event Hubs fournit un type de mode préemptif. Le compte de stockage agit en tant que bail pour déterminer la partition détenue par le consommateur. Lorsqu’un nouveau consommateur démarre, il tente de voler certaines partitions de la plupart des consommateurs chargés pour atteindre l’équilibrage de charge de travail.

Pour spécifier la stratégie d’équilibrage de charge, les développeurs peuvent utiliser EventHubsContainerProperties pour la configuration. Consultez la section suivante pour obtenir un exemple de configuration de EventHubsContainerProperties.

Prise en charge des consommateurs Batch

Le EventHubsInboundChannelAdapter prend en charge le mode de consommation par lots. Pour l’activer, les utilisateurs peuvent spécifier le mode d’écouteur comme ListenerMode.BATCH lors de la construction d’une instance de EventHubsInboundChannelAdapter. Lorsqu’elle est activée, un message dont la charge utile est une liste d’événements par lots reçus et transmis au canal en aval. Chaque en-tête de message est également converti en tant que liste, dont le contenu est la valeur d’en-tête associée analysée à partir de chaque événement. Pour les en-têtes communaux de l’ID de partition, le point de contrôle et les dernières propriétés mises en file d’attente, ils sont présentés sous la forme d’une valeur unique pour l’ensemble du lot d’événements partagent le même. Pour plus d’informations, consultez la section Des en-têtes de message Event Hubs.

Note

L’en-tête de point de contrôle existe uniquement lorsque mode manuel point de contrôle est utilisé.

Le point de contrôle du consommateur de lots prend en charge deux modes : BATCH et MANUAL. BATCH mode est un mode de point de contrôle automatique pour contrôler l’ensemble du lot d’événements une fois qu’ils sont reçus. MANUAL mode consiste à contrôler les événements par les utilisateurs. Lorsqu’il est utilisé, le du point de contrôle est passé dans l’en-tête de message, et les utilisateurs peuvent l’utiliser pour effectuer des points de contrôle.

La stratégie de consommation de lots peut être spécifiée par les propriétés de max-size et de max-wait-time, où max-size est une propriété nécessaire alors que max-wait-time est facultatif. Pour spécifier la stratégie de consommation par lots, les développeurs peuvent utiliser EventHubsContainerProperties pour la configuration. Consultez la section suivante pour obtenir un exemple de configuration de EventHubsContainerProperties.

Configuration des dépendances

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Configuration

Ce démarrage fournit les 3 parties suivantes des options de configuration :

Propriétés de configuration de connexion

Cette section contient les options de configuration utilisées pour la connexion à Azure Event Hubs.

Note

Si vous choisissez d’utiliser un principal de sécurité pour authentifier et autoriser avec Microsoft Entra ID pour accéder à une ressource Azure, consultez Autoriser l’accès avec l’ID Microsoft Entra pour vous assurer que le principal de sécurité a reçu l’autorisation suffisante pour accéder à la ressource Azure.

Propriétés configurables de connexion de spring-cloud-azure-starter-integration-eventhubs :

Propriété Type Description
spring.cloud.azure.eventhubs.enabled booléen Indique si azure Event Hubs est activé.
spring.cloud.azure.eventhubs.connection-string Corde Valeur de chaîne de connexion de l’espace de noms Event Hubs.
spring.cloud.azure.eventhubs.namespace Corde Valeur de l’espace de noms Event Hubs, qui est le préfixe du nom de domaine complet. Un nom de domaine complet doit être composé de NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Corde Nom de domaine d’une valeur d’espace de noms Azure Event Hubs.
spring.cloud.azure.eventhubs.custom-endpoint-address Corde Adresse de point de terminaison personnalisée.
spring.cloud.azure.eventhubs.shared-connection Booléen Indique si EventProcessorClient et EventHubProducerAsyncClient sous-jacents utilisent la même connexion. Par défaut, une nouvelle connexion est construite et utilisée pour chaque client Event Hub créé.

Propriétés de configuration de point de contrôle

Cette section contient les options de configuration pour le service Blobs de stockage, qui est utilisé pour conserver la propriété de partition et les informations de point de contrôle.

Note

À partir de la version 4.0.0, lorsque la propriété de spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists n’est pas activée manuellement, aucun conteneur de stockage n’est créé automatiquement.

Contrôle des propriétés configurables de spring-cloud-azure-starter-integration-eventhubs :

Propriété Type Description
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Booléen Indique s’il faut autoriser la création de conteneurs s’il n’existe pas.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Corde Nom du compte de stockage.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Corde Clé d’accès au compte de stockage.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Corde Nom du conteneur de stockage.

Les options de configuration courantes du Kit de développement logiciel (SDK) Azure Service sont également configurables pour le magasin de points de contrôle d’objets blob de stockage. Les options de configuration prises en charge sont introduites dans de configuration Azure Spring Cloud et peuvent être configurées avec le préfixe unifié spring.cloud.azure. ou le préfixe de spring.cloud.azure.eventhubs.processor.checkpoint-store.

Propriétés de configuration du processeur Event Hub

L'EventHubsInboundChannelAdapter utilise le EventProcessorClient pour consommer des messages à partir d’un hub d’événements, pour configurer les propriétés globales d’un EventProcessorClient, les développeurs peuvent utiliser EventHubsContainerProperties pour la configuration. Consultez la section suivante sur l’utilisation de EventHubsInboundChannelAdapter.

Utilisation de base

Envoyer des messages à Azure Event Hubs

  1. Renseignez les options de configuration des informations d’identification.

    • Pour les informations d’identification sous forme de chaîne de connexion, configurez les propriétés suivantes dans votre fichier application.yml :

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      

      Note

      Microsoft recommande d’utiliser le flux d’authentification le plus sécurisé disponible. Le flux d'authentification décrit dans cette procédure, comme pour les bases de données, les caches, la messagerie ou les services IA, demande un degré de confiance très élevé dans l'application et comporte des risques non présents dans d'autres flux. Utilisez ce flux uniquement lorsque des options plus sécurisées, telles que les identités managées pour les connexions sans mot de passe ou sans clé, ne sont pas viables. Pour les opérations d’ordinateur local, préférez les identités utilisateur pour les connexions sans mot de passe ou sans clé.

    • Pour les informations d’identification en tant qu’identités managées, configurez les propriétés suivantes dans votre fichier application.yml :

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • Pour les informations d’identification en tant que principal de service, configurez les propriétés suivantes dans votre fichier application.yml :

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Note

Les valeurs autorisées pour tenant-id sont : common, organizations, consumersou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez le Utilisé le point de terminaison incorrect (comptes personnels et d’organisation) section Erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans ledu locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur Microsoft Entra ID.

  1. Créez DefaultMessageHandler avec le EventHubsTemplate bean pour envoyer des messages à Event Hubs.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Créez une liaison de passerelle de message avec le gestionnaire de messages ci-dessus via un canal de message.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Envoyez des messages à l’aide de la passerelle.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Recevoir des messages d’Azure Event Hubs

  1. Renseignez les options de configuration des informations d’identification.

  2. Créez unan de canal de message en tant que canal d’entrée.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Créez EventHubsInboundChannelAdapter avec le EventHubsMessageListenerContainer bean pour recevoir des messages d’Event Hubs.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Créez une liaison de récepteur de messages avec EventHubsInboundChannelAdapter via le canal de message créé précédemment.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Configurer EventHubsMessageConverter pour personnaliser objectMapper

EventHubsMessageConverter est fait en tant que bean configurable pour permettre aux utilisateurs de personnaliser ObjectMapper.

Prise en charge des consommateurs Batch

Pour consommer des messages à partir d’Event Hubs par lots, il est similaire à l’exemple ci-dessus, en plus des utilisateurs doivent définir les options de configuration associées au traitement par lots pour EventHubsInboundChannelAdapter.

Lorsque vous créez EventHubsInboundChannelAdapter, le mode écouteur doit être défini comme BATCH. Lorsque vous créez un haricot de EventHubsMessageListenerContainer, définissez le mode de point de contrôle comme MANUAL ou BATCH, et les options de traitement par lots peuvent être configurées en fonction des besoins.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

En-têtes de message Event Hubs

Le tableau suivant illustre la façon dont les propriétés de message Event Hubs sont mappées aux en-têtes de message Spring. Pour Azure Event Hubs, le message est appelé event.

Mappage entre event Hubs Message / Propriétés d’événement et en-têtes de message Spring en mode d’écoute d’enregistrement :

Propriétés d’événement Event Hubs Constantes d’en-tête de message Spring Type Description
Heure en file d’attente EventHubsHeaders#ENQUEUED_TIME Instant Instant, en UTC, de l’heure à laquelle l’événement a été mis en file d’attente dans la partition Event Hub.
Compenser EventHubsHeaders#OFFSET Long Décalage de l’événement lorsqu’il a été reçu de la partition Event Hub associée.
Clé de partition AzureHeaders#PARTITION_KEY Corde Clé de hachage de partition si elle a été définie lors de la publication initiale de l’événement.
Partition ID AzureHeaders#RAW_PARTITION_ID Corde ID de partition du hub d’événements.
Numéro de séquence EventHubsHeaders#SEQUENCE_NUMBER Long Numéro de séquence affecté à l’événement lorsqu’il a été mis en file d’attente dans la partition Event Hub associée.
Dernières propriétés d’événement en file d’attente EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties Propriétés du dernier événement mis en file d’attente dans cette partition.
NA AzureHeaders#CHECKPOINTER Point de contrôle En-tête du point de contrôle du message spécifique.

Les utilisateurs peuvent analyser les en-têtes de message pour les informations associées de chaque événement. Pour définir un en-tête de message pour l’événement, tous les en-têtes personnalisés sont placés en tant que propriété d’application d’un événement, où l’en-tête est défini comme clé de propriété. Lorsque les événements sont reçus d’Event Hubs, toutes les propriétés de l’application sont converties en en-tête de message.

Note

Les en-têtes de message de la clé de partition, l’heure mise en file d’attente, le décalage et le numéro de séquence ne sont pas pris en charge pour être définis manuellement.

Lorsque le mode consommateur par lots est activé, les en-têtes spécifiques des messages par lots sont répertoriés ci-dessous, qui contient une liste de valeurs de chaque événement Event Hubs unique.

Mappage entre event Hubs Message / Propriétés d’événement et en-têtes de message Spring en mode Écouteur Batch :

Propriétés d’événement Event Hubs Constantes d’en-tête de message Spring Batch Type Description
Heure en file d’attente EventHubsHeaders#ENQUEUED_TIME Liste des instantanés Liste de l’instant, au format UTC, de l’heure à laquelle chaque événement a été mis en file d’attente dans la partition Event Hub.
Compenser EventHubsHeaders#OFFSET Liste de longs Liste du décalage de chaque événement lorsqu’il a été reçu de la partition Event Hub associée.
Clé de partition AzureHeaders#PARTITION_KEY Liste de chaînes Liste de la clé de hachage de partition si elle a été définie lors de la publication initiale de chaque événement.
Numéro de séquence EventHubsHeaders#SEQUENCE_NUMBER Liste de longs Liste du numéro de séquence affecté à chaque événement lorsqu’il a été mis en file d’attente dans la partition Event Hub associée.
Propriétés système EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Liste des mappages Liste des propriétés système de chaque événement.
Propriétés de l’application EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Liste des mappages Liste des propriétés d’application de chaque événement, où tous les en-têtes de message personnalisés ou propriétés d’événement sont placés.

Note

Lors de la publication de messages, tous les en-têtes de lot ci-dessus sont supprimés des messages s’il existe.

Échantillons

Pour plus d’informations, consultez le dépôt azure-spring-boot-samples sur GitHub.

Intégration spring à Azure Service Bus

Concepts clés

Spring Integration permet une messagerie légère dans les applications Spring et prend en charge l’intégration avec des systèmes externes via des adaptateurs déclaratifs.

Le projet d’extension Spring Integration pour Azure Service Bus fournit des adaptateurs de canal entrants et sortants pour Azure Service Bus.

Note

Les API de prise en charge completFuture ont été déconseillées à partir de la version 2.10.0 et sont remplacées par Reactor Core à partir de la version 4.0.0. Pour plus d’informations, consultez Javadoc.

Configuration des dépendances

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Configuration

Ce démarrage fournit les 2 parties suivantes des options de configuration :

Propriétés de configuration de connexion

Cette section contient les options de configuration utilisées pour la connexion à Azure Service Bus.

Note

Si vous choisissez d’utiliser un principal de sécurité pour authentifier et autoriser avec Microsoft Entra ID pour accéder à une ressource Azure, consultez Autoriser l’accès avec l’ID Microsoft Entra pour vous assurer que le principal de sécurité a reçu l’autorisation suffisante pour accéder à la ressource Azure.

Propriétés configurables de connexion de spring-cloud-azure-starter-integration-servicebus :

Propriété Type Description
spring.cloud.azure.servicebus.enabled booléen Indique si Azure Service Bus est activé.
spring.cloud.azure.servicebus.connection-string Corde Valeur de chaîne de connexion de l’espace de noms Service Bus.
spring.cloud.azure.servicebus.custom-endpoint-address Corde Adresse de point de terminaison personnalisée à utiliser lors de la connexion à Service Bus.
spring.cloud.azure.servicebus.namespace Corde Valeur de l’espace de noms Service Bus, qui est le préfixe du nom de domaine complet. Un nom de domaine complet doit être composé de NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Corde Nom de domaine d’une valeur d’espace de noms Azure Service Bus.

Propriétés de configuration du processeur Service Bus

Le ServiceBusInboundChannelAdapter utilise le ServiceBusProcessorClient pour consommer des messages, pour configurer les propriétés globales d’un ServiceBusProcessorClient, les développeurs peuvent utiliser ServiceBusContainerProperties pour la configuration. Consultez la section suivante sur l’utilisation de ServiceBusInboundChannelAdapter.

Utilisation de base

Envoyer des messages à Azure Service Bus

  1. Renseignez les options de configuration des informations d’identification.

    • Pour les informations d’identification sous forme de chaîne de connexion, configurez les propriétés suivantes dans votre fichier application.yml :

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      

      Note

      Microsoft recommande d’utiliser le flux d’authentification le plus sécurisé disponible. Le flux d'authentification décrit dans cette procédure, comme pour les bases de données, les caches, la messagerie ou les services IA, demande un degré de confiance très élevé dans l'application et comporte des risques non présents dans d'autres flux. Utilisez ce flux uniquement lorsque des options plus sécurisées, telles que les identités managées pour les connexions sans mot de passe ou sans clé, ne sont pas viables. Pour les opérations d’ordinateur local, préférez les identités utilisateur pour les connexions sans mot de passe ou sans clé.

    • Pour les informations d’identification en tant qu’identités managées, configurez les propriétés suivantes dans votre fichier application.yml :

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Note

Les valeurs autorisées pour tenant-id sont : common, organizations, consumersou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez le Utilisé le point de terminaison incorrect (comptes personnels et d’organisation) section Erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans ledu locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur Microsoft Entra ID.

  • Pour les informations d’identification en tant que principal de service, configurez les propriétés suivantes dans votre fichier application.yml :

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Note

Les valeurs autorisées pour tenant-id sont : common, organizations, consumersou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez le Utilisé le point de terminaison incorrect (comptes personnels et d’organisation) section Erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans ledu locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur Microsoft Entra ID.

  1. Créez DefaultMessageHandler avec le ServiceBusTemplate bean pour envoyer des messages à Service Bus, définissez le type d’entité pour ServiceBusTemplate. Cet exemple utilise la file d’attente Service Bus comme exemple.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Créez une liaison de passerelle de message avec le gestionnaire de messages ci-dessus via un canal de message.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Envoyez des messages à l’aide de la passerelle.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Recevoir des messages d’Azure Service Bus

  1. Renseignez les options de configuration des informations d’identification.

  2. Créez unan de canal de message en tant que canal d’entrée.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Créez ServiceBusInboundChannelAdapter avec le ServiceBusMessageListenerContainer bean pour recevoir des messages vers Service Bus. Cet exemple utilise la file d’attente Service Bus comme exemple.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Créez une liaison de récepteur de messages avec ServiceBusInboundChannelAdapter via le canal de message que nous avons créé précédemment.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Configurer ServiceBusMessageConverter pour personnaliser objectMapper

ServiceBusMessageConverter est fait en tant que bean configurable pour permettre aux utilisateurs de personnaliser ObjectMapper.

En-têtes de message Service Bus

Pour certains en-têtes Service Bus qui peuvent être mappés à plusieurs constantes d’en-tête Spring, la priorité des différents en-têtes Spring est répertoriée.

Mappage entre les en-têtes Service Bus et les en-têtes Spring :

En-têtes et propriétés des messages Service Bus Constantes d’en-tête de message Spring Type Paramétrable Description
Type de contenu MessageHeaders#CONTENT_TYPE Corde Oui Descripteur de type de contenu du message RFC2045.
ID de corrélation ServiceBusMessageHeaders#CORRELATION_ID Corde Oui ID de corrélation du message
Message ID ServiceBusMessageHeaders#MESSAGE_ID Corde Oui L’ID de message du message, cet en-tête a une priorité supérieure à MessageHeaders#ID.
Message ID MessageHeaders#ID UUID Oui L’ID de message du message, cet en-tête a une priorité inférieure à ServiceBusMessageHeaders#MESSAGE_ID.
Clé de partition ServiceBusMessageHeaders#PARTITION_KEY Corde Oui Clé de partition pour l’envoi du message à une entité partitionnée.
Répondre MessageHeaders#REPLY_CHANNEL Corde Oui Adresse d’une entité à laquelle envoyer des réponses.
Répondre à l’ID de session ServiceBusMessageHeaders#REPLY_TO_SESSION_ID Corde Oui Valeur de propriété ReplyToGroupId du message.
Heure de file d’attente planifiée utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Oui Date/heure à laquelle le message doit être mis en file d’attente dans Service Bus, cet en-tête a une priorité supérieure à AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE.
Heure de file d’attente planifiée utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Entier Oui Date/heure à laquelle le message doit être mis en file d’attente dans Service Bus, cet en-tête a une priorité inférieure à ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME.
Session ID ServiceBusMessageHeaders#SESSION_ID Corde Oui IDentifier de session pour une entité prenant en charge la session.
Durée de vie ServiceBusMessageHeaders#TIME_TO_LIVE Durée Oui Durée avant l’expiration de ce message.
À ServiceBusMessageHeaders#TO Corde Oui Adresse « à » du message, réservée à une utilisation ultérieure dans les scénarios de routage et actuellement ignorée par le répartiteur lui-même.
Objet ServiceBusMessageHeaders#SUBJECT Corde Oui Objet du message.
Description de l’erreur de lettre morte ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION Corde Non Description d’un message qui a été lettre morte.
Raison de la lettre morte ServiceBusMessageHeaders#DEAD_LETTER_REASON Corde Non La raison pour laquelle un message a été mis en lettres mortes.
Source de lettres mortes ServiceBusMessageHeaders#DEAD_LETTER_SOURCE Corde Non Entité dans laquelle le message a été mis en lettres mortes.
Nombre de livraisons ServiceBusMessageHeaders#DELIVERY_COUNT long Non Nombre de fois où ce message a été remis aux clients.
Numéro de séquence mis en file d’attente ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER long Non Numéro de séquence mis en file d’attente affecté à un message par Service Bus.
Heure en file d’attente ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime Non Date à laquelle ce message a été mis en file d’attente dans Service Bus.
Expire à ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime Non Date d’expiration de ce message.
Jeton de verrouillage ServiceBusMessageHeaders#LOCK_TOKEN Corde Non Jeton de verrouillage pour le message actuel.
Verrouillé jusqu’à ce que ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime Non Date d’expiration du verrou de ce message.
Numéro de séquence ServiceBusMessageHeaders#SEQUENCE_NUMBER long Non Numéro unique affecté à un message par Service Bus.
État ServiceBusMessageHeaders#STATE ServiceBusMessageState Non État du message, qui peut être actif, différé ou planifié.

Prise en charge des clés de partition

Ce démarrage prend en charge partitionnement Service Bus en autorisant la définition de la clé de partition et de l’ID de session dans l’en-tête du message. Cette section explique comment définir la clé de partition pour les messages.

Recommandé : utilisez ServiceBusMessageHeaders.PARTITION_KEY comme clé de l’en-tête.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Non recommandé mais actuellement pris en charge : AzureHeaders.PARTITION_KEY comme clé de l’en-tête.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Note

Lorsque les ServiceBusMessageHeaders.PARTITION_KEY et les AzureHeaders.PARTITION_KEY sont définis dans les en-têtes de message, ServiceBusMessageHeaders.PARTITION_KEY est préférable.

Prise en charge des sessions

Cet exemple montre comment définir manuellement l’ID de session d’un message dans l’application.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Note

Lorsque le ServiceBusMessageHeaders.SESSION_ID est défini dans les en-têtes de message et qu’un en-tête de ServiceBusMessageHeaders.PARTITION_KEY différent est également défini, la valeur de l’ID de session sera finalement utilisée pour remplacer la valeur de la clé de partition.

Personnaliser les propriétés du client Service Bus

Les développeurs peuvent utiliser AzureServiceClientBuilderCustomizer pour personnaliser les propriétés du client Service Bus. L’exemple suivant personnalise la propriété sessionIdleTimeout dans ServiceBusClientBuilder:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Échantillons

Pour plus d’informations, consultez le dépôt azure-spring-boot-samples sur GitHub.

Intégration spring à la file d’attente stockage Azure

Concepts clés

Stockage File d’attente Azure est un service permettant de stocker un grand nombre de messages. Vous accédez à des messages depuis n’importe où dans le monde via des appels authentifiés à l’aide de HTTP ou HTTPS. Un message de file d’attente peut comporter jusqu’à 64 Ko de taille. Une file d’attente peut contenir des millions de messages, jusqu’à la limite totale de capacité d’un compte de stockage. Les files d’attente sont couramment utilisées pour créer un backlog de travail à traiter de manière asynchrone.

Configuration des dépendances

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Configuration

Ce démarrage fournit les options de configuration suivantes :

Propriétés de configuration de connexion

Cette section contient les options de configuration utilisées pour la connexion à la file d’attente stockage Azure.

Note

Si vous choisissez d’utiliser un principal de sécurité pour authentifier et autoriser avec Microsoft Entra ID pour accéder à une ressource Azure, consultez Autoriser l’accès avec l’ID Microsoft Entra pour vous assurer que le principal de sécurité a reçu l’autorisation suffisante pour accéder à la ressource Azure.

Propriétés configurables de connexion de spring-cloud-azure-starter-integration-storage-queue :

Propriété Type Description
spring.cloud.azure.storage.queue.enabled booléen Indique si une file d’attente stockage Azure est activée.
spring.cloud.azure.storage.queue.connection-string Corde Valeur de la chaîne de connexion de l’espace de noms de file d’attente du stockage.
spring.cloud.azure.storage.queue.accountName Corde Nom du compte file d’attente de stockage.
spring.cloud.azure.storage.queue.accountKey Corde Clé de compte de file d’attente de stockage.
spring.cloud.azure.storage.queue.endpoint Corde Point de terminaison du service file d’attente de stockage.
spring.cloud.azure.storage.queue.sasToken Corde Informations d’identification du jeton Sas
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion QueueServiceVersion utilisée lors de l’envoi de requêtes d’API.
spring.cloud.azure.storage.queue.messageEncoding Corde Encodage des messages de file d’attente.

Utilisation de base

Envoyer des messages à la file d’attente stockage Azure

  1. Renseignez les options de configuration des informations d’identification.

    • Pour les informations d’identification sous forme de chaîne de connexion, configurez les propriétés suivantes dans votre fichier application.yml :

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
      

      Note

      Microsoft recommande d’utiliser le flux d’authentification le plus sécurisé disponible. Le flux d'authentification décrit dans cette procédure, comme pour les bases de données, les caches, la messagerie ou les services IA, demande un degré de confiance très élevé dans l'application et comporte des risques non présents dans d'autres flux. Utilisez ce flux uniquement lorsque des options plus sécurisées, telles que les identités managées pour les connexions sans mot de passe ou sans clé, ne sont pas viables. Pour les opérations d’ordinateur local, préférez les identités utilisateur pour les connexions sans mot de passe ou sans clé.

    • Pour les informations d’identification en tant qu’identités managées, configurez les propriétés suivantes dans votre fichier application.yml :

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
      

Note

Les valeurs autorisées pour tenant-id sont : common, organizations, consumersou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez le Utilisé le point de terminaison incorrect (comptes personnels et d’organisation) section Erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans ledu locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur Microsoft Entra ID.

  • Pour les informations d’identification en tant que principal de service, configurez les propriétés suivantes dans votre fichier application.yml :

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
    

Note

Les valeurs autorisées pour tenant-id sont : common, organizations, consumersou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez le Utilisé le point de terminaison incorrect (comptes personnels et d’organisation) section Erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans ledu locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur Microsoft Entra ID.

  1. Créez DefaultMessageHandler avec le StorageQueueTemplate bean pour envoyer des messages à la file d’attente de stockage.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Créez une liaison de passerelle de message avec le gestionnaire de messages ci-dessus via un canal de message.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Envoyez des messages à l’aide de la passerelle.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Recevoir des messages de la file d’attente stockage Azure

  1. Renseignez les options de configuration des informations d’identification.

  2. Créez unan de canal de message en tant que canal d’entrée.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Créez StorageQueueMessageSource avec le StorageQueueTemplate bean pour recevoir des messages dans la file d’attente de stockage.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Créez une liaison de récepteur de messages avec StorageQueueMessageSource créée à la dernière étape via le canal de message que nous avons créé précédemment.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Échantillons

Pour plus d’informations, consultez le dépôt azure-spring-boot-samples sur GitHub.