Spring Cloud Stream se službou Azure Event Hubs
Tento kurz ukazuje, jak odesílat a přijímat zprávy pomocí služby Azure Event Hubs a Spring Cloud Stream Binder EventHubs v aplikaci Spring Boot.
Požadavky
Předplatné Azure – vytvořte si ho zdarma.
Java Development Kit (JDK) verze 8 nebo vyšší.
Apache Maven verze 3.2 nebo vyšší
cURL nebo podobný nástroj HTTP pro testování funkčnosti.
Centrum událostí Azure Pokud ho nemáte, vytvořte centrum událostí pomocí webu Azure Portal.
Účet služby Azure Storage pro kontrolní body centra událostí. Pokud ho nemáte, vytvořte účet úložiště.
Aplikace Spring Boot. Pokud ho nemáte, vytvořte projekt Maven pomocí aplikace Spring Initializr. Nezapomeňte vybrat projekt Maven a v části Závislosti přidejte závislosti Spring Web a Podpory Azure a pak vyberte Javu verze 8 nebo vyšší.
Poznámka:
Pokud chcete účtu udělit přístup k prostředkům, přiřaďte Azure Event Hubs Data Receiver
ve službě Azure Event Hubs účet Microsoft Entra, Azure Event Hubs Data Sender
který aktuálně používáte. Potom v účtu Azure Storage přiřaďte Storage Blob Data Contributor
roli k účtu Microsoft Entra, který právě používáte. Další informace o udělení přístupových rolí najdete v tématu Přiřazení rolí Azure pomocí webu Azure Portal a autorizace přístupu k prostředkům služby Event Hubs pomocí ID Microsoft Entra.
Důležité
K dokončení kroků v tomto kurzu se vyžaduje Spring Boot verze 2.5 nebo vyšší.
Odesílání a příjem zpráv ze služby Azure Event Hubs
Pomocí účtu služby Azure Storage a centra událostí Azure můžete odesílat a přijímat zprávy pomocí služby Spring Cloud Azure Stream Binder Event Hubs.
Pokud chcete nainstalovat modul Spring Cloud Azure Stream Binder Event Hubs, přidejte do souboru pom.xml následující závislosti:
Kusovník materiálů (BOM) Spring Cloud v Azure:
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.19.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Poznámka:
Pokud používáte Spring Boot 2.x, nezapomeňte nastavit
spring-cloud-azure-dependencies
verzi na4.19.0
. Tato faktura materiálu (BOM) by měla být nakonfigurována v<dependencyManagement>
části vašeho pom.xml souboru. Tím se zajistí, že všechny závislosti Azure Spring Cloudu budou používat stejnou verzi. Další informace o verzi použité pro tuto kusovníku najdete v tématu Jakou verzi Spring Cloud Azure mám použít.Artefakt Spring Cloud Azure Stream Binder Event Hubs:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId> </dependency>
Vytvoření kódu aplikace
Pomocí následujících kroků nakonfigurujte aplikaci tak, aby vytvářela a využívala zprávy pomocí služby Azure Event Hubs.
Nakonfigurujte přihlašovací údaje centra událostí přidáním následujících vlastností do souboru application.properties .
spring.cloud.azure.eventhubs.namespace=${AZURE_EVENTHUBS_NAMESPACE} spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=${AZURE_STORAGE_ACCOUNT_NAME} spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=${AZURE_STORAGE_CONTAINER_NAME} spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_EVENTHUB_NAME} spring.cloud.stream.bindings.consume-in-0.group=${AZURE_EVENTHUB_CONSUMER_GROUP} spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_EVENTHUB_NAME} spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode=MANUAL spring.cloud.function.definition=consume;supply; spring.cloud.stream.poller.initial-delay=0 spring.cloud.stream.poller.fixed-delay=1000
Následující tabulka popisuje pole v konfiguraci:
Pole Popis spring.cloud.azure.eventhubs.namespace
Zadejte obor názvů, který jste získali ve svém centru událostí z webu Azure Portal. spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name
Zadejte účet úložiště, který jste vytvořili v tomto kurzu. spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name
Zadejte kontejner vašeho účtu úložiště. spring.cloud.stream.bindings.consume-in-0.destination
Zadejte centrum událostí, které jste použili v tomto kurzu. spring.cloud.stream.bindings.consume-in-0.group
Zadejte skupiny příjemců ve vaší instanci služby Event Hubs. spring.cloud.stream.bindings.supply-out-0.destination
Zadejte stejné centrum událostí, které jste použili v tomto kurzu. spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode
Zadejte MANUAL
.spring.cloud.function.definition
Určete, která funkční bean se má svázat s externími cíli vystavenými vazbami. spring.cloud.stream.poller.initial-delay
Zadejte počáteční zpoždění pro pravidelné aktivační události. Výchozí hodnota je 0. spring.cloud.stream.poller.fixed-delay
Zadejte pevné zpoždění pro výchozí poller v milisekundách. Výchozí hodnota je 1000 L. Upravte soubor spouštěcí třídy, aby se zobrazil následující obsah.
import com.azure.spring.messaging.checkpoint.Checkpointer; import com.azure.spring.messaging.eventhubs.support.EventHubsHeaders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER; @SpringBootApplication public class EventHubBinderApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderApplication.class); private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer(); public static void main(String[] args) { SpringApplication.run(EventHubBinderApplication.class, args); } @Bean public Supplier<Flux<Message<String>>> supply() { return ()->many.asFlux() .doOnNext(m->LOGGER.info("Manually sending message {}", m)) .doOnError(t->LOGGER.error("Error encountered", t)); } @Bean public Consumer<Message<String>> consume() { return message->{ Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued " +"time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error->LOGGER.error("Exception found", error)) .block(); }; } @Override public void run(String... args) { LOGGER.info("Going to add message {} to sendMessage.", "Hello World"); many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST); } }
Tip
V tomto kurzu nejsou v konfiguracích ani kódu žádné ověřovací operace. Připojení ke službám Azure ale vyžaduje ověření. K dokončení ověřování je potřeba použít identitu Azure. Spring Cloud Azure používá
DefaultAzureCredential
, kterou poskytuje knihovna identit Azure, která vám pomůže získat přihlašovací údaje bez jakýchkoli změn kódu.DefaultAzureCredential
podporuje více metod ověřování a určuje, kterou metodu použít za běhu. Tento přístup umožňuje vaší aplikaci používat různé metody ověřování v různých prostředích (například v místních a produkčních prostředích) bez implementace kódu specifického pro prostředí. Další informace naleznete v tématu DefaultAzureCredential.K dokončení ověřování v místních vývojových prostředích můžete použít Azure CLI, Visual Studio Code, PowerShell nebo jiné metody. Další informace najdete v tématu Ověřování Azure ve vývojových prostředích Java. K dokončení ověřování v hostitelských prostředích Azure doporučujeme použít spravovanou identitu přiřazenou uživatelem. Další informace najdete v tématu Co jsou spravované identity pro prostředky Azure?
Spusťte aplikaci. Zprávy, jako je tato, se publikuje v protokolu vaší aplikace, jak je znázorněno v následujícím příkladu výstupu:
New message received: 'Hello World', partition key: 107207233, sequence number: 458, offset: 94256, enqueued time: 2023-02-17T08:27:59.641Z Message 'Hello World!' successfully checkpointed
Nasazení do Azure Spring Apps
Teď, když máte aplikaci Spring Boot spuštěnou místně, je čas ji přesunout do produkčního prostředí. Azure Spring Apps usnadňuje nasazování aplikací Spring Boot do Azure bez jakýchkoli změn kódu. Služba spravuje infrastrukturu aplikací Spring, aby se vývojáři mohli soustředit na svůj kód. Azure Spring Apps poskytuje správu životního cyklu pomocí komplexního monitorování a diagnostiky, správy konfigurace, zjišťování služeb, integrace CI/CD, modrých zelených nasazení a dalších. Pokud chcete nasadit aplikaci do Azure Spring Apps, přečtěte si téma Nasazení první aplikace do Azure Spring Apps.