Поддержка Spring Cloud Azure для Spring Cloud Stream
Эта статья относится к:✅ версии 4.19.0 ✅ версии 5.19.0
Spring Cloud Stream — это платформа для создания высокомасштабируемых микрослужб, управляемых событиями, подключенных к общим системам обмена сообщениями.
Платформа предоставляет гибкую модель программирования, созданную на основе уже установленных и знакомых идиом Spring idioms и рекомендаций. Эти рекомендации включают поддержку сохраняемой семантики паба или подсемантики, групп потребителей и секций с отслеживанием состояния.
К текущим реализациям привязки относятся:
-
spring-cloud-azure-stream-binder-eventhubs
. Дополнительные сведения см. в Spring Cloud Stream Binder для Центров событий Azure -
spring-cloud-azure-stream-binder-servicebus
. Дополнительные сведения см. в статье Spring Cloud Stream Binder для служебной шины Azure
Spring Cloud Stream Binder для Центров событий Azure
Основные понятия
Привязка Spring Cloud Stream для Центров событий Azure предоставляет реализацию привязки для платформы Spring Cloud Stream. Эта реализация использует адаптеры каналов Концентраторов событий Spring Integration в своей основе. С точки зрения проектирования центры событий похожи на Kafka. Кроме того, к центрам событий можно получить доступ через API Kafka. Если проект имеет жесткую зависимость от API Kafka, вы можете попробовать Концентратор событий с помощью примера API Kafka
Группа потребителей
Центры событий обеспечивают аналогичную поддержку группы потребителей, как Apache Kafka, но с небольшой другой логикой. Пока Kafka сохраняет все зафиксированные смещения в брокере, необходимо хранить смещения сообщений Центров событий, обрабатываемых вручную. Пакет SDK центров событий предоставляет функцию для хранения таких смещения в службе хранилища Azure.
Поддержка секционирования
Центры событий предоставляют аналогичную концепцию физической секции, как Kafka. Но в отличие от автоматической перебалансировки Kafka между потребителями и секциями, Центры событий предоставляют вид предварительного режима. Учетная запись хранения выступает в качестве аренды, чтобы определить, какой потребитель владеет какой секцией. При запуске нового потребителя он пытается украсть некоторые секции из наиболее сильно загруженных потребителей для достижения баланса рабочей нагрузки.
Чтобы указать стратегию балансировки нагрузки, предоставляются свойства spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*
. Дополнительные сведения см. в разделе Свойства потребителя.
Поддержка потребителей пакетной службы
Привязка центров событий Spring Cloud Azure Stream поддерживает компонент пакетной службы Spring Cloud Stream.
Чтобы работать с режимом пакетного потребителя, задайте для свойства spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
значение true
. При включении сообщение со полезными данными списка пакетных событий получается и передается в функцию Consumer
. Каждый заголовок сообщения также преобразуется в список, из которого содержимое является соответствующим значением заголовка, проанализированным из каждого события. Общие заголовки идентификаторов секций, контрольных точек и последних вложенных свойств представлены в виде одного значения, так как весь пакет событий использует одно и то же значение. Дополнительные сведения см. в разделе заголовков сообщений центров событий поддержки Spring Cloud Azure для Spring Integration.
Заметка
Заголовок контрольной точки существует только в том случае, если используется режим контрольной точки MANUAL
.
Контрольная точка пакетного потребителя поддерживает два режима: BATCH
и MANUAL
.
BATCH
режим — это режим автоматической контрольной точки для проверки всего пакета событий вместе после их получения привязкой.
MANUAL
режиме — контрольные точки событий пользователями. При использовании Checkpointer
передается в заголовок сообщения, и пользователи могут использовать его для выполнения контрольных точек.
Размер пакета можно указать, задав свойства max-size
и max-wait-time
с префиксом spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.
. Необходимо свойство max-size
, а свойство max-wait-time
является необязательным. Дополнительные сведения см. в разделе Свойства потребителя.
Настройка зависимостей
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
Кроме того, можно использовать начальный центр событий Azure Stream Spring Cloud, как показано в следующем примере для Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
Конфигурация
Привязыватель предоставляет следующие три части параметров конфигурации:
Свойства конфигурации подключения
В этом разделе содержатся параметры конфигурации, используемые для подключения к Центрам событий Azure.
Заметка
Если вы решили использовать субъект безопасности для проверки подлинности и авторизации с помощью идентификатора Microsoft Entra для доступа к ресурсу Azure, ознакомьтесь с авторизовать доступ с помощью идентификатора Microsoft Entra ID, чтобы убедиться, что субъект безопасности получил достаточное разрешение на доступ к ресурсу Azure.
Настраиваемые свойства подключения spring-cloud-azure-stream-binder-eventhubs:
Свойство | Тип | Описание |
---|---|---|
spring.cloud.azure.eventhubs.enabled | булев | Включена ли служба Центров событий Azure. |
spring.cloud.azure.eventhubs.connection-string | Струна | Строковое значение строки подключения к пространству имен Центров событий. |
spring.cloud.azure.eventhubsпространство имен | Струна | Значение пространства имен Центров событий, которое является префиксом полного доменного имени. Полное доменное имя должно состоять из NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | Струна | Доменное имя значения пространства имен Центров событий Azure. |
spring.cloud.azure.eventhubs.custom-endpoint-address | Струна | Адрес пользовательской конечной точки. |
Кончик
Общие параметры конфигурации пакета SDK службы Azure можно настроить для привязки Центров событий Azure Stream Spring Cloud. Поддерживаемые параметры конфигурации представлены в конфигурации Spring Cloud Azureи могут быть настроены с помощью единого префикса spring.cloud.azure.
или префикса spring.cloud.azure.eventhubs.
.
Привязка также поддерживает Spring Spring Может Azure Resource Manager по умолчанию. Сведения о том, как получить строку подключения с субъектами безопасности, которые не предоставляются с Data
связанными ролями, см. в разделе базовыхSpring Could Azure Resource Manager.
Свойства конфигурации контрольной точки
В этом разделе содержатся параметры конфигурации для службы BLOB-объектов хранилища, которая используется для сохранения сведений о владельцах секций и контрольных точках.
Заметка
В версии 4.0.0, когда свойство spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-not-exists не включено вручную, контейнер хранилища не будет создан автоматически с именем из spring.cloud.stream.bindings.bindings.name.destination.
Контрольные точки настраиваемых свойств spring-cloud-azure-stream-binder-eventhubs:
Свойство | Тип | Описание |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Булев | Разрешить ли создание контейнеров, если они отсутствуют. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Струна | Имя учетной записи хранения. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Струна | Ключ доступа к учетной записи хранения. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Струна | Имя контейнера хранилища. |
Кончик
Общие параметры конфигурации пакета SDK службы Azure можно настроить для хранилища контрольных точек BLOB-объектов хранилища. Поддерживаемые параметры конфигурации представлены в конфигурации Spring Cloud Azureи могут быть настроены с помощью единого префикса spring.cloud.azure.
или префикса spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Свойства конфигурации привязки Центров событий Azure
Следующие параметры разделены на четыре раздела: свойства потребителей, расширенные конфигурации потребителей, свойства производителя и расширенные конфигурации производителя.
Свойства потребителя
Эти свойства предоставляются через EventHubsConsumerProperties
.
Заметка
Чтобы избежать повторения, начиная с версии 4.19.0 и 5.19.0, Центры событий Azure Stream Binder Spring Cloud поддерживают параметры для всех каналов в формате spring.cloud.stream.eventhubs.default.consumer.<property>=<value>
.
Настраиваемые свойства потребителей spring-cloud-azure-stream-binder-eventhubs:
Свойство | Тип | Описание |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode | Контрольная точкаMode | Режим контрольной точки, используемый, когда потребитель решает, как отправлять сообщение о контрольной точке |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count | Целое число | Определяет объем сообщения для каждой секции, чтобы выполнить одну контрольную точку. Вступают в силу только в том случае, если используется режим контрольной точки PARTITION_COUNT . |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval | Длительность | Определяет интервал времени для выполнения одной контрольной точки. Вступают в силу только в том случае, если используется режим контрольной точки TIME . |
spring.cloud.stream.eventhubs.bindings.<размер привязки name.consumer.batch.max | Целое число | Максимальное количество событий в пакете. Требуется для режима пакетного потребителя. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time | Длительность | Максимальная длительность использования пакетной службы. Вступают в силу только в том случае, если включен режим пакетного потребителя и является необязательным. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balanceing.update-interval | Длительность | Длительность интервала обновления. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balanceing.strategy | LoadBalancingStrategy | Стратегия балансировки нагрузки. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balanceing.partition-ownership-interval | Длительность | Срок действия, после которого истекает срок владения секцией. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties | Булев | Должен ли обработчик событий запрашивать сведения о последнем заквеченном событии в связанной секции и отслеживать эти сведения по мере получения событий. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | Целое число | Число, используемое потребителем для управления числом событий, которые потребитель Концентратора событий будет активно получать и очереди локально. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | Сопоставление с ключом в качестве идентификатора секции и значений StartPositionProperties |
Карта, содержащая позицию события, используемую для каждой секции, если контрольная точка для секции не существует в хранилище контрольных точек. Эта карта будет ключом от идентификатора секции. |
Заметка
Конфигурация initial-partition-event-position
принимает map
, чтобы указать начальную позицию для каждого концентратора событий. Таким образом, его ключом является идентификатор секции, а значение равно StartPositionProperties
, которое включает свойства смещения, порядковый номер, заквеченное время даты и независимо от того, включено ли это значение. Например, его можно задать как
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
Расширенная конфигурация потребителя
Приведенный выше подключения, контрольных точеки общей конфигурации клиента azure SDK настройки для каждого потребителя привязки, который можно настроить с помощью префикса spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.
.
Свойства производителя
Эти свойства предоставляются через EventHubsProducerProperties
.
Заметка
Чтобы избежать повторения, начиная с версии 4.19.0 и 5.19.0, Центры событий Azure Stream Binder Spring Cloud поддерживают параметры для всех каналов в формате spring.cloud.stream.eventhubs.default.producer.<property>=<value>
.
Настраиваемые свойства spring-cloud-azure-stream-binder-eventhubs:
Свойство | Тип | Описание |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | булев | Флаг переключателя для синхронизации производителя. Если задано значение true, производитель ожидает ответа после операции отправки. |
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | длинный | Время ожидания ответа после операции отправки. Вступают в силу только в том случае, если производитель синхронизации включен. |
Расширенная конфигурация производителя
Приведенные выше подключения и общей конфигурации клиента Azure SDK настройки для каждого производителя привязки, который можно настроить с помощью префикса spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.
.
Базовое использование
Отправка и получение сообщений из центров событий
Заполните параметры конфигурации учетными данными.
Для учетных данных в качестве строки подключения настройте следующие свойства в файле application.yml:
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Для учетных данных в качестве субъекта-службы настройте следующие свойства в файле application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Заметка
Значения, допустимые для tenant-id
: common
, organizations
, consumers
или идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе Используется неправильная конечная точка (личные учетные записи и учетные записи организации) ошибки AADSTS50020. Учетная запись пользователя от поставщика удостоверений не существует вклиента. Сведения о преобразовании приложения с одним клиентом см. в статье Преобразование однотенантного приложения в мультитенантное приложение наидентификатора Microsoft Entra.
Для учетных данных в качестве управляемых удостоверений настройте следующие свойства в файле application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Определение поставщика и потребителя.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Поддержка секционирования
Создается PartitionSupplier
с сведениями о секционированиях, предоставленными пользователем, для настройки сведений о секционированиях для отправки сообщения. В следующей блок-схеме показано, как получить различные приоритеты для идентификатора секции и ключа:
Поддержка потребителей пакетной службы
Укажите параметры пакетной конфигурации, как показано в следующем примере:
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed
Определение поставщика и потребителя.
Для режима контрольной точки как
BATCH
можно использовать следующий код для отправки сообщений и использования в пакетах.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Для режима контрольных точек в виде
MANUAL
можно использовать следующий код для отправки сообщений и использования и контрольных точек в пакетах.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Заметка
В режиме пакетного использования тип содержимого привязки Spring Cloud Stream по умолчанию application/json
, поэтому убедитесь, что полезные данные сообщения соответствуют типу контента. Например, при использовании типа контента по умолчанию application/json
для получения сообщений с полезными данными String
полезные данные должны быть JSON String
, окруженные двойными кавычками для исходного текста String
. Хотя для типа контента text/plain
это может быть объект String
напрямую. Дополнительные сведения см. в разделе Согласование типов контента Spring Cloud Stream.
Обработка сообщений об ошибках
Обработка сообщений об ошибках исходящей привязки
По умолчанию Spring Integration создает глобальный канал ошибок с именем
errorChannel
. Настройте следующую конечную точку сообщения для обработки исходящих сообщений об ошибках привязки.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Обработка сообщений об ошибках входящего привязки
Spring Cloud Stream Event Hubs Binder поддерживает одно решение для обработки ошибок для привязок входящих сообщений: обработчики ошибок.
обработчик ошибок:
Spring Cloud Stream предоставляет механизм для предоставления пользовательского обработчика ошибок путем добавления
Consumer
, который принимает экземплярыErrorMessage
. Дополнительные сведения см. в обработке сообщений об ошибках в документации spring Cloud Stream.Обработчик ошибок по умолчанию привязки
Настройте одну
Consumer
bean для использования всех сообщений об ошибках привязки входящего трафика. Следующая функция по умолчанию подписывается на каждый канал ошибок входящего привязки:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Кроме того, необходимо задать для свойства
spring.cloud.stream.default.error-handler-definition
имя функции.Обработчик ошибок, зависящих от привязки
Настройте
Consumer
bean для использования определенных сообщений об ошибках входящего привязки. Следующая функция подписывается на определенный канал ошибок входящего трафика и имеет более высокий приоритет, чем обработчик ошибок по умолчанию привязки:@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }
Кроме того, необходимо задать для свойства
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
имя функции.
Заголовки сообщений Центров событий
Основные поддерживаемые заголовки сообщений см. в разделе Заголовки сообщений Центров событий поддержки поддержки Spring Cloud Azure для Spring Integration.
Поддержка нескольких привязок
Подключение к нескольким пространствам имен Центров событий также поддерживается с помощью нескольких привязок. В этом примере в качестве примера используется строка подключения. Также поддерживаются учетные данные субъектов-служб и управляемых удостоверений. Связанные свойства можно задать в параметрах среды привязки.
Чтобы использовать несколько привязок с Центрами событий, настройте следующие свойства в файле application.yml:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000
Заметка
В предыдущем файле приложения показано, как настроить один опрашиватель по умолчанию для приложения для всех привязок. Если вы хотите настроить опрос для определенной привязки, можно использовать конфигурацию, например
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.Нам нужно определить двух поставщиков и двух потребителей:
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
Подготовка ресурсов
Привязка Центров событий поддерживает подготовку концентратора событий и группы потребителей, пользователи могут использовать следующие свойства для включения подготовки.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
Заметка
Значения, допустимые для tenant-id
: common
, organizations
, consumers
или идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе Используется неправильная конечная точка (личные учетные записи и учетные записи организации) ошибки AADSTS50020. Учетная запись пользователя от поставщика удостоверений не существует вклиента. Сведения о преобразовании приложения с одним клиентом см. в статье Преобразование однотенантного приложения в мультитенантное приложение наидентификатора Microsoft Entra.
Образцы
Дополнительные сведения см. в репозитории azure-spring-boot-samples на сайте GitHub.
Spring Cloud Stream Binder для служебной шины Azure
Основные понятия
Spring Cloud Stream Binder для служебной шины Azure предоставляет реализацию привязки для Spring Cloud Stream Framework. Эта реализация использует адаптеры канала служебной шины Spring Integration в своей основе.
Запланированное сообщение
Эта привязка поддерживает отправку сообщений в раздел для отложенной обработки. Пользователи могут отправлять запланированные сообщения с заголовком x-delay
выражения в миллисекундах время задержки сообщения. Сообщение будет доставлено в соответствующие разделы после x-delay
миллисекундах.
Группа потребителей
Раздел служебной шины обеспечивает аналогичную поддержку группы потребителей, как Apache Kafka, но с небольшой другой логикой.
Этот привязка зависит от Subscription
раздела, чтобы выступать в качестве группы потребителей.
Настройка зависимостей
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
Кроме того, можно использовать начальную шину Azure Stream Spring Cloud, как показано в следующем примере для Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
Конфигурация
Привязка предоставляет следующие две части параметров конфигурации:
Свойства конфигурации подключения
В этом разделе содержатся параметры конфигурации, используемые для подключения к служебной шине Azure.
Заметка
Если вы решили использовать субъект безопасности для проверки подлинности и авторизации с помощью идентификатора Microsoft Entra для доступа к ресурсу Azure, ознакомьтесь с авторизовать доступ с помощью идентификатора Microsoft Entra ID, чтобы убедиться, что субъект безопасности получил достаточное разрешение на доступ к ресурсу Azure.
Настраиваемые свойства подключения spring-cloud-azure-stream-binder-servicebus:
Свойство | Тип | Описание |
---|---|---|
spring.cloud.azure.servicebus.enabled | булев | Включена ли служебная шина Azure. |
spring.cloud.azure.servicebus.connection-string | Струна | Строковое значение строки подключения пространства имен служебной шины. |
spring.cloud.azure.servicebus.custom-endpoint-address | Струна | Пользовательский адрес конечной точки, используемый при подключении к служебной шине. |
spring.cloud.azure.servicebusпространство имен | Струна | Значение пространства имен служебной шины, которое является префиксом полного доменного имени. Полное доменное имя должно состоять из NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | Струна | Доменное имя значения пространства имен служебной шины Azure. |
Заметка
Общие параметры конфигурации пакета SDK для службы Azure можно настроить для привязки служебной шины Azure Spring Cloud. Поддерживаемые параметры конфигурации представлены в конфигурации Spring Cloud Azureи могут быть настроены с помощью единого префикса spring.cloud.azure.
или префикса spring.cloud.azure.servicebus.
.
Привязка также поддерживает Spring Spring Может Azure Resource Manager по умолчанию. Сведения о том, как получить строку подключения с субъектами безопасности, которые не предоставляются с Data
связанными ролями, см. в разделе базовыхSpring Could Azure Resource Manager.
Свойства конфигурации привязки служебной шины Azure
Следующие параметры разделены на четыре раздела: свойства потребителей, расширенные конфигурации потребителей, свойства производителя и расширенные конфигурации производителя.
Свойства потребителя
Эти свойства предоставляются через ServiceBusConsumerProperties
.
Заметка
Чтобы избежать повторения, начиная с версии 4.19.0 и 5.19.0, служебная шина Azure Stream Binder Spring Cloud поддерживает параметры значений для всех каналов в формате spring.cloud.stream.servicebus.default.consumer.<property>=<value>
.
Настраиваемые свойства spring-cloud-azure-stream-binder-servicebus:
Свойство | Тип | По умолчанию | Описание |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected | булев | ложный | Если неудачные сообщения перенаправляются в DLQ. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-параллельные вызовы | Целое число | 1 | Максимальное число одновременных сообщений, которые должен обрабатывать клиент обработчика служебной шины. Если сеанс включен, он применяется к каждому сеансу. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-session | Целое число | недействительный | Максимальное количество одновременных сеансов для обработки в любое время. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled | Булев | недействительный | Включен ли сеанс. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count | Целое число | 0 | Число предварительных выборок клиента обработчика служебной шины. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue | SubQueue | никакой | Тип вложенной очереди для подключения. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | Длительность | 5 млн | Время для продолжения автоматического продления блокировки. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | Режим получения клиента обработчика служебной шины. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete | Булев | истинный | Следует ли автоматически урегулировать сообщения. Если задано значение false, то будет добавлен заголовок сообщения Checkpointer , чтобы разработчики могли разрешать сообщения вручную. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-мегабайт | Длинный | 1024 | Максимальный размер очереди или раздела в мегабайтах, который является размером памяти, выделенной для очереди или раздела. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live | Длительность | P10675199DT2H48M5.4775807S. (10675199 дней, 2 часа, 48 минут, 5 секунд и 477 миллисекунд) | Длительность, после которой истекает срок действия сообщения, начиная с момента отправки сообщения в служебную шину. |
Важный
При использовании Azure Resource Manager (ARM) необходимо настроить свойство spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type
. Дополнительные сведения см. в примере servicebus-queue-binder-arm на сайте GitHub.
Расширенная конфигурация потребителя
Приведенные выше подключения и общей конфигурации клиента Azure SDK настройки для каждого потребителя привязки, который можно настроить с помощью префикса spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.
.
Свойства производителя
Эти свойства предоставляются через ServiceBusProducerProperties
.
Заметка
Чтобы избежать повторения, начиная с версии 4.19.0 и 5.19.0, служебная шина Azure Stream Binder Spring Cloud поддерживает параметры значений для всех каналов в формате spring.cloud.stream.servicebus.default.producer.<property>=<value>
.
Настраиваемые свойства spring-cloud-azure-stream-binder-servicebus:
Свойство | Тип | По умолчанию | Описание |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | булев | ложный | Переключение флага для синхронизации производителя. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | длинный | 10000 | Значение времени ожидания для отправки производителя. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | недействительный | Тип сущности служебной шины производителя, необходимый для производителя привязки. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-мегабайт | Длинный | 1024 | Максимальный размер очереди или раздела в мегабайтах, который является размером памяти, выделенной для очереди или раздела. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live | Длительность | P10675199DT2H48M5.4775807S. (10675199 дней, 2 часа, 48 минут, 5 секунд и 477 миллисекунд) | Длительность, после которой истекает срок действия сообщения, начиная с момента отправки сообщения в служебную шину. |
Важный
При использовании производителя привязки необходимо настроить свойство spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type
.
Расширенная конфигурация производителя
Приведенные выше подключения и общей конфигурации клиента Azure SDK настройки для каждого производителя привязки, который можно настроить с помощью префикса spring.cloud.stream.servicebus.bindings.<binding-name>.producer.
.
Базовое использование
Отправка и получение сообщений из служебной шины
Заполните параметры конфигурации учетными данными.
Для учетных данных в качестве строки подключения настройте следующие свойства в файле application.yml:
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Для учетных данных в качестве субъекта-службы настройте следующие свойства в файле application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Заметка
Значения, допустимые для tenant-id
: common
, organizations
, consumers
или идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе Используется неправильная конечная точка (личные учетные записи и учетные записи организации) ошибки AADSTS50020. Учетная запись пользователя от поставщика удостоверений не существует вклиента. Сведения о преобразовании приложения с одним клиентом см. в статье Преобразование однотенантного приложения в мультитенантное приложение наидентификатора Microsoft Entra.
Для учетных данных в качестве управляемых удостоверений настройте следующие свойства в файле application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Определение поставщика и потребителя.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Поддержка ключа секции
Привязка поддерживает секционирование служебной шины путем задания ключа секции и идентификатора сеанса в заголовке сообщения. В этом разделе описывается настройка ключа секции для сообщений.
Spring Cloud Stream предоставляет свойство выражения spEL ключа раздела spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
. Например, присвойите этому свойству значение "'partitionKey-' + headers[<message-header-key>]"
и добавьте заголовок с именем message-header-key. Spring Cloud Stream использует значение этого заголовка при оценке выражения для назначения ключа секции. В следующем коде представлен пример производителя:
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
Поддержка сеансов
Привязка поддерживает сеансы сообщений служебной шины. Идентификатор сеанса сообщения можно задать с помощью заголовка сообщения.
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
Заметка
Согласно секционирование служебной шины, идентификатор сеанса имеет более высокий приоритет, чем ключ секции. Поэтому при установке обоих ServiceBusMessageHeaders#SESSION_ID
и ServiceBusMessageHeaders#PARTITION_KEY
заголовков значение идентификатора сеанса в конечном итоге используется для перезаписи значения ключа секции.
Обработка сообщений об ошибках
Обработка сообщений об ошибках исходящей привязки
По умолчанию Spring Integration создает глобальный канал ошибок с именем
errorChannel
. Настройте следующую конечную точку сообщения для обработки сообщения об ошибке исходящей привязки.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Обработка сообщений об ошибках входящего привязки
Привязка служебной шины Spring Cloud Stream поддерживает два решения для обработки ошибок для привязок входящих сообщений: обработчика ошибок и обработчиков ошибок привязки.
обработчик ошибок привязки
: Обработчик ошибок привязки по умолчанию обрабатывает входящий привязку. Этот обработчик используется для отправки неудачных сообщений в очередь недоставленных писем при включении
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected
. В противном случае не удалось отказаться от сообщений. Обработчик ошибок привязки является взаимоисключающим с другими предоставленными обработчиками ошибок.обработчик ошибок
: Spring Cloud Stream предоставляет механизм для предоставления пользовательского обработчика ошибок путем добавления
Consumer
, который принимает экземплярыErrorMessage
. Дополнительные сведения см. в обработке сообщений об ошибках в документации spring Cloud Stream.Обработчик ошибок по умолчанию привязки
Настройте одну
Consumer
bean для использования всех сообщений об ошибках привязки входящего трафика. Следующая функция по умолчанию подписывается на каждый канал ошибок входящего привязки:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Кроме того, необходимо задать для свойства
spring.cloud.stream.default.error-handler-definition
имя функции.Обработчик ошибок, зависящих от привязки
Настройте
Consumer
bean для использования определенных сообщений об ошибках входящего привязки. Следующая функция подписывается на определенный канал ошибок входящего трафика с более высоким приоритетом, чем обработчик ошибок по умолчанию привязки.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Кроме того, необходимо задать для свойства
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
имя функции.
Заголовки сообщений служебной шины
Основные поддерживаемые заголовки сообщений см. в разделе заголовков сообщений служебной шины раздела поддержки Spring Cloud Azure для Spring Integration.
Заметка
При настройке ключа секции приоритет заголовка сообщения выше свойства Spring Cloud Stream. Поэтому spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
действует только в том случае, если ни один из ServiceBusMessageHeaders#SESSION_ID
и ServiceBusMessageHeaders#PARTITION_KEY
заголовков не настроен.
Поддержка нескольких привязок
Подключение к нескольким пространствам имен служебной шины также поддерживается с помощью нескольких привязок. Этот пример принимает строку подключения в качестве примера. Учетные данные субъектов-служб и управляемых удостоверений также поддерживаются, пользователи могут задавать связанные свойства в параметрах среды каждого привязки.
Чтобы использовать несколько привязок ServiceBus, настройте следующие свойства в файле application.yml:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000
Заметка
В предыдущем файле приложения показано, как настроить один опрашиватель по умолчанию для приложения для всех привязок. Если вы хотите настроить опрос для определенной привязки, можно использовать конфигурацию, например
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.нам нужно определить двух поставщиков и двух потребителей
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
Подготовка ресурсов
Привязка служебной шины поддерживает подготовку очередей, раздела и подписки, пользователи могут использовать следующие свойства для включения подготовки.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
Заметка
Значения, допустимые для tenant-id
: common
, organizations
, consumers
или идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе Используется неправильная конечная точка (личные учетные записи и учетные записи организации) ошибки AADSTS50020. Учетная запись пользователя от поставщика удостоверений не существует вклиента. Сведения о преобразовании приложения с одним клиентом см. в статье Преобразование однотенантного приложения в мультитенантное приложение наидентификатора Microsoft Entra.
Настройка свойств клиента служебной шины
Разработчики могут использовать AzureServiceClientBuilderCustomizer
для настройки свойств клиента служебной шины. Следующий пример настраивает свойство sessionIdleTimeout
в ServiceBusClientBuilder
:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Образцы
Дополнительные сведения см. в репозитории azure-spring-boot-samples на сайте GitHub.