Spring Cloud Stream с Azure Service Bus
В этой статье показано, как использовать Spring Cloud Stream Binder для отправки сообщений в служебную шину и получения сообщений из нее queues
и topics
.
Azure предоставляет платформу асинхронного обмена сообщениями, именуемую служебной шиной Azure ("служебная шина"), основанную на стандарте Advanced Message Queueing Protocol 1.0 (Расширенный протокол управления очередью сообщений "AMQP 1.0"). Служебную шину можно использовать в пределах поддерживаемых платформ Azure.
Предварительные условия
Подписка Azure — создайте бесплатную учетную запись.
Пакет средств разработки Java (JDK) версии 8 или более поздней.
Apache Maven версии 3.2 или более поздней.
cURL или подобная служебная HTTP-программа, с помощью которой можно протестировать функциональные возможности.
Очередь или тема для Служебной шины Azure. Если у вас его нет, создайте очередь Service Bus или создайте тему Service Bus.
Приложение Spring Boot. Если у вас его нет, создайте проект Maven, используя Spring Initializr. Обязательно выберите Проект Maven и в разделе Зависимости добавьте Spring Web и Поддержка Azure, а затем выберите Java версии 8 или выше.
Примечание.
Чтобы предоставить вашей учетной записи доступ к ресурсам Служебной шины Azure, назначьте роли Azure Service Bus Data Sender
и Azure Service Bus Data Receiver
учетной записи Microsoft Entra, которую вы используете. Для получения дополнительной информации о предоставлении ролей доступа см. Использование портала Microsoft Azure для назначения ролей Azure и Аутентификация и авторизация приложения с помощью Microsoft Entra ID для доступа к сущностям Служебной шины Azure.
Внимание
Для выполнения действий, описанных в этой статье, требуется spring Boot версии 2.5 или более поздней.
Отправка и получение сообщений из службы сообщений Azure Service Bus
С помощью очереди или темы для Служебной шины Azure вы можете отправлять и получать сообщения с помощью Spring Cloud Azure Stream Binder для Служебной шины.
Чтобы установить модуль Service Bus для связки потоков Spring Cloud в Azure, добавьте следующие зависимости в файл pom.xml:
Спецификация компонентов Spring Cloud Azure (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.21.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Примечание.
Если вы используете Spring Boot 2.x, обязательно установите
spring-cloud-azure-dependencies
значение4.19.0
. Этот счет материалов (BOM) должен быть настроен в<dependencyManagement>
разделе pom.xml файла. Это гарантирует, что все зависимости Spring Cloud Azure используют одну и ту же версию. Дополнительные сведения о версии, используемой для этого BOM, см. в статье "Какая версия Spring Cloud Azure должна использоваться".Артефакт Azure Stream Binder Spring Cloud служебная шина:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> </dependency>
Написать код для приложения
Чтобы настроить приложение для использования очереди или темы Service Bus для отправки и получения сообщений, следуйте данным шагам.
Настройте учетные данные Служебной шины в файле конфигурации
application.properties
.spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE} spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue spring.cloud.function.definition=consume;supply; spring.cloud.stream.poller.fixed-delay=60000 spring.cloud.stream.poller.initial-delay=0
В следующей таблице описаны поля в конфигурации:
Поле Описание spring.cloud.azure.servicebus.namespace
Укажите пространство имен, полученное в Служебной шине из портала Azure. spring.cloud.stream.bindings.consume-in-0.destination
Укажите очередь Service Bus или раздел Service Bus, использованные в этом руководстве. spring.cloud.stream.bindings.supply-out-0.destination
Укажите то же значение, которое использовалось для назначения ввода. spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete
Укажите, следует ли автоматически урегулировать сообщения. Если задано значение false
, будет добавлен заголовок сообщенияCheckpointer
, чтобы разработчики могли вручную урегулировать сообщения.spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type
Укажите тип сущности для выходной привязки, может быть queue
илиtopic
.spring.cloud.function.definition
Укажите, какой функциональный bean-компонент нужно привязать к внешним назначениям, предоставляемым привязками. spring.cloud.stream.poller.fixed-delay
Укажите фиксированную задержку для опроса по умолчанию в миллисекундах. Значение по умолчанию — 1000 L
. Рекомендуемое значение —60000
.spring.cloud.stream.poller.initial-delay
Укажите начальную задержку для периодических триггеров. Значение по умолчанию — 0
.Измените файл класса запуска, чтобы отобразить следующее содержимое.
import com.azure.spring.messaging.checkpoint.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER; @SpringBootApplication public class ServiceBusQueueBinderApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class); private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer(); public static void main(String[] args) { SpringApplication.run(ServiceBusQueueBinderApplication.class, args); } @Bean public Supplier<Flux<Message<String>>> supply() { return ()->many.asFlux() .doOnNext(m->LOGGER.info("Manually sending message {}", m)) .doOnError(t->LOGGER.error("Error encountered", t)); } @Bean public Consumer<Message<String>> consume() { return message->{ Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e->LOGGER.error("Error found", e)) .block(); }; } @Override public void run(String... args) { LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World"); many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST); } }
Совет
В этом руководстве нет операций проверки подлинности в конфигурациях или коде. Однако для подключения к службам Azure требуется проверка подлинности. Чтобы завершить проверку подлинности, необходимо использовать удостоверение Azure. Spring Cloud Azure использует
DefaultAzureCredential
, предоставляемую библиотекой удостоверений Azure, чтобы помочь вам получить учетные данные без изменений в коде.DefaultAzureCredential
поддерживает несколько методов проверки подлинности и определяет, какой метод следует использовать во время выполнения. Этот подход позволяет приложению использовать различные методы проверки подлинности в разных средах (например, локальных и рабочих средах), не реализуя код, зависящий от среды. Дополнительные сведения см. в разделе DefaultAzureCredential.Для выполнения проверки подлинности в локальных средах разработки можно использовать Azure CLI, Visual Studio Code, PowerShell или другие методы. Дополнительные сведения см. в статье о проверке подлинности Azure в средах разработки Java. Чтобы завершить проверку подлинности в средах размещения Azure, рекомендуется использовать управляемое удостоверение, назначаемое пользователем. См. сведения об управляемых удостоверениях для ресурсов Azure.
Запустите приложение. Сообщения, как показано в следующем примере, будут размещены в журнале приложений:
New message received: 'Hello World' Message 'Hello World' successfully checkpointed