Udostępnij za pośrednictwem


Spring Cloud Stream z usługą Azure Service Bus

W tym artykule pokazano, jak używać integratora strumienia Spring Cloud do wysyłania komunikatów do elementów queues i topics usługi Service Bus oraz odbierania komunikatów z usługi Service Bus.

Platforma Azure udostępnia rozwiązanie do asynchronicznej obsługi komunikatów o nazwie Azure Service Bus („Service Bus”) oparte na standardzie Advanced Message Queueing Protocol 1.0 („AMQP 1.0”). Usługi Service Bus można używać na różnych obsługiwanych platformach Azure.

Wymagania wstępne

  • Subskrypcja platformy Azure — utwórz bezpłatnie.

  • Zestaw Java Development Kit (JDK) w wersji 8 lub nowszej.

  • Apache Maven, wersja 3.2 lub nowsza.

  • cURL lub podobne narzędzie HTTP do testowania funkcjonalności.

  • Kolejka lub temat dla usługi Azure Service Bus. Jeśli go nie masz, utwórz kolejkę usługi Service Bus lub utwórz temat usługi Service Bus.

  • Aplikacja Spring Boot. Jeśli go nie masz, utwórz projekt Maven za pomocą narzędzia Spring Initializr. Pamiętaj, aby wybrać pozycję Projekt Maven i w obszarze Zależności dodaj zależności platformy Spring Web i Pomoc techniczna platformy Azure, a następnie wybierz pozycję Java w wersji 8 lub nowszej.

Uwaga

Aby udzielić kontu dostępu do zasobów usługi Azure Service Bus, przypisz Azure Service Bus Data Sender rolę i Azure Service Bus Data Receiver do konta Microsoft Entra, którego obecnie używasz. Aby uzyskać więcej informacji na temat udzielania ról dostępu, zobacz Przypisywanie ról platformy Azure przy użyciu witryny Azure Portal i Uwierzytelnianie i autoryzowanie aplikacji przy użyciu identyfikatora Entra firmy Microsoft w celu uzyskania dostępu do jednostek usługi Azure Service Bus.

Ważne

Do wykonania kroków opisanych w tym artykule jest wymagany program Spring Boot w wersji 2.5 lub nowszej.

Wysyłanie i odbieranie komunikatów z usługi Azure Service Bus

Za pomocą kolejki lub tematu dla usługi Azure Service Bus można wysyłać i odbierać komunikaty przy użyciu usługi Service Bus powiązania usługi Azure Stream Spring Cloud.

Aby zainstalować moduł Spring Cloud Azure Stream Binder Service Bus, dodaj następujące zależności do pliku pom.xml :

  • Projekt Spring Cloud Azure Bill of Materials (BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.19.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Uwaga

    Jeśli używasz środowiska Spring Boot 2.x, pamiętaj, aby ustawić spring-cloud-azure-dependencies wersję na 4.19.0. Ten rachunek materiału (BOM) należy skonfigurować w <dependencyManagement> sekcji pliku pom.xml . Gwarantuje to, że wszystkie zależności platformy Azure platformy Spring Cloud korzystają z tej samej wersji. Aby uzyskać więcej informacji na temat wersji używanej dla tego modelu BOM, zobacz Która wersja platformy Spring Cloud platformy Azure powinna być używana.

  • Artefakt usługi Service Bus binder usługi Azure Stream Spring Cloud:

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

Kodowanie aplikacji

Wykonaj poniższe kroki, aby skonfigurować aplikację do używania kolejki lub tematu usługi Service Bus do wysyłania i odbierania komunikatów.

  1. Skonfiguruj poświadczenia usługi Service Bus w pliku application.propertieskonfiguracji .

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    W poniższej tabeli opisano pola w konfiguracji:

    Pole opis
    spring.cloud.azure.servicebus.namespace Określ przestrzeń nazw uzyskaną w usłudze Service Bus w witrynie Azure Portal.
    spring.cloud.stream.bindings.consume-in-0.destination Podaj kolejkę lub temat usługi Service Bus, których użyto w tym samouczku.
    spring.cloud.stream.bindings.supply-out-0.destination Podaj tę samą wartość, której użyto dla miejsca docelowego elementu wejściowego.
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete Określ, czy komunikaty mają być automatycznie rozliczane. W przypadku ustawienia wartości false nagłówek komunikatu Checkpointer zostanie dodany, aby umożliwić deweloperom ręczne rozliczanie komunikatów.
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type Określ typ jednostki dla powiązania wyjściowego, może to być queue lub topic.
    spring.cloud.function.definition Określ, która fasola funkcjonalna ma być powiązana z zewnętrznymi miejscami docelowymi udostępnianymi przez powiązania.
    spring.cloud.stream.poller.fixed-delay Określ stałe opóźnienie domyślnegollera w milisekundach. Wartość domyślna to 1000 L. Zalecana wartość to 60000.
    spring.cloud.stream.poller.initial-delay Określ początkowe opóźnienie dla wyzwalaczy okresowych. Wartość domyślna to 0.
  2. Edytuj plik klasy uruchamiania, aby wyświetlić następującą zawartość.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Napiwek

    W tym samouczku nie ma żadnych operacji uwierzytelniania w konfiguracjach ani kodzie. Jednak nawiązywanie połączenia z usługami platformy Azure wymaga uwierzytelniania. Aby ukończyć uwierzytelnianie, musisz użyć usługi Azure Identity. Platforma Spring Cloud na platformie Azure używa DefaultAzureCredentialbiblioteki tożsamości platformy Azure, która ułatwia uzyskiwanie poświadczeń bez żadnych zmian w kodzie.

    DefaultAzureCredential obsługuje wiele metod uwierzytelniania i określa, która metoda ma być używana w czasie wykonywania. Takie podejście umożliwia aplikacji używanie różnych metod uwierzytelniania w różnych środowiskach (takich jak środowiska lokalne i produkcyjne) bez implementowania kodu specyficznego dla środowiska. Aby uzyskać więcej informacji, zobacz DefaultAzureCredential.

    Aby ukończyć uwierzytelnianie w lokalnych środowiskach deweloperskich, możesz użyć interfejsu wiersza polecenia platformy Azure, programu Visual Studio Code, programu PowerShell lub innych metod. Aby uzyskać więcej informacji, zobacz Uwierzytelnianie platformy Azure w środowiskach deweloperskich Java. Aby ukończyć uwierzytelnianie w środowiskach hostingu platformy Azure, zalecamy użycie tożsamości zarządzanej przypisanej przez użytkownika. Aby uzyskać więcej informacji, zobacz Co to są tożsamości zarządzane dla zasobów platformy Azure?

  3. Uruchom aplikację. Komunikaty podobne do poniższego przykładu zostaną opublikowane w dzienniku aplikacji:

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

Następne kroki