Поделиться через


Поддержка Spring Cloud Azure для Spring Cloud Stream

Эта статья относится к:✅ версии 4.19.0 ✅ версии 5.19.0

Spring Cloud Stream — это платформа для создания высокомасштабируемых микрослужб, управляемых событиями, подключенных к общим системам обмена сообщениями.

Платформа предоставляет гибкую модель программирования, созданную на основе уже установленных и знакомых идиом Spring idioms и рекомендаций. Эти рекомендации включают поддержку сохраняемой семантики паба или подсемантики, групп потребителей и секций с отслеживанием состояния.

К текущим реализациям привязки относятся:

Spring Cloud Stream Binder для Центров событий Azure

Основные понятия

Привязка Spring Cloud Stream для Центров событий Azure предоставляет реализацию привязки для платформы Spring Cloud Stream. Эта реализация использует адаптеры каналов Концентраторов событий Spring Integration в своей основе. С точки зрения проектирования центры событий похожи на Kafka. Кроме того, к центрам событий можно получить доступ через API Kafka. Если проект имеет жесткую зависимость от API Kafka, вы можете попробовать Концентратор событий с помощью примера API Kafka

Группа потребителей

Центры событий обеспечивают аналогичную поддержку группы потребителей, как Apache Kafka, но с небольшой другой логикой. Пока Kafka сохраняет все зафиксированные смещения в брокере, необходимо хранить смещения сообщений Центров событий, обрабатываемых вручную. Пакет SDK центров событий предоставляет функцию для хранения таких смещения в службе хранилища Azure.

Поддержка секционирования

Центры событий предоставляют аналогичную концепцию физической секции, как Kafka. Но в отличие от автоматической перебалансировки Kafka между потребителями и секциями, Центры событий предоставляют вид предварительного режима. Учетная запись хранения выступает в качестве аренды, чтобы определить, какой потребитель владеет какой секцией. При запуске нового потребителя он пытается украсть некоторые секции из наиболее сильно загруженных потребителей для достижения баланса рабочей нагрузки.

Чтобы указать стратегию балансировки нагрузки, предоставляются свойства spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*. Дополнительные сведения см. в разделе Свойства потребителя.

Поддержка потребителей пакетной службы

Привязка центров событий Spring Cloud Azure Stream поддерживает компонент пакетной службы Spring Cloud Stream.

Чтобы работать с режимом пакетного потребителя, задайте для свойства spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode значение true. При включении сообщение со полезными данными списка пакетных событий получается и передается в функцию Consumer. Каждый заголовок сообщения также преобразуется в список, из которого содержимое является соответствующим значением заголовка, проанализированным из каждого события. Общие заголовки идентификаторов секций, контрольных точек и последних вложенных свойств представлены в виде одного значения, так как весь пакет событий использует одно и то же значение. Дополнительные сведения см. в разделе заголовков сообщений центров событий поддержки Spring Cloud Azure для Spring Integration.

Заметка

Заголовок контрольной точки существует только в том случае, если используется режим контрольной точки MANUAL.

Контрольная точка пакетного потребителя поддерживает два режима: BATCH и MANUAL. BATCH режим — это режим автоматической контрольной точки для проверки всего пакета событий вместе после их получения привязкой. MANUAL режиме — контрольные точки событий пользователями. При использовании Checkpointer передается в заголовок сообщения, и пользователи могут использовать его для выполнения контрольных точек.

Размер пакета можно указать, задав свойства max-size и max-wait-time с префиксом spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.. Необходимо свойство max-size, а свойство max-wait-time является необязательным. Дополнительные сведения см. в разделе Свойства потребителя.

Настройка зависимостей

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

Кроме того, можно использовать начальный центр событий Azure Stream Spring Cloud, как показано в следующем примере для Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Конфигурация

Привязыватель предоставляет следующие три части параметров конфигурации:

Свойства конфигурации подключения

В этом разделе содержатся параметры конфигурации, используемые для подключения к Центрам событий Azure.

Заметка

