次の方法で共有


Spring Cloud Azure での Spring Integration のサポート

この記事は、✅ バージョン 4.19.0 ✅ バージョン 5.19.0 に適用されます。

Azure の Spring Integration Extension には、Azure SDK for Javaによって提供されるさまざまなサービス用の Spring Integration アダプターが用意されています。 Spring Integration では、Event Hubs、Service Bus、Storage Queue などの Azure サービスをサポートしています。 サポートされているアダプターの一覧を次に示します。

  • spring-cloud-azure-starter-integration-eventhubs - 詳細については、「Spring と Azure Event Hubs の統合」を参照してください。
  • spring-cloud-azure-starter-integration-servicebus - 詳細については、「Spring と Azure Service Bus の統合」を参照してください。
  • spring-cloud-azure-starter-integration-storage-queue - 詳細については、「Spring と Azure Storage Queue の統合」を参照してください。

Azure Event Hubs との Spring 統合

主な概念

Azure Event Hubs は、ビッグ データ ストリーミング プラットフォームおよびイベント インジェスト サービスです。 1 秒あたり何百万ものイベントを受信して処理できます。 イベント ハブに送信されるデータは、リアルタイム分析プロバイダーまたはバッチ処理/ストレージ アダプターを使用して変換および格納できます。

Spring Integration を使用すると、Spring ベースのアプリケーション内で軽量のメッセージングが可能になり、宣言型アダプターを介した外部システムとの統合がサポートされます。 これらのアダプターは、リモート処理、メッセージング、およびスケジューリングに対する Spring のサポートに対して、より高度な抽象化を提供します。 Event Hubs 拡張機能プロジェクトの Spring Integration は、Azure Event Hubs の受信チャネル アダプターと送信チャネル アダプターとゲートウェイを提供します。

手記

RxJava サポート API はバージョン 4.0.0 から削除されます。 詳細については、Javadoc を参照してください。

コンシューマー グループ

Event Hubs では、Apache Kafka と同様のコンシューマー グループのサポートが提供されますが、ロジックは若干異なります。 Kafka はコミットされたすべてのオフセットをブローカーに格納しますが、手動で処理される Event Hubs メッセージのオフセットを格納する必要があります。 Event Hubs SDK は、このようなオフセットを Azure Storage 内に格納する関数を提供します。

パーティション分割のサポート

Event Hubs には、Kafka と同様の物理パーティションの概念が用意されています。 ただし、Kafka のコンシューマーとパーティション間の自動再分散とは異なり、Event Hubs には一種のプリエンプティブ モードが用意されています。 ストレージ アカウントは、どのパーティションがどのコンシューマーによって所有されているかを判断するためのリースとして機能します。 新しいコンシューマーが起動すると、ワークロードの分散を実現するために、ほとんどの負荷の高いコンシューマーからいくつかのパーティションを盗もうとします。

負荷分散戦略を指定するために、開発者は構成に EventHubsContainerProperties を使用できます。 を構成する方法の例については、次のセクション 参照してください。

Batch コンシューマーのサポート

EventHubsInboundChannelAdapter では、バッチ使用モードがサポートされています。 これを有効にするには、ユーザーは、ListenerMode.BATCH インスタンスを構築するときに、リスナー モードを EventHubsInboundChannelAdapter として指定できます。 有効にすると、ペイロードがバッチ 処理されたイベントの一覧である メッセージ が受信され、ダウンストリーム チャネルに渡されます。 各メッセージ ヘッダーもリストとして変換され、コンテンツは各イベントから解析された関連ヘッダー値です。 パーティション ID、チェックポイント、および最後にエンキューされたプロパティの共同ヘッダーの場合、イベントのバッチ全体で同じ値が共有される単一の値として表示されます。 詳細については、「Event Hubs メッセージ ヘッダー 」セクションを参照してください。

手記

チェックポイント ヘッダーは、manual チェックポイント モード 使用されている場合にのみ存在します。

バッチ コンシューマーのチェックポイント処理では、BATCHMANUALの 2 つのモードがサポートされます。 BATCH モードは、受信したイベントのバッチ全体を一緒にチェックポイント処理する自動チェックポイント モードです。 MANUAL モードでは、ユーザーがイベントをチェックポイント処理します。 使用すると、Checkpointer がメッセージ ヘッダーに渡され、ユーザーはそれを使用してチェックポイント処理を実行できます。

