Поделиться через


Поддержка Spring Cloud Azure для Spring Integration

Эта статья относится к:✅ версии 4.19.0 ✅ версии 5.19.0

Расширение Spring Integration для Azure предоставляет адаптеры Spring Integration для различных служб, предоставляемых пакетом SDK Azure для Java. Мы предоставляем поддержку Spring Integration для этих служб Azure: Центры событий, служебная шина, очередь хранилища. Ниже приведен список поддерживаемых адаптеров:

Интеграция Spring с Центрами событий Azure

Основные понятия

Центры событий Azure — это платформа потоковой передачи больших данных и служба приема событий. Он может получать и обрабатывать миллионы событий в секунду. Данные, отправленные в концентратор событий, можно преобразовать и сохранить с помощью любого поставщика аналитики в режиме реального времени или адаптеров пакетной обработки или хранилища.

Spring Integration обеспечивает упрощенное обмен сообщениями в приложениях Spring и поддерживает интеграцию с внешними системами с помощью декларативных адаптеров. Эти адаптеры обеспечивают более высокий уровень абстракции по сравнению с поддержкой Spring для удаленного взаимодействия, обмена сообщениями и планирования. Проект расширения Spring Integration for Event Hubs предоставляет адаптеры и шлюзы для центров событий Azure для входящих и исходящих каналов.

Заметка

API-интерфейсы поддержки RxJava удаляются из версии 4.0.0. Дополнительные сведения см. в Javadoc.

Группа потребителей

Центры событий обеспечивают аналогичную поддержку группы потребителей, как Apache Kafka, но с небольшой другой логикой. Пока Kafka сохраняет все зафиксированные смещения в брокере, необходимо хранить смещения сообщений Центров событий, обрабатываемых вручную. Пакет SDK центров событий предоставляет функцию для хранения таких смещения в службе хранилища Azure.

Поддержка секционирования

Центры событий предоставляют аналогичную концепцию физической секции, как Kafka. Но в отличие от автоматической балансировки Kafka между потребителями и секциями, Центры событий предоставляют вид предварительного режима. Учетная запись хранения выступает в качестве аренды, чтобы определить, какой раздел принадлежит определенному потребителю. Когда начинается новый потребитель, он попытается украсть некоторые секции из большинства загруженных потребителей для достижения балансировки рабочей нагрузки.

Чтобы указать стратегию балансировки нагрузки, разработчики могут использовать EventHubsContainerProperties для конфигурации. Пример настройки см. в следующем разделе.

Поддержка потребителей пакетной службы

EventHubsInboundChannelAdapter поддерживает режим пакетного использования. Чтобы включить его, пользователи могут указать режим прослушивателя как ListenerMode.BATCH при создании экземпляра EventHubsInboundChannelAdapter. При включении сообщение, из которого полезные данные являются списком пакетных событий, будут получены и переданы в подчиненный канал. Каждый заголовок сообщения также преобразуется в список, из которого содержимое является соответствующим значением заголовка, проанализированным из каждого события. Для общих заголовков идентификатора секции, контрольных точек и последних вложенных свойств они отображаются в виде одного значения для всего пакета событий, совместного с одним и тем же. Дополнительные сведения см. в разделе Заголовки сообщений Центров событий.

Заметка

Заголовок контрольной точки существует только в том случае, если используется режим MANUAL контрольных точек.

Контрольная точка пакетного потребителя поддерживает два режима: BATCH и MANUAL. BATCH режим — это режим автоматической контрольной точки для проверки всего пакета событий вместе после их получения. MANUAL режиме — контрольные точки событий пользователями. При использовании контрольный элемент будет передан в заголовок сообщения, и пользователи могут использовать его для выполнения контрольных точек.

Политику использования пакетной службы можно указать свойствами max-size и max-wait-time, где max-size является необходимым свойством, а max-wait-time является необязательным. Чтобы указать стратегию использования пакетной службы, разработчики могут использовать EventHubsContainerProperties для конфигурации. Пример настройки см. в следующем разделе.

Настройка зависимостей

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Конфигурация

Этот начальный элемент предоставляет следующие 3 части параметров конфигурации:

Свойства конфигурации подключения

В этом разделе содержатся параметры конфигурации, используемые для подключения к Центрам событий Azure.

Заметка