Если вы решили использовать субъект безопасности для проверки подлинности и авторизации с помощью идентификатора Microsoft Entra для доступа к ресурсу Azure, ознакомьтесь с авторизовать доступ с помощью идентификатора Microsoft Entra ID, чтобы убедиться, что субъект безопасности получил достаточное разрешение на доступ к ресурсу Azure.

Настраиваемые свойства подключения spring-cloud-azure-stream-binder-eventhubs:

Свойство Тип Описание
spring.cloud.azure.eventhubs.enabled булев Включена ли служба Центров событий Azure.
spring.cloud.azure.eventhubs.connection-string Струна Строковое значение строки подключения к пространству имен Центров событий.
spring.cloud.azure.eventhubsпространство имен Струна Значение пространства имен Центров событий, которое является префиксом полного доменного имени. Полное доменное имя должно состоять из NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Струна Доменное имя значения пространства имен Центров событий Azure.
spring.cloud.azure.eventhubs.custom-endpoint-address Струна Адрес пользовательской конечной точки.

Кончик

Общие параметры конфигурации пакета SDK службы Azure можно настроить для привязки Центров событий Azure Stream Spring Cloud. Поддерживаемые параметры конфигурации представлены в конфигурации Spring Cloud Azureи могут быть настроены с помощью единого префикса spring.cloud.azure. или префикса spring.cloud.azure.eventhubs..

Привязка также поддерживает Spring Spring Может Azure Resource Manager по умолчанию. Сведения о том, как получить строку подключения с субъектами безопасности, которые не предоставляются с Data связанными ролями, см. в разделе базовыхSpring Could Azure Resource Manager.

Свойства конфигурации контрольной точки

В этом разделе содержатся параметры конфигурации для службы BLOB-объектов хранилища, которая используется для сохранения сведений о владельцах секций и контрольных точках.

Заметка

В версии 4.0.0, когда свойство spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-not-exists не включено вручную, контейнер хранилища не будет создан автоматически с именем из spring.cloud.stream.bindings.bindings.name.destination.

Контрольные точки настраиваемых свойств spring-cloud-azure-stream-binder-eventhubs:

Свойство Тип Описание
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Булев Разрешить ли создание контейнеров, если они отсутствуют.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Струна Имя учетной записи хранения.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Струна Ключ доступа к учетной записи хранения.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Струна Имя контейнера хранилища.

Кончик

Общие параметры конфигурации пакета SDK службы Azure можно настроить для хранилища контрольных точек BLOB-объектов хранилища. Поддерживаемые параметры конфигурации представлены в конфигурации Spring Cloud Azureи могут быть настроены с помощью единого префикса spring.cloud.azure. или префикса spring.cloud.azure.eventhubs.processor.checkpoint-store.

Свойства конфигурации привязки Центров событий Azure

Следующие параметры разделены на четыре раздела: свойства потребителей, расширенные конфигурации потребителей, свойства производителя и расширенные конфигурации производителя.

Свойства потребителя

Эти свойства предоставляются через EventHubsConsumerProperties.

Заметка

Чтобы избежать повторения, начиная с версии 4.19.0 и 5.19.0, Центры событий Azure Stream Binder Spring Cloud поддерживают параметры для всех каналов в формате spring.cloud.stream.eventhubs.default.consumer.<property>=<value>.

Настраиваемые свойства потребителей spring-cloud-azure-stream-binder-eventhubs:

Свойство Тип Описание
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode Контрольная точкаMode Режим контрольной точки, используемый, когда потребитель решает, как отправлять сообщение о контрольной точке
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count Целое число Определяет объем сообщения для каждой секции, чтобы выполнить одну контрольную точку. Вступают в силу только в том случае, если используется режим контрольной точки PARTITION_COUNT.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval Длительность Определяет интервал времени для выполнения одной контрольной точки. Вступают в силу только в том случае, если используется режим контрольной точки TIME.
spring.cloud.stream.eventhubs.bindings.<размер привязки name.consumer.batch.max Целое число Максимальное количество событий в пакете. Требуется для режима пакетного потребителя.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Длительность Максимальная длительность использования пакетной службы. Вступают в силу только в том случае, если включен режим пакетного потребителя и является необязательным.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balanceing.update-interval Длительность Длительность интервала обновления.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balanceing.strategy LoadBalancingStrategy Стратегия балансировки нагрузки.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balanceing.partition-ownership-interval Длительность Срок действия, после которого истекает срок владения секцией.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties Булев Должен ли обработчик событий запрашивать сведения о последнем заквеченном событии в связанной секции и отслеживать эти сведения по мере получения событий.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Целое число Число, используемое потребителем для управления числом событий, которые потребитель Концентратора событий будет активно получать и очереди локально.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Сопоставление с ключом в качестве идентификатора секции и значений StartPositionProperties Карта, содержащая позицию события, используемую для каждой секции, если контрольная точка для секции не существует в хранилище контрольных точек. Эта карта будет ключом от идентификатора секции.

Заметка

Конфигурация initial-partition-event-position принимает map, чтобы указать начальную позицию для каждого концентратора событий. Таким образом, его ключом является идентификатор секции, а значение равно StartPositionProperties, которое включает свойства смещения, порядковый номер, заквеченное время даты и независимо от того, включено ли это значение. Например, его можно задать как

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Расширенная конфигурация потребителя

Приведенный выше подключения, контрольных точеки общей конфигурации клиента azure SDK настройки для каждого потребителя привязки, который можно настроить с помощью префикса spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer..

Свойства производителя

Эти свойства предоставляются через EventHubsProducerProperties.

Заметка

Чтобы избежать повторения, начиная с версии 4.19.0 и 5.19.0, Центры событий Azure Stream Binder Spring Cloud поддерживают параметры для всех каналов в формате spring.cloud.stream.eventhubs.default.producer.<property>=<value>.

Настраиваемые свойства spring-cloud-azure-stream-binder-eventhubs:

Свойство Тип Описание
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync булев Флаг переключателя для синхронизации производителя. Если задано значение true, производитель ожидает ответа после операции отправки.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout длинный Время ожидания ответа после операции отправки. Вступают в силу только в том случае, если производитель синхронизации включен.
Расширенная конфигурация производителя

Приведенные выше подключения и общей конфигурации клиента Azure SDK настройки для каждого производителя привязки, который можно настроить с помощью префикса spring.cloud.stream.eventhubs.bindings.<binding-name>.producer..

Базовое использование

Отправка и получение сообщений из центров событий

  1. Заполните параметры конфигурации учетными данными.

    • Для учетных данных в качестве строки подключения настройте следующие свойства в файле application.yml:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • Для учетных данных в качестве субъекта-службы настройте следующие свойства в файле application.yml:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Заметка

Значения, допустимые для tenant-id: common, organizations, consumersили идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе Используется неправильная конечная точка (личные учетные записи и учетные записи организации) ошибки AADSTS50020. Учетная запись пользователя от поставщика удостоверений не существует вклиента. Сведения о преобразовании приложения с одним клиентом см. в статье Преобразование однотенантного приложения в мультитенантное приложение наидентификатора Microsoft Entra.

  • Для учетных данных в качестве управляемых удостоверений настройте следующие свойства в файле application.yml:

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Определение поставщика и потребителя.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Поддержка секционирования

Создается PartitionSupplier с сведениями о секционированиях, предоставленными пользователем, для настройки сведений о секционированиях для отправки сообщения. В следующей блок-схеме показано, как получить различные приоритеты для идентификатора секции и ключа:

схема с блок-схемой процесса поддержки секционирования.

Поддержка потребителей пакетной службы

  1. Укажите параметры пакетной конфигурации, как показано в следующем примере:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Определение поставщика и потребителя.

    Для режима контрольной точки как BATCHможно использовать следующий код для отправки сообщений и использования в пакетах.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    Для режима контрольных точек в виде MANUALможно использовать следующий код для отправки сообщений и использования и контрольных точек в пакетах.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Заметка