バッチ使用ポリシーは、max-sizemax-wait-timeのプロパティで指定できます。ここで、max-size は必要なプロパティですが、max-wait-time は省略可能です。 バッチ使用戦略を指定するために、開発者は構成に EventHubsContainerProperties を使用できます。 を構成する方法の例については、次のセクション 参照してください。

依存関係のセットアップ

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

構成

このスターターには、次の 3 つの構成オプションが用意されています。

接続構成のプロパティ

このセクションには、Azure Event Hubs への接続に使用される構成オプションが含まれています。

手記

セキュリティ プリンシパルを使用して、Azure リソースにアクセスするための Microsoft Entra ID による認証と承認を行う場合は、「Microsoft Entra ID を使用してアクセスを承認する」 を参照して、セキュリティ プリンシパルに Azure リソースにアクセスするための十分なアクセス許可が付与されていることを確認してください。

spring-cloud-azure-starter-integration-eventhubs の接続構成可能なプロパティ:

財産 種類 形容
spring.cloud.azure.eventhubs.enabled ブーリアン Azure Event Hubs が有効になっているかどうか。
spring.cloud.azure.eventhubs.connection-string Event Hubs 名前空間の接続文字列の値。
spring.cloud.azure.eventhubs.namespace を する Event Hubs 名前空間の値。これは FQDN のプレフィックスです。 FQDN は NamespaceName.DomainName で構成する必要があります
spring.cloud.azure.eventhubs .domain-name Azure Event Hubs 名前空間の値のドメイン名。
spring.cloud.azure.eventhubs.custom-endpoint-address を する カスタム エンドポイント アドレス。
spring.cloud.azure.eventhubs.shared-connection を する ブーリアン 基になる EventProcessorClient と EventHubProducerAsyncClient が同じ接続を使用するかどうか。 既定では、新しい接続が構築され、作成されたイベント ハブ クライアントごとに使用されます。

チェックポイント構成プロパティ

このセクションには、パーティションの所有権とチェックポイント情報を保持するために使用されるストレージ BLOB サービスの構成オプションが含まれています。

手記

バージョン 4.0.0 以降、spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists のプロパティが手動で有効になっていない場合、ストレージ コンテナーは自動的に作成されません。

spring-cloud-azure-starter-integration-eventhubs の構成可能なプロパティのチェックポイント処理:

財産 種類 形容
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists ブーリアン コンテナーが存在しない場合に作成を許可するかどうか。
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name を する ストレージ アカウントの名前。
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key ストレージ アカウントのアクセス キー。
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name ストレージ コンテナー名。

Azure Service SDK の一般的な構成オプションは、ストレージ BLOB チェックポイント ストアでも構成できます。 サポートされている構成オプションは、Spring Cloud Azure 構成で導入され、統合プレフィックス spring.cloud.azure. または spring.cloud.azure.eventhubs.processor.checkpoint-storeのプレフィックスで構成できます。

Event Hub プロセッサの構成プロパティ

EventHubsInboundChannelAdapter では、EventProcessorClient を使用してイベント ハブからのメッセージを使用し、EventProcessorClientの全体的なプロパティを構成します。開発者は構成に EventHubsContainerProperties を使用できます。 の使用方法については、次のセクション 参照してください。

基本的な使用方法

Azure Event Hubs にメッセージを送信する

  1. 資格情報の構成オプションを入力します。

    • 接続文字列としての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • 資格情報をマネージド ID として使用するには、application.yml ファイルで次のプロパティを構成します。

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • サービス プリンシパルとしての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

手記

tenant-id に使用できる値は、commonorganizationsconsumers、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人用アカウントと組織アカウント) セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra IDでのシングルテナント アプリをマルチテナントに変換する」を参照してください。

  1. Event Hubs にメッセージを送信する DefaultMessageHandler Bean で EventHubsTemplate を作成します。

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. メッセージ チャネルを介して、上記のメッセージ ハンドラーを使用してメッセージ ゲートウェイ バインドを作成します。

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. ゲートウェイを使用してメッセージを送信します。

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Azure Event Hubs からメッセージを受信する

  1. 資格情報の構成オプションを入力します。

  2. 入力チャネルとしてメッセージ・チャネルの Bean を作成します。

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Event Hubs からメッセージを受信する EventHubsInboundChannelAdapter Bean で EventHubsMessageListenerContainer を作成します。

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. 前に作成したメッセージ チャネルを使用して、EventHubsInboundChannelAdapter でメッセージ レシーバー バインドを作成します。

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

