次の方法で共有


Spring Cloud Stream の Spring Cloud Azure サポート

この記事の対象: ✔️ バージョン 4.14.0 ✔️ バージョン 5.8.0

Spring Cloud Stream は、共有メッセージング システムに接続された高度なスケーリングが可能なイベント ドリブン マイクロサービスを構築するためのフレームワークです。

このフレームワークは、既に確立されており、使い慣れた Spring のイディオムとベスト プラクティスに基づいて構築された柔軟なプログラミング モデルを提供します。 これらのベスト プラクティスには、永続的な pub/sub セマンティクス、コンシューマー グループ、ステートフル パーティションのサポートが含まれます。

現在のバインダーの実装は次のとおりです。

Azure Event Hubs 用 Spring Cloud Stream Binder

主要な概念

Azure Event Hubs 用 Spring Cloud Stream Binder は、Spring Cloud Stream フレームワークのバインド実装を提供します。 この実装では、Spring Integration Event Hubs チャネル アダプターを基盤として使用します。 デザインの観点から見ると、Event Hubs は Kafka と似ています。 また、Event Hubs には Kafka API を使用してアクセスできます。 プロジェクトが Kafka API に厳密に依存している場合は、Kafka API サンプルを使用した Events Hub を試すことができます

コンシューマー グループ

Event Hubs では、Apache Kafka と同様のコンシューマー グループのサポートが提供されますが、ロジックは若干異なります。 Kafka はコミットされたすべてのオフセットをブローカーに格納しますが、ユーザーは手動で処理される Event Hubs メッセージのオフセットを格納する必要があります。 Azure Storage 内にこのようなオフセットを格納する関数が、Event Hubs SDK で提供されています。

パーティション分割のサポート

Event Hubs には、Kafka と同様の物理パーティションの概念が用意されています。 ただし、Kafka のコンシューマーとパーティション間の自動再調整とは異なり、Event Hubs には一種のプリエンプティブ モードが用意されています。 ストレージ アカウントは、どのコンシューマーがどのパーティションを所有するかを判断するためのリースとして機能します。 新しいコンシューマーが起動すると、ワークロードバランスを実現するために、最も負荷の高いコンシューマーからいくつかのパーティションを盗もうとします。

負荷分散戦略を指定するために、spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* のプロパティが提供されます。 詳細については、「コンシューマーのプロパティ」セクションを参照してください。

Batch コンシューマーのサポート

Spring Cloud Azure Stream Event Hubs バインダーでは、Spring Cloud Stream Batch コンシューマー機能がサポートされています。

バッチ コンシューマー モードを使用するには、プロパティtruespring.cloud.stream.bindings.<binding-name>.consumer.batch-mode . 有効にすると、バッチ処理されたイベントの一覧のペイロードを含むメッセージが受信され、関数に Consumer 渡されます。 各メッセージ ヘッダーもリストに変換され、コンテンツは各イベントから解析された関連ヘッダー値です。 イベントのバッチ全体が同じ値を共有するため、パーティション ID、チェックpointer、および最後にエンキューされたプロパティの共同ヘッダーが 1 つの値として表示されます。 詳細については、Spring Integration の Spring Cloud Azure サポートの Event Hubs メッセージ ヘッダーセクションを参照してください。

Note

チェックpoint ヘッダーは、チェックpoint モードがMANUAL使用されている場合にのみ存在します。

バッチ コンシューマーのチェックポイント処理では、BATCHMANUAL の 2 つのモードがサポートされています。 BATCHmode は、バインダーがイベントを受け取った後にイベントのバッチ全体をチェックポイントする自動チェックポイント モードです。 MANUAL モードでは、ユーザーがイベントをチェックポイント処理します。 使用すると、Checkpointerメッセージ ヘッダーに渡され、ユーザーはそれを使用してチェックポイントできます。

バッチ サイズを指定するには、プレフィックスspring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.が ⿇ のmax-sizeプロパティとmax-wait-timeを設定します。 プロパティは max-size 必須であり、 max-wait-time プロパティは省略可能です。 詳細については、「コンシューマーのプロパティ」セクションを参照してください。

