Delen via


Spring Cloud Stream met Azure Service Bus

In dit artikel wordt beschreven hoe u Spring Cloud Stream Binder gebruikt om berichten te verzenden naar en te ontvangen van Service Bus-queues en -topics.

Azure biedt een platform voor asynchrone berichtenuitwisseling met de naam Azure Service Bus ('Service Bus') dat is gebaseerd op de standaard Advanced Message Queueing Protocol 1.0 ('AMQP 1.0'). Service Bus kan worden gebruikt voor alle ondersteunde Azure-platforms.

Vereisten

Notitie

Als u uw account toegang wilt verlenen tot uw Azure Service Bus-resources, wijst u de Azure Service Bus Data Sender en Azure Service Bus Data Receiver rol toe aan het Microsoft Entra-account dat u momenteel gebruikt. Zie Azure-rollen toewijzen met behulp van Azure Portal en een toepassing verifiëren en autoriseren met Microsoft Entra-id voor toegang tot Azure Service Bus-entiteiten voor meer informatie over het verlenen van toegangsrollen.

Belangrijk

Spring Boot versie 2.5 of hoger is vereist om de stappen in dit artikel uit te voeren.

Berichten verzenden en ontvangen van Azure Service Bus

Met een wachtrij of onderwerp voor Azure Service Bus kunt u berichten verzenden en ontvangen met behulp van Spring Cloud Azure Stream Binder Service Bus.

Als u de Spring Cloud Azure Stream Binder Service Bus-module wilt installeren, voegt u de volgende afhankelijkheden toe aan uw pom.xml-bestand :

  • De Spring Cloud Azure Bill of Materials (BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.19.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Notitie

    Als u Spring Boot 2.x gebruikt, moet u de spring-cloud-azure-dependencies versie instellen op 4.19.0. Deze stuklijst (Bill of Material) moet worden geconfigureerd in de <dependencyManagement> sectie van uw pom.xml-bestand . Dit zorgt ervoor dat alle Spring Cloud Azure-afhankelijkheden dezelfde versie gebruiken. Zie welke versie van Spring Cloud Azure moet ik gebruiken voor meer informatie over de versie die voor deze BOM wordt gebruikt.

  • Het Spring Cloud Azure Stream Binder Service Bus-artefact:

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

De toepassing coderen

Gebruik de volgende stappen om uw toepassing te configureren voor het gebruik van een Service Bus-wachtrij of -onderwerp voor het verzenden en ontvangen van berichten.

  1. Configureer de Service Bus-referenties in het configuratiebestand application.properties.

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    In de volgende tabel worden de velden in de configuratie beschreven:

    Veld Beschrijving
    spring.cloud.azure.servicebus.namespace Geef de naamruimte op die u hebt verkregen in uw Service Bus vanuit Azure Portal.
    spring.cloud.stream.bindings.consume-in-0.destination Geef de Service Bus-wachtrij of het Service Bus-onderwerp op dat u in deze zelfstudie hebt gebruikt.
    spring.cloud.stream.bindings.supply-out-0.destination Geef dezelfde waarde op als voor invoerbestemming.
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete Geef op of berichten automatisch moeten worden vereffend. Als deze optie is ingesteld als false, wordt er een berichtkop van Checkpointer toegevoegd om ontwikkelaars in staat te stellen berichten handmatig te vereffenen.
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type Geef het entiteitstype voor de uitvoerbinding op, kan of queuetopic.
    spring.cloud.function.definition Geef op welke functionele bean moet worden verbonden met de externe bestemmingen die door de bindingen worden weergegeven.
    spring.cloud.stream.poller.fixed-delay Geef een vaste vertraging op voor de standaard poller in milliseconden. De standaardwaarde is 1000 L. De aanbevolen waarde is 60000.
    spring.cloud.stream.poller.initial-delay Geef de initiële vertraging op voor periodieke triggers. De standaardwaarde is 0.
  2. Bewerk het opstartklassebestand om de volgende inhoud weer te geven.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Tip

    In deze zelfstudie zijn er geen verificatiebewerkingen in de configuraties of de code. Voor het maken van verbinding met Azure-services is echter verificatie vereist. Als u de verificatie wilt voltooien, moet u Azure Identity gebruiken. Spring Cloud Azure maakt gebruik DefaultAzureCredentialvan, die de Azure Identity-bibliotheek biedt om u te helpen referenties op te halen zonder dat er codewijzigingen zijn aangebracht.

    DefaultAzureCredential ondersteunt meerdere verificatiemethoden en bepaalt welke methode tijdens runtime moet worden gebruikt. Met deze aanpak kan uw app verschillende verificatiemethoden gebruiken in verschillende omgevingen (zoals lokale en productieomgevingen) zonder omgevingsspecifieke code te implementeren. Zie DefaultAzureCredential voor meer informatie.

    Als u de verificatie in lokale ontwikkelomgevingen wilt voltooien, kunt u Azure CLI, Visual Studio Code, PowerShell of andere methoden gebruiken. Zie Azure-verificatie in Java-ontwikkelomgevingen voor meer informatie. Als u de verificatie in Azure-hostingomgevingen wilt voltooien, raden we u aan om een door de gebruiker toegewezen beheerde identiteit te gebruiken. Zie Wat zijn beheerde identiteiten voor Azure-resources? voor meer informatie.

  3. Start de toepassing. Berichten zoals het volgende voorbeeld worden in uw toepassingslogboek geplaatst:

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

Volgende stappen