Spring Cloud Stream med Azure Service Bus
Den här artikeln visar hur du använder Spring Cloud Stream Binder för att skicka meddelanden till och ta emot meddelanden från Service Bus queues
och topics
.
Azure tillhandahåller en asynkron meddelandeplattform som kallas Azure Service Bus ("Service Bus") som baseras på standarden Advanced Message Queueing Protocol 1-0 ("AMQP 1.0"). Service Bus kan användas i flera Azure-plattformar som stöds.
Förutsättningar
En Azure-prenumeration – skapa en kostnadsfritt.
Java Development Kit (JDK) version 8 eller senare.
Apache Maven, version 3.2 eller senare.
cURL eller ett liknande HTTP-verktyg för att testa funktioner.
En kö eller ett ämne för Azure Service Bus. Om du inte har någon skapar du en Service Bus-kö eller skapar ett Service Bus-ämne.
Ett Spring Boot-program. Om du inte har ett skapar du ett Maven-projekt med Spring Initializr. Se till att välja Maven Project och under Beroenden lägger du till Beroenden för Spring Web och Azure Support och väljer sedan Java version 8 eller senare.
Kommentar
Tilldela rollerna Azure Service Bus Data Sender
och Azure Service Bus Data Receiver
till det Microsoft Entra-konto du använder för närvarande för att ge ditt konto åtkomst till dina Azure Service Bus-resurser. Mer information om hur du beviljar åtkomstroller finns i Tilldela Azure-roller med hjälp av Azure Portal och Autentisera och auktorisera ett program med Microsoft Entra-ID för åtkomst till Azure Service Bus-entiteter.
Viktigt!
Spring Boot version 2.5 eller senare krävs för att slutföra stegen i den här artikeln.
Skicka och ta emot meddelanden från Azure Service Bus
Med en kö eller ett ämne för Azure Service Bus kan du skicka och ta emot meddelanden med Spring Cloud Azure Stream Binder Service Bus.
Om du vill installera Spring Cloud Azure Stream Binder Service Bus-modulen lägger du till följande beroenden i din pom.xml-fil :
Spring Cloud Azure Materialförteckning (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.21.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Kommentar
Om du använder Spring Boot 2.x måste du ange
spring-cloud-azure-dependencies
versionen till4.19.0
. Den här strukturlistan (BOM) bör konfigureras i<dependencyManagement>
-avsnittet i din pom.xml-fil. Detta säkerställer att alla Spring Cloud Azure-beroenden använder samma version. Mer information om den version som används av denna BOM finns i Vilken version av Spring Cloud Azure ska jag använda.Spring Cloud Azure Stream Binder Service Bus artefakt:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> </dependency>
Koda appen
Använd följande steg för att konfigurera ditt program att använda en Service Bus-kö eller ett ämne för att skicka och ta emot meddelanden.
Konfigurera Service Bus-autentiseringsuppgifterna i konfigurationsfilen
application.properties
.spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE} spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue spring.cloud.function.definition=consume;supply; spring.cloud.stream.poller.fixed-delay=60000 spring.cloud.stream.poller.initial-delay=0
I följande tabell beskrivs fälten i konfigurationen:
Fält beskrivning spring.cloud.azure.servicebus.namespace
Ange det namnområde som du fick i Service Bus från Azure Portal. spring.cloud.stream.bindings.consume-in-0.destination
Ange Service Bus-kö eller ämnet för Service Bus som du använde i den här handledningen. spring.cloud.stream.bindings.supply-out-0.destination
Ange samma värde som används för inmatningens mål. spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete
Specificera om meddelanden ska hanteras automatiskt. Om det anges som false
läggs ett meddelandehuvud förCheckpointer
till för att göra det möjligt för utvecklare att reglera meddelanden manuellt.spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type
Ange entitetstypen för utdatabindningen, kan vara queue
ellertopic
.spring.cloud.function.definition
Ange vilken funktionell komponent som ska bindas till de externa mål som exponeras av bindningarna. spring.cloud.stream.poller.fixed-delay
Ange en fast fördröjning för standardpollern i millisekunder. Standardvärdet är 1000 L
. Det rekommenderade värdet är60000
.spring.cloud.stream.poller.initial-delay
Ange inledande fördröjning för periodiska utlösare. Standardvärdet är 0
.Redigera startklassfilen för att visa följande innehåll.
import com.azure.spring.messaging.checkpoint.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER; @SpringBootApplication public class ServiceBusQueueBinderApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class); private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer(); public static void main(String[] args) { SpringApplication.run(ServiceBusQueueBinderApplication.class, args); } @Bean public Supplier<Flux<Message<String>>> supply() { return ()->many.asFlux() .doOnNext(m->LOGGER.info("Manually sending message {}", m)) .doOnError(t->LOGGER.error("Error encountered", t)); } @Bean public Consumer<Message<String>> consume() { return message->{ Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e->LOGGER.error("Error found", e)) .block(); }; } @Override public void run(String... args) { LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World"); many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST); } }
Tips
I den här handledningen finns det ingen autentisering i konfigurationerna eller i koden. Att ansluta till Azure-tjänster kräver dock autentisering. För att slutföra autentiseringen måste du använda Azure Identity. Spring Cloud Azure använder
DefaultAzureCredential
, som Azure Identity-biblioteket tillhandahåller för att hjälpa dig att få autentiseringsuppgifter utan några kodändringar.DefaultAzureCredential
stöder flera autentiseringsmetoder och avgör vilken metod som ska användas vid körning. Med den här metoden kan din app använda olika autentiseringsmetoder i olika miljöer (till exempel lokala miljöer och produktionsmiljöer) utan att implementera miljöspecifik kod. Mer information finns i DefaultAzureCredential.För att slutföra autentiseringen i lokala utvecklingsmiljöer kan du använda Azure CLI, Visual Studio Code, PowerShell eller andra metoder. Mer information finns i Azure-autentisering i Java-utvecklingsmiljöer. För att slutföra autentiseringen i Azure-värdmiljöer rekommenderar vi att du använder användartilldelad hanterad identitet. Mer information finns i Vad är hanterade identiteter för Azure-resurser?
Starta programmet. Meddelanden som i följande exempel publiceras i programloggen:
New message received: 'Hello World' Message 'Hello World' successfully checkpointed