依存関係のセットアップ

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

または、Maven の次の例に示すように、Spring Cloud Azure Stream Event Hubs Starter を使用することもできます。

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

構成

バインダーは、構成オプションの次の 3 つの部分を提供します。

接続構成プロパティ

このセクションには、Azure Event Hubs への接続に使用される構成オプションが含まれます。

Note

セキュリティ プリンシパルを使用して、Azure リソースにアクセスするための Microsoft Entra ID による認証と承認を行う場合は、「Microsoft Entra ID によるアクセスの承認」を参照して、Azure リソースにアクセスするための十分なアクセス許可がセキュリティ プリンシパルに付与されていることを確認してください。

spring-cloud-azure-stream-binder-eventhubs の構成可能な接続プロパティ:

プロパティ タイプ 説明
spring.cloud.azure.eventhubs.enabled boolean Azure Event Hubs が有効になっているかどうか。
spring.cloud.azure.eventhubs.connection-string String Event Hubs 名前空間の接続文字列の値。
spring.cloud.azure.eventhubs.namespace String Event Hubs 名前空間の値。これは FQDN のプレフィックスです。 FQDN は NamespaceName.DomainName で構成する必要があります
spring.cloud.azure.eventhubs.domain-name String Azure Event Hubs 名前空間の値のドメイン名。
spring.cloud.azure.eventhubs.custom-endpoint-address String カスタム エンドポイント アドレス。

ヒント

一般的な Azure Service SDK 構成オプションは、Spring Cloud Azure Stream Event Hubs バインダーにも構成できます。 サポートされている構成オプションは Spring Cloud Azure 構成導入され、統合プレフィックスspring.cloud.azure.またはspring.cloud.azure.eventhubs.プレフィックスを使用して構成できます。

バインダーでは、Spring Could Azure Resource Manager も既定でサポートされています。 関連ロールで付与Dataされていないセキュリティ プリンシパルを使用して接続文字列を取得する方法については、Spring Could Azure Resource Manager「基本的な使用」セクションを参照してください

チェックポイント構成プロパティ

このセクションには、パーティションの所有権とチェックポイントの情報を保持するために使用される、Storage Blob サービスの構成オプションが含まれます。

Note

バージョン 4.0.0 以降では、spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists のプロパティが手動で有効にされていない場合、spring.cloud.stream.bindings.binding-name.destination の名前を持つ Storage コンテナーは自動的に作成されません。

spring-cloud-azure-stream-binder-eventhubs の構成可能なチェックポイント プロパティ:

プロパティ タイプ 説明
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Boolean 存在しない場合にコンテナーの作成を許可するかどうか。
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name String ストレージ アカウントの名前。
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key String ストレージ アカウント アクセス キー。
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name String ストレージ コンテナー名。

ヒント

一般的な Azure Service SDK 構成オプションは、Storage Blob チェックポイント ストアに対しても構成できます。 サポートされている構成オプションは Spring Cloud Azure 構成導入され、統合プレフィックスspring.cloud.azure.またはspring.cloud.azure.eventhubs.processor.checkpoint-storeプレフィックスを使用して構成できます。

Azure Event Hubs のバインド構成プロパティ

以下のオプションは、コンシューマー プロパティ、高度なコンシューマー構成、プロデューサー プロパティ、高度なプロデューサー構成の 4 つのセクションに分かれています。

コンシューマーのプロパティ

これらのプロパティは、EventHubsConsumerProperties を介して公開されます。

spring-cloud-azure-stream-binder-eventhubs の構成可能なコンシューマー プロパティ:

