Поддержка Spring Cloud Azure для Spring Integration
Эта статья относится к:✅ версии 4.19.0 ✅ версии 5.19.0
Расширение Spring Integration для Azure предоставляет адаптеры Spring Integration для различных служб, предоставляемых пакетом SDK Azure для Java. Мы предоставляем поддержку Spring Integration для этих служб Azure: Центры событий, служебная шина, очередь хранилища. Ниже приведен список поддерживаемых адаптеров:
-
spring-cloud-azure-starter-integration-eventhubs
. Дополнительные сведения см. в статье Spring Integration с Центрами событий Azure -
spring-cloud-azure-starter-integration-servicebus
. Дополнительные сведения см. в статье Spring Integration с служебной шиной Azure -
spring-cloud-azure-starter-integration-storage-queue
. Дополнительные сведения см. в статье Spring Integration с очередью службы хранилища Azure
Интеграция Spring с Центрами событий Azure
Основные понятия
Центры событий Azure — это платформа потоковой передачи больших данных и служба приема событий. Он может получать и обрабатывать миллионы событий в секунду. Данные, отправленные в концентратор событий, можно преобразовать и сохранить с помощью любого поставщика аналитики в режиме реального времени или адаптеров пакетной обработки или хранилища.
Spring Integration обеспечивает упрощенное обмен сообщениями в приложениях Spring и поддерживает интеграцию с внешними системами с помощью декларативных адаптеров. Эти адаптеры обеспечивают более высокий уровень абстракции по сравнению с поддержкой Spring для удаленного взаимодействия, обмена сообщениями и планирования. Проект расширения Spring Integration for Event Hubs предоставляет адаптеры и шлюзы для центров событий Azure для входящих и исходящих каналов.
Заметка
API-интерфейсы поддержки RxJava удаляются из версии 4.0.0. Дополнительные сведения см. в Javadoc.
Группа потребителей
Центры событий обеспечивают аналогичную поддержку группы потребителей, как Apache Kafka, но с небольшой другой логикой. Пока Kafka сохраняет все зафиксированные смещения в брокере, необходимо хранить смещения сообщений Центров событий, обрабатываемых вручную. Пакет SDK центров событий предоставляет функцию для хранения таких смещения в службе хранилища Azure.
Поддержка секционирования
Центры событий предоставляют аналогичную концепцию физической секции, как Kafka. Но в отличие от автоматической балансировки Kafka между потребителями и секциями, Центры событий предоставляют вид предварительного режима. Учетная запись хранения выступает в качестве аренды, чтобы определить, какой раздел принадлежит определенному потребителю. Когда начинается новый потребитель, он попытается украсть некоторые секции из большинства загруженных потребителей для достижения балансировки рабочей нагрузки.
Чтобы указать стратегию балансировки нагрузки, разработчики могут использовать EventHubsContainerProperties
для конфигурации. Пример настройки
Поддержка потребителей пакетной службы
EventHubsInboundChannelAdapter
поддерживает режим пакетного использования. Чтобы включить его, пользователи могут указать режим прослушивателя как ListenerMode.BATCH
при создании экземпляра EventHubsInboundChannelAdapter
.
При включении сообщение, из которого полезные данные являются списком пакетных событий, будут получены и переданы в подчиненный канал. Каждый заголовок сообщения также преобразуется в список, из которого содержимое является соответствующим значением заголовка, проанализированным из каждого события. Для общих заголовков идентификатора секции, контрольных точек и последних вложенных свойств они отображаются в виде одного значения для всего пакета событий, совместного с одним и тем же. Дополнительные сведения см. в разделе Заголовки сообщений Центров событий.
Заметка
Заголовок контрольной точки существует только в том случае, если используется режим MANUAL контрольных точек.
Контрольная точка пакетного потребителя поддерживает два режима: BATCH
и MANUAL
.
BATCH
режим — это режим автоматической контрольной точки для проверки всего пакета событий вместе после их получения.
MANUAL
режиме — контрольные точки событий пользователями. При использовании контрольный элемент будет передан в заголовок сообщения, и пользователи могут использовать его для выполнения контрольных точек.
Политику использования пакетной службы можно указать свойствами max-size
и max-wait-time
, где max-size
является необходимым свойством, а max-wait-time
является необязательным.
Чтобы указать стратегию использования пакетной службы, разработчики могут использовать EventHubsContainerProperties
для конфигурации. Пример настройки
Настройка зависимостей
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
Конфигурация
Этот начальный элемент предоставляет следующие 3 части параметров конфигурации:
Свойства конфигурации подключения
В этом разделе содержатся параметры конфигурации, используемые для подключения к Центрам событий Azure.
Заметка
Если вы решили использовать субъект безопасности для проверки подлинности и авторизации с помощью идентификатора Microsoft Entra для доступа к ресурсу Azure, ознакомьтесь с авторизовать доступ с помощью идентификатора Microsoft Entra ID, чтобы убедиться, что субъект безопасности получил достаточное разрешение на доступ к ресурсу Azure.
Настраиваемые свойства подключения spring-cloud-azure-starter-integration-eventhubs:
Свойство | Тип | Описание |
---|---|---|
spring.cloud.azure.eventhubs.enabled | булев | Включена ли служба Центров событий Azure. |
spring.cloud.azure.eventhubs.connection-string | Струна | Строковое значение строки подключения к пространству имен Центров событий. |
spring.cloud.azure.eventhubsпространство имен | Струна | Значение пространства имен Центров событий, которое является префиксом полного доменного имени. Полное доменное имя должно состоять из NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | Струна | Доменное имя значения пространства имен Центров событий Azure. |
spring.cloud.azure.eventhubs.custom-endpoint-address | Струна | Адрес пользовательской конечной точки. |
spring.cloud.azure.eventhubs.shared-connection | Булев | Используется ли базовое подключение EventProcessorClient и EventHubProducerAsyncClient. По умолчанию создается и используется новое подключение для каждого клиента Концентратора событий. |
Свойства конфигурации контрольной точки
В этом разделе содержатся параметры конфигурации для службы BLOB-объектов хранилища, которая используется для сохранения сведений о владельцах секций и контрольных точках.
Заметка
В версии 4.0.0, когда свойство spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists не включено вручную, контейнер хранилища не будет создан автоматически.
Контрольные точки настраиваемых свойств spring-cloud-azure-starter-integration-eventhubs:
Свойство | Тип | Описание |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Булев | Разрешить ли создание контейнеров, если они отсутствуют. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Струна | Имя учетной записи хранения. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Струна | Ключ доступа к учетной записи хранения. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Струна | Имя контейнера хранилища. |
Общие параметры конфигурации пакета SDK службы Azure можно настроить для хранилища контрольных точек BLOB-объектов хранилища. Поддерживаемые параметры конфигурации представлены в конфигурации Spring Cloud Azureи могут быть настроены с помощью единого префикса spring.cloud.azure.
или префикса spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Свойства конфигурации процессора Концентратора событий
EventHubsInboundChannelAdapter
использует EventProcessorClient
для использования сообщений из концентратора событий для настройки общих свойств EventProcessorClient
, разработчики могут использовать EventHubsContainerProperties
для настройки. См. следующем разделе о работе с EventHubsInboundChannelAdapter
.
Базовое использование
Отправка сообщений в Центры событий Azure
Заполните параметры конфигурации учетных данных.
Для учетных данных в качестве строки подключения настройте следующие свойства в файле application.yml:
spring: cloud: azure: eventhubs: connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
Для учетных данных в качестве управляемых удостоверений настройте следующие свойства в файле application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Для учетных данных в качестве субъекта-службы настройте следующие свойства в файле application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Заметка
Значения, допустимые для tenant-id
: common
, organizations
, consumers
или идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе Используется неправильная конечная точка (личные учетные записи и учетные записи организации) ошибки AADSTS50020. Учетная запись пользователя от поставщика удостоверений не существует вклиента. Сведения о преобразовании приложения с одним клиентом см. в статье Преобразование однотенантного приложения в мультитенантное приложение наидентификатора Microsoft Entra.
Создайте
DefaultMessageHandler
с помощьюEventHubsTemplate
bean для отправки сообщений в Центры событий.class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
Создайте привязку шлюза сообщений с указанным выше обработчиком сообщений через канал сообщений.
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
Отправка сообщений с помощью шлюза.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Получение сообщений из Центров событий Azure
Заполните параметры конфигурации учетных данных.
Создайте боб канала сообщений в качестве входного канала.
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
Создайте
EventHubsInboundChannelAdapter
с помощьюEventHubsMessageListenerContainer
bean для получения сообщений из Центров событий.@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
Создайте привязку приемника сообщений с помощью EventHubsInboundChannelAdapter через канал сообщений, созданный ранее.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Настройка EventHubsMessageConverter для настройки objectMapper
EventHubsMessageConverter
выполняется в виде настраиваемого боба, чтобы пользователи могли настраивать ObjectMapper.
Поддержка потребителей пакетной службы
Чтобы использовать сообщения из Центров событий в пакетах, аналогично приведенному выше примеру, кроме того, пользователям следует задать параметры конфигурации, связанные с пакетной службой, для EventHubsInboundChannelAdapter
.
При создании EventHubsInboundChannelAdapter
режим прослушивателя должен быть задан как BATCH
. При создании EventHubsMessageListenerContainer
установите режим контрольной точки как MANUAL
или BATCH
, а параметры пакетной службы можно настроить по мере необходимости.
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
Заголовки сообщений Центров событий
В следующей таблице показано, как свойства сообщений Центров событий сопоставляются с заголовками сообщений Spring. Для Центров событий Azure сообщение вызывается как event
.
Сопоставление сообщений центров событий и свойств событий и заголовков spring message в режиме прослушивателя записей:
Свойства событий Центров событий | Константы заголовка spring message | Тип | Описание |
---|---|---|---|
Заквеченное время | EventHubsHeaders#ENQUEUED_TIME | Мгновение | Момент в формате UTC, когда событие было заквечено в разделе Концентратора событий. |
Смещение | EventHubsHeaders#OFFSET | Длинный | Смещение события при получении из связанного раздела Концентратора событий. |
Ключ секции | AzureHeaders#PARTITION_KEY | Струна | Хэширование секций, если оно было задано при первоначальной публикации события. |
Идентификатор секции | AzureHeaders#RAW_PARTITION_ID | Струна | Идентификатор секции концентратора событий. |
Порядковый номер | EventHubsHeaders#SEQUENCE_NUMBER | Длинный | Порядковый номер, назначенный событию, когда он был закреплен в связанном разделе Концентратора событий. |
Последние свойства события, вложенные в очередь | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | Свойства последнего заквеченного события в этом разделе. |
NA | AzureHeaders#CHECKPOINTER | Контрольный контрольный пункт | Заголовок для контрольной точки определенного сообщения. |
Пользователи могут анализировать заголовки сообщений для связанных сведений о каждом событии. Чтобы задать заголовок сообщения для события, все настраиваемые заголовки будут помещены в качестве свойства приложения события, где заголовок задается в качестве ключа свойства. При получении событий из Центров событий все свойства приложения будут преобразованы в заголовок сообщения.
Заметка
Заголовки сообщений ключа секции, вложенное время, смещение и порядковый номер не поддерживаются вручную.
Если включен режим пакетного потребителя, определенные заголовки пакетных сообщений перечислены следующим образом, который содержит список значений из каждого события Центров событий.
Сопоставление между сообщениями центров событий и свойствами событий и заголовками spring message в режиме прослушивателя пакетной службы:
Свойства событий Центров событий | Константы заголовка сообщений Spring Batch | Тип | Описание |
---|---|---|---|
Заквеченное время | EventHubsHeaders#ENQUEUED_TIME | Список мгновенного экземпляра | Список моментов в формате UTC о том, когда каждое событие было вложено в раздел Концентратора событий. |
Смещение | EventHubsHeaders#OFFSET | Список длинных | Список смещения каждого события при получении из связанного раздела Концентратора событий. |
Ключ секции | AzureHeaders#PARTITION_KEY | Список строк | Список хэширования секций, если он был задан при первоначальной публикации каждого события. |
Порядковый номер | EventHubsHeaders#SEQUENCE_NUMBER | Список длинных | Список порядкового номера, назначенного каждому событию, когда он был закреплен в связанном разделе Концентратора событий. |
Системные свойства | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | Список карт | Список системных свойств каждого события. |
Свойства приложения | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | Список карт | Список свойств приложения каждого события, где размещаются все настраиваемые заголовки сообщений или свойства события. |
Заметка
При публикации сообщений все указанные выше заголовки пакета будут удалены из сообщений, если они существуют.
Образцы
Дополнительные сведения см. в репозитории azure-spring-boot-samples на сайте GitHub.
Интеграция Spring с служебной шиной Azure
Основные понятия
Spring Integration обеспечивает упрощенное обмен сообщениями в приложениях Spring и поддерживает интеграцию с внешними системами с помощью декларативных адаптеров.
Проект расширения Spring Integration для служебной шины Azure предоставляет адаптеры для входящих и исходящих каналов для служебной шины Azure.
Заметка
Интерфейсы API-интерфейсы поддержки CompletableFuture устарели с версии 2.10.0 и заменены 2.10.0 с версии 4.0.0. Дополнительные сведения см. в Javadoc.
Настройка зависимостей
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
Конфигурация
Этот начальный элемент предоставляет следующие 2 части параметров конфигурации:
Свойства конфигурации подключения
В этом разделе содержатся параметры конфигурации, используемые для подключения к служебной шине Azure.
Заметка
Если вы решили использовать субъект безопасности для проверки подлинности и авторизации с помощью идентификатора Microsoft Entra для доступа к ресурсу Azure, ознакомьтесь с авторизовать доступ с помощью идентификатора Microsoft Entra ID, чтобы убедиться, что субъект безопасности получил достаточное разрешение на доступ к ресурсу Azure.
Настраиваемые свойства подключения spring-cloud-azure-starter-integration-servicebus:
Свойство | Тип | Описание |
---|---|---|
spring.cloud.azure.servicebus.enabled | булев | Включена ли служебная шина Azure. |
spring.cloud.azure.servicebus.connection-string | Струна | Строковое значение строки подключения пространства имен служебной шины. |
spring.cloud.azure.servicebus.custom-endpoint-address | Струна | Пользовательский адрес конечной точки, используемый при подключении к служебной шине. |
spring.cloud.azure.servicebusпространство имен | Струна | Значение пространства имен служебной шины, которое является префиксом полного доменного имени. Полное доменное имя должно состоять из NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | Струна | Доменное имя значения пространства имен служебной шины Azure. |
Свойства конфигурации процессора служебной шины
ServiceBusInboundChannelAdapter
использует ServiceBusProcessorClient
для использования сообщений, чтобы настроить общие свойства ServiceBusProcessorClient
, разработчики могут использовать ServiceBusContainerProperties
для настройки. См. следующем разделе о работе с ServiceBusInboundChannelAdapter
.
Базовое использование
Отправка сообщений в служебную шину Azure
Заполните параметры конфигурации учетных данных.
Для учетных данных в качестве строки подключения настройте следующие свойства в файле application.yml:
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
Для учетных данных в качестве управляемых удостоверений настройте следующие свойства в файле application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Заметка
Значения, допустимые для tenant-id
: common
, organizations
, consumers
или идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе Используется неправильная конечная точка (личные учетные записи и учетные записи организации) ошибки AADSTS50020. Учетная запись пользователя от поставщика удостоверений не существует вклиента. Сведения о преобразовании приложения с одним клиентом см. в статье Преобразование однотенантного приложения в мультитенантное приложение наидентификатора Microsoft Entra.
Для учетных данных в качестве субъекта-службы настройте следующие свойства в файле application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Заметка
Значения, допустимые для tenant-id
: common
, organizations
, consumers
или идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе Используется неправильная конечная точка (личные учетные записи и учетные записи организации) ошибки AADSTS50020. Учетная запись пользователя от поставщика удостоверений не существует вклиента. Сведения о преобразовании приложения с одним клиентом см. в статье Преобразование однотенантного приложения в мультитенантное приложение наидентификатора Microsoft Entra.
Создайте
DefaultMessageHandler
с помощьюServiceBusTemplate
bean для отправки сообщений в служебную шину, задайте тип сущности для ServiceBusTemplate. В этом примере в качестве примера используется очередь служебной шины.class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Создайте привязку шлюза сообщений с указанным выше обработчиком сообщений через канал сообщений.
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
Отправка сообщений с помощью шлюза.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Получение сообщений из служебной шины Azure
Заполните параметры конфигурации учетных данных.
Создайте боб канала сообщений в качестве входного канала.
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Создайте
ServiceBusInboundChannelAdapter
с помощьюServiceBusMessageListenerContainer
bean для получения сообщений в служебную шину. В этом примере в качестве примера используется очередь служебной шины.@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
Создайте привязку приемника сообщений с
ServiceBusInboundChannelAdapter
через канал сообщений, который мы создали ранее.class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Настройка ServiceBusMessageConverter для настройки objectMapper
ServiceBusMessageConverter
выполняется в виде настраиваемого боба, чтобы пользователи могли настраивать ObjectMapper
.
Заголовки сообщений служебной шины
Для некоторых заголовков служебной шины, которые можно сопоставить с несколькими константами заголовков Spring, указан приоритет различных заголовков Spring.
Сопоставление заголовков служебной шины и spring headers:
Заголовки и свойства сообщений служебной шины | Константы заголовка сообщения Spring | Тип | Конфигурируемый | Описание |
---|---|---|---|---|
Тип контента | MessageHeaders#CONTENT_TYPE |
Струна | Да | Дескриптор типа контента RFC2045 сообщения. |
Идентификатор корреляции | ServiceBusMessageHeaders#CORRELATION_ID |
Струна | Да | Идентификатор корреляции сообщения |
Идентификатор сообщения | ServiceBusMessageHeaders#MESSAGE_ID |
Струна | Да | Идентификатор сообщения, этот заголовок имеет более высокий приоритет, чем MessageHeaders#ID . |
Идентификатор сообщения | MessageHeaders#ID |
UUID | Да | Идентификатор сообщения, этот заголовок имеет более низкий приоритет, чем ServiceBusMessageHeaders#MESSAGE_ID . |
Ключ секции | ServiceBusMessageHeaders#PARTITION_KEY |
Струна | Да | Ключ секции для отправки сообщения в секционированную сущность. |
Ответить на | MessageHeaders#REPLY_CHANNEL |
Струна | Да | Адрес сущности для отправки ответов. |
Ответ на идентификатор сеанса | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
Струна | Да | Значение свойства ReplyToGroupId сообщения. |
Запланированное время в формате UTC | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | Да | Дата и время, в котором сообщение должно быть вложено в служебную шину, этот заголовок имеет более высокий приоритет, чем AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE . |
Запланированное время в формате UTC | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
Целое число | Да | Дата и время, в котором сообщение должно быть вложено в служебную шину, этот заголовок имеет более низкий приоритет, чем ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME . |
Идентификатор сеанса | ServiceBusMessageHeaders#SESSION_ID |
Струна | Да | Идентификатор сеанса для сущности, поддерживающей сеанс. |
Время жить | ServiceBusMessageHeaders#TIME_TO_LIVE |
Длительность | Да | Длительность времени до истечения срока действия этого сообщения. |
Кому | ServiceBusMessageHeaders#TO |
Струна | Да | Адрес сообщения "to", зарезервированный для дальнейшего использования в сценариях маршрутизации и в настоящее время игнорируемый самим брокером. |
Тема | ServiceBusMessageHeaders#SUBJECT |
Струна | Да | Тема сообщения. |
Описание ошибки недоставленной буквы | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
Струна | Нет | Описание сообщения, которое было недоставлено. |
Причина недоставленных писем | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
Струна | Нет | Причина, по которой сообщение было недоставлено. |
Источник недоставленных писем | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
Струна | Нет | Сущность, в которой сообщение было недоставлено. |
Количество доставки | ServiceBusMessageHeaders#DELIVERY_COUNT |
длинный | Нет | Количество случаев доставки этого сообщения клиентам. |
Заквеченный порядковый номер | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
длинный | Нет | Закрепленный номер последовательности, назначенный сообщению служебной шиной. |
Заквеченное время | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | Нет | Дата, по которой это сообщение было заквечено в служебной шине. |
Истекает по адресу | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | Нет | Дата окончания срока действия этого сообщения. |
Маркер блокировки | ServiceBusMessageHeaders#LOCK_TOKEN |
Струна | Нет | Маркер блокировки для текущего сообщения. |
Блокировка до | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | Нет | Дата, в течение которого истекает блокировка этого сообщения. |
Порядковый номер | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
длинный | Нет | Уникальный номер, назначенный сообщению служебной шиной. |
Государство | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | Нет | Состояние сообщения, которое может быть активным, отложенным или запланированным. |
Поддержка ключа секции
Этот начальный элемент поддерживает секционирование служебной шины путем задания ключа секции и идентификатора сеанса в заголовке сообщения. В этом разделе описывается настройка ключа секции для сообщений.
Рекомендуется: использовать ServiceBusMessageHeaders.PARTITION_KEY
в качестве ключа заголовка.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Не рекомендуется, но в настоящее время поддерживается: AzureHeaders.PARTITION_KEY
в качестве ключа заголовка.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Заметка
Если в заголовках сообщений заданы ServiceBusMessageHeaders.PARTITION_KEY
и AzureHeaders.PARTITION_KEY
, ServiceBusMessageHeaders.PARTITION_KEY
предпочтительнее.
Поддержка сеансов
В этом примере показано, как вручную задать идентификатор сеанса сообщения в приложении.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Заметка
Если ServiceBusMessageHeaders.SESSION_ID
заданы в заголовках сообщений, а другой заголовок ServiceBusMessageHeaders.PARTITION_KEY
также задан, значение идентификатора сеанса в конечном итоге будет использоваться для перезаписи значения ключа секции.
Настройка свойств клиента служебной шины
Разработчики могут использовать AzureServiceClientBuilderCustomizer
для настройки свойств клиента служебной шины. Следующий пример настраивает свойство sessionIdleTimeout
в ServiceBusClientBuilder
:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Образцы
Дополнительные сведения см. в репозитории azure-spring-boot-samples на сайте GitHub.
Spring Integration with Azure Storage Queue
Основные понятия
Хранилище очередей Azure — это служба для хранения большого количества сообщений. Вы обращаетесь к сообщениям из любой точки мира через прошедшие проверку подлинности вызовы с помощью HTTP или HTTPS. Сообщение очереди может быть размером до 64 КБ. Очередь может содержать миллионы сообщений до общего ограничения емкости учетной записи хранения. Очереди обычно используются для создания невыполненной работы для асинхронной обработки.
Настройка зависимостей
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
Конфигурация
Этот начальный параметр предоставляет следующие параметры конфигурации:
Свойства конфигурации подключения
В этом разделе содержатся параметры конфигурации, используемые для подключения к очереди службы хранилища Azure.
Заметка
Если вы решили использовать субъект безопасности для проверки подлинности и авторизации с помощью идентификатора Microsoft Entra для доступа к ресурсу Azure, ознакомьтесь с авторизовать доступ с помощью идентификатора Microsoft Entra ID, чтобы убедиться, что субъект безопасности получил достаточное разрешение на доступ к ресурсу Azure.
Настраиваемые свойства подключения spring-cloud-azure-starter-integration-storage-queue:
Свойство | Тип | Описание |
---|---|---|
spring.cloud.azure.storage.queue.enabled | булев | Включена ли очередь службы хранилища Azure. |
spring.cloud.azure.storage.queue.connection-string | Струна | Строковое значение строки подключения пространства имен очереди хранилища. |
spring.cloud.azure.storage.queue.accountName | Струна | Имя учетной записи очереди хранения. |
spring.cloud.azure.storage.queue.accountKey | Струна | Ключ учетной записи очереди хранения. |
spring.cloud.azure.storage.queue.endpoint | Струна | Конечная точка службы очередей хранилища. |
spring.cloud.azure.storage.queue.sasToken | Струна | Учетные данные маркера Sas |
spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | QueueServiceVersion, используемый при выполнении запросов API. |
spring.cloud.azure.storage.queue.messageEncoding | Струна | Кодировка сообщений очереди. |
Базовое использование
Отправка сообщений в очередь службы хранилища Azure
Заполните параметры конфигурации учетных данных.
Для учетных данных в качестве строки подключения настройте следующие свойства в файле application.yml:
spring: cloud: azure: storage: queue: connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
Для учетных данных в качестве управляемых удостоверений настройте следующие свойства в файле application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Заметка
Значения, допустимые для tenant-id
: common
, organizations
, consumers
или идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе Используется неправильная конечная точка (личные учетные записи и учетные записи организации) ошибки AADSTS50020. Учетная запись пользователя от поставщика удостоверений не существует вклиента. Сведения о преобразовании приложения с одним клиентом см. в статье Преобразование однотенантного приложения в мультитенантное приложение наидентификатора Microsoft Entra.
Для учетных данных в качестве субъекта-службы настройте следующие свойства в файле application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Заметка
Значения, допустимые для tenant-id
: common
, organizations
, consumers
или идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе Используется неправильная конечная точка (личные учетные записи и учетные записи организации) ошибки AADSTS50020. Учетная запись пользователя от поставщика удостоверений не существует вклиента. Сведения о преобразовании приложения с одним клиентом см. в статье Преобразование однотенантного приложения в мультитенантное приложение наидентификатора Microsoft Entra.
Создайте
DefaultMessageHandler
с помощьюStorageQueueTemplate
bean для отправки сообщений в очередь хранилища.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Создайте привязку шлюза сообщений с указанным выше обработчиком сообщений через канал сообщений.
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
Отправка сообщений с помощью шлюза.
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
Получение сообщений из очереди службы хранилища Azure
Заполните параметры конфигурации учетных данных.
Создайте боб канала сообщений в качестве входного канала.
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Создайте
StorageQueueMessageSource
с помощьюStorageQueueTemplate
bean для получения сообщений в очередь хранилища.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
Создайте привязку приемника сообщений с помощью StorageQueueMessageSource, созданной на последнем шаге через канал сообщений, который мы создали раньше.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
Образцы
Дополнительные сведения см. в репозитории azure-spring-boot-samples на сайте GitHub.