В режиме пакетного использования тип содержимого привязки Spring Cloud Stream по умолчанию application/json, поэтому убедитесь, что полезные данные сообщения соответствуют типу контента. Например, при использовании типа контента по умолчанию application/json для получения сообщений с полезными данными String полезные данные должны быть JSON String, окруженные двойными кавычками для исходного текста String. Хотя для типа контента text/plain это может быть объект String напрямую. Дополнительные сведения см. в разделе Согласование типов контента Spring Cloud Stream.

Обработка сообщений об ошибках

  • Обработка сообщений об ошибках исходящей привязки

    По умолчанию Spring Integration создает глобальный канал ошибок с именем errorChannel. Настройте следующую конечную точку сообщения для обработки исходящих сообщений об ошибках привязки.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Обработка сообщений об ошибках входящего привязки

    Spring Cloud Stream Event Hubs Binder поддерживает одно решение для обработки ошибок для привязок входящих сообщений: обработчики ошибок.

    обработчик ошибок:

    Spring Cloud Stream предоставляет механизм для предоставления пользовательского обработчика ошибок путем добавления Consumer, который принимает экземпляры ErrorMessage. Дополнительные сведения см. в обработке сообщений об ошибках в документации spring Cloud Stream.

    • Обработчик ошибок по умолчанию привязки

      Настройте одну Consumer bean для использования всех сообщений об ошибках привязки входящего трафика. Следующая функция по умолчанию подписывается на каждый канал ошибок входящего привязки:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Кроме того, необходимо задать для свойства spring.cloud.stream.default.error-handler-definition имя функции.

    • Обработчик ошибок, зависящих от привязки

      Настройте Consumer bean для использования определенных сообщений об ошибках входящего привязки. Следующая функция подписывается на определенный канал ошибок входящего трафика и имеет более высокий приоритет, чем обработчик ошибок по умолчанию привязки:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Кроме того, необходимо задать для свойства spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition имя функции.

Заголовки сообщений Центров событий

Основные поддерживаемые заголовки сообщений см. в разделе Заголовки сообщений Центров событий поддержки поддержки Spring Cloud Azure для Spring Integration.

Поддержка нескольких привязок

Подключение к нескольким пространствам имен Центров событий также поддерживается с помощью нескольких привязок. В этом примере в качестве примера используется строка подключения. Также поддерживаются учетные данные субъектов-служб и управляемых удостоверений. Связанные свойства можно задать в параметрах среды привязки.

  1. Чтобы использовать несколько привязок с Центрами событий, настройте следующие свойства в файле application.yml:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Заметка

    В предыдущем файле приложения показано, как настроить один опрашиватель по умолчанию для приложения для всех привязок. Если вы хотите настроить опрос для определенной привязки, можно использовать конфигурацию, например spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. Нам нужно определить двух поставщиков и двух потребителей:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Подготовка ресурсов

Привязка Центров событий поддерживает подготовку концентратора событий и группы потребителей, пользователи могут использовать следующие свойства для включения подготовки.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Заметка

Значения, допустимые для tenant-id: common, organizations, consumersили идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе Используется неправильная конечная точка (личные учетные записи и учетные записи организации) ошибки AADSTS50020. Учетная запись пользователя от поставщика удостоверений не существует вклиента. Сведения о преобразовании приложения с одним клиентом см. в статье Преобразование однотенантного приложения в мультитенантное приложение наидентификатора Microsoft Entra.

Образцы

Дополнительные сведения см. в репозитории azure-spring-boot-samples на сайте GitHub.

Spring Cloud Stream Binder для служебной шины Azure

Основные понятия

Spring Cloud Stream Binder для служебной шины Azure предоставляет реализацию привязки для Spring Cloud Stream Framework. Эта реализация использует адаптеры канала служебной шины Spring Integration в своей основе.

Запланированное сообщение

Эта привязка поддерживает отправку сообщений в раздел для отложенной обработки. Пользователи могут отправлять запланированные сообщения с заголовком x-delay выражения в миллисекундах время задержки сообщения. Сообщение будет доставлено в соответствующие разделы после x-delay миллисекундах.