プロパティ タイプ 説明
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode CheckpointMode コンシューマーがメッセージのチェックポイントを設定する方法を決定するときに使用されるチェックポイント モード
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count 整数型 1 つのチェックポイントに対するパーティションごとのメッセージの量を決定します。 PARTITION_COUNT チェックポイント モードが使用されている場合にのみ有効になります。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval Duration 1 つのチェックポイントに対する時間間隔を決定します。 TIME チェックポイント モードが使用されている場合にのみ有効になります。
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size 整数型 バッチ内の最大イベント数。 バッチ コンシューマー モードの場合は必須です。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Duration バッチ処理の最大時間。 バッチ コンシューマー モードが有効で省略可能な場合にのみ有効になります。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval Duration 更新の時間間隔。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy LoadBalancingStrategy 負荷分散戦略。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval Duration パーティションの所有権が期限切れになるまでの期間。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties Boolean イベント プロセッサが、関連付けられているパーティションで最後にエンキューされたイベントに関する情報を要求し、イベントの受信時にその情報を追跡する必要があるかどうか。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count 整数型 イベント ハブ コンシューマーがアクティブに受信し、ローカルでキューに登録するイベントの数を制御するためにコンシューマーによって使用される数。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position キーがパーティション ID で値が StartPositionProperties のマップ パーティションのチェックポイントがチェックポイント ストアに存在しない場合に、各パーティションに使用するイベント位置を含むマップ。 このマップは、パーティション ID からキーが設定されます。

Note

initial-partition-event-position の構成は、map を受け取って、各イベント ハブの初期位置を指定します。 したがって、そのキーはパーティション ID であり、値は StartPositionProperties (オフセット、シーケンス番号、エンキューされた日時、および包含のプロパティを含む) です。 たとえば、次のように設定できます

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
高度なコンシューマー構成

上記の接続チェックポイント、および一般的な Azure SDK クライアント構成では、各バインダー コンシューマーのカスタマイズがサポートされています。これはプレフィックスspring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.を使用して構成できます。

プロデューサーのプロパティ

これらのプロパティは、EventHubsProducerProperties を介して公開されます。

spring-cloud-azure-stream-binder-eventhubs の構成可能なプロデューサー プロパティ:

プロパティ タイプ 説明
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync boolean プロデューサーの同期のためのスイッチ フラグ。 true の場合、プロデューサーは送信操作の後で応答を待機します。
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout long 送信操作の後に応答を待機する時間。 同期プロデューサーが有効になっている場合にのみ有効になります。
高度なプロデューサー構成

上記 の接続一般的な Azure SDK クライアント 構成では、各バインダー プロデューサーのカスタマイズがサポートされており、プレフィックス spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.を使用して構成できます。

基本的な使用方法

Event Hubs との間のメッセージの送受信

  1. 構成オプションに資格情報を入力します。

    • 接続文字列資格情報の場合は、application.yml ファイルで次のプロパティを構成します。

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • サービス プリンシパルとしての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Note

使用できる tenant-id 値は、次のとおりです。 commonorganizationsconsumers、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人アカウントと組織アカウント) を使用する」セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra ID でシングルテナント アプリをマルチテナントに変換する」を参照してください

  • 資格情報をマネージド ID として使用するには、application.yml ファイルで次のプロパティを構成します。

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. サプライヤーとコンシューマーを定義します。

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

パーティション分割のサポート

PartitionSupplier送信するメッセージに関するパーティション情報を構成するために、ユーザーが指定したパーティション情報が作成されます。 次のフローチャートは、パーティション ID とキーのさまざまな優先順位を取得するプロセスを示しています。

Diagram showing a flowchart of the partitioning support process.

Batch コンシューマーのサポート

  1. 次の例に示すように、バッチ構成オプションを指定します。

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. サプライヤーとコンシューマーを定義します。

    BATCH チェックポイント モードの場合は、次のコードを使用してメッセージを送信し、バッチで使用できます。

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    MANUAL チェックポイント モードの場合は、次のコードを使用してメッセージを送信し、バッチで使用およびチェックポイントを実行できます。

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Note

バッチ消費モードでは、Spring Cloud Stream バインダーの既定のコンテンツ タイプが application/json であるため、メッセージ ペイロードがコンテンツ タイプと一致していることを確認してください。 たとえば、既定のコンテンツ タイプをapplication/json使用してペイロードを含むStringメッセージを受信する場合、ペイロードは元Stringのテキストの二重引用符で囲む必要がありますJSON Stringtext/plain のコンテンツ タイプの場合は、直接 String オブジェクトにすることができます。 詳細については、「Spring Cloud Stream コンテンツ タイプ ネゴシエーション」を参照してください

