Prise en charge d’Azure Spring Cloud pour Spring Integration
Cet article s’applique à :✅ version 4.19.0 ✅ version 5.20.1
Spring Integration Extension pour Azure fournit des adaptateurs Spring Integration pour les différents services fournis par le sdk Azure pour Java. Nous fournissons la prise en charge de Spring Integration pour ces services Azure : Event Hubs, Service Bus, File d’attente de stockage. Voici une liste des adaptateurs pris en charge :
-
spring-cloud-azure-starter-integration-eventhubs
- pour plus d’informations, consultez Spring Integration avec Azure Event Hubs -
spring-cloud-azure-starter-integration-servicebus
- pour plus d’informations, consultez Spring Integration avec Azure Service Bus -
spring-cloud-azure-starter-integration-storage-queue
- pour plus d’informations, consultez Spring Integration with Azure Storage Queue
Intégration spring à Azure Event Hubs
Concepts clés
Azure Event Hubs est une plateforme de streaming Big Data et un service d’ingestion d’événements. Il peut recevoir et traiter des millions d’événements par seconde. Les données envoyées à un concentrateur d’événements peuvent être transformées et stockées à l’aide de n’importe quel fournisseur d’analyse en temps réel ou d’adaptateurs de traitement par lots/adaptateurs de stockage.
Spring Integration permet une messagerie légère dans les applications Spring et prend en charge l’intégration avec des systèmes externes via des adaptateurs déclaratifs. Ces adaptateurs fournissent un niveau d’abstraction supérieur sur la prise en charge de Spring pour la communication à distance, la messagerie et la planification. Le projet d’extension Spring Integration pour Event Hubs fournit des adaptateurs de canal entrants et sortants et des passerelles pour Azure Event Hubs.
Note
Les API de prise en charge rxJava sont supprimées de la version 4.0.0. Pour plus d’informations, consultez Javadoc.
Groupe de consommateurs
Event Hubs fournit une prise en charge similaire du groupe de consommateurs comme Apache Kafka, mais avec une légère logique différente. Bien que Kafka stocke tous les décalages validés dans le répartiteur, vous devez stocker les décalages des messages Event Hubs traités manuellement. Le Kit de développement logiciel (SDK) Event Hubs fournit la fonction pour stocker ces décalages dans le stockage Azure.
Prise en charge du partitionnement
Event Hubs fournit un concept similaire de partition physique comme Kafka. Mais contrairement à la rééquilibrage automatique de Kafka entre les consommateurs et les partitions, Event Hubs fournit un type de mode préemptif. Le compte de stockage agit en tant que bail pour déterminer la partition détenue par le consommateur. Lorsqu’un nouveau consommateur démarre, il tente de voler certaines partitions de la plupart des consommateurs chargés pour atteindre l’équilibrage de charge de travail.
Pour spécifier la stratégie d’équilibrage de charge, les développeurs peuvent utiliser EventHubsContainerProperties
pour la configuration. Consultez la section suivante pour obtenir un exemple de configuration de EventHubsContainerProperties
.
Prise en charge des consommateurs Batch
Le EventHubsInboundChannelAdapter
prend en charge le mode de consommation par lots. Pour l’activer, les utilisateurs peuvent spécifier le mode d’écouteur comme ListenerMode.BATCH
lors de la construction d’une instance de EventHubsInboundChannelAdapter
.
Lorsqu’elle est activée, un message dont la charge utile est une liste d’événements par lots reçus et transmis au canal en aval. Chaque en-tête de message est également converti en tant que liste, dont le contenu est la valeur d’en-tête associée analysée à partir de chaque événement. Pour les en-têtes communaux de l’ID de partition, le point de contrôle et les dernières propriétés mises en file d’attente, ils sont présentés sous la forme d’une valeur unique pour l’ensemble du lot d’événements partagent le même. Pour plus d’informations, consultez la section Des en-têtes de message Event Hubs.
Note
L’en-tête de point de contrôle existe uniquement lorsque mode manuel point de contrôle est utilisé.
Le point de contrôle du consommateur de lots prend en charge deux modes : BATCH
et MANUAL
.
BATCH
mode est un mode de point de contrôle automatique pour contrôler l’ensemble du lot d’événements une fois qu’ils sont reçus.
MANUAL
mode consiste à contrôler les événements par les utilisateurs. Lorsqu’il est utilisé, le du point de contrôle
La stratégie de consommation de lots peut être spécifiée par les propriétés de max-size
et de max-wait-time
, où max-size
est une propriété nécessaire alors que max-wait-time
est facultatif.
Pour spécifier la stratégie de consommation par lots, les développeurs peuvent utiliser EventHubsContainerProperties
pour la configuration. Consultez la section suivante pour obtenir un exemple de configuration de EventHubsContainerProperties
.
Configuration des dépendances
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
Configuration
Ce démarrage fournit les 3 parties suivantes des options de configuration :
Propriétés de configuration de connexion
Cette section contient les options de configuration utilisées pour la connexion à Azure Event Hubs.
Note
Si vous choisissez d’utiliser un principal de sécurité pour authentifier et autoriser avec Microsoft Entra ID pour accéder à une ressource Azure, consultez Autoriser l’accès avec l’ID Microsoft Entra pour vous assurer que le principal de sécurité a reçu l’autorisation suffisante pour accéder à la ressource Azure.
Propriétés configurables de connexion de spring-cloud-azure-starter-integration-eventhubs :
Propriété | Type | Description |
---|---|---|
spring.cloud.azure.eventhubs.enabled | booléen | Indique si azure Event Hubs est activé. |
spring.cloud.azure.eventhubs.connection-string | Corde | Valeur de chaîne de connexion de l’espace de noms Event Hubs. |
spring.cloud.azure.eventhubs.namespace | Corde | Valeur de l’espace de noms Event Hubs, qui est le préfixe du nom de domaine complet. Un nom de domaine complet doit être composé de NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | Corde | Nom de domaine d’une valeur d’espace de noms Azure Event Hubs. |
spring.cloud.azure.eventhubs.custom-endpoint-address | Corde | Adresse de point de terminaison personnalisée. |
spring.cloud.azure.eventhubs.shared-connection | Booléen | Indique si EventProcessorClient et EventHubProducerAsyncClient sous-jacents utilisent la même connexion. Par défaut, une nouvelle connexion est construite et utilisée pour chaque client Event Hub créé. |
Propriétés de configuration de point de contrôle
Cette section contient les options de configuration pour le service Blobs de stockage, qui est utilisé pour conserver la propriété de partition et les informations de point de contrôle.
Note
À partir de la version 4.0.0, lorsque la propriété de spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists n’est pas activée manuellement, aucun conteneur de stockage n’est créé automatiquement.
Contrôle des propriétés configurables de spring-cloud-azure-starter-integration-eventhubs :
Propriété | Type | Description |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Booléen | Indique s’il faut autoriser la création de conteneurs s’il n’existe pas. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Corde | Nom du compte de stockage. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Corde | Clé d’accès au compte de stockage. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Corde | Nom du conteneur de stockage. |
Les options de configuration courantes du Kit de développement logiciel (SDK) Azure Service sont également configurables pour le magasin de points de contrôle d’objets blob de stockage. Les options de configuration prises en charge sont introduites dans de configuration Azure Spring Cloud et peuvent être configurées avec le préfixe unifié spring.cloud.azure.
ou le préfixe de spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Propriétés de configuration du processeur Event Hub
L'EventHubsInboundChannelAdapter
utilise le EventProcessorClient
pour consommer des messages à partir d’un hub d’événements, pour configurer les propriétés globales d’un EventProcessorClient
, les développeurs peuvent utiliser EventHubsContainerProperties
pour la configuration. Consultez la section suivante sur l’utilisation de EventHubsInboundChannelAdapter
.
Utilisation de base
Envoyer des messages à Azure Event Hubs
Renseignez les options de configuration des informations d’identification.
Pour les informations d’identification sous forme de chaîne de connexion, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: eventhubs: connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
Note
Microsoft recommande d’utiliser le flux d’authentification le plus sécurisé disponible. Le flux d'authentification décrit dans cette procédure, comme pour les bases de données, les caches, la messagerie ou les services IA, demande un degré de confiance très élevé dans l'application et comporte des risques non présents dans d'autres flux. Utilisez ce flux uniquement lorsque des options plus sécurisées, telles que les identités managées pour les connexions sans mot de passe ou sans clé, ne sont pas viables. Pour les opérations d’ordinateur local, préférez les identités utilisateur pour les connexions sans mot de passe ou sans clé.
Pour les informations d’identification en tant qu’identités managées, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Pour les informations d’identification en tant que principal de service, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Note
Les valeurs autorisées pour tenant-id
sont : common
, organizations
, consumers
ou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez le Utilisé le point de terminaison incorrect (comptes personnels et d’organisation) section Erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans ledu locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur Microsoft Entra ID.
Créez
DefaultMessageHandler
avec leEventHubsTemplate
bean pour envoyer des messages à Event Hubs.class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
Créez une liaison de passerelle de message avec le gestionnaire de messages ci-dessus via un canal de message.
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
Envoyez des messages à l’aide de la passerelle.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Recevoir des messages d’Azure Event Hubs
Renseignez les options de configuration des informations d’identification.
Créez unan de canal de message en tant que canal d’entrée.
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
Créez
EventHubsInboundChannelAdapter
avec leEventHubsMessageListenerContainer
bean pour recevoir des messages d’Event Hubs.@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
Créez une liaison de récepteur de messages avec EventHubsInboundChannelAdapter via le canal de message créé précédemment.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Configurer EventHubsMessageConverter pour personnaliser objectMapper
EventHubsMessageConverter
est fait en tant que bean configurable pour permettre aux utilisateurs de personnaliser ObjectMapper.
Prise en charge des consommateurs Batch
Pour consommer des messages à partir d’Event Hubs par lots, il est similaire à l’exemple ci-dessus, en plus des utilisateurs doivent définir les options de configuration associées au traitement par lots pour EventHubsInboundChannelAdapter
.
Lorsque vous créez EventHubsInboundChannelAdapter
, le mode écouteur doit être défini comme BATCH
. Lorsque vous créez un haricot de EventHubsMessageListenerContainer
, définissez le mode de point de contrôle comme MANUAL
ou BATCH
, et les options de traitement par lots peuvent être configurées en fonction des besoins.
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
En-têtes de message Event Hubs
Le tableau suivant illustre la façon dont les propriétés de message Event Hubs sont mappées aux en-têtes de message Spring. Pour Azure Event Hubs, le message est appelé event
.
Mappage entre event Hubs Message / Propriétés d’événement et en-têtes de message Spring en mode d’écoute d’enregistrement :
Propriétés d’événement Event Hubs | Constantes d’en-tête de message Spring | Type | Description |
---|---|---|---|
Heure en file d’attente | EventHubsHeaders#ENQUEUED_TIME | Instant | Instant, en UTC, de l’heure à laquelle l’événement a été mis en file d’attente dans la partition Event Hub. |
Compenser | EventHubsHeaders#OFFSET | Long | Décalage de l’événement lorsqu’il a été reçu de la partition Event Hub associée. |
Clé de partition | AzureHeaders#PARTITION_KEY | Corde | Clé de hachage de partition si elle a été définie lors de la publication initiale de l’événement. |
Partition ID | AzureHeaders#RAW_PARTITION_ID | Corde | ID de partition du hub d’événements. |
Numéro de séquence | EventHubsHeaders#SEQUENCE_NUMBER | Long | Numéro de séquence affecté à l’événement lorsqu’il a été mis en file d’attente dans la partition Event Hub associée. |
Dernières propriétés d’événement en file d’attente | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | Propriétés du dernier événement mis en file d’attente dans cette partition. |
NA | AzureHeaders#CHECKPOINTER | Point de contrôle | En-tête du point de contrôle du message spécifique. |
Les utilisateurs peuvent analyser les en-têtes de message pour les informations associées de chaque événement. Pour définir un en-tête de message pour l’événement, tous les en-têtes personnalisés sont placés en tant que propriété d’application d’un événement, où l’en-tête est défini comme clé de propriété. Lorsque les événements sont reçus d’Event Hubs, toutes les propriétés de l’application sont converties en en-tête de message.
Note
Les en-têtes de message de la clé de partition, l’heure mise en file d’attente, le décalage et le numéro de séquence ne sont pas pris en charge pour être définis manuellement.
Lorsque le mode consommateur par lots est activé, les en-têtes spécifiques des messages par lots sont répertoriés ci-dessous, qui contient une liste de valeurs de chaque événement Event Hubs unique.
Mappage entre event Hubs Message / Propriétés d’événement et en-têtes de message Spring en mode Écouteur Batch :
Propriétés d’événement Event Hubs | Constantes d’en-tête de message Spring Batch | Type | Description |
---|---|---|---|
Heure en file d’attente | EventHubsHeaders#ENQUEUED_TIME | Liste des instantanés | Liste de l’instant, au format UTC, de l’heure à laquelle chaque événement a été mis en file d’attente dans la partition Event Hub. |
Compenser | EventHubsHeaders#OFFSET | Liste de longs | Liste du décalage de chaque événement lorsqu’il a été reçu de la partition Event Hub associée. |
Clé de partition | AzureHeaders#PARTITION_KEY | Liste de chaînes | Liste de la clé de hachage de partition si elle a été définie lors de la publication initiale de chaque événement. |
Numéro de séquence | EventHubsHeaders#SEQUENCE_NUMBER | Liste de longs | Liste du numéro de séquence affecté à chaque événement lorsqu’il a été mis en file d’attente dans la partition Event Hub associée. |
Propriétés système | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | Liste des mappages | Liste des propriétés système de chaque événement. |
Propriétés de l’application | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | Liste des mappages | Liste des propriétés d’application de chaque événement, où tous les en-têtes de message personnalisés ou propriétés d’événement sont placés. |
Note
Lors de la publication de messages, tous les en-têtes de lot ci-dessus sont supprimés des messages s’il existe.
Échantillons
Pour plus d’informations, consultez le dépôt azure-spring-boot-samples sur GitHub.
Intégration spring à Azure Service Bus
Concepts clés
Spring Integration permet une messagerie légère dans les applications Spring et prend en charge l’intégration avec des systèmes externes via des adaptateurs déclaratifs.
Le projet d’extension Spring Integration pour Azure Service Bus fournit des adaptateurs de canal entrants et sortants pour Azure Service Bus.
Note
Les API de prise en charge completFuture ont été déconseillées à partir de la version 2.10.0 et sont remplacées par Reactor Core à partir de la version 4.0.0. Pour plus d’informations, consultez Javadoc.
Configuration des dépendances
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
Configuration
Ce démarrage fournit les 2 parties suivantes des options de configuration :
Propriétés de configuration de connexion
Cette section contient les options de configuration utilisées pour la connexion à Azure Service Bus.
Note
Si vous choisissez d’utiliser un principal de sécurité pour authentifier et autoriser avec Microsoft Entra ID pour accéder à une ressource Azure, consultez Autoriser l’accès avec l’ID Microsoft Entra pour vous assurer que le principal de sécurité a reçu l’autorisation suffisante pour accéder à la ressource Azure.
Propriétés configurables de connexion de spring-cloud-azure-starter-integration-servicebus :
Propriété | Type | Description |
---|---|---|
spring.cloud.azure.servicebus.enabled | booléen | Indique si Azure Service Bus est activé. |
spring.cloud.azure.servicebus.connection-string | Corde | Valeur de chaîne de connexion de l’espace de noms Service Bus. |
spring.cloud.azure.servicebus.custom-endpoint-address | Corde | Adresse de point de terminaison personnalisée à utiliser lors de la connexion à Service Bus. |
spring.cloud.azure.servicebus.namespace | Corde | Valeur de l’espace de noms Service Bus, qui est le préfixe du nom de domaine complet. Un nom de domaine complet doit être composé de NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | Corde | Nom de domaine d’une valeur d’espace de noms Azure Service Bus. |
Propriétés de configuration du processeur Service Bus
Le ServiceBusInboundChannelAdapter
utilise le ServiceBusProcessorClient
pour consommer des messages, pour configurer les propriétés globales d’un ServiceBusProcessorClient
, les développeurs peuvent utiliser ServiceBusContainerProperties
pour la configuration. Consultez la section suivante sur l’utilisation de ServiceBusInboundChannelAdapter
.
Utilisation de base
Envoyer des messages à Azure Service Bus
Renseignez les options de configuration des informations d’identification.
Pour les informations d’identification sous forme de chaîne de connexion, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
Note
Microsoft recommande d’utiliser le flux d’authentification le plus sécurisé disponible. Le flux d'authentification décrit dans cette procédure, comme pour les bases de données, les caches, la messagerie ou les services IA, demande un degré de confiance très élevé dans l'application et comporte des risques non présents dans d'autres flux. Utilisez ce flux uniquement lorsque des options plus sécurisées, telles que les identités managées pour les connexions sans mot de passe ou sans clé, ne sont pas viables. Pour les opérations d’ordinateur local, préférez les identités utilisateur pour les connexions sans mot de passe ou sans clé.
Pour les informations d’identification en tant qu’identités managées, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Note
Les valeurs autorisées pour tenant-id
sont : common
, organizations
, consumers
ou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez le Utilisé le point de terminaison incorrect (comptes personnels et d’organisation) section Erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans ledu locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur Microsoft Entra ID.
Pour les informations d’identification en tant que principal de service, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Note
Les valeurs autorisées pour tenant-id
sont : common
, organizations
, consumers
ou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez le Utilisé le point de terminaison incorrect (comptes personnels et d’organisation) section Erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans ledu locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur Microsoft Entra ID.
Créez
DefaultMessageHandler
avec leServiceBusTemplate
bean pour envoyer des messages à Service Bus, définissez le type d’entité pour ServiceBusTemplate. Cet exemple utilise la file d’attente Service Bus comme exemple.class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Créez une liaison de passerelle de message avec le gestionnaire de messages ci-dessus via un canal de message.
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
Envoyez des messages à l’aide de la passerelle.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Recevoir des messages d’Azure Service Bus
Renseignez les options de configuration des informations d’identification.
Créez unan de canal de message en tant que canal d’entrée.
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Créez
ServiceBusInboundChannelAdapter
avec leServiceBusMessageListenerContainer
bean pour recevoir des messages vers Service Bus. Cet exemple utilise la file d’attente Service Bus comme exemple.@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
Créez une liaison de récepteur de messages avec
ServiceBusInboundChannelAdapter
via le canal de message que nous avons créé précédemment.class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Configurer ServiceBusMessageConverter pour personnaliser objectMapper
ServiceBusMessageConverter
est fait en tant que bean configurable pour permettre aux utilisateurs de personnaliser ObjectMapper
.
En-têtes de message Service Bus
Pour certains en-têtes Service Bus qui peuvent être mappés à plusieurs constantes d’en-tête Spring, la priorité des différents en-têtes Spring est répertoriée.
Mappage entre les en-têtes Service Bus et les en-têtes Spring :
En-têtes et propriétés des messages Service Bus | Constantes d’en-tête de message Spring | Type | Paramétrable | Description |
---|---|---|---|---|
Type de contenu | MessageHeaders#CONTENT_TYPE |
Corde | Oui | Descripteur de type de contenu du message RFC2045. |
ID de corrélation | ServiceBusMessageHeaders#CORRELATION_ID |
Corde | Oui | ID de corrélation du message |
Message ID | ServiceBusMessageHeaders#MESSAGE_ID |
Corde | Oui | L’ID de message du message, cet en-tête a une priorité supérieure à MessageHeaders#ID . |
Message ID | MessageHeaders#ID |
UUID | Oui | L’ID de message du message, cet en-tête a une priorité inférieure à ServiceBusMessageHeaders#MESSAGE_ID . |
Clé de partition | ServiceBusMessageHeaders#PARTITION_KEY |
Corde | Oui | Clé de partition pour l’envoi du message à une entité partitionnée. |
Répondre | MessageHeaders#REPLY_CHANNEL |
Corde | Oui | Adresse d’une entité à laquelle envoyer des réponses. |
Répondre à l’ID de session | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
Corde | Oui | Valeur de propriété ReplyToGroupId du message. |
Heure de file d’attente planifiée utc | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | Oui | Date/heure à laquelle le message doit être mis en file d’attente dans Service Bus, cet en-tête a une priorité supérieure à AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE . |
Heure de file d’attente planifiée utc | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
Entier | Oui | Date/heure à laquelle le message doit être mis en file d’attente dans Service Bus, cet en-tête a une priorité inférieure à ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME . |
Session ID | ServiceBusMessageHeaders#SESSION_ID |
Corde | Oui | IDentifier de session pour une entité prenant en charge la session. |
Durée de vie | ServiceBusMessageHeaders#TIME_TO_LIVE |
Durée | Oui | Durée avant l’expiration de ce message. |
À | ServiceBusMessageHeaders#TO |
Corde | Oui | Adresse « à » du message, réservée à une utilisation ultérieure dans les scénarios de routage et actuellement ignorée par le répartiteur lui-même. |
Objet | ServiceBusMessageHeaders#SUBJECT |
Corde | Oui | Objet du message. |
Description de l’erreur de lettre morte | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
Corde | Non | Description d’un message qui a été lettre morte. |
Raison de la lettre morte | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
Corde | Non | La raison pour laquelle un message a été mis en lettres mortes. |
Source de lettres mortes | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
Corde | Non | Entité dans laquelle le message a été mis en lettres mortes. |
Nombre de livraisons | ServiceBusMessageHeaders#DELIVERY_COUNT |
long | Non | Nombre de fois où ce message a été remis aux clients. |
Numéro de séquence mis en file d’attente | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
long | Non | Numéro de séquence mis en file d’attente affecté à un message par Service Bus. |
Heure en file d’attente | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | Non | Date à laquelle ce message a été mis en file d’attente dans Service Bus. |
Expire à | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | Non | Date d’expiration de ce message. |
Jeton de verrouillage | ServiceBusMessageHeaders#LOCK_TOKEN |
Corde | Non | Jeton de verrouillage pour le message actuel. |
Verrouillé jusqu’à ce que | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | Non | Date d’expiration du verrou de ce message. |
Numéro de séquence | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
long | Non | Numéro unique affecté à un message par Service Bus. |
État | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | Non | État du message, qui peut être actif, différé ou planifié. |
Prise en charge des clés de partition
Ce démarrage prend en charge partitionnement Service Bus en autorisant la définition de la clé de partition et de l’ID de session dans l’en-tête du message. Cette section explique comment définir la clé de partition pour les messages.
Recommandé : utilisez ServiceBusMessageHeaders.PARTITION_KEY
comme clé de l’en-tête.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Non recommandé mais actuellement pris en charge : AzureHeaders.PARTITION_KEY
comme clé de l’en-tête.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Note
Lorsque les ServiceBusMessageHeaders.PARTITION_KEY
et les AzureHeaders.PARTITION_KEY
sont définis dans les en-têtes de message, ServiceBusMessageHeaders.PARTITION_KEY
est préférable.
Prise en charge des sessions
Cet exemple montre comment définir manuellement l’ID de session d’un message dans l’application.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Note
Lorsque le ServiceBusMessageHeaders.SESSION_ID
est défini dans les en-têtes de message et qu’un en-tête de ServiceBusMessageHeaders.PARTITION_KEY
différent est également défini, la valeur de l’ID de session sera finalement utilisée pour remplacer la valeur de la clé de partition.
Personnaliser les propriétés du client Service Bus
Les développeurs peuvent utiliser AzureServiceClientBuilderCustomizer
pour personnaliser les propriétés du client Service Bus. L’exemple suivant personnalise la propriété sessionIdleTimeout
dans ServiceBusClientBuilder
:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Échantillons
Pour plus d’informations, consultez le dépôt azure-spring-boot-samples sur GitHub.
Intégration spring à la file d’attente stockage Azure
Concepts clés
Stockage File d’attente Azure est un service permettant de stocker un grand nombre de messages. Vous accédez à des messages depuis n’importe où dans le monde via des appels authentifiés à l’aide de HTTP ou HTTPS. Un message de file d’attente peut comporter jusqu’à 64 Ko de taille. Une file d’attente peut contenir des millions de messages, jusqu’à la limite totale de capacité d’un compte de stockage. Les files d’attente sont couramment utilisées pour créer un backlog de travail à traiter de manière asynchrone.
Configuration des dépendances
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
Configuration
Ce démarrage fournit les options de configuration suivantes :
Propriétés de configuration de connexion
Cette section contient les options de configuration utilisées pour la connexion à la file d’attente stockage Azure.
Note
Si vous choisissez d’utiliser un principal de sécurité pour authentifier et autoriser avec Microsoft Entra ID pour accéder à une ressource Azure, consultez Autoriser l’accès avec l’ID Microsoft Entra pour vous assurer que le principal de sécurité a reçu l’autorisation suffisante pour accéder à la ressource Azure.
Propriétés configurables de connexion de spring-cloud-azure-starter-integration-storage-queue :
Propriété | Type | Description |
---|---|---|
spring.cloud.azure.storage.queue.enabled | booléen | Indique si une file d’attente stockage Azure est activée. |
spring.cloud.azure.storage.queue.connection-string | Corde | Valeur de la chaîne de connexion de l’espace de noms de file d’attente du stockage. |
spring.cloud.azure.storage.queue.accountName | Corde | Nom du compte file d’attente de stockage. |
spring.cloud.azure.storage.queue.accountKey | Corde | Clé de compte de file d’attente de stockage. |
spring.cloud.azure.storage.queue.endpoint | Corde | Point de terminaison du service file d’attente de stockage. |
spring.cloud.azure.storage.queue.sasToken | Corde | Informations d’identification du jeton Sas |
spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | QueueServiceVersion utilisée lors de l’envoi de requêtes d’API. |
spring.cloud.azure.storage.queue.messageEncoding | Corde | Encodage des messages de file d’attente. |
Utilisation de base
Envoyer des messages à la file d’attente stockage Azure
Renseignez les options de configuration des informations d’identification.
Pour les informations d’identification sous forme de chaîne de connexion, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: storage: queue: connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
Note
Microsoft recommande d’utiliser le flux d’authentification le plus sécurisé disponible. Le flux d'authentification décrit dans cette procédure, comme pour les bases de données, les caches, la messagerie ou les services IA, demande un degré de confiance très élevé dans l'application et comporte des risques non présents dans d'autres flux. Utilisez ce flux uniquement lorsque des options plus sécurisées, telles que les identités managées pour les connexions sans mot de passe ou sans clé, ne sont pas viables. Pour les opérations d’ordinateur local, préférez les identités utilisateur pour les connexions sans mot de passe ou sans clé.
Pour les informations d’identification en tant qu’identités managées, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Note
Les valeurs autorisées pour tenant-id
sont : common
, organizations
, consumers
ou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez le Utilisé le point de terminaison incorrect (comptes personnels et d’organisation) section Erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans ledu locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur Microsoft Entra ID.
Pour les informations d’identification en tant que principal de service, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Note
Les valeurs autorisées pour tenant-id
sont : common
, organizations
, consumers
ou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez le Utilisé le point de terminaison incorrect (comptes personnels et d’organisation) section Erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans ledu locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur Microsoft Entra ID.
Créez
DefaultMessageHandler
avec leStorageQueueTemplate
bean pour envoyer des messages à la file d’attente de stockage.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Créez une liaison de passerelle de message avec le gestionnaire de messages ci-dessus via un canal de message.
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
Envoyez des messages à l’aide de la passerelle.
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
Recevoir des messages de la file d’attente stockage Azure
Renseignez les options de configuration des informations d’identification.
Créez unan de canal de message en tant que canal d’entrée.
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Créez
StorageQueueMessageSource
avec leStorageQueueTemplate
bean pour recevoir des messages dans la file d’attente de stockage.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
Créez une liaison de récepteur de messages avec StorageQueueMessageSource créée à la dernière étape via le canal de message que nous avons créé précédemment.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
Échantillons
Pour plus d’informations, consultez le dépôt azure-spring-boot-samples sur GitHub.