objectMapper をカスタマイズするように EventHubsMessageConverter を構成する

EventHubsMessageConverter は、ユーザーが ObjectMapper をカスタマイズできるように構成可能な Bean として作成されます。

Batch コンシューマーのサポート

Event Hubs からのメッセージをバッチで使用するには、上記のサンプルと似ていますが、ユーザーは、EventHubsInboundChannelAdapterのバッチ消費関連の構成オプションを設定する必要があります。

EventHubsInboundChannelAdapterを作成するときは、リスナー モードを BATCHとして設定する必要があります。 EventHubsMessageListenerContainerの Bean を作成する場合は、チェックポイント モードを MANUAL または BATCHとして設定し、必要に応じてバッチ オプションを構成できます。

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Event Hubs メッセージ ヘッダー

次の表は、Event Hubs のメッセージ プロパティを Spring メッセージ ヘッダーにマップする方法を示しています。 Azure Event Hubs の場合、メッセージは eventとして呼び出されます。

レコード リスナー モードでの Event Hubs メッセージ/イベント プロパティと Spring メッセージ ヘッダーの間のマッピング:

Event Hubs イベントのプロパティ Spring Message ヘッダー定数 種類 形容
エンキューされた時刻 EventHubsHeaders#ENQUEUED_TIME インスタント イベントがイベント ハブ パーティションにエンキューされた時刻 (UTC)。
相殺 EventHubsHeaders#OFFSET 長い 関連付けられた Event Hub パーティションから受信されたイベントのオフセット。
パーティション キー AzureHeaders#PARTITION_KEY 最初にイベントを発行するときに設定された場合のパーティション ハッシュ キー。
パーティション ID AzureHeaders#RAW_PARTITION_ID イベント ハブのパーティション ID。
シーケンス番号 EventHubsHeaders#SEQUENCE_NUMBER 長い 関連付けられた Event Hub パーティションにエンキューされたときにイベントに割り当てられたシーケンス番号。
エンキューされた最後のイベント プロパティ EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties このパーティション内の最後にエンキューされたイベントのプロパティ。
NA AzureHeaders#CHECKPOINTER Checkpointer 特定のメッセージをチェックポイント処理するためのヘッダー。

ユーザーは、各イベントの関連情報のメッセージ ヘッダーを解析できます。 イベントのメッセージ ヘッダーを設定するには、カスタマイズされたすべてのヘッダーがイベントのアプリケーション プロパティとして配置されます。この場合、ヘッダーはプロパティ キーとして設定されます。 Event Hubs からイベントを受信すると、すべてのアプリケーション プロパティがメッセージ ヘッダーに変換されます。

手記

パーティション キー、エンキューされた時刻、オフセット、シーケンス番号のメッセージ ヘッダーは、手動で設定することはできません。

バッチ コンシューマー モードを有効にすると、バッチ処理されたメッセージの特定のヘッダーが次のように表示されます。このヘッダーには、各 Event Hubs イベントの値の一覧が含まれます。

バッチ リスナー モードでの Event Hubs メッセージ/イベント プロパティと Spring メッセージ ヘッダーの間のマッピング:

Event Hubs イベントのプロパティ Spring Batch Message ヘッダー定数 種類 形容
エンキューされた時刻 EventHubsHeaders#ENQUEUED_TIME インスタントの一覧 各イベントが Event Hub パーティションにエンキューされたときのインスタント (UTC) の一覧。
相殺 EventHubsHeaders#OFFSET 長の一覧 関連付けられた Event Hub パーティションから受信された各イベントのオフセットの一覧。
パーティション キー AzureHeaders#PARTITION_KEY 文字列の一覧 最初に各イベントを発行するときに設定された場合のパーティション ハッシュ キーの一覧。
シーケンス番号 EventHubsHeaders#SEQUENCE_NUMBER 長の一覧 関連付けられた Event Hub パーティションにエンキューされたときに各イベントに割り当てられたシーケンス番号の一覧。
システム プロパティ EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES マップの一覧 各イベントのシステム プロパティの一覧。
アプリケーションのプロパティ EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES マップの一覧 カスタマイズされたすべてのメッセージ ヘッダーまたはイベント プロパティが配置される、各イベントのアプリケーション プロパティの一覧。