Если вы решили использовать субъект безопасности для проверки подлинности и авторизации с помощью идентификатора Microsoft Entra для доступа к ресурсу Azure, ознакомьтесь с авторизовать доступ с помощью идентификатора Microsoft Entra ID, чтобы убедиться, что субъект безопасности получил достаточное разрешение на доступ к ресурсу Azure.

Настраиваемые свойства подключения spring-cloud-azure-starter-integration-eventhubs:

Свойство Тип Описание
spring.cloud.azure.eventhubs.enabled булев Включена ли служба Центров событий Azure.
spring.cloud.azure.eventhubs.connection-string Струна Строковое значение строки подключения к пространству имен Центров событий.
spring.cloud.azure.eventhubsпространство имен Струна Значение пространства имен Центров событий, которое является префиксом полного доменного имени. Полное доменное имя должно состоять из NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Струна Доменное имя значения пространства имен Центров событий Azure.
spring.cloud.azure.eventhubs.custom-endpoint-address Струна Адрес пользовательской конечной точки.
spring.cloud.azure.eventhubs.shared-connection Булев Используется ли базовое подключение EventProcessorClient и EventHubProducerAsyncClient. По умолчанию создается и используется новое подключение для каждого клиента Концентратора событий.

Свойства конфигурации контрольной точки

В этом разделе содержатся параметры конфигурации для службы BLOB-объектов хранилища, которая используется для сохранения сведений о владельцах секций и контрольных точках.

Заметка

В версии 4.0.0, когда свойство spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists не включено вручную, контейнер хранилища не будет создан автоматически.

Контрольные точки настраиваемых свойств spring-cloud-azure-starter-integration-eventhubs:

Свойство Тип Описание
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Булев Разрешить ли создание контейнеров, если они отсутствуют.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Струна Имя учетной записи хранения.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Струна Ключ доступа к учетной записи хранения.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Струна Имя контейнера хранилища.

Общие параметры конфигурации пакета SDK службы Azure можно настроить для хранилища контрольных точек BLOB-объектов хранилища. Поддерживаемые параметры конфигурации представлены в конфигурации Spring Cloud Azureи могут быть настроены с помощью единого префикса spring.cloud.azure. или префикса spring.cloud.azure.eventhubs.processor.checkpoint-store.

Свойства конфигурации процессора Концентратора событий

EventHubsInboundChannelAdapter использует EventProcessorClient для использования сообщений из концентратора событий для настройки общих свойств EventProcessorClient, разработчики могут использовать EventHubsContainerProperties для настройки. См. следующем разделе о работе с EventHubsInboundChannelAdapter.

Базовое использование

Отправка сообщений в Центры событий Azure

  1. Заполните параметры конфигурации учетных данных.

    • Для учетных данных в качестве строки подключения настройте следующие свойства в файле application.yml:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • Для учетных данных в качестве управляемых удостоверений настройте следующие свойства в файле application.yml:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • Для учетных данных в качестве субъекта-службы настройте следующие свойства в файле application.yml:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Заметка

Значения, допустимые для tenant-id: common, organizations, consumersили идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе Используется неправильная конечная точка (личные учетные записи и учетные записи организации) ошибки AADSTS50020. Учетная запись пользователя от поставщика удостоверений не существует вклиента. Сведения о преобразовании приложения с одним клиентом см. в статье Преобразование однотенантного приложения в мультитенантное приложение наидентификатора Microsoft Entra.

  1. Создайте DefaultMessageHandler с помощью EventHubsTemplate bean для отправки сообщений в Центры событий.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Создайте привязку шлюза сообщений с указанным выше обработчиком сообщений через канал сообщений.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Отправка сообщений с помощью шлюза.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Получение сообщений из Центров событий Azure

  1. Заполните параметры конфигурации учетных данных.

  2. Создайте боб канала сообщений в качестве входного канала.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Создайте EventHubsInboundChannelAdapter с помощью EventHubsMessageListenerContainer bean для получения сообщений из Центров событий.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Создайте привязку приемника сообщений с помощью EventHubsInboundChannelAdapter через канал сообщений, созданный ранее.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Настройка EventHubsMessageConverter для настройки objectMapper

EventHubsMessageConverter выполняется в виде настраиваемого боба, чтобы пользователи могли настраивать ObjectMapper.

Поддержка потребителей пакетной службы

Чтобы использовать сообщения из Центров событий в пакетах, аналогично приведенному выше примеру, кроме того, пользователям следует задать параметры конфигурации, связанные с пакетной службой, для EventHubsInboundChannelAdapter.