エラー メッセージを処理する

  • 送信バインドのエラー メッセージを処理する

    既定では、Spring Integration によって 、"> という errorChannelグローバル エラー チャネルが作成されます。 送信バインディング エラー メッセージを処理するように、次のメッセージ エンドポイントを構成します。

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • 受信バインディング のエラー メッセージを処理する

    Spring Cloud Stream Event Hubs Binder では、受信メッセージ バインドのエラーを処理するための 2 つのソリューション (カスタム エラー チャネルとハンドラー) がサポートされています。

    エラー チャネル:

    Spring Cloud Stream は、受信バインディングごとにエラー チャネルを提供します。 エラー ErrorMessage チャネルに送信されます。 詳細については、Spring Cloud Stream ドキュメントのエラーの処理を参照してください。

    • 既定のエラー チャネル

      名前付きの errorChannel グローバル エラー チャネルを使用して、すべての受信バインディング エラー メッセージを使用できます。 これらのメッセージを処理するには、次のメッセージ エンドポイントを構成します。

      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      
    • バインド固有のエラー チャネル

      特定のエラー チャネルを使用して、既定のエラー チャネルよりも優先順位の高い特定の受信バインディング エラー メッセージを使用できます。 これらのメッセージを処理するには、次のメッセージ エンドポイントを構成します。

      // Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination
      // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group
      @ServiceActivator(inputChannel = "{destination}.{group}.errors")
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      

      Note

      バインド固有のエラー チャネルは、他の指定されたエラー ハンドラーおよびチャネルと相互に排他的です。

    エラー ハンドラー:

    Spring Cloud Stream では、インスタンスを受け入れるErrorMessageエラー ハンドラーを追加してカスタム エラー ハンドラーをConsumer提供するためのメカニズムが公開されています。 詳細については、Spring Cloud Stream ドキュメントのエラー処理を参照してください。

    Note

    バインド エラー ハンドラーが構成されている場合は、既定のエラー チャネルを使用できます。

    • バインドの既定のエラー ハンドラー

      すべてのインバウンド・バインディング・エラー・メッセージを使用するように単一 Consumer Bean を構成します。 次の既定の関数は、各受信バインディング エラー チャネルをサブスクライブします。

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      また、プロパティを関数名に設定 spring.cloud.stream.default.error-handler-definition する必要もあります。

    • バインド固有のエラー ハンドラー

      特定の Consumer インバウンド・バインディング・エラー・メッセージを使用するように Bean を構成します。 次の関数は、特定の受信バインディング エラー チャネルをサブスクライブし、バインディングの既定のエラー ハンドラーよりも高い優先順位を持ちます。

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      また、プロパティを関数名に設定 spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition する必要もあります。

Event Hubs メッセージ ヘッダー

サポートされている基本的なメッセージ ヘッダーについては、Spring Cloud Azure サポート for Spring IntegrationEvent Hubs メッセージ ヘッダーに関するセクションを参照してください

複数のバインダーのサポート

複数のバインダーを使用して、複数の Event Hubs 名前空間への接続もサポートされます。 このサンプルでは、例として接続文字列を取ります。 サービス プリンシパルとマネージド ID の資格情報もサポートされています。 各バインダーの環境設定で、関連するプロパティを設定できます。

  1. Event Hubs で複数のバインダーを使用するには、application.yml ファイルで次のプロパティを構成します。

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Note

    前のアプリケーション ファイルは、すべてのバインドに対してアプリケーションの 1 つの既定のポーリングツールを構成する方法を示しています。 特定のバインディングに対して poller を構成する場合は、次のような spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000構成を使用できます。

  2. 2 つのサプライヤーと 2 つのコンシューマーを定義する必要があります。

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

リソースのプロビジョニング

Event Hubs バインダーでは、イベント ハブとコンシューマー グループのプロビジョニングがサポートされており、ユーザーは次のプロパティを使用してプロビジョニングを有効にできます。

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Note