手記

メッセージを発行すると、上記のすべてのバッチ ヘッダーがメッセージから削除されます (存在する場合)。

サンプル

詳細については、GitHub の azure-spring-boot-samples リポジトリ を参照してください。

Azure Service Bus との Spring 統合

主な概念

Spring Integration を使用すると、Spring ベースのアプリケーション内で軽量のメッセージングが可能になり、宣言型アダプターを介した外部システムとの統合がサポートされます。

Azure Service Bus 拡張機能プロジェクトの Spring Integration は、Azure Service Bus の受信チャネル アダプターと送信チャネル アダプターを提供します。

手記

CompletableFuture サポート API はバージョン 2.10.0 から非推奨となり、バージョン 4.0.0 の Reactor Core に置き換えられました。 詳細については、Javadoc を参照してください。

依存関係のセットアップ

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

構成

このスターターでは、構成オプションの次の 2 つの部分を提供します。

接続構成プロパティ

このセクションには、Azure Service Bus への接続に使用される構成オプションが含まれています。

手記

セキュリティ プリンシパルを使用して、Azure リソースにアクセスするための Microsoft Entra ID による認証と承認を行う場合は、「Microsoft Entra ID を使用してアクセスを承認する」 を参照して、セキュリティ プリンシパルに Azure リソースにアクセスするための十分なアクセス許可が付与されていることを確認してください。

spring-cloud-azure-starter-integration-servicebus の接続構成可能なプロパティ:

財産 種類 形容
spring.cloud.azure.servicebus.enabled ブーリアン Azure Service Bus が有効になっているかどうか。
spring.cloud.azure.servicebus.connection-string を する Service Bus 名前空間の接続文字列の値。
spring.cloud.azure.servicebus.custom-endpoint-address を する Service Bus に接続するときに使用するカスタム エンドポイント アドレス。
spring.cloud.azure.servicebus.namespace を する Service Bus 名前空間の値。これは FQDN のプレフィックスです。 FQDN は NamespaceName.DomainName で構成する必要があります
spring.cloud.azure.servicebus.domain-name を する Azure Service Bus 名前空間の値のドメイン名。

Service Bus プロセッサ構成プロパティ

ServiceBusInboundChannelAdapter では、ServiceBusProcessorClient を使用してメッセージを使用し、ServiceBusProcessorClientの全体的なプロパティを構成します。開発者は構成に ServiceBusContainerProperties を使用できます。 の使用方法については、次のセクション 参照してください。

基本的な使用方法

Azure Service Bus にメッセージを送信する

  1. 資格情報の構成オプションを入力します。

    • 接続文字列としての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • 資格情報をマネージド ID として使用するには、application.yml ファイルで次のプロパティを構成します。

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

手記

tenant-id に使用できる値は、commonorganizationsconsumers、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人用アカウントと組織アカウント) セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra IDでのシングルテナント アプリをマルチテナントに変換する」を参照してください。

  • サービス プリンシパルとしての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

手記

tenant-id に使用できる値は、commonorganizationsconsumers、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人用アカウントと組織アカウント) セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra IDでのシングルテナント アプリをマルチテナントに変換する」を参照してください。

  1. Service Bus にメッセージを送信する DefaultMessageHandler Bean で ServiceBusTemplate を作成し、ServiceBusTemplate のエンティティ型を設定します。 このサンプルでは、Service Bus キューを例として受け取ります。

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. メッセージ チャネルを介して、上記のメッセージ ハンドラーを使用してメッセージ ゲートウェイ バインドを作成します。

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. ゲートウェイを使用してメッセージを送信します。

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Azure Service Bus からメッセージを受信する

  1. 資格情報の構成オプションを入力します。

  2. 入力チャネルとしてメッセージ・チャネルの Bean を作成します。

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Service Bus にメッセージを受信する ServiceBusInboundChannelAdapter Bean で ServiceBusMessageListenerContainer を作成します。 このサンプルでは、Service Bus キューを例として受け取ります。

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. 前に作成したメッセージ チャネルを使用して、ServiceBusInboundChannelAdapter を使用してメッセージ レシーバー バインドを作成します。

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

objectMapper をカスタマイズするように ServiceBusMessageConverter を構成する

ServiceBusMessageConverter は、ユーザーが ObjectMapperをカスタマイズできるように構成可能な Bean として作成されます。

Service Bus メッセージ ヘッダー

