Spring Cloud Azure サポート for Spring Integration
この記事の対象: ✔️ バージョン 4.14.0 ✔️ バージョン 5.8.0
Spring Integration Extension for Azure には、Azure SDK for Java によって提供されるさまざまなサービス用の Spring Integration アダプターが用意されています。 Event Hubs、Service Bus、Storage キューの各 Azure サービスに対する Spring Integration のサポートが提供されています。 サポート対象のアダプターの一覧は次のとおりです。
spring-cloud-azure-starter-integration-eventhubs
- 詳細については、「Spring と Azure Event Hubs の統合」を参照してください。spring-cloud-azure-starter-integration-servicebus
- 詳細については、「Spring と Azure Service Bus の統合」を参照してください。spring-cloud-azure-starter-integration-storage-queue
- 詳細については、「Spring と Azure Storage Queue の統合」を参照してください。
Spring と Azure Event Hubs の統合
主要な概念
Azure Event Hubs は、ビッグ データのストリーミング プラットフォームとなるイベント インジェスト サービスです。 1 秒間に何百万ものイベントを受信して処理することができます。 イベント ハブに送信されたデータは、任意のリアルタイム分析プロバイダーやバッチ処理/ストレージ アダプターを使用して、変換および保存できます。
Spring 統合を使用すると、Spring ベースのアプリケーション内で軽量のメッセージングが可能になり、宣言型アダプターを介した外部システムとの統合がサポートされます。 それらのアダプターは、リモート処理、メッセージング、スケジュール設定に対して Spring のサポートより高いレベルの抽象化を提供します。 Spring Integration for Event Hubs 拡張機能プロジェクトは、Azure Event Hubs の送受信チャネル アダプターとゲートウェイを提供します。
Note
RxJava サポート API はバージョン 4.0.0 から削除されます。 詳しくは Javadoc をご覧ください。
コンシューマー グループ
Event Hubs では、Apache Kafka と同様のコンシューマー グループのサポートが提供されますが、ロジックは若干異なります。 Kafka はコミットされたすべてのオフセットをブローカーに格納しますが、ユーザーは手動で処理される Event Hubs メッセージのオフセットを格納する必要があります。 Azure Storage 内にこのようなオフセットを格納する関数が、Event Hubs SDK で提供されています。
パーティション分割のサポート
Event Hubs には、Kafka と同様の物理パーティションの概念が用意されています。 ただし、Kafka によるコンシューマーとパーティションの間の自動再調整とは異なり、Event Hubs には一種のプリエンプティブ モードが用意されています。 ストレージ アカウントは、どのパーティションがどのコンシューマーによって所有されているかを判断するためのリースとして機能します。 新しいコンシューマーが起動すると、負荷の高いコンシューマーからいくつかのパーティションを盗んでワークロードの分散を実現しようとします。
負荷分散戦略を指定するため、開発者は構成に EventHubsContainerProperties
を使用できます。 EventHubsContainerProperties
を構成する方法の例については、後のセクションを参照してください。
Batch コンシューマーのサポート
EventHubsInboundChannelAdapter
はバッチ消費モードをサポートしています。 ユーザーがこれを有効にするには、EventHubsInboundChannelAdapter
インスタンスを構築するときに、リスナー モードを ListenerMode.BATCH
と指定できます。
有効にすると、バッチ処理されたイベントのリストがペイロードである Message が受信されて、ダウンストリーム チャネルに渡されます。 各メッセージ ヘッダーもリストとして変換されます。その内容は各イベントから解析された関連ヘッダー値です。 パーティション ID、チェックポイント、および最後にエンキューされたプロパティの共有ヘッダーの場合、イベントのバッチ全体で同じ値が共有されます。 詳細については、「Event Hubs メッセージ ヘッダー」セクションを参照してください。
Note
チェックポイント ヘッダーは、MANUAL チェックポイント モードが使用されている場合にのみ存在します。
バッチ コンシューマーのチェックポイント処理では、BATCH
と MANUAL
の 2 つのモードがサポートされています。 BATCH
モードは、受信されたイベントのバッチ全体を一緒にチェックポイント処理する自動チェックポイント モードです。 MANUAL
モードでは、ユーザーがイベントをチェックポイント処理します。 使用すると、Checkpointer がメッセージ ヘッダーに渡され、ユーザーはそれを使用してチェックポイント処理を行うことができます。
バッチ消費ポリシーは、プロパティ max-size
と max-wait-time
で指定できます。max-size
は必須プロパティですが、max-wait-time
は省略可能です。
バッチ消費戦略を指定するには、開発者は構成に EventHubsContainerProperties
を使用できます。 EventHubsContainerProperties
を構成する方法の例については、後のセクションを参照してください。
依存関係のセットアップ
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
構成
このスターターでは、構成オプションの次の 3 つの部分を提供します。
接続構成プロパティ
このセクションには、Azure Event Hubs への接続に使用される構成オプションが含まれます。
Note
セキュリティ プリンシパルを使用して、Azure リソースにアクセスするための Microsoft Entra ID による認証と承認を行う場合は、「Microsoft Entra ID によるアクセスの承認」を参照して、Azure リソースにアクセスするための十分なアクセス許可がセキュリティ プリンシパルに付与されていることを確認してください。
spring-cloud-azure-starter-integration-eventhubs の構成可能な接続プロパティ:
プロパティ | タイプ | 説明 |
---|---|---|
spring.cloud.azure.eventhubs.enabled | boolean | Azure Event Hubs が有効になっているかどうか。 |
spring.cloud.azure.eventhubs.connection-string | String | Event Hubs 名前空間の接続文字列の値。 |
spring.cloud.azure.eventhubs.namespace | String | Event Hubs 名前空間の値。これは FQDN のプレフィックスです。 FQDN は NamespaceName.DomainName で構成する必要があります |
spring.cloud.azure.eventhubs.domain-name | String | Azure Event Hubs 名前空間の値のドメイン名。 |
spring.cloud.azure.eventhubs.custom-endpoint-address | String | カスタム エンドポイント アドレス。 |
spring.cloud.azure.eventhubs.shared-connection | Boolean | 基になる EventProcessorClient と EventHubProducerAsyncClient で同じ接続を使用するかどうか。 既定では、作成される Event Hub クライアントごとに、新しい接続が構築されて使用されます。 |
チェックポイント構成プロパティ
このセクションには、パーティションの所有権とチェックポイントの情報を保持するために使用される、Storage Blob サービスの構成オプションが含まれます。
Note
バージョン 4.0.0 以降、spring.cloud.azure.eventhubs.processor.チェック のプロパティpoint-store.create-container-if-not-exists は手動で有効になっていません。ストレージ コンテナーは自動的に作成されません。
spring-cloud-azure-starter-integration-eventhubs の構成可能なチェックポイント プロパティ:
プロパティ | タイプ | 説明 |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Boolean | 存在しない場合にコンテナーの作成を許可するかどうか。 |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | String | ストレージ アカウントの名前。 |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | String | ストレージ アカウント アクセス キー。 |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | String | ストレージ コンテナー名。 |
一般的な Azure Service SDK 構成オプションは、Storage Blob チェックポイント ストアに対しても構成できます。 サポートされている構成オプションは Spring Cloud Azure 構成で導入され、統合プレフィックスspring.cloud.azure.
またはspring.cloud.azure.eventhubs.processor.checkpoint-store
プレフィックスを使用して構成できます。
イベント ハブ プロセッサの構成プロパティ
EventHubsInboundChannelAdapter
は EventProcessorClient
を使用してイベント ハブからのメッセージを使用し、EventProcessorClient
の全体的なプロパティを構成します。開発者は、EventHubsContainerProperties
を構成に使用できます。 EventHubsInboundChannelAdapter
の使用方法については、後のセクションを参照してください。
基本的な使用方法
Azure Event Hubs にメッセージを送信する
資格情報の構成オプションを入力します。
接続文字列資格情報の場合は、application.yml ファイルで次のプロパティを構成します。
spring: cloud: azure: eventhubs: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
資格情報をマネージド ID として使用するには、application.yml ファイルで次のプロパティを構成します。
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_SERVICE_BUS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
サービス プリンシパルとしての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_SERVICE_BUS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Note
使用できる tenant-id
値は、次のとおりです。 common
、 organizations
、 consumers
、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人アカウントと組織アカウント) を使用する」セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra ID でシングルテナント アプリをマルチテナントに変換する」を参照してください。
Bean を使用して
EventHubsTemplate
作成DefaultMessageHandler
し、Event Hubs にメッセージを送信します。class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
メッセージ チャネルを使用し、上のメッセージ ハンドラーでメッセージ ゲートウェイ バインドを作成します。
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
ゲートウェイを使用してメッセージを送信します。
class Demo { public void demo() { this.messagingGateway.send(message); } }
Azure Event Hubs からのメッセージを受信する
資格情報の構成オプションを入力します。
入力チャネルとしてメッセージ チャネルの Bean を作成します。
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
Bean を使用して
EventHubsMessageListenerContainer
作成EventHubsInboundChannelAdapter
し、Event Hubs からメッセージを受信します。@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
前に作成したメッセージ チャネルを使用して、EventHubsInboundChannelAdapter でメッセージ レシーバー バインドを作成します。
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
EventHubsMessageConverter を構成して objectMapper をカスタマイズする
ユーザーが ObjectMapper をカスタマイズできるように、EventHubsMessageConverter
は構成可能な Bean として作成されます。
Batch コンシューマーのサポート
Event Hubs からのメッセージのバッチでの使用は、上記のサンプルと似ていますが、ユーザーは EventHubsInboundChannelAdapter
のバッチ消費関連の構成オプションを設定する必要があります。
EventHubsInboundChannelAdapter
を作成するときは、リスナー モードを BATCH
に設定する必要があります。 EventHubsMessageListenerContainer
の Bean を作成するときは、チェックポイント モードを MANUAL
または BATCH
として設定し、バッチ オプションを必要に応じて構成できます。
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
Event Hubs メッセージ ヘッダー
次の表では、Event Hubs のメッセージ プロパティを Spring メッセージ ヘッダーにマップする方法を示します。 Azure Event Hubs では、メッセージは event
として呼び出されます。
レコード リスナー モードでの、Event Hubs のメッセージおよびイベントのプロパティと、Spring のメッセージ ヘッダーのマッピング:
Event Hubs イベントのプロパティ | Spring Message ヘッダー定数 | 型 | 説明 |
---|---|---|---|
エンキューされた時刻 | EventHubsHeaders#ENQUEUED_TIME | 即時 | イベントがイベント ハブ パーティションにエンキューされた日時 (UTC)。 |
オフセット | EventHubsHeaders#OFFSET | Long | 関連付けられたイベント ハブ パーティションからイベントが受信されたときのオフセット。 |
パーティション キー | AzureHeaders#PARTITION_KEY | String | 最初にイベントを発行するときに設定された場合は、パーティション ハッシュ キー。 |
パーティション ID | AzureHeaders#RAW_PARTITION_ID | String | イベント ハブのパーティション ID。 |
Sequence number | EventHubsHeaders#SEQUENCE_NUMBER | Long | 関連付けられたイベント ハブ パーティションにエンキューされたときにイベントに割り当てられたシーケンス番号。 |
最後にエンキューされたイベント プロパティ | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | このパーティション内の最後にエンキューされたイベントのプロパティ。 |
NA | AzureHeaders#CHECKPOINTER | Checkpointer | 特定のメッセージをチェックポイント処理するためのヘッダー。 |
ユーザーは、各イベントの関連情報のメッセージ ヘッダーを解析できます。 イベントのメッセージ ヘッダーを設定するために、カスタマイズされたすべてのヘッダーがイベントのアプリケーション プロパティとして配置されます。ここで、ヘッダーはプロパティ キーとして設定されます。 Event Hubs からイベントを受信すると、すべてのアプリケーション プロパティがメッセージ ヘッダーに変換されます。
Note
パーティション キー、エンキューされた時刻、オフセット、シーケンス番号のメッセージ ヘッダーは、手動で設定することはできません。
バッチ コンシューマー モードを有効にすると、バッチ処理されたメッセージの特定のヘッダーが次のように表示されます。これには、各 Event Hubs イベントの値の一覧が含まれます。
バッチ リスナー モードでの、Event Hubs のメッセージおよびイベントのプロパティと、Spring のメッセージ ヘッダーのマッピング:
Event Hubs イベントのプロパティ | Spring Batch メッセージ ヘッダー定数 | 型 | 説明 |
---|---|---|---|
エンキューされた時刻 | EventHubsHeaders#ENQUEUED_TIME | インスタントの一覧 | 各イベントがイベント ハブ パーティションにエンキューされたときの日時 (UTC) の一覧。 |
オフセット | EventHubsHeaders#OFFSET | 長の一覧 | 関連付けられたイベント ハブ パーティションから受信された各イベントのオフセットの一覧。 |
パーティション キー | AzureHeaders#PARTITION_KEY | 文字列の一覧 | 最初に各イベントを発行するときに設定された場合は、パーティション ハッシュ キーの一覧。 |
Sequence number | EventHubsHeaders#SEQUENCE_NUMBER | 長の一覧 | 関連付けられたイベント ハブ パーティションにエンキューされたときに各イベントに割り当てられたシーケンス番号の一覧。 |
System properties (システムのプロパティ) | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | マップの一覧 | 各イベントのシステム プロパティの一覧。 |
Application properties | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | マップの一覧 | カスタマイズされたすべてのメッセージ ヘッダーまたはイベント プロパティが配置される各イベントのアプリケーション プロパティの一覧。 |
Note
メッセージを発行するとき、上のすべてのバッチ ヘッダーが、メッセージから削除されます (存在する場合)。
サンプル
詳細については、GitHub の azure-spring-boot-samples リポジトリを参照してください。
Spring と Azure Service Bus の統合
主要な概念
Spring 統合を使用すると、Spring ベースのアプリケーション内で軽量のメッセージングが可能になり、宣言型アダプターを介した外部システムとの統合がサポートされます。
Spring Integration for Azure Service Bus 拡張プロジェクトでは、Azure Service Bus の送受信チャネル アダプターが提供されます。
Note
CompletableFuture サポート API はバージョン 2.10.0 から非推奨になり、バージョン 4.0.0 から Reactor Core に置き換えられています。 詳しくは Javadoc をご覧ください。
依存関係のセットアップ
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
構成
このスターターでは、構成オプションの次の 2 つの部分を提供します。
接続構成プロパティ
このセクションには、Azure Service Bus への接続に使用される構成オプションが含まれます。
Note
セキュリティ プリンシパルを使用して、Azure リソースにアクセスするための Microsoft Entra ID による認証と承認を行う場合は、「Microsoft Entra ID によるアクセスの承認」を参照して、Azure リソースにアクセスするための十分なアクセス許可がセキュリティ プリンシパルに付与されていることを確認してください。
spring-cloud-azure-starter-integration-servicebus の構成可能な接続プロパティ:
プロパティ | タイプ | 説明 |
---|---|---|
spring.cloud.azure.servicebus.enabled | boolean | Azure Service Bus が有効になっているかどうか。 |
spring.cloud.azure.servicebus.connection-string | String | Service Bus 名前空間の接続文字列の値。 |
spring.cloud.azure.servicebus.namespace | String | Service Bus 名前空間の値。これは FQDN のプレフィックスです。 FQDN は NamespaceName.DomainName で構成する必要があります |
spring.cloud.azure.servicebus.domain-name | String | Azure Service Bus 名前空間の値のドメイン名。 |
Service Bus プロセッサの構成プロパティ
ServiceBusInboundChannelAdapter
は ServiceBusProcessorClient
を使用してメッセージを消費し、ServiceBusProcessorClient
の全体的なプロパティを構成します。開発者は、ServiceBusContainerProperties
を構成に使用できます。 ServiceBusInboundChannelAdapter
の使用方法については、後のセクションを参照してください。
基本的な使用方法
Azure Service Bus にメッセージを送信する
資格情報の構成オプションを入力します。
接続文字列資格情報の場合は、application.yml ファイルで次のプロパティを構成します。
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
資格情報をマネージド ID として使用するには、application.yml ファイルで次のプロパティを構成します。
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Note
使用できる tenant-id
値は、次のとおりです。 common
、 organizations
、 consumers
、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人アカウントと組織アカウント) を使用する」セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra ID でシングルテナント アプリをマルチテナントに変換する」を参照してください。
サービス プリンシパルとしての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Note
使用できる tenant-id
値は、次のとおりです。 common
、 organizations
、 consumers
、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人アカウントと組織アカウント) を使用する」セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra ID でシングルテナント アプリをマルチテナントに変換する」を参照してください。
メッセージを
ServiceBusTemplate
Service Bus に送信する Bean を使用して作成DefaultMessageHandler
し、ServiceBusTemplate のエンティティ型を設定します。 このサンプルでは、例として Service Bus キューを使用します。class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
メッセージ チャネルを使用し、上のメッセージ ハンドラーでメッセージ ゲートウェイ バインドを作成します。
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
ゲートウェイを使用してメッセージを送信します。
class Demo { public void demo() { this.messagingGateway.send(message); } }
Azure Service Bus からメッセージを受信する
資格情報の構成オプションを入力します。
入力チャネルとしてメッセージ チャネルの Bean を作成します。
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Service Bus にメッセージを
ServiceBusMessageListenerContainer
受信する Bean を使用して作成ServiceBusInboundChannelAdapter
します。 このサンプルでは、例として Service Bus キューを使用します。@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
前に作成したメッセージ チャネルを
ServiceBusInboundChannelAdapter
使用して、メッセージ レシーバー バインドを作成します。class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
objectMapper をカスタマイズするように ServiceBusMessageConverter を構成する
ServiceBusMessageConverter
は、ユーザーがカスタマイズ ObjectMapper
できるように構成可能な Bean として作成されます。
Service Bus のメッセージ ヘッダー
複数の Spring ヘッダー定数にマップできる一部の Service Bus ヘッダーについては、異なる Spring ヘッダーの優先順位が一覧表示されます。
Service Bus のヘッダーと Spring のヘッダーの間のマッピング:
Service Bus メッセージのヘッダーとプロパティ | Spring メッセージ ヘッダー定数 | Type | コンフィギュレーション可能 | 説明 |
---|---|---|---|---|
コンテンツ タイプ | MessageHeaders#CONTENT_TYPE |
String | はい | メッセージの RFC2045 Content-Type 記述子。 |
関連付け ID | ServiceBusMessageHeaders#CORRELATION_ID |
String | はい | メッセージの関連付け ID。 |
メッセージ ID | ServiceBusMessageHeaders#MESSAGE_ID |
String | はい | メッセージのメッセージ ID。このヘッダーは MessageHeaders#ID より高い優先度です。 |
メッセージ ID | MessageHeaders#ID |
UUID | はい | メッセージのメッセージ ID。このヘッダーは ServiceBusMessageHeaders#MESSAGE_ID より低い優先度です。 |
パーティション キー | ServiceBusMessageHeaders#PARTITION_KEY |
String | はい | パーティション分割されたエンティティにメッセージを送信するためのパーティション キー。 |
返信先 | MessageHeaders#REPLY_CHANNEL |
String | はい | 返信の送信先のエンティティのアドレス。 |
返信先セッション ID | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
String | はい | メッセージの ReplyToGroupId プロパティの値。 |
スケジュールされたエンキュー日時 (UTC) | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | はい | メッセージを Service Bus にエンキューする必要がある日時。このヘッダーは AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE より高い優先度です。 |
スケジュールされたエンキュー日時 (UTC) | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
整数型 | はい | メッセージを Service Bus にエンキューする必要がある日時。このヘッダーは ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME より低い優先度です。 |
セッション ID | ServiceBusMessageHeaders#SESSION_ID |
String | はい | セッション対応エンティティのセッション IDentifier。 |
Time to Live | ServiceBusMessageHeaders#TIME_TO_LIVE |
Duration | はい | このメッセージの有効期限が切れるまでの時間。 |
受信先 | ServiceBusMessageHeaders#TO |
String | はい | メッセージの "送信先" アドレス。ルーティング シナリオでの将来の使用のために予約されており、現在はブローカー自体で無視されます。 |
情報カテゴリ | ServiceBusMessageHeaders#SUBJECT |
String | はい | メッセージの件名。 |
配信不能エラーの説明 | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
String | いいえ | 配信不能にされたメッセージの説明。 |
配信不能の理由 | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
String | いいえ | メッセージが配信不能になった理由。 |
配信不能メッセージのソース | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
String | いいえ | メッセージが配信不能になったエンティティ。 |
配信回数 | ServiceBusMessageHeaders#DELIVERY_COUNT |
long | いいえ | このメッセージがクライアントに配信された回数。 |
エンキューされたシーケンス番号 | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
long | いいえ | Service Bus によってメッセージに割り当てられるエンキューされたシーケンス番号。 |
エンキューされた時刻 | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | いいえ | このメッセージが Service Bus にエンキューされた日時。 |
有効期限 | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | いいえ | このメッセージの有効期限が切れる日時。 |
ロック トークン | ServiceBusMessageHeaders#LOCK_TOKEN |
String | いいえ | 現在のメッセージのロック トークン。 |
ロック期限 | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | いいえ | このメッセージのロックの有効期限が切れる日時。 |
Sequence number | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
long | いいえ | Service Bus によってメッセージに割り当てられる一意の番号。 |
状態 | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | いいえ | メッセージの状態。Active、Deferred、または Scheduled です。 |
パーティション キーのサポート
このスターターは、メッセージ ヘッダーでパーティション キーとセッション ID を設定できるようにすることで、Service Bus のパーティション分割をサポートします。 このセクションでは、メッセージのパーティション キーを設定する方法について説明します。
推奨: 。ヘッダーのキーとして ServiceBusMessageHeaders.PARTITION_KEY
を使用します。
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
AzureHeaders.PARTITION_KEY
はヘッダーのキーとして推奨されませんが、現在サポートされています。
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Note
ServiceBusMessageHeaders.PARTITION_KEY
と AzureHeaders.PARTITION_KEY
の両方がメッセージ ヘッダーに設定されている場合は、 ServiceBusMessageHeaders.PARTITION_KEY
が優先されます。
セッションのサポート
この例では、アプリケーションでメッセージのセッション ID を手動で設定する方法を示します。
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Note
ServiceBusMessageHeaders.SESSION_ID
メッセージ ヘッダーに設定され、別ServiceBusMessageHeaders.PARTITION_KEY
のヘッダーも設定されている場合は、最終的にセッション ID の値を使用してパーティション キーの値が上書きされます。
サンプル
詳細については、GitHub の azure-spring-boot-samples リポジトリを参照してください。
Spring と Azure Storage Queue の統合
主要な概念
Azure Queue storage は、多数のメッセージを格納するためのサービスです。 メッセージには、HTTP または HTTPS を使用して、認証された呼び出しを介して世界中のどこからでもアクセスできます。 キュー メッセージの許容される最大サイズは 64 KB です。 キューには、ストレージ アカウントの総容量の上限を超えない限り、数百万のメッセージを含めることができます。 キューは通常、非同期的な処理用に作業のバックログを作成するために使用されます。
依存関係のセットアップ
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
構成
このスターターでは、次の構成オプションを提供します。
接続構成プロパティ
このセクションには、Azure Storage キューへの接続に使用される構成オプションが含まれます。
Note
セキュリティ プリンシパルを使用して、Azure リソースにアクセスするための Microsoft Entra ID による認証と承認を行う場合は、「Microsoft Entra ID によるアクセスの承認」を参照して、Azure リソースにアクセスするための十分なアクセス許可がセキュリティ プリンシパルに付与されていることを確認してください。
spring-cloud-azure-starter-integration-storage-queue の構成可能な接続プロパティ:
プロパティ | タイプ | 説明 |
---|---|---|
spring.cloud.azure.storage.queue.enabled | boolean | Azure Storage キューが有効かどうか。 |
spring.cloud.azure.storage.queue.connection-string | String | Storage キュー名前空間の接続文字列の値。 |
spring.cloud.azure.storage.queue.accountName | String | Storage キューのアカウント名。 |
spring.cloud.azure.storage.queue.accountKey | String | Storage キューのアカウント キー。 |
spring.cloud.azure.storage.queue.endpoint | String | Storage キュー サービスのエンドポイント。 |
spring.cloud.azure.storage.queue.sasToken | String | SAS トークンの資格情報 |
spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | QueueServiceVersion は API 要求を行うときに使用されます。 |
spring.cloud.azure.storage.queue.messageEncoding | String | キュー メッセージのエンコード。 |
基本的な使用方法
Azure Storage キューにメッセージを送信する
資格情報の構成オプションを入力します。
接続文字列資格情報の場合は、application.yml ファイルで次のプロパティを構成します。
spring: cloud: azure: storage: queue: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
資格情報をマネージド ID として使用するには、application.yml ファイルで次のプロパティを構成します。
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Note
使用できる tenant-id
値は、次のとおりです。 common
、 organizations
、 consumers
、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人アカウントと組織アカウント) を使用する」セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra ID でシングルテナント アプリをマルチテナントに変換する」を参照してください。
サービス プリンシパルとしての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Note
使用できる tenant-id
値は、次のとおりです。 common
、 organizations
、 consumers
、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人アカウントと組織アカウント) を使用する」セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra ID でシングルテナント アプリをマルチテナントに変換する」を参照してください。
Bean を使用して
StorageQueueTemplate
作成DefaultMessageHandler
し、ストレージ キューにメッセージを送信します。class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
メッセージ チャネルを使用し、上のメッセージ ハンドラーでメッセージ ゲートウェイ バインドを作成します。
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
ゲートウェイを使用してメッセージを送信します。
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
Azure Storage キューからメッセージを受信する
資格情報の構成オプションを入力します。
入力チャネルとしてメッセージ チャネルの Bean を作成します。
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Bean を使用して
StorageQueueTemplate
作成StorageQueueMessageSource
し、ストレージ キューにメッセージを受信します。class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
前に作成したメッセージ チャネルを使用し、最後のステップで作成した StorageQueueMessageSource でメッセージ レシーバー バインドを作成します。
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
サンプル
詳細については、GitHub の azure-spring-boot-samples リポジトリを参照してください。