Поделиться через


Spring Cloud Stream с Azure Event Hubs

В этом руководстве показано, как отправлять и получать сообщения с помощью Azure Event Hubs и Spring Cloud Stream Binder для Event Hubs в приложении Spring Boot.

Предварительные условия

Примечание.

Чтобы предоставить вашей учетной записи доступ к ресурсам, в Центрах событий Azure назначьте учетной записи Microsoft Entra, которую вы используете, роли Azure Event Hubs Data Receiver и Azure Event Hubs Data Sender. Затем в учетной записи Azure Storage назначьте роль Storage Blob Data Contributor учетной записи Microsoft Entra, которую вы используете в данный момент. Дополнительные сведения о предоставлении ролей доступа см. в статье Назначение ролей Azure с помощью портала Azure и Авторизация доступа к ресурсам Центров событий с помощью идентификатора Microsoft Entra ID.

Внимание

Для выполнения действий, описанных в этом руководстве, требуется Spring Boot версии 2.5 или более поздней.

Отправка и получение сообщений в Azure Event Hubs

С помощью учетной записи хранилища Azure и концентратора событий Azure вы можете отправлять и получать сообщения с помощью Spring Cloud Azure Stream Binder Event Hubs.

Чтобы установить модуль Центров событий Azure Stream Binder Spring Cloud, добавьте следующие зависимости в файл pom.xml :

  • Весенний облачный Azure список материалов (BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.21.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Примечание.

    Если вы используете Spring Boot 2.x, обязательно установите spring-cloud-azure-dependencies на 4.19.0. Этот счет материалов (BOM) должен быть настроен в <dependencyManagement> разделе pom.xml файла. Это гарантирует, что все зависимости Azure Spring Cloud используют одну и ту же версию. Дополнительные сведения о версии, используемой для этого BOM, см. в статье "Какая версия Spring Cloud Azure должна использоваться".

  • Артефакт Центров событий Azure Stream Binder Spring Cloud:

    <dependency>
       <groupId>com.azure.spring</groupId>
       <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
    </dependency>
    

Программирование приложения

Используйте следующие шаги, чтобы настроить приложение для производства и потребления сообщений с помощью Azure Event Hubs.

  1. Настройте учетные данные концентратора событий, добавив следующие свойства в файл application.properties .

     spring.cloud.azure.eventhubs.namespace=${AZURE_EVENTHUBS_NAMESPACE}
     spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=${AZURE_STORAGE_ACCOUNT_NAME}
     spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=${AZURE_STORAGE_CONTAINER_NAME}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_EVENTHUB_NAME}
     spring.cloud.stream.bindings.consume-in-0.group=${AZURE_EVENTHUB_CONSUMER_GROUP}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_EVENTHUB_NAME}
     spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode=MANUAL
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.initial-delay=0
     spring.cloud.stream.poller.fixed-delay=1000
    

    В следующей таблице описаны поля в конфигурации:

    Поле Описание
    spring.cloud.azure.eventhubs.namespace Укажите пространство имен, полученное в концентраторе событий из портала Azure.
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Укажите учетную запись хранения, созданную в этом руководстве.
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Укажите контейнер учетной записи хранения.
    spring.cloud.stream.bindings.consume-in-0.destination Укажите концентратор событий, используемый в этом руководстве.
    spring.cloud.stream.bindings.consume-in-0.group Укажите группы потребителей в экземпляре Event Hubs.
    spring.cloud.stream.bindings.supply-out-0.destination Укажите тот же концентратор событий, который использовался в этом руководстве.
    spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode Укажите MANUAL.
    spring.cloud.function.definition Укажите, какой функциональный bean-компонент нужно привязать к внешним точкам назначения, раскрываемым привязками.
    spring.cloud.stream.poller.initial-delay Укажите начальную задержку для периодических триггеров. Значение по умолчанию — 0.
    spring.cloud.stream.poller.fixed-delay Укажите фиксированную задержку для опросчика по умолчанию в миллисекундах. Значение по умолчанию — 1000 L.
  2. Измените файл класса запуска, чтобы отобразить следующее содержимое.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import com.azure.spring.messaging.eventhubs.support.EventHubsHeaders;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class EventHubBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued "
                        +"time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
                );
                checkpointer.success()
                            .doOnSuccess(success->LOGGER.info("Message '{}' successfully checkpointed",
                                message.getPayload()))
                            .doOnError(error->LOGGER.error("Exception found", error))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to sendMessage.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Совет

    В этом руководстве нет операций проверки подлинности в конфигурациях или коде. Однако для подключения к службам Azure требуется проверка подлинности. Чтобы завершить проверку подлинности, необходимо использовать удостоверение Azure. Spring Cloud Azure использует DefaultAzureCredential, который библиотека Azure Identity предоставляет для получения учетных данных без каких-либо изменений кода.

    DefaultAzureCredential поддерживает несколько методов проверки подлинности и определяет, какой метод следует использовать во время выполнения. Этот подход позволяет приложению использовать различные методы проверки подлинности в разных средах (например, локальных и рабочих средах), не реализуя код, зависящий от среды. Дополнительные сведения см. в разделе DefaultAzureCredential.

    Для выполнения проверки подлинности в локальных средах разработки можно использовать Azure CLI, Visual Studio Code, PowerShell или другие методы. Дополнительные сведения см. в статье о проверке подлинности Azure в средах разработки Java. Чтобы завершить проверку подлинности в средах размещения Azure, рекомендуется использовать управляемое удостоверение, назначаемое пользователем. См. сведения об управляемых удостоверениях для ресурсов Azure.

  3. Запустите приложение. Такие сообщения будут размещены в журнале приложений, как показано в следующем примере выходных данных:

    New message received: 'Hello World', partition key: 107207233, sequence number: 458, offset: 94256, enqueued time: 2023-02-17T08:27:59.641Z
    Message 'Hello World!' successfully checkpointed
    

Развертывание в Azure Spring Apps

Теперь, когда у вас есть приложение Spring Boot, работающее локально, пришло время переместить его в рабочую среду. Azure Spring Apps упрощает развертывание приложений Spring Boot в Azure без каких-либо изменений кода. Эта служба управляет инфраструктурой приложений Spring, благодаря чему разработчики могут сосредоточиться на коде. Azure Spring Apps обеспечивает управление жизненным циклом за счет комплексного мониторинга и диагностики, управления конфигурацией, обнаружения служб, интеграции CI/CD, выполнения сине-зеленых развертываний и прочего. Сведения о развертывании приложения в Azure Spring Apps см. в статье "Развертывание первого приложения в Azure Spring Apps".

Следующие шаги