複数の Spring ヘッダー定数にマップできる一部の Service Bus ヘッダーの場合、異なる Spring ヘッダーの優先順位が一覧表示されます。

Service Bus ヘッダーと Spring ヘッダーの間のマッピング:

Service Bus メッセージのヘッダーとプロパティ Spring メッセージ ヘッダー定数 種類 設定 形容
コンテンツ タイプ MessageHeaders#CONTENT_TYPE はい メッセージのRFC2045コンテンツ タイプ記述子。
関連付け ID ServiceBusMessageHeaders#CORRELATION_ID はい メッセージの関連付け ID
メッセージ ID ServiceBusMessageHeaders#MESSAGE_ID はい メッセージのメッセージ ID。このヘッダーは、MessageHeaders#IDよりも高い優先順位を持ちます。
メッセージ ID MessageHeaders#ID UUID はい メッセージのメッセージ ID。このヘッダーの優先度は ServiceBusMessageHeaders#MESSAGE_IDよりも低くなります。
パーティション キー ServiceBusMessageHeaders#PARTITION_KEY はい パーティション分割されたエンティティにメッセージを送信するためのパーティション キー。
返信先 MessageHeaders#REPLY_CHANNEL はい 返信を送信するエンティティのアドレス。
セッション ID に返信する ServiceBusMessageHeaders#REPLY_TO_SESSION_ID はい メッセージの ReplyToGroupId プロパティ値。
スケジュールされたエンキュー時間 (utc) ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime はい Service Bus でメッセージをエンキューする日時。このヘッダーの優先度は AzureHeaders#SCHEDULED_ENQUEUE_MESSAGEよりも高くなります。
スケジュールされたエンキュー時間 (utc) AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE 整数 はい Service Bus でメッセージをエンキューする日時。このヘッダーの優先度は ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIMEよりも低くなります。
セッション ID ServiceBusMessageHeaders#SESSION_ID はい セッション対応エンティティのセッション IDentifier。
Time to Live ServiceBusMessageHeaders#TIME_TO_LIVE 期間 はい このメッセージの有効期限が切れるまでの時間。
宛先 ServiceBusMessageHeaders#TO はい メッセージの "宛先" アドレス。ルーティング シナリオで将来使用するために予約され、現在はブローカー自体によって無視されます。
件名 ServiceBusMessageHeaders#SUBJECT はい メッセージの件名。
配信不能エラーの説明 ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION いいえ 配信不能メッセージの説明。
配信不能の理由 ServiceBusMessageHeaders#DEAD_LETTER_REASON いいえ メッセージが配信不能になった理由。
配信不能ソース ServiceBusMessageHeaders#DEAD_LETTER_SOURCE いいえ メッセージが配信不能になったエンティティ。
配信数 ServiceBusMessageHeaders#DELIVERY_COUNT 長い いいえ このメッセージがクライアントに配信された回数。
エンキューされたシーケンス番号 ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER 長い いいえ Service Bus によってメッセージに割り当てられたエンキューされたシーケンス番号。
エンキューされた時刻 ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime いいえ Service Bus でこのメッセージがエンキューされた日時。
有効期限: ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime いいえ このメッセージの有効期限が切れる日時。
ロック トークン ServiceBusMessageHeaders#LOCK_TOKEN いいえ 現在のメッセージのロック トークン。
ロック時間 ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime いいえ このメッセージのロックの有効期限が切れる日時。
シーケンス番号 ServiceBusMessageHeaders#SEQUENCE_NUMBER 長い いいえ Service Bus によってメッセージに割り当てられた一意の番号。
状態 ServiceBusMessageHeaders#STATE ServiceBusMessageState いいえ メッセージの状態。アクティブ、遅延、またはスケジュールできます。

パーティション キーのサポート

このスターターでは、メッセージ ヘッダー パーティション キーとセッション ID を設定できるようにすることで、Service Bus のパーティション分割 をサポートします。 このセクションでは、メッセージのパーティション キーを設定する方法について説明します。

推奨: ヘッダーのキーとして ServiceBusMessageHeaders.PARTITION_KEY を使用します。

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

推奨されていませんが、現在サポートされています:ヘッダーのキーとして AzureHeaders.PARTITION_KEY

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

手記

メッセージ ヘッダーに ServiceBusMessageHeaders.PARTITION_KEYAzureHeaders.PARTITION_KEY の両方を設定する場合は、ServiceBusMessageHeaders.PARTITION_KEY をお勧めします。