使用できる tenant-id 値は、次のとおりです。 commonorganizationsconsumers、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人アカウントと組織アカウント) を使用する」セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra ID でシングルテナント アプリをマルチテナントに変換する」を参照してください

サンプル

詳細については、GitHub の azure-spring-boot-samples リポジトリを参照してください。

Azure Service Bus 用 Spring Cloud Stream Binder

主要な概念

Azure Service Bus 用 Spring Cloud Stream Binder は、Spring Cloud Stream Framework のバインド実装を提供します。 この実装では、Spring Integration Service Bus チャネル アダプターを基盤として使用します。

スケジュールされたメッセージ

このバインダーは、遅延処理のためにトピックへのメッセージの送信をサポートします。 ユーザーは、メッセージの遅延時間をミリ秒単位で表すヘッダー x-delay を使用して、スケジュールされたメッセージを送信できます。 メッセージは、x-delay ミリ秒後にそれぞれのトピックに配信されます。

コンシューマー グループ

Service Bus トピックでは、Apache Kafka と同様のコンシューマー グループのサポートが提供されますが、ロジックは若干異なります。 このバインダーは、コンシューマー グループとして機能するトピックの Subscription に依存します。

依存関係のセットアップ

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

または、Maven の次の例で示すように、Spring Cloud Azure Stream Service Bus Starter を使用することもできます。

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

構成

バインダーは、構成オプションの次の 2 つの部分を提供します。

接続構成プロパティ

このセクションには、Azure Service Bus への接続に使用される構成オプションが含まれます。

Note

セキュリティ プリンシパルを使用して、Azure リソースにアクセスするための Microsoft Entra ID による認証と承認を行う場合は、「Microsoft Entra ID によるアクセスの承認」を参照して、Azure リソースにアクセスするための十分なアクセス許可がセキュリティ プリンシパルに付与されていることを確認してください。

spring-cloud-azure-stream-binder-servicebus の構成可能な接続プロパティ:

プロパティ タイプ 説明
spring.cloud.azure.servicebus.enabled boolean Azure Service Bus が有効になっているかどうか。
spring.cloud.azure.servicebus.connection-string String Service Bus 名前空間の接続文字列の値。
spring.cloud.azure.servicebus.namespace String Service Bus 名前空間の値。これは FQDN のプレフィックスです。 FQDN は NamespaceName.DomainName で構成する必要があります
spring.cloud.azure.servicebus.domain-name String Azure Service Bus 名前空間の値のドメイン名。

Note

一般的な Azure Service SDK 構成オプションは、Spring Cloud Azure Stream Service Bus バインダーにも構成できます。 サポートされている構成オプションは Spring Cloud Azure 構成導入され、統合プレフィックスspring.cloud.azure.またはspring.cloud.azure.servicebus.プレフィックスを使用して構成できます。

バインダーでは、Spring Could Azure Resource Manager も既定でサポートされています。 関連ロールで付与Dataされていないセキュリティ プリンシパルを使用して接続文字列を取得する方法については、Spring Could Azure Resource Manager「基本的な使用」セクションを参照してください

Azure Service Bus のバインド構成プロパティ

以下のオプションは、コンシューマー プロパティ、高度なコンシューマー構成、プロデューサー プロパティ、高度なプロデューサー構成の 4 つのセクションに分かれています。

コンシューマーのプロパティ

これらのプロパティは、ServiceBusConsumerProperties を介して公開されます。

spring-cloud-azure-stream-binder-servicebus の構成可能なコンシューマー プロパティ:

プロパティ Type Default 説明
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected boolean false 失敗したメッセージが DLQ にルーティングされるかどうか。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls 整数型 1 Service Bus プロセッサ クライアントが処理する必要がある最大同時実行メッセージ数。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions 整数型 null 任意の時点で処理する同時セッションの最大数。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled Boolean null セッションが有効かどうか。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count 整数型 0 Service Bus プロセッサ クライアントのプリフェッチ数。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue SubQueue なし 接続するサブ キューの種類。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Duration 5 分 ロックの自動更新を続行する時間。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock Service Bus プロセッサ クライアントの受信モード。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete Boolean true メッセージを自動的に決済するかどうか。 false に設定すると、開発者がメッセージを手動で決済できるように、メッセージ ヘッダー Checkpointer が追加されます。
高度なコンシューマー構成