Группа потребителей

Раздел служебной шины обеспечивает аналогичную поддержку группы потребителей, как Apache Kafka, но с небольшой другой логикой. Этот привязка зависит от Subscription раздела, чтобы выступать в качестве группы потребителей.

Настройка зависимостей

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

Кроме того, можно использовать начальную шину Azure Stream Spring Cloud, как показано в следующем примере для Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Конфигурация

Привязка предоставляет следующие две части параметров конфигурации:

Свойства конфигурации подключения

В этом разделе содержатся параметры конфигурации, используемые для подключения к служебной шине Azure.

Заметка

Если вы решили использовать субъект безопасности для проверки подлинности и авторизации с помощью идентификатора Microsoft Entra для доступа к ресурсу Azure, ознакомьтесь с авторизовать доступ с помощью идентификатора Microsoft Entra ID, чтобы убедиться, что субъект безопасности получил достаточное разрешение на доступ к ресурсу Azure.

Настраиваемые свойства подключения spring-cloud-azure-stream-binder-servicebus:

Свойство Тип Описание
spring.cloud.azure.servicebus.enabled булев Включена ли служебная шина Azure.
spring.cloud.azure.servicebus.connection-string Струна Строковое значение строки подключения пространства имен служебной шины.
spring.cloud.azure.servicebus.custom-endpoint-address Струна Пользовательский адрес конечной точки, используемый при подключении к служебной шине.
spring.cloud.azure.servicebusпространство имен Струна Значение пространства имен служебной шины, которое является префиксом полного доменного имени. Полное доменное имя должно состоять из NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Струна Доменное имя значения пространства имен служебной шины Azure.

Заметка

Общие параметры конфигурации пакета SDK для службы Azure можно настроить для привязки служебной шины Azure Spring Cloud. Поддерживаемые параметры конфигурации представлены в конфигурации Spring Cloud Azureи могут быть настроены с помощью единого префикса spring.cloud.azure. или префикса spring.cloud.azure.servicebus..

Привязка также поддерживает Spring Spring Может Azure Resource Manager по умолчанию. Сведения о том, как получить строку подключения с субъектами безопасности, которые не предоставляются с Data связанными ролями, см. в разделе базовыхSpring Could Azure Resource Manager.

Свойства конфигурации привязки служебной шины Azure

Следующие параметры разделены на четыре раздела: свойства потребителей, расширенные конфигурации потребителей, свойства производителя и расширенные конфигурации производителя.

Свойства потребителя

Эти свойства предоставляются через ServiceBusConsumerProperties.

Заметка

Чтобы избежать повторения, начиная с версии 4.19.0 и 5.19.0, служебная шина Azure Stream Binder Spring Cloud поддерживает параметры значений для всех каналов в формате spring.cloud.stream.servicebus.default.consumer.<property>=<value>.

Настраиваемые свойства spring-cloud-azure-stream-binder-servicebus:

Свойство Тип По умолчанию Описание
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected булев ложный Если неудачные сообщения перенаправляются в DLQ.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-параллельные вызовы Целое число 1 Максимальное число одновременных сообщений, которые должен обрабатывать клиент обработчика служебной шины. Если сеанс включен, он применяется к каждому сеансу.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-session Целое число недействительный Максимальное количество одновременных сеансов для обработки в любое время.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled Булев недействительный Включен ли сеанс.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Целое число 0 Число предварительных выборок клиента обработчика служебной шины.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue SubQueue никакой Тип вложенной очереди для подключения.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Длительность 5 млн Время для продолжения автоматического продления блокировки.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock Режим получения клиента обработчика служебной шины.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete Булев истинный Следует ли автоматически урегулировать сообщения. Если задано значение false, то будет добавлен заголовок сообщения Checkpointer, чтобы разработчики могли разрешать сообщения вручную.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-мегабайт Длинный 1024 Максимальный размер очереди или раздела в мегабайтах, который является размером памяти, выделенной для очереди или раздела.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live Длительность P10675199DT2H48M5.4775807S. (10675199 дней, 2 часа, 48 минут, 5 секунд и 477 миллисекунд) Длительность, после которой истекает срок действия сообщения, начиная с момента отправки сообщения в служебную шину.