セッションのサポート

この例では、アプリケーションでメッセージのセッション ID を手動で設定する方法を示します。

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

手記

メッセージ ヘッダーに ServiceBusMessageHeaders.SESSION_ID を設定し、別の ServiceBusMessageHeaders.PARTITION_KEY ヘッダーも設定すると、最終的にセッション ID の値を使用してパーティション キーの値が上書きされます。

Service Bus クライアントのプロパティをカスタマイズする

開発者は、AzureServiceClientBuilderCustomizer を使用して Service Bus クライアントのプロパティをカスタマイズできます。 次の例では、sessionIdleTimeoutServiceBusClientBuilder プロパティをカスタマイズします。

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

サンプル

詳細については、GitHub の azure-spring-boot-samples リポジトリ を参照してください。

Spring と Azure Storage Queue の統合

主な概念

Azure Queue Storage は、大量のメッセージを格納するためのサービスです。 HTTP または HTTPS を使用して認証された呼び出しを使用して、世界中のどこからでもメッセージにアクセスします。 キュー メッセージのサイズは最大 64 KB です。 キューには、ストレージ アカウントの合計容量制限まで、数百万のメッセージが含まれる場合があります。 キューは、非同期的に処理する作業のバックログを作成するために一般的に使用されます。

依存関係のセットアップ

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

構成

このスターターには、次の構成オプションがあります。

接続構成プロパティ

このセクションでは、Azure Storage キューへの接続に使用される構成オプションについて説明します。

手記

セキュリティ プリンシパルを使用して、Azure リソースにアクセスするための Microsoft Entra ID による認証と承認を行う場合は、「Microsoft Entra ID を使用してアクセスを承認する」 を参照して、セキュリティ プリンシパルに Azure リソースにアクセスするための十分なアクセス許可が付与されていることを確認してください。

spring-cloud-azure-starter-integration-storage-queue の接続構成可能なプロパティ:

財産 種類 形容
spring.cloud.azure.storage.queue.enabled ブーリアン Azure Storage キューが有効になっているかどうか。
spring.cloud.azure.storage.queue.connection-string を する Storage Queue 名前空間の接続文字列の値。
spring.cloud.azure.storage.queue.accountName ストレージ キュー アカウント名。
spring.cloud.azure.storage.queue.accountKey を する ストレージ キュー アカウント キー。
spring.cloud.azure.storage.queue.endpoint を する Storage Queue サービス エンドポイント。
spring.cloud.azure.storage.queue.sasToken を する Sas トークン資格情報
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion API 要求を行うときに使用される QueueServiceVersion。
spring.cloud.azure.storage.queue.messageEncoding を する キュー メッセージのエンコード。

基本的な使用方法

Azure Storage キューにメッセージを送信する

  1. 資格情報の構成オプションを入力します。

    • 接続文字列としての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
      
    • 資格情報をマネージド ID として使用するには、application.yml ファイルで次のプロパティを構成します。

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
      

手記

tenant-id に使用できる値は、commonorganizationsconsumers、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人用アカウントと組織アカウント) セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra IDでのシングルテナント アプリをマルチテナントに変換する」を参照してください。

  • サービス プリンシパルとしての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
    

手記

tenant-id に使用できる値は、commonorganizationsconsumers、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人用アカウントと組織アカウント) セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra IDでのシングルテナント アプリをマルチテナントに変換する」を参照してください。

  1. DefaultMessageHandler Bean を使用して StorageQueueTemplate を作成し、ストレージ キューにメッセージを送信します。

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. メッセージ チャネルを介して、上記のメッセージ ハンドラーを使用してメッセージ ゲートウェイ バインドを作成します。

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. ゲートウェイを使用してメッセージを送信します。

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Azure Storage キューからメッセージを受信する

  1. 資格情報の構成オプションを入力します。

  2. 入力チャネルとしてメッセージ・チャネルの Bean を作成します。

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. ストレージ キューにメッセージを受信する StorageQueueMessageSource Bean を含む StorageQueueTemplate を作成します。

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. 前に作成したメッセージ チャネルを使用して、最後の手順で作成した StorageQueueMessageSource を使用してメッセージ レシーバー バインドを作成します。

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

サンプル

詳細については、GitHub の azure-spring-boot-samples リポジトリ を参照してください。