Использование Spring Kafka с Центры событий Azure для API Kafka
В этом руководстве показано, как настроить Привязку Stream Binder на основе Java, чтобы использовать Центры событий Azure для Kafka для отправки и получения сообщений с Центры событий Azure. Для получения дополнительной информации см. раздел Использование центра событий Azure в приложениях Apache Kafka
В этом руководстве мы рассмотрим два метода проверки подлинности: проверку подлинности Microsoft Entra и проверку подлинности ПОДПИСАННЫХ URL-адресов (SAS). На вкладке "Безпарольная" показана проверка подлинности Microsoft Entra, а на вкладке "Строка подключения" отображается проверка подлинности SAS.
Аутентификация Microsoft Entra — это механизм подключения к Центру событий Azure для Kafka с использованием удостоверений, определенных в Microsoft Entra ID. С помощью аутентификации Microsoft Entra вы можете управлять удостоверениями пользователей базы данных и другими сервисами Майкрософт в одном центральном месте, что упрощает управление разрешениями.
Проверка подлинности SAS использует строку подключения пространства имен Центров событий Azure для делегированного доступа к Центрам событий с использованием Kafka. Если вы выбираете использовать Подписи общего доступа в качестве учетных данных, вам нужно самостоятельно управлять строкой подключения.
Предварительные условия
Подписка Azure — создайте бесплатную учетную запись.
Пакет средств разработки Java (JDK) версии 8 или более поздней.
Apache Maven версии 3.2 или более поздней.
cURL или подобная служебная HTTP-программа, с помощью которой можно протестировать функциональные возможности.
Azure Cloud Shell или Azure CLI 2.37.0 или более поздней версии.
Концентратор событий Azure. Если у вас его нет, создайте концентратор событий с помощью портал Azure.
Приложение Spring Boot. Если у вас его нет, создайте проект Maven с помощью Spring Initializr. Обязательно выберите Maven Project и в разделе Dependencies добавьте зависимости Spring Web, Spring for Apache Kafka и Cloud Stream, а затем выберите версию Java 8 или более позднюю.
Внимание
Для выполнения действий, описанных в этом руководстве, требуется Spring Boot версии 2.5 или более поздней.
Подготовьте учетные данные
Центры событий Azure поддерживает использование идентификатора Microsoft Entra для авторизации запросов к ресурсам Центров событий. С помощью идентификатора Microsoft Entra можно использовать управление доступом на основе ролей Azure (Azure RBAC) для предоставления разрешений субъекту безопасности, который может быть пользователем или субъектом-службой приложений.
Если вы хотите выполнить этот пример локально с проверкой подлинности Microsoft Entra, убедитесь, что ваша учетная запись пользователя прошла проверку подлинности с помощью пакета Azure Toolkit для IntelliJ, плагина учетной записи Azure для Visual Studio Code или Azure CLI. Кроме того, убедитесь, что у учетной записи есть достаточные разрешения.
Примечание.
При использовании подключений без пароля необходимо предоставить вашему аккаунту доступ к ресурсам. В Azure Event Hubs назначьте роль Azure Event Hubs Data Receiver
и Azure Event Hubs Data Sender
учетной записи Microsoft Entra, которую вы используете в настоящее время. Дополнительные сведения о предоставлении ролей доступа см. в статье «Назначение ролей Azure с помощью портала Azure» и «Авторизация доступа к ресурсам Event Hubs с использованием Microsoft Entra ID».
Отправка и получение сообщений из Центра событий Azure
С помощью концентратора событий Azure можно отправлять и получать сообщения с помощью Spring Cloud Azure.
Чтобы установить модуль Spring Cloud Azure Starter, добавьте следующие зависимости в файл pom.xml :
Спецификация компонентов Spring Cloud Azure (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.21.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Примечание.
Если вы используете Spring Boot 2.x, убедитесь, что выбрана версия
spring-cloud-azure-dependencies
4.19.0
. Этот счет материалов (BOM) должен быть настроен в<dependencyManagement>
разделе pom.xml файла. Это гарантирует, что все зависимости Spring Cloud Azure используют одну и ту же версию. Дополнительные сведения о версии, используемой для этого BOM, см. в статье "Какая версия Spring Cloud Azure должна использоваться".Артефакт Spring Cloud Azure Starter
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter</artifactId> </dependency>
Написать код приложения
Выполните следующие действия, чтобы настроить приложение для создания и использования сообщений с помощью Центры событий Azure.
Настройте учетные данные концентратора событий, добавив следующие свойства в файл application.properties .
spring.cloud.stream.kafka.binder.brokers=${AZ_EVENTHUBS_NAMESPACE_NAME}.servicebus.windows.net:9093 spring.cloud.function.definition=consume;supply spring.cloud.stream.bindings.consume-in-0.destination=${AZ_EVENTHUB_NAME} spring.cloud.stream.bindings.consume-in-0.group=$Default spring.cloud.stream.bindings.supply-out-0.destination=${AZ_EVENTHUB_NAME}
Совет
Если используется версия
spring-cloud-azure-dependencies:4.3.0
, необходимо добавить свойствоspring.cloud.stream.binders.<kafka-binder-name>.environment.spring.main.sources
со значениемcom.azure.spring.cloud.autoconfigure.kafka.AzureKafkaSpringCloudStreamConfiguration
.Так как
4.4.0
это свойство будет добавлено автоматически, поэтому его не нужно добавлять вручную.В следующей таблице описаны поля в конфигурации:
Поле Описание spring.cloud.stream.kafka.binder.brokers
Указывает конечную точку Azure Event Hubs. spring.cloud.stream.bindings.consume-in-0.destination
Указывает концентратора событий ввода, который в этом учебном пособии является концентратором, созданным ранее. spring.cloud.stream.bindings.consume-in-0.group
Указывает группу потребителей из Центра событий Azure, которую можно задать $Default
, чтобы использовать группу потребителей по умолчанию, созданную при создании экземпляра Центра событий Azure.spring.cloud.stream.bindings.supply-out-0.destination
Указывает концентратор событий назначения выходных данных, который для этого руководства совпадает с назначением входных данных. Примечание.
Если вы включите автоматическое создание раздела, обязательно добавьте элемент конфигурации
spring.cloud.stream.kafka.binder.replicationFactor
с значением, равным по крайней мере1
. Дополнительные сведения см. в справочном руководстве по Kafka Binder для Spring Cloud Stream.Измените файл класса запуска, чтобы отобразить следующее содержимое.
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.GenericMessage; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; @SpringBootApplication public class EventHubKafkaBinderApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubKafkaBinderApplication.class); private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer(); public static void main(String[] args) { SpringApplication.run(EventHubKafkaBinderApplication.class, args); } @Bean public Supplier<Flux<Message<String>>> supply() { return ()->many.asFlux() .doOnNext(m->LOGGER.info("Manually sending message {}", m)) .doOnError(t->LOGGER.error("Error encountered", t)); } @Bean public Consumer<Message<String>> consume() { return message->LOGGER.info("New message received: '{}'", message.getPayload()); } @Override public void run(String... args) { many.emitNext(new GenericMessage<>("Hello World"), Sinks.EmitFailureHandler.FAIL_FAST); } }
Совет
В этом руководстве нет операций проверки подлинности в конфигурациях или коде. Однако для подключения к службам Azure требуется проверка подлинности. Чтобы завершить проверку подлинности, необходимо использовать удостоверение Azure. Spring Cloud Azure использует
DefaultAzureCredential
, которую предоставляет библиотека удостоверений Azure, чтобы помочь вам получить учетные данные без изменения кода.DefaultAzureCredential
поддерживает несколько методов проверки подлинности и определяет, какой метод следует использовать во время выполнения. Этот подход позволяет приложению использовать различные методы проверки подлинности в разных средах (например, локальных и рабочих средах), не реализуя код, зависящий от среды. Дополнительные сведения см. в разделе DefaultAzureCredential.Для выполнения проверки подлинности в локальных средах разработки можно использовать Azure CLI, Visual Studio Code, PowerShell или другие методы. Дополнительные сведения см. в статье о проверке подлинности Azure в средах разработки Java. Чтобы завершить проверку подлинности в средах размещения Azure, рекомендуется использовать управляемое удостоверение, назначаемое пользователем. См. сведения об управляемых удостоверениях для ресурсов Azure.
Запустите приложение. Сообщения, как показано в следующем примере, будут размещены в журнале приложений:
Kafka version: 3.0.1 Kafka commitId: 62abe01bee039651 Kafka startTimeMs: 1622616433956 New message received: 'Hello World'
Развертывание в Azure Spring Apps
Теперь, когда у вас есть приложение Spring Boot, работающее локально, пришло время переместить его в рабочую среду. Azure Spring Apps упрощает развертывание приложений Spring Boot в Azure без каких-либо изменений кода. Эта служба управляет инфраструктурой приложений Spring, благодаря чему разработчики могут сосредоточиться на коде. Azure Spring Apps обеспечивает управление жизненным циклом за счет комплексного мониторинга и диагностики, управления конфигурацией, обнаружения служб, интеграции CI/CD, выполнения сине-зеленых развертываний и прочего. Сведения о развертывании приложения в Azure Spring Apps см. в статье "Развертывание первого приложения в Azure Spring Apps".