上記 の接続一般的な Azure SDK クライアント 構成では、プレフィックスを使用して構成できる各バインダー コンシューマーのカスタマイズがサポートされています spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.

プロデューサーのプロパティ

これらのプロパティは、ServiceBusProducerProperties を介して公開されます。

spring-cloud-azure-stream-binder-servicebus の構成可能なプロデューサー プロパティ:

プロパティ Type Default 説明
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync boolean false プロデューサーの同期用にフラグを切り替えます。
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout long 10000 プロデューサーの送信のタイムアウト値。
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType null プロデューサーの Service Bus エンティティ型。バインディング プロデューサーに必須です。

重要

バインディング プロデューサーを使用する場合は、spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type のプロパティを構成する必要があります。

高度なプロデューサー構成

上記 の接続一般的な Azure SDK クライアント 構成では、各バインダー プロデューサーのカスタマイズがサポートされており、プレフィックス spring.cloud.stream.servicebus.bindings.<binding-name>.producer.を使用して構成できます。

基本的な使用方法

Service Bus との間のメッセージの送受信

  1. 構成オプションに資格情報を入力します。

    • 接続文字列資格情報の場合は、application.yml ファイルで次のプロパティを構成します。

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • サービス プリンシパルとしての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Note

使用できる tenant-id 値は、次のとおりです。 commonorganizationsconsumers、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人アカウントと組織アカウント) を使用する」セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra ID でシングルテナント アプリをマルチテナントに変換する」を参照してください

  • 資格情報をマネージド ID として使用するには、application.yml ファイルで次のプロパティを構成します。

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. サプライヤーとコンシューマーを定義します。

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

パーティション キーのサポート

バインダーは、メッセージ ヘッダーでパーティション キーとセッション ID を設定できるようにすることで、Service Bus パーティション分割をサポートします。 このセクションでは、メッセージのパーティション キーを設定する方法について説明します。

Spring Cloud Stream には、パーティション キー SpEL 式プロパティ spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression が用意されています。 たとえば、このプロパティを "'partitionKey-' + headers[<message-header-key>]" に設定し、message-header-key というヘッダーを追加します。 Spring Cloud Stream は、パーティション キーを割り当てる式を評価するときに、このヘッダーの値を使用します。 次のコードは、プロデューサーの例を示しています。

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

セッションのサポート

バインダーは、Service Bus のメッセージ セッションをサポートします。 メッセージのセッション ID は、メッセージ ヘッダーを使用して設定できます。

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Note

Service Bus のパーティション分割に従うと、セッション ID の優先順位はパーティション キーより高くなります。 そのため、両方のServiceBusMessageHeaders#SESSION_IDServiceBusMessageHeaders#PARTITION_KEYヘッダーが設定されると、最終的にセッション ID の値を使用してパーティション キーの値が上書きされます。

