Spring Cloud Stream met Azure Service Bus
In dit artikel wordt beschreven hoe u Spring Cloud Stream Binder gebruikt om berichten te verzenden naar en te ontvangen van Service Bus-queues
en -topics
.
Azure biedt een platform voor asynchrone berichtenuitwisseling met de naam Azure Service Bus ('Service Bus') dat is gebaseerd op de standaard Advanced Message Queueing Protocol 1.0 ('AMQP 1.0'). Service Bus kan worden gebruikt voor alle ondersteunde Azure-platforms.
Vereisten
Een Azure-abonnement (u kunt een gratis abonnement maken).
Java Development Kit (JDK) versie 8 of hoger.
Apache Maven, versie 3.2 of hoger.
cURL of een vergelijkbaar HTTP-hulpprogramma om de functionaliteit te testen.
Een wachtrij of onderwerp voor Azure Service Bus. Als u er nog geen hebt, maakt u een Service Bus-wachtrij of maakt u een Service Bus-onderwerp.
Een Spring Boot-toepassing. Als u er nog geen hebt, maakt u een Maven-project met de Spring Initializr. Zorg ervoor dat u Maven-project selecteert en voeg onder Afhankelijkheden de afhankelijkheden van Spring Web en Azure Support toe en selecteer vervolgens Java-versie 8 of hoger.
Notitie
Als u uw account toegang wilt verlenen tot uw Azure Service Bus-resources, wijst u de Azure Service Bus Data Sender
en Azure Service Bus Data Receiver
rol toe aan het Microsoft Entra-account dat u momenteel gebruikt. Zie Azure-rollen toewijzen met behulp van Azure Portal en een toepassing verifiëren en autoriseren met Microsoft Entra-id voor toegang tot Azure Service Bus-entiteiten voor meer informatie over het verlenen van toegangsrollen.
Belangrijk
Spring Boot versie 2.5 of hoger is vereist om de stappen in dit artikel uit te voeren.
Berichten verzenden en ontvangen van Azure Service Bus
Met een wachtrij of onderwerp voor Azure Service Bus kunt u berichten verzenden en ontvangen met behulp van Spring Cloud Azure Stream Binder Service Bus.
Als u de Spring Cloud Azure Stream Binder Service Bus-module wilt installeren, voegt u de volgende afhankelijkheden toe aan uw pom.xml-bestand :
De Spring Cloud Azure Bill of Materials (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.19.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Notitie
Als u Spring Boot 2.x gebruikt, moet u de
spring-cloud-azure-dependencies
versie instellen op4.19.0
. Deze stuklijst (Bill of Material) moet worden geconfigureerd in de<dependencyManagement>
sectie van uw pom.xml-bestand . Dit zorgt ervoor dat alle Spring Cloud Azure-afhankelijkheden dezelfde versie gebruiken. Zie welke versie van Spring Cloud Azure moet ik gebruiken voor meer informatie over de versie die voor deze BOM wordt gebruikt.Het Spring Cloud Azure Stream Binder Service Bus-artefact:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> </dependency>
De toepassing coderen
Gebruik de volgende stappen om uw toepassing te configureren voor het gebruik van een Service Bus-wachtrij of -onderwerp voor het verzenden en ontvangen van berichten.
Configureer de Service Bus-referenties in het configuratiebestand
application.properties
.spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE} spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue spring.cloud.function.definition=consume;supply; spring.cloud.stream.poller.fixed-delay=60000 spring.cloud.stream.poller.initial-delay=0
In de volgende tabel worden de velden in de configuratie beschreven:
Veld Beschrijving spring.cloud.azure.servicebus.namespace
Geef de naamruimte op die u hebt verkregen in uw Service Bus vanuit Azure Portal. spring.cloud.stream.bindings.consume-in-0.destination
Geef de Service Bus-wachtrij of het Service Bus-onderwerp op dat u in deze zelfstudie hebt gebruikt. spring.cloud.stream.bindings.supply-out-0.destination
Geef dezelfde waarde op als voor invoerbestemming. spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete
Geef op of berichten automatisch moeten worden vereffend. Als deze optie is ingesteld als false
, wordt er een berichtkop vanCheckpointer
toegevoegd om ontwikkelaars in staat te stellen berichten handmatig te vereffenen.spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type
Geef het entiteitstype voor de uitvoerbinding op, kan of queue
topic
.spring.cloud.function.definition
Geef op welke functionele bean moet worden verbonden met de externe bestemmingen die door de bindingen worden weergegeven. spring.cloud.stream.poller.fixed-delay
Geef een vaste vertraging op voor de standaard poller in milliseconden. De standaardwaarde is 1000 L
. De aanbevolen waarde is60000
.spring.cloud.stream.poller.initial-delay
Geef de initiële vertraging op voor periodieke triggers. De standaardwaarde is 0
.Bewerk het opstartklassebestand om de volgende inhoud weer te geven.
import com.azure.spring.messaging.checkpoint.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER; @SpringBootApplication public class ServiceBusQueueBinderApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class); private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer(); public static void main(String[] args) { SpringApplication.run(ServiceBusQueueBinderApplication.class, args); } @Bean public Supplier<Flux<Message<String>>> supply() { return ()->many.asFlux() .doOnNext(m->LOGGER.info("Manually sending message {}", m)) .doOnError(t->LOGGER.error("Error encountered", t)); } @Bean public Consumer<Message<String>> consume() { return message->{ Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e->LOGGER.error("Error found", e)) .block(); }; } @Override public void run(String... args) { LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World"); many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST); } }
Tip
In deze zelfstudie zijn er geen verificatiebewerkingen in de configuraties of de code. Voor het maken van verbinding met Azure-services is echter verificatie vereist. Als u de verificatie wilt voltooien, moet u Azure Identity gebruiken. Spring Cloud Azure maakt gebruik
DefaultAzureCredential
van, die de Azure Identity-bibliotheek biedt om u te helpen referenties op te halen zonder dat er codewijzigingen zijn aangebracht.DefaultAzureCredential
ondersteunt meerdere verificatiemethoden en bepaalt welke methode tijdens runtime moet worden gebruikt. Met deze aanpak kan uw app verschillende verificatiemethoden gebruiken in verschillende omgevingen (zoals lokale en productieomgevingen) zonder omgevingsspecifieke code te implementeren. Zie DefaultAzureCredential voor meer informatie.Als u de verificatie in lokale ontwikkelomgevingen wilt voltooien, kunt u Azure CLI, Visual Studio Code, PowerShell of andere methoden gebruiken. Zie Azure-verificatie in Java-ontwikkelomgevingen voor meer informatie. Als u de verificatie in Azure-hostingomgevingen wilt voltooien, raden we u aan om een door de gebruiker toegewezen beheerde identiteit te gebruiken. Zie Wat zijn beheerde identiteiten voor Azure-resources? voor meer informatie.
Start de toepassing. Berichten zoals het volgende voorbeeld worden in uw toepassingslogboek geplaatst:
New message received: 'Hello World' Message 'Hello World' successfully checkpointed