При создании EventHubsInboundChannelAdapterрежим прослушивателя должен быть задан как BATCH. При создании EventHubsMessageListenerContainerустановите режим контрольной точки как MANUAL или BATCH, а параметры пакетной службы можно настроить по мере необходимости.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Заголовки сообщений Центров событий

В следующей таблице показано, как свойства сообщений Центров событий сопоставляются с заголовками сообщений Spring. Для Центров событий Azure сообщение вызывается как event.

Сопоставление сообщений центров событий и свойств событий и заголовков spring message в режиме прослушивателя записей:

Свойства событий Центров событий Константы заголовка spring message Тип Описание
Заквеченное время EventHubsHeaders#ENQUEUED_TIME Мгновение Момент в формате UTC, когда событие было заквечено в разделе Концентратора событий.
Смещение EventHubsHeaders#OFFSET Длинный Смещение события при получении из связанного раздела Концентратора событий.
Ключ секции AzureHeaders#PARTITION_KEY Струна Хэширование секций, если оно было задано при первоначальной публикации события.
Идентификатор секции AzureHeaders#RAW_PARTITION_ID Струна Идентификатор секции концентратора событий.
Порядковый номер EventHubsHeaders#SEQUENCE_NUMBER Длинный Порядковый номер, назначенный событию, когда он был закреплен в связанном разделе Концентратора событий.
Последние свойства события, вложенные в очередь EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties Свойства последнего заквеченного события в этом разделе.
NA AzureHeaders#CHECKPOINTER Контрольный контрольный пункт Заголовок для контрольной точки определенного сообщения.

Пользователи могут анализировать заголовки сообщений для связанных сведений о каждом событии. Чтобы задать заголовок сообщения для события, все настраиваемые заголовки будут помещены в качестве свойства приложения события, где заголовок задается в качестве ключа свойства. При получении событий из Центров событий все свойства приложения будут преобразованы в заголовок сообщения.

Заметка

Заголовки сообщений ключа секции, вложенное время, смещение и порядковый номер не поддерживаются вручную.

Если включен режим пакетного потребителя, определенные заголовки пакетных сообщений перечислены следующим образом, который содержит список значений из каждого события Центров событий.

Сопоставление между сообщениями центров событий и свойствами событий и заголовками spring message в режиме прослушивателя пакетной службы:

Свойства событий Центров событий Константы заголовка сообщений Spring Batch Тип Описание
Заквеченное время EventHubsHeaders#ENQUEUED_TIME Список мгновенного экземпляра Список моментов в формате UTC о том, когда каждое событие было вложено в раздел Концентратора событий.
Смещение EventHubsHeaders#OFFSET Список длинных Список смещения каждого события при получении из связанного раздела Концентратора событий.
Ключ секции AzureHeaders#PARTITION_KEY Список строк Список хэширования секций, если он был задан при первоначальной публикации каждого события.
Порядковый номер EventHubsHeaders#SEQUENCE_NUMBER Список длинных Список порядкового номера, назначенного каждому событию, когда он был закреплен в связанном разделе Концентратора событий.
Системные свойства EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Список карт Список системных свойств каждого события.
Свойства приложения EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Список карт Список свойств приложения каждого события, где размещаются все настраиваемые заголовки сообщений или свойства события.

Заметка

При публикации сообщений все указанные выше заголовки пакета будут удалены из сообщений, если они существуют.

Образцы

Дополнительные сведения см. в репозитории azure-spring-boot-samples на сайте GitHub.

Интеграция Spring с служебной шиной Azure

Основные понятия

Spring Integration обеспечивает упрощенное обмен сообщениями в приложениях Spring и поддерживает интеграцию с внешними системами с помощью декларативных адаптеров.

Проект расширения Spring Integration для служебной шины Azure предоставляет адаптеры для входящих и исходящих каналов для служебной шины Azure.

Заметка

Интерфейсы API-интерфейсы поддержки CompletableFuture устарели с версии 2.10.0 и заменены 2.10.0 с версии 4.0.0. Дополнительные сведения см. в Javadoc.

Настройка зависимостей

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Конфигурация

Этот начальный элемент предоставляет следующие 2 части параметров конфигурации:

Свойства конфигурации подключения

В этом разделе содержатся параметры конфигурации, используемые для подключения к служебной шине Azure.

