Sdílet prostřednictvím


Spring Cloud Stream se službou Azure Event Hubs

Tento kurz ukazuje, jak odesílat a přijímat zprávy pomocí služby Azure Event Hubs a Spring Cloud Stream Binder EventHubs v aplikaci Spring Boot.

Požadavky

Poznámka:

Pokud chcete účtu udělit přístup k prostředkům, přiřaďte Azure Event Hubs Data Receiver ve službě Azure Event Hubs účet Microsoft Entra, Azure Event Hubs Data Sender který aktuálně používáte. Potom v účtu Azure Storage přiřaďte Storage Blob Data Contributor roli k účtu Microsoft Entra, který právě používáte. Další informace o udělení přístupových rolí najdete v tématu Přiřazení rolí Azure pomocí webu Azure Portal a autorizace přístupu k prostředkům služby Event Hubs pomocí ID Microsoft Entra.

Důležité

K dokončení kroků v tomto kurzu se vyžaduje Spring Boot verze 2.5 nebo vyšší.

Odesílání a příjem zpráv ze služby Azure Event Hubs

Pomocí účtu služby Azure Storage a centra událostí Azure můžete odesílat a přijímat zprávy pomocí služby Spring Cloud Azure Stream Binder Event Hubs.

Pokud chcete nainstalovat modul Spring Cloud Azure Stream Binder Event Hubs, přidejte do souboru pom.xml následující závislosti:

  • Kusovník materiálů (BOM) Spring Cloud v Azure:

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.19.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Poznámka:

    Pokud používáte Spring Boot 2.x, nezapomeňte nastavit spring-cloud-azure-dependencies verzi na 4.19.0. Tato faktura materiálu (BOM) by měla být nakonfigurována v <dependencyManagement> části vašeho pom.xml souboru. Tím se zajistí, že všechny závislosti Azure Spring Cloudu budou používat stejnou verzi. Další informace o verzi použité pro tuto kusovníku najdete v tématu Jakou verzi Spring Cloud Azure mám použít.

  • Artefakt Spring Cloud Azure Stream Binder Event Hubs:

    <dependency>
       <groupId>com.azure.spring</groupId>
       <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
    </dependency>
    

Vytvoření kódu aplikace

Pomocí následujících kroků nakonfigurujte aplikaci tak, aby vytvářela a využívala zprávy pomocí služby Azure Event Hubs.

  1. Nakonfigurujte přihlašovací údaje centra událostí přidáním následujících vlastností do souboru application.properties .

     spring.cloud.azure.eventhubs.namespace=${AZURE_EVENTHUBS_NAMESPACE}
     spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=${AZURE_STORAGE_ACCOUNT_NAME}
     spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=${AZURE_STORAGE_CONTAINER_NAME}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_EVENTHUB_NAME}
     spring.cloud.stream.bindings.consume-in-0.group=${AZURE_EVENTHUB_CONSUMER_GROUP}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_EVENTHUB_NAME}
     spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode=MANUAL
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.initial-delay=0
     spring.cloud.stream.poller.fixed-delay=1000
    

    Následující tabulka popisuje pole v konfiguraci:

    Pole Popis
    spring.cloud.azure.eventhubs.namespace Zadejte obor názvů, který jste získali ve svém centru událostí z webu Azure Portal.
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Zadejte účet úložiště, který jste vytvořili v tomto kurzu.
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Zadejte kontejner vašeho účtu úložiště.
    spring.cloud.stream.bindings.consume-in-0.destination Zadejte centrum událostí, které jste použili v tomto kurzu.
    spring.cloud.stream.bindings.consume-in-0.group Zadejte skupiny příjemců ve vaší instanci služby Event Hubs.
    spring.cloud.stream.bindings.supply-out-0.destination Zadejte stejné centrum událostí, které jste použili v tomto kurzu.
    spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode Zadejte MANUAL.
    spring.cloud.function.definition Určete, která funkční bean se má svázat s externími cíli vystavenými vazbami.
    spring.cloud.stream.poller.initial-delay Zadejte počáteční zpoždění pro pravidelné aktivační události. Výchozí hodnota je 0.
    spring.cloud.stream.poller.fixed-delay Zadejte pevné zpoždění pro výchozí poller v milisekundách. Výchozí hodnota je 1000 L.
  2. Upravte soubor spouštěcí třídy, aby se zobrazil následující obsah.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import com.azure.spring.messaging.eventhubs.support.EventHubsHeaders;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class EventHubBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued "
                        +"time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
                );
                checkpointer.success()
                            .doOnSuccess(success->LOGGER.info("Message '{}' successfully checkpointed",
                                message.getPayload()))
                            .doOnError(error->LOGGER.error("Exception found", error))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to sendMessage.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Tip

    V tomto kurzu nejsou v konfiguracích ani kódu žádné ověřovací operace. Připojení ke službám Azure ale vyžaduje ověření. K dokončení ověřování je potřeba použít identitu Azure. Spring Cloud Azure používá DefaultAzureCredential, kterou poskytuje knihovna identit Azure, která vám pomůže získat přihlašovací údaje bez jakýchkoli změn kódu.

    DefaultAzureCredential podporuje více metod ověřování a určuje, kterou metodu použít za běhu. Tento přístup umožňuje vaší aplikaci používat různé metody ověřování v různých prostředích (například v místních a produkčních prostředích) bez implementace kódu specifického pro prostředí. Další informace naleznete v tématu DefaultAzureCredential.

    K dokončení ověřování v místních vývojových prostředích můžete použít Azure CLI, Visual Studio Code, PowerShell nebo jiné metody. Další informace najdete v tématu Ověřování Azure ve vývojových prostředích Java. K dokončení ověřování v hostitelských prostředích Azure doporučujeme použít spravovanou identitu přiřazenou uživatelem. Další informace najdete v tématu Co jsou spravované identity pro prostředky Azure?

  3. Spusťte aplikaci. Zprávy, jako je tato, se publikuje v protokolu vaší aplikace, jak je znázorněno v následujícím příkladu výstupu:

    New message received: 'Hello World', partition key: 107207233, sequence number: 458, offset: 94256, enqueued time: 2023-02-17T08:27:59.641Z
    Message 'Hello World!' successfully checkpointed
    

Nasazení do Azure Spring Apps

Teď, když máte aplikaci Spring Boot spuštěnou místně, je čas ji přesunout do produkčního prostředí. Azure Spring Apps usnadňuje nasazování aplikací Spring Boot do Azure bez jakýchkoli změn kódu. Služba spravuje infrastrukturu aplikací Spring, aby se vývojáři mohli soustředit na svůj kód. Azure Spring Apps poskytuje správu životního cyklu pomocí komplexního monitorování a diagnostiky, správy konfigurace, zjišťování služeb, integrace CI/CD, modrých zelených nasazení a dalších. Pokud chcete nasadit aplikaci do Azure Spring Apps, přečtěte si téma Nasazení první aplikace do Azure Spring Apps.

Další kroky