エラー メッセージを処理する

  • 送信バインドのエラー メッセージを処理する

    既定では、Spring Integration によって 、"> という errorChannelグローバル エラー チャネルが作成されます。 送信バインディング エラー メッセージを処理するように、次のメッセージ エンドポイントを構成します。

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • 受信バインディング のエラー メッセージを処理する

    Spring Cloud Stream Service Bus Binder では、受信メッセージ バインドのエラーを処理するための 3 つのソリューション (バインダー エラー ハンドラー、カスタム エラー チャネル、ハンドラー) がサポートされています。

    バインダー エラー ハンドラー:

    既定のバインダー エラー ハンドラーは、受信バインディングを処理します。 このハンドラーを使用して、有効な場合に失敗したメッセージを配信不能キューに spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected 送信します。 それ以外の場合、失敗したメッセージは破棄されます。 バインド固有のエラー チャネルを構成する場合を除き、バインダー エラー ハンドラーは、他のカスタム エラー ハンドラーまたはチャネルがあるかどうかに関係なく、常に有効になります。

    エラー チャネル:

    Spring Cloud Stream は、受信バインディングごとにエラー チャネルを提供します。 エラー ErrorMessage チャネルに送信されます。 詳細については、Spring Cloud Stream ドキュメントのエラーの処理を参照してください。

    • 既定のエラー チャネル

      名前付きの errorChannel グローバル エラー チャネルを使用して、すべての受信バインディング エラー メッセージを使用できます。 これらのメッセージを処理するには、次のメッセージ エンドポイントを構成します。

      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      
    • バインド固有のエラー チャネル

      特定のエラー チャネルを使用して、既定のエラー チャネルよりも優先順位の高い特定の受信バインディング エラー メッセージを使用できます。 これらのメッセージを処理するには、次のメッセージ エンドポイントを構成します。

      // Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination
      // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group
      @ServiceActivator(inputChannel = "{destination}.{group}.errors")
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      

      Note

      バインド固有のエラー チャネルは、他の指定されたエラー ハンドラーおよびチャネルと相互に排他的です。

    エラー ハンドラー:

    Spring Cloud Stream では、インスタンスを受け入れるErrorMessageエラー ハンドラーを追加してカスタム エラー ハンドラーをConsumer提供するためのメカニズムが公開されています。 詳細については、Spring Cloud Stream ドキュメントのエラー処理を参照してください。

    Note

    バインド エラー ハンドラーが構成されている場合は、既定のエラー チャネルとバインダー エラー ハンドラーを使用できます。

    • バインドの既定のエラー ハンドラー

      すべてのインバウンド・バインディング・エラー・メッセージを使用するように単一 Consumer Bean を構成します。 次の既定の関数は、各受信バインディング エラー チャネルをサブスクライブします。

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      また、プロパティを関数名に設定 spring.cloud.stream.default.error-handler-definition する必要もあります。

    • バインド固有のエラー ハンドラー

      特定の Consumer インバウンド・バインディング・エラー・メッセージを使用するように Bean を構成します。 次の関数は、バインディングの既定のエラー ハンドラーよりも優先順位の高い特定の受信バインド エラー チャネルをサブスクライブします。

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      また、プロパティを関数名に設定 spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition する必要もあります。

Service Bus のメッセージ ヘッダー

サポートされる基本的なメッセージ ヘッダーについては、Spring Cloud Azure サポート for Spring IntegrationService Bus メッセージ ヘッダーセクションを参照してください

Note

パーティション キーを設定する場合、メッセージ ヘッダーの優先度は Spring Cloud Stream プロパティよりも高くなります。 そのため、spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expressionいずれのヘッダーもServiceBusMessageHeaders#PARTITION_KEY構成されていないServiceBusMessageHeaders#SESSION_ID場合にのみ有効になります。

複数のバインダーのサポート

複数の Service Bus 名前空間への接続も、複数のバインダーを使用することによってサポートされます。 このサンプルでは、例として接続文字列を使用します。 サービス プリンシパルとマネージド ID の資格情報もサポートされています。ユーザーは、各バインダーの環境設定で関連プロパティを設定できます。

  1. ServiceBus の複数のバインダーを使用するには、application.yml ファイルで次のプロパティを構成します。

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Note

    前のアプリケーション ファイルは、すべてのバインドに対してアプリケーションの 1 つの既定のポーリングツールを構成する方法を示しています。 特定のバインディングに対して poller を構成する場合は、次のような spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000構成を使用できます。

  2. 2 つのサプライヤーと 2 つのコンシューマーを定義する必要がある

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

リソースのプロビジョニング

Service Bus バインダーでは、キュー、トピック、サブスクリプションのプロビジョニングがサポートされています。ユーザーは、次のプロパティを使用してプロビジョニングを有効にできます。

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Note

使用できる tenant-id 値は、次のとおりです。 commonorganizationsconsumers、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人アカウントと組織アカウント) を使用する」セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra ID でシングルテナント アプリをマルチテナントに変換する」を参照してください

サンプル

詳細については、GitHub の azure-spring-boot-samples リポジトリを参照してください。