Заметка

Если вы решили использовать субъект безопасности для проверки подлинности и авторизации с помощью идентификатора Microsoft Entra для доступа к ресурсу Azure, ознакомьтесь с авторизовать доступ с помощью идентификатора Microsoft Entra ID, чтобы убедиться, что субъект безопасности получил достаточное разрешение на доступ к ресурсу Azure.

Настраиваемые свойства подключения spring-cloud-azure-starter-integration-servicebus:

Свойство Тип Описание
spring.cloud.azure.servicebus.enabled булев Включена ли служебная шина Azure.
spring.cloud.azure.servicebus.connection-string Струна Строковое значение строки подключения пространства имен служебной шины.
spring.cloud.azure.servicebus.custom-endpoint-address Струна Пользовательский адрес конечной точки, используемый при подключении к служебной шине.
spring.cloud.azure.servicebusпространство имен Струна Значение пространства имен служебной шины, которое является префиксом полного доменного имени. Полное доменное имя должно состоять из NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Струна Доменное имя значения пространства имен служебной шины Azure.

Свойства конфигурации процессора служебной шины

ServiceBusInboundChannelAdapter использует ServiceBusProcessorClient для использования сообщений, чтобы настроить общие свойства ServiceBusProcessorClient, разработчики могут использовать ServiceBusContainerProperties для настройки. См. следующем разделе о работе с ServiceBusInboundChannelAdapter.

Базовое использование

Отправка сообщений в служебную шину Azure

  1. Заполните параметры конфигурации учетных данных.

    • Для учетных данных в качестве строки подключения настройте следующие свойства в файле application.yml:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • Для учетных данных в качестве управляемых удостоверений настройте следующие свойства в файле application.yml:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Заметка

Значения, допустимые для tenant-id: common, organizations, consumersили идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе Используется неправильная конечная точка (личные учетные записи и учетные записи организации) ошибки AADSTS50020. Учетная запись пользователя от поставщика удостоверений не существует вклиента. Сведения о преобразовании приложения с одним клиентом см. в статье Преобразование однотенантного приложения в мультитенантное приложение наидентификатора Microsoft Entra.

  • Для учетных данных в качестве субъекта-службы настройте следующие свойства в файле application.yml:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Заметка

Значения, допустимые для tenant-id: common, organizations, consumersили идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе Используется неправильная конечная точка (личные учетные записи и учетные записи организации) ошибки AADSTS50020. Учетная запись пользователя от поставщика удостоверений не существует вклиента. Сведения о преобразовании приложения с одним клиентом см. в статье Преобразование однотенантного приложения в мультитенантное приложение наидентификатора Microsoft Entra.

  1. Создайте DefaultMessageHandler с помощью ServiceBusTemplate bean для отправки сообщений в служебную шину, задайте тип сущности для ServiceBusTemplate. В этом примере в качестве примера используется очередь служебной шины.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Создайте привязку шлюза сообщений с указанным выше обработчиком сообщений через канал сообщений.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Отправка сообщений с помощью шлюза.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Получение сообщений из служебной шины Azure

  1. Заполните параметры конфигурации учетных данных.

  2. Создайте боб канала сообщений в качестве входного канала.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Создайте ServiceBusInboundChannelAdapter с помощью ServiceBusMessageListenerContainer bean для получения сообщений в служебную шину. В этом примере в качестве примера используется очередь служебной шины.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Создайте привязку приемника сообщений с ServiceBusInboundChannelAdapter через канал сообщений, который мы создали ранее.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Настройка ServiceBusMessageConverter для настройки objectMapper

ServiceBusMessageConverter выполняется в виде настраиваемого боба, чтобы пользователи могли настраивать ObjectMapper.

Заголовки сообщений служебной шины

Для некоторых заголовков служебной шины, которые можно сопоставить с несколькими константами заголовков Spring, указан приоритет различных заголовков Spring.

Сопоставление заголовков служебной шины и spring headers:

Заголовки и свойства сообщений служебной шины Константы заголовка сообщения Spring Тип Конфигурируемый Описание
Тип контента MessageHeaders#CONTENT_TYPE Струна Да Дескриптор типа контента RFC2045 сообщения.
Идентификатор корреляции ServiceBusMessageHeaders#CORRELATION_ID Струна Да Идентификатор корреляции сообщения
Идентификатор сообщения ServiceBusMessageHeaders#MESSAGE_ID Струна Да Идентификатор сообщения, этот заголовок имеет более высокий приоритет, чем MessageHeaders#ID.
Идентификатор сообщения MessageHeaders#ID UUID Да Идентификатор сообщения, этот заголовок имеет более низкий приоритет, чем ServiceBusMessageHeaders#MESSAGE_ID.
Ключ секции ServiceBusMessageHeaders#PARTITION_KEY Струна Да Ключ секции для отправки сообщения в секционированную сущность.
Ответить на MessageHeaders#REPLY_CHANNEL Струна Да Адрес сущности для отправки ответов.
Ответ на идентификатор сеанса ServiceBusMessageHeaders#REPLY_TO_SESSION_ID Струна Да Значение свойства ReplyToGroupId сообщения.
Запланированное время в формате UTC ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Да Дата и время, в котором сообщение должно быть вложено в служебную шину, этот заголовок имеет более высокий приоритет, чем AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE.
Запланированное время в формате UTC AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Целое число Да Дата и время, в котором сообщение должно быть вложено в служебную шину, этот заголовок имеет более низкий приоритет, чем ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME.
Идентификатор сеанса ServiceBusMessageHeaders#SESSION_ID Струна Да Идентификатор сеанса для сущности, поддерживающей сеанс.
Время жить ServiceBusMessageHeaders#TIME_TO_LIVE Длительность Да Длительность времени до истечения срока действия этого сообщения.
Кому ServiceBusMessageHeaders#TO Струна Да Адрес сообщения "to", зарезервированный для дальнейшего использования в сценариях маршрутизации и в настоящее время игнорируемый самим брокером.
Тема ServiceBusMessageHeaders#SUBJECT Струна Да Тема сообщения.
Описание ошибки недоставленной буквы ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION Струна Нет Описание сообщения, которое было недоставлено.
Причина недоставленных писем ServiceBusMessageHeaders#DEAD_LETTER_REASON Струна Нет Причина, по которой сообщение было недоставлено.
Источник недоставленных писем ServiceBusMessageHeaders#DEAD_LETTER_SOURCE Струна Нет Сущность, в которой сообщение было недоставлено.
Количество доставки ServiceBusMessageHeaders#DELIVERY_COUNT длинный Нет Количество случаев доставки этого сообщения клиентам.
Заквеченный порядковый номер ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER длинный Нет Закрепленный номер последовательности, назначенный сообщению служебной шиной.
Заквеченное время ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime Нет Дата, по которой это сообщение было заквечено в служебной шине.
Истекает по адресу ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime Нет Дата окончания срока действия этого сообщения.
Маркер блокировки ServiceBusMessageHeaders#LOCK_TOKEN Струна Нет Маркер блокировки для текущего сообщения.
Блокировка до ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime Нет Дата, в течение которого истекает блокировка этого сообщения.
Порядковый номер ServiceBusMessageHeaders#SEQUENCE_NUMBER длинный Нет Уникальный номер, назначенный сообщению служебной шиной.
Государство ServiceBusMessageHeaders#STATE ServiceBusMessageState Нет Состояние сообщения, которое может быть активным, отложенным или запланированным.

Поддержка ключа секции

Этот начальный элемент поддерживает секционирование служебной шины путем задания ключа секции и идентификатора сеанса в заголовке сообщения. В этом разделе описывается настройка ключа секции для сообщений.

Рекомендуется: использовать ServiceBusMessageHeaders.PARTITION_KEY в качестве ключа заголовка.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Не рекомендуется, но в настоящее время поддерживается: AzureHeaders.PARTITION_KEY в качестве ключа заголовка.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Заметка

Если в заголовках сообщений заданы ServiceBusMessageHeaders.PARTITION_KEY и AzureHeaders.PARTITION_KEY, ServiceBusMessageHeaders.PARTITION_KEY предпочтительнее.

Поддержка сеансов

В этом примере показано, как вручную задать идентификатор сеанса сообщения в приложении.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Заметка

Если ServiceBusMessageHeaders.SESSION_ID заданы в заголовках сообщений, а другой заголовок ServiceBusMessageHeaders.PARTITION_KEY также задан, значение идентификатора сеанса в конечном итоге будет использоваться для перезаписи значения ключа секции.

Настройка свойств клиента служебной шины

Разработчики могут использовать AzureServiceClientBuilderCustomizer для настройки свойств клиента служебной шины. Следующий пример настраивает свойство sessionIdleTimeout в ServiceBusClientBuilder:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Образцы

Дополнительные сведения см. в репозитории azure-spring-boot-samples на сайте GitHub.

Spring Integration with Azure Storage Queue

Основные понятия

Хранилище очередей Azure — это служба для хранения большого количества сообщений. Вы обращаетесь к сообщениям из любой точки мира через прошедшие проверку подлинности вызовы с помощью HTTP или HTTPS. Сообщение очереди может быть размером до 64 КБ. Очередь может содержать миллионы сообщений до общего ограничения емкости учетной записи хранения. Очереди обычно используются для создания невыполненной работы для асинхронной обработки.

Настройка зависимостей

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Конфигурация

Этот начальный параметр предоставляет следующие параметры конфигурации:

Свойства конфигурации подключения

В этом разделе содержатся параметры конфигурации, используемые для подключения к очереди службы хранилища Azure.

Заметка

Если вы решили использовать субъект безопасности для проверки подлинности и авторизации с помощью идентификатора Microsoft Entra для доступа к ресурсу Azure, ознакомьтесь с авторизовать доступ с помощью идентификатора Microsoft Entra ID, чтобы убедиться, что субъект безопасности получил достаточное разрешение на доступ к ресурсу Azure.

Настраиваемые свойства подключения spring-cloud-azure-starter-integration-storage-queue:

Свойство Тип Описание
spring.cloud.azure.storage.queue.enabled булев Включена ли очередь службы хранилища Azure.
spring.cloud.azure.storage.queue.connection-string Струна Строковое значение строки подключения пространства имен очереди хранилища.
spring.cloud.azure.storage.queue.accountName Струна Имя учетной записи очереди хранения.
spring.cloud.azure.storage.queue.accountKey Струна Ключ учетной записи очереди хранения.
spring.cloud.azure.storage.queue.endpoint Струна Конечная точка службы очередей хранилища.
spring.cloud.azure.storage.queue.sasToken Струна Учетные данные маркера Sas
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion QueueServiceVersion, используемый при выполнении запросов API.
spring.cloud.azure.storage.queue.messageEncoding Струна Кодировка сообщений очереди.

Базовое использование

Отправка сообщений в очередь службы хранилища Azure

  1. Заполните параметры конфигурации учетных данных.

    • Для учетных данных в качестве строки подключения настройте следующие свойства в файле application.yml:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
      
    • Для учетных данных в качестве управляемых удостоверений настройте следующие свойства в файле application.yml:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
      

Заметка

Значения, допустимые для tenant-id: common, organizations, consumersили идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе Используется неправильная конечная точка (личные учетные записи и учетные записи организации) ошибки AADSTS50020. Учетная запись пользователя от поставщика удостоверений не существует вклиента. Сведения о преобразовании приложения с одним клиентом см. в статье Преобразование однотенантного приложения в мультитенантное приложение наидентификатора Microsoft Entra.

  • Для учетных данных в качестве субъекта-службы настройте следующие свойства в файле application.yml:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
    

Заметка

Значения, допустимые для tenant-id: common, organizations, consumersили идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе Используется неправильная конечная точка (личные учетные записи и учетные записи организации) ошибки AADSTS50020. Учетная запись пользователя от поставщика удостоверений не существует вклиента. Сведения о преобразовании приложения с одним клиентом см. в статье Преобразование однотенантного приложения в мультитенантное приложение наидентификатора Microsoft Entra.

  1. Создайте DefaultMessageHandler с помощью StorageQueueTemplate bean для отправки сообщений в очередь хранилища.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Создайте привязку шлюза сообщений с указанным выше обработчиком сообщений через канал сообщений.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Отправка сообщений с помощью шлюза.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Получение сообщений из очереди службы хранилища Azure

  1. Заполните параметры конфигурации учетных данных.

  2. Создайте боб канала сообщений в качестве входного канала.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Создайте StorageQueueMessageSource с помощью StorageQueueTemplate bean для получения сообщений в очередь хранилища.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Создайте привязку приемника сообщений с помощью StorageQueueMessageSource, созданной на последнем шаге через канал сообщений, который мы создали раньше.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Образцы

Дополнительные сведения см. в репозитории azure-spring-boot-samples на сайте GitHub.