Spring Cloud Stream with Azure Service Bus
この記事では、Spring Cloud Stream Binder を使用して Service Bus queues
および topics
との間でメッセージを送受信する方法について説明します。
Azure には、Azure Service Bus ("Service Bus") という、Advanced Message Queueing Protocol 1.0 ("AMQP 1.0") 標準に基づいた非同期のメッセージング プラットフォームが用意されています。 Service Bus は、サポートされている Azure プラットフォームの範囲全体で使用することができます。
前提条件
Azure サブスクリプション - 無料アカウントを作成します。
Java Development Kit (JDK) バージョン 8 以降。
Apache Maven、バージョン 3.2 以降。
cURL または機能をテストするための類似の HTTP ユーティリティ。
Azure Service Bus のキューまたはトピック。 お持ちでない場合は、Service Bus キューを作成するか、Service Bus トピックを作成します。
Spring Boot アプリケーション。 ない場合は、Spring Initializr で Maven プロジェクトを作成します。 必ず、[Maven プロジェクト] を選択し、[依存関係] で [Spring Web] と [Azure サポート] 依存関係を追加したら、バージョン 8 以降の Java を選択します。
Note
アカウントに Azure Service Bus リソースへのアクセス権を付与するには、現在使用している Microsoft Entra アカウントに Azure Service Bus Data Sender
と Azure Service Bus Data Receiver
のロールを割り当てます。 アクセス ロールの付与の詳細については、「Azure portal を使用して Azure ロールを割り当てる」および「Microsoft Entra ID を使用してアプリケーションを認証および承認して Azure Service Bus エンティティにアクセスする」を参照してください。
重要
この記事の手順を完了するには、Spring Boot 2.5 以降のバージョンが必要です。
Azure Service Bus からメッセージを送受信する
Azure Service Bus のキューまたはトピックを使用すると、Spring Cloud Azure Stream Binder Service Bus を使用してメッセージを送受信できます。
Spring Cloud Azure Stream Binder Service Bus モジュールをインストールするには、次の依存関係を pom.xml ファイルに追加します。
Spring Cloud Azure 部品表 (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.18.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Note
Spring Boot 2.xを使用している場合は、
spring-cloud-azure-dependencies
バージョンを4.19.0
に設定してください。 この部品表(BOM)は、 pom.xml ファイルの<dependencyManagement>
セクションで設定する必要があります。 これにより、すべてのSpring Cloud Azure依存関係が同じバージョンを使用していることが保証されます。 このBOMに使用されるバージョンの詳細については、「Spring Cloud Azureのどのバージョンを使うべきか」を参照してください。Spring Cloud Azure Stream Binder Service Bus アーティファクト:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> </dependency>
アプリケーションをコーディングする
次の手順を実行して、Service Bus キューまたはトピックを使用してメッセージを送受信するようにアプリケーションを構成します。
構成ファイル
application.properties
で Service Bus の資格情報を構成します。spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE} spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue spring.cloud.function.definition=consume;supply; spring.cloud.stream.poller.fixed-delay=60000 spring.cloud.stream.poller.initial-delay=0
次の表では、構成のフィールドについて説明します。
フィールド 説明 spring.cloud.azure.servicebus.namespace
Azure portal の自分の Service Bus で取得した名前空間を指定します。 spring.cloud.stream.bindings.consume-in-0.destination
このチュートリアルで自分が使用した Service Bus キューまたは Service Bus トピックを指定します。 spring.cloud.stream.bindings.supply-out-0.destination
入力先に使用したものと同じ値を指定します。 spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete
メッセージを自動的に取得するかどうかを指定します。 false に設定すると、開発者がメッセージを手動で決済できるように、メッセージ ヘッダー Checkpointer
が追加されます。spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type
出力バインドのエンティティの種類を指定します。 queue
またはtopic
に指定できます。spring.cloud.function.definition
バインドによって公開されている外部送信先にバインドする、機能 Bean を指定します。 spring.cloud.stream.poller.fixed-delay
デフォルトのポーラーの固定遅延をミリ秒単位で指定します。 既定値は 1000 Lで、推奨値は 60000 です。 spring.cloud.stream.poller.initial-delay
定期的なトリガーの初期遅延を指定します。 既定値は0です。 スタートアップ クラス ファイルを編集して、次の内容を表示します。
import com.azure.spring.messaging.checkpoint.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER; @SpringBootApplication public class ServiceBusQueueBinderApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class); private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer(); public static void main(String[] args) { SpringApplication.run(ServiceBusQueueBinderApplication.class, args); } @Bean public Supplier<Flux<Message<String>>> supply() { return ()->many.asFlux() .doOnNext(m->LOGGER.info("Manually sending message {}", m)) .doOnError(t->LOGGER.error("Error encountered", t)); } @Bean public Consumer<Message<String>> consume() { return message->{ Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e->LOGGER.error("Error found", e)) .block(); }; } @Override public void run(String... args) { LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World"); many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST); } }
ヒント
このチュートリアルでは、構成またはコードに認証操作はありません。 ただし、Azure サービスに接続するには認証が必要です。 認証を完了するには、Azure ID を使用する必要があります。 Spring Cloud Azure では、
DefaultAzureCredential
を使用します。これは、コードを変更せずに資格情報を取得できるようにするために、Azure ID ライブラリで提供されます。DefaultAzureCredential
は複数の認証方法をサポートしており、実行時に使用する方法が決定されます。 このアプローチを採用すると、環境固有のコードを実装することなく、異なる環境 (ローカルと運用環境など) で異なる認証方法をアプリに使用できます。 詳細については、DefaultAzureCredential を参照してください。ローカル開発環境で認証を完了するには、Azure CLI、Visual Studio Code、PowerShell、またはその他の方法を使用できます。 詳細については、「Java 開発環境での Azure 認証」を参照してください。 Azure ホスティング環境で認証を完了するには、ユーザー割り当てマネージド ID を使用することをお勧めします。 詳細については、「Azure リソースのマネージド ID とは」を参照してください。
アプリケーションを起動します。 次の例のようなメッセージは、アプリケーション ログに投稿されます。
New message received: 'Hello World' Message 'Hello World' successfully checkpointed