Spring Cloud Stream se službou Azure Service Bus
V tomto článku se dozvíte, jak pomocí Spring Cloud Stream Binderu odesílat zprávy do front (queues
) a témat (topics
) služby Service Bus a přijímat je z ní.
Azure poskytuje platformu pro asynchronní zasílání zpráv s názvem Azure Service Bus („služba Service Bus“), která je založena na standardu Advanced Message Queueing Protocol 1.0 („protokol AMQP 1.0“). Službu Service Bus je možné používat na mnoha podporovaných platformách Azure.
Požadavky
Předplatné Azure – vytvořte si ho zdarma.
Java Development Kit (JDK) verze 8 nebo vyšší.
Apache Maven verze 3.2 nebo vyšší
cURL nebo podobný nástroj HTTP pro testování funkčnosti.
Fronta nebo téma služby Azure Service Bus Pokud ho nemáte, vytvořte frontu služby Service Bus nebo vytvořte téma služby Service Bus.
Aplikace Spring Boot. Pokud ho nemáte, vytvořte projekt Maven pomocí aplikace Spring Initializr. Nezapomeňte vybrat projekt Maven a v části Závislosti přidejte závislosti Spring Web a Podpory Azure a pak vyberte Javu verze 8 nebo vyšší.
Poznámka:
Pokud chcete účtu udělit přístup k prostředkům služby Azure Service Bus, přiřaďte Azure Service Bus Data Sender
Azure Service Bus Data Receiver
tuto roli k účtu Microsoft Entra, který aktuálně používáte. Další informace o udělení přístupových rolí najdete v tématu Přiřazení rolí Azure pomocí webu Azure Portal a ověřování a autorizace aplikace s ID Microsoft Entra pro přístup k entitě Azure Service Bus.
Důležité
K dokončení kroků v tomto článku se vyžaduje Spring Boot verze 2.5 nebo vyšší.
Odesílání a příjem zpráv ze služby Azure Service Bus
Pomocí fronty nebo tématu služby Azure Service Bus můžete odesílat a přijímat zprávy pomocí služby Spring Cloud Azure Stream Binder Service Bus.
Pokud chcete nainstalovat modul Spring Cloud Azure Stream Binder Service Bus, přidejte do souboru pom.xml následující závislosti:
Kusovník materiálů (BOM) Spring Cloud v Azure:
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.19.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Poznámka:
Pokud používáte Spring Boot 2.x, nezapomeňte nastavit
spring-cloud-azure-dependencies
verzi na4.19.0
. Tato faktura materiálu (BOM) by měla být nakonfigurována v<dependencyManagement>
části vašeho pom.xml souboru. Tím se zajistí, že všechny závislosti Azure Spring Cloudu budou používat stejnou verzi. Další informace o verzi použité pro tuto kusovníku najdete v tématu Jakou verzi Spring Cloud Azure mám použít.Artefakt Spring Cloud Azure Stream Binder Service Bus:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> </dependency>
Vytvoření kódu aplikace
Pomocí následujících kroků nakonfigurujte aplikaci tak, aby k odesílání a příjmu zpráv používala frontu nebo téma služby Service Bus.
Nakonfigurujte přihlašovací údaje služby Service Bus v konfiguračním souboru
application.properties
.spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE} spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue spring.cloud.function.definition=consume;supply; spring.cloud.stream.poller.fixed-delay=60000 spring.cloud.stream.poller.initial-delay=0
Následující tabulka popisuje pole v konfiguraci:
Pole Popis spring.cloud.azure.servicebus.namespace
Zadejte obor názvů, který jste získali ve službě Service Bus z webu Azure Portal. spring.cloud.stream.bindings.consume-in-0.destination
Zadejte frontu nebo téma služby Service Bus, které jste použili v tomto kurzu. spring.cloud.stream.bindings.supply-out-0.destination
Zadejte stejnou hodnotu jako u cíle vstupu. spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete
Určete, zda se mají zprávy automaticky urovnat. Pokud je nastavená hodnota false, přidá se hlavička Checkpointer
zprávy, která vývojářům umožní ručně urovnat zprávy.spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type
Zadejte typ entity pro výstupní vazbu, může být queue
nebotopic
.spring.cloud.function.definition
Určete, která funkční bean se má svázat s externími cíli vystavenými vazbami. spring.cloud.stream.poller.fixed-delay
Zadejte pevné zpoždění pro výchozí poller v milisekundách. Výchozí hodnota je 1000 L. Doporučená hodnota je 60000. spring.cloud.stream.poller.initial-delay
Zadejte počáteční zpoždění pro pravidelné aktivační události. Výchozí hodnota je 0. Upravte soubor spouštěcí třídy, aby se zobrazil následující obsah.
import com.azure.spring.messaging.checkpoint.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER; @SpringBootApplication public class ServiceBusQueueBinderApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class); private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer(); public static void main(String[] args) { SpringApplication.run(ServiceBusQueueBinderApplication.class, args); } @Bean public Supplier<Flux<Message<String>>> supply() { return ()->many.asFlux() .doOnNext(m->LOGGER.info("Manually sending message {}", m)) .doOnError(t->LOGGER.error("Error encountered", t)); } @Bean public Consumer<Message<String>> consume() { return message->{ Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e->LOGGER.error("Error found", e)) .block(); }; } @Override public void run(String... args) { LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World"); many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST); } }
Tip
V tomto kurzu nejsou v konfiguracích ani kódu žádné ověřovací operace. Připojení ke službám Azure ale vyžaduje ověření. K dokončení ověřování je potřeba použít identitu Azure. Spring Cloud Azure používá
DefaultAzureCredential
, kterou poskytuje knihovna identit Azure, která vám pomůže získat přihlašovací údaje bez jakýchkoli změn kódu.DefaultAzureCredential
podporuje více metod ověřování a určuje, kterou metodu použít za běhu. Tento přístup umožňuje vaší aplikaci používat různé metody ověřování v různých prostředích (například v místních a produkčních prostředích) bez implementace kódu specifického pro prostředí. Další informace naleznete v tématu DefaultAzureCredential.K dokončení ověřování v místních vývojových prostředích můžete použít Azure CLI, Visual Studio Code, PowerShell nebo jiné metody. Další informace najdete v tématu Ověřování Azure ve vývojových prostředích Java. K dokončení ověřování v hostitelských prostředích Azure doporučujeme použít spravovanou identitu přiřazenou uživatelem. Další informace najdete v tématu Co jsou spravované identity pro prostředky Azure?
Spusťte aplikaci. Zprávy jako v následujícím příkladu se publikuje v protokolu vaší aplikace:
New message received: 'Hello World' Message 'Hello World' successfully checkpointed