Важный

При использовании Azure Resource Manager (ARM) необходимо настроить свойство spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type. Дополнительные сведения см. в примере servicebus-queue-binder-arm на сайте GitHub.

Расширенная конфигурация потребителя

Приведенные выше подключения и общей конфигурации клиента Azure SDK настройки для каждого потребителя привязки, который можно настроить с помощью префикса spring.cloud.stream.servicebus.bindings.<binding-name>.consumer..

Свойства производителя

Эти свойства предоставляются через ServiceBusProducerProperties.

Заметка

Чтобы избежать повторения, начиная с версии 4.19.0 и 5.19.0, служебная шина Azure Stream Binder Spring Cloud поддерживает параметры значений для всех каналов в формате spring.cloud.stream.servicebus.default.producer.<property>=<value>.

Настраиваемые свойства spring-cloud-azure-stream-binder-servicebus:

Свойство Тип По умолчанию Описание
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync булев ложный Переключение флага для синхронизации производителя.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout длинный 10000 Значение времени ожидания для отправки производителя.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType недействительный Тип сущности служебной шины производителя, необходимый для производителя привязки.
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-мегабайт Длинный 1024 Максимальный размер очереди или раздела в мегабайтах, который является размером памяти, выделенной для очереди или раздела.
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live Длительность P10675199DT2H48M5.4775807S. (10675199 дней, 2 часа, 48 минут, 5 секунд и 477 миллисекунд) Длительность, после которой истекает срок действия сообщения, начиная с момента отправки сообщения в служебную шину.

Важный

При использовании производителя привязки необходимо настроить свойство spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type.

Расширенная конфигурация производителя

Приведенные выше подключения и общей конфигурации клиента Azure SDK настройки для каждого производителя привязки, который можно настроить с помощью префикса spring.cloud.stream.servicebus.bindings.<binding-name>.producer..

Базовое использование

Отправка и получение сообщений из служебной шины

  1. Заполните параметры конфигурации учетными данными.

    • Для учетных данных в качестве строки подключения настройте следующие свойства в файле application.yml:

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • Для учетных данных в качестве субъекта-службы настройте следующие свойства в файле application.yml:

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Заметка

Значения, допустимые для tenant-id: common, organizations, consumersили идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе Используется неправильная конечная точка (личные учетные записи и учетные записи организации) ошибки AADSTS50020. Учетная запись пользователя от поставщика удостоверений не существует вклиента. Сведения о преобразовании приложения с одним клиентом см. в статье Преобразование однотенантного приложения в мультитенантное приложение наидентификатора Microsoft Entra.

  • Для учетных данных в качестве управляемых удостоверений настройте следующие свойства в файле application.yml:

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Определение поставщика и потребителя.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Поддержка ключа секции

Привязка поддерживает секционирование служебной шины путем задания ключа секции и идентификатора сеанса в заголовке сообщения. В этом разделе описывается настройка ключа секции для сообщений.

Spring Cloud Stream предоставляет свойство выражения spEL ключа раздела spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression. Например, присвойите этому свойству значение "'partitionKey-' + headers[<message-header-key>]" и добавьте заголовок с именем message-header-key. Spring Cloud Stream использует значение этого заголовка при оценке выражения для назначения ключа секции. В следующем коде представлен пример производителя:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Поддержка сеансов

Привязка поддерживает сеансы сообщений служебной шины. Идентификатор сеанса сообщения можно задать с помощью заголовка сообщения.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Заметка

Согласно секционирование служебной шины, идентификатор сеанса имеет более высокий приоритет, чем ключ секции. Поэтому при установке обоих ServiceBusMessageHeaders#SESSION_ID и ServiceBusMessageHeaders#PARTITION_KEY заголовков значение идентификатора сеанса в конечном итоге используется для перезаписи значения ключа секции.

Обработка сообщений об ошибках

  • Обработка сообщений об ошибках исходящей привязки

    По умолчанию Spring Integration создает глобальный канал ошибок с именем errorChannel. Настройте следующую конечную точку сообщения для обработки сообщения об ошибке исходящей привязки.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Обработка сообщений об ошибках входящего привязки

    Привязка служебной шины Spring Cloud Stream поддерживает два решения для обработки ошибок для привязок входящих сообщений: обработчика ошибок и обработчиков ошибок привязки.

    обработчик ошибок привязки :

    Обработчик ошибок привязки по умолчанию обрабатывает входящий привязку. Этот обработчик используется для отправки неудачных сообщений в очередь недоставленных писем при включении spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected. В противном случае не удалось отказаться от сообщений. Обработчик ошибок привязки является взаимоисключающим с другими предоставленными обработчиками ошибок.

    обработчик ошибок :

    Spring Cloud Stream предоставляет механизм для предоставления пользовательского обработчика ошибок путем добавления Consumer, который принимает экземпляры ErrorMessage. Дополнительные сведения см. в обработке сообщений об ошибках в документации spring Cloud Stream.

    • Обработчик ошибок по умолчанию привязки

      Настройте одну Consumer bean для использования всех сообщений об ошибках привязки входящего трафика. Следующая функция по умолчанию подписывается на каждый канал ошибок входящего привязки:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Кроме того, необходимо задать для свойства spring.cloud.stream.default.error-handler-definition имя функции.

    • Обработчик ошибок, зависящих от привязки

      Настройте Consumer bean для использования определенных сообщений об ошибках входящего привязки. Следующая функция подписывается на определенный канал ошибок входящего трафика с более высоким приоритетом, чем обработчик ошибок по умолчанию привязки.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Кроме того, необходимо задать для свойства spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition имя функции.

Заголовки сообщений служебной шины

Основные поддерживаемые заголовки сообщений см. в разделе заголовков сообщений служебной шины раздела поддержки Spring Cloud Azure для Spring Integration.

Заметка

При настройке ключа секции приоритет заголовка сообщения выше свойства Spring Cloud Stream. Поэтому spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression действует только в том случае, если ни один из ServiceBusMessageHeaders#SESSION_ID и ServiceBusMessageHeaders#PARTITION_KEY заголовков не настроен.

Поддержка нескольких привязок

Подключение к нескольким пространствам имен служебной шины также поддерживается с помощью нескольких привязок. Этот пример принимает строку подключения в качестве примера. Учетные данные субъектов-служб и управляемых удостоверений также поддерживаются, пользователи могут задавать связанные свойства в параметрах среды каждого привязки.

  1. Чтобы использовать несколько привязок ServiceBus, настройте следующие свойства в файле application.yml:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Заметка

    В предыдущем файле приложения показано, как настроить один опрашиватель по умолчанию для приложения для всех привязок. Если вы хотите настроить опрос для определенной привязки, можно использовать конфигурацию, например spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. нам нужно определить двух поставщиков и двух потребителей

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Подготовка ресурсов

Привязка служебной шины поддерживает подготовку очередей, раздела и подписки, пользователи могут использовать следующие свойства для включения подготовки.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Заметка

Значения, допустимые для tenant-id: common, organizations, consumersили идентификатор клиента. Дополнительные сведения об этих значениях см. в разделе Используется неправильная конечная точка (личные учетные записи и учетные записи организации) ошибки AADSTS50020. Учетная запись пользователя от поставщика удостоверений не существует вклиента. Сведения о преобразовании приложения с одним клиентом см. в статье Преобразование однотенантного приложения в мультитенантное приложение наидентификатора Microsoft Entra.

Настройка свойств клиента служебной шины

Разработчики могут использовать AzureServiceClientBuilderCustomizer для настройки свойств клиента служебной шины. Следующий пример настраивает свойство sessionIdleTimeout в ServiceBusClientBuilder:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Образцы

Дополнительные сведения см. в репозитории azure-spring-boot-samples на сайте GitHub.