Azure Service Bus トピックへのメッセージ送信とトピックのサブスクリプションからのメッセージ受信 (Java)
このクイックスタートでは、Azure Service Bus トピックにメッセージを送信してそのトピックのサブスクリプションからメッセージを受信する Java コードを、azure-messaging-servicebus パッケージを使用して作成します。
注意
このクイック スタートでは、メッセージのバッチを Service Bus トピックに送信し、それらのメッセージをトピックのサブスクリプションから受信するという単純なシナリオの手順を説明します。 Azure Service Bus の事前構築済みの Java サンプルが、GitHub の Azure SDK for Java リポジトリに用意されています。
ヒント
Spring アプリケーションで Azure Service Bus リソースを操作している場合は、Spring Cloud Azure を代替手段として検討することをお勧めします。 Spring Cloud Azure は、Spring と Azure サービスのシームレスな統合を実現するオープンソース プロジェクトです。 Spring Cloud Azure の詳細と Service Bus の使用例については、「Azure Service Bus を使用する Spring Cloud Stream with」を参照してください。
前提条件
- Azure サブスクリプション。 このチュートリアルを完了するには、Azure アカウントが必要です。 Visual Studio または MSDN のサブスクライバー特典を有効にするか、無料アカウントにサインアップしてください。
- Azure SDK for Java をインストールします。 Eclipse を使用している場合は、Azure SDK for Java が含まれている Azure Toolkit for Eclipse をインストールできます。 これで Microsoft Azure Libraries for Java をプロジェクトに追加できます。 IntelliJ を使用している場合は、Azure Toolkit for IntelliJ のインストールに関するページを参照してください。
Azure Portal での名前空間の作成
Azure の Service Bus メッセージング エンティティを使用するには、Azure 全体で一意となる名前を備えた名前空間を最初に作成しておく必要があります。 名前空間により、ご利用のアプリケーション内に Service Bus リソース (キュー、トピックなど) 用のスコープ コンテナーが提供されます。
名前空間を作成するには:
Azure portal にサインインします。
[すべてのサービス] ページに移動します。
左側のナビゲーション バーで、カテゴリの一覧から [統合] を選択し、[Service Bus] 上にマウス ポインターを置き、[Service Bus] タイルの [+] ボタンを選択します。
[名前空間の作成] ページの [基本] タブで、こちらの手順を実行します。
[サブスクリプション] で、名前空間を作成する Azure サブスクリプションを選択します。
[リソース グループ] では、既存のリソース グループを選択するか、新しいリソース グループを作成します。
名前空間の名前を入力します。 名前空間名は次の名前付け規則に従う必要があります。
- この名前は Azure 全体で一意である必要があります。 その名前が使用できるかどうかがすぐに自動で確認されます。
- 名前の長さは 6 ~ 50 文字である。
- 名前には文字、数字、ハイフン
-
のみを含めることができます。 - 名前の先頭は文字、末尾は文字または数字にする必要があります。
- 名前の末尾を
-sb
または-mgmt
にすることはできません。
[場所] で、名前空間をホストするリージョンを選択します。
[価格レベル] で、名前空間の価格レベル (Basic、Standard、Premium) を選択します。 このクイック スタートでは、 [Standard] を選択します。
Premium レベルを選択した場合は、名前空間に対して geo レプリケーションを有効にできるかどうかを選択します。 geo レプリケーション機能を使用すると、名前空間のメタデータとデータが、プライマリ Azure リージョンから 1 つ以上のセカンダリ Azure リージョンに、継続的にレプリケートされることが保証されます。
重要
トピックとサブスクリプションを使用する場合は、Standard または Premium を選択してください。 Basic 価格レベルでは、トピックとサブスクリプションはサポートされていません。
[Premium] 価格レベルを選択した場合は、メッセージング ユニットの数を指定します。 Premium レベルでは、各ワークロードが分離した状態で実行されるように、CPU とメモリのレベルでリソースが分離されます。 このリソースのコンテナーをメッセージング ユニットと呼びます。 Premium 名前空間には、少なくとも 1 つのメッセージング ユニットがあります。 Service Bus の Premium 名前空間ごとに、1 個、2 個、4 個、8 個、または 16 個のメッセージング ユニットを選択できます。 詳細については、Service Bus の Premium メッセージングに関するページをご覧ください。
ページ下部にある [確認と作成] を選択します。
[確認および作成] ページで、設定を確認し、 [作成] を選択します。
リソースのデプロイが成功したら、デプロイ ページで [リソースに移動] を選択します。
Service Bus 名前空間のホーム ページが表示されます。
Azure Portal を使用したトピックの作成
[Service Bus 名前空間] ページで、左側のナビゲーション メニューの [エンティティ] を展開し、左側のメニューで [トピック] を選びます。
ツール バーの [+ トピック] を選択します。
トピックの名前を入力します。 他のオプションは既定値のままにしてください。
[作成] を選択します。
トピックに対するサブスクリプションの作成
前のセクションで作成したトピックを選択します。
[Service Bus トピック] ページで、ツール バーの [+ サブスクリプション] を選択します。
[サブスクリプションの作成] ページで、次の手順に従います。
サブスクリプションの名前として「S1」と入力します。
[最大配信数] に「3」と入力します。
次に、 [作成] を選択してサブスクリプションを作成します。
Azure に対してアプリを認証する
このクイック スタートでは、Azure Service Bus に接続する 2 つの方法である、パスワードレスと接続文字列について説明します。
最初のオプションでは、Microsoft Entra ID とロールベースのアクセス制御 (RBAC) でセキュリティ プリンシパルを使用して Service Bus 名前空間に接続する方法を示します。 コードや構成ファイル、または Azure Key Vault などのセキュリティで保護されたストレージに、ハードコーディングされた接続文字列を含める心配はありません。
2 番目のオプションでは、接続文字列を使用して Service Bus 名前空間に接続する方法を示します。 Azure を初めて使用する場合は、接続文字列オプションの方が理解しやすいかもしれません。 実際のアプリケーションと運用環境では、パスワードレス オプションを使用することをお勧めします。 詳細については、「認証と承認」を参照してください。 パスワードレス認証の詳細については、概要ページを参照してください。
Microsoft Entra ユーザーにロールを割り当てる
ローカルでの開発時には、Azure Service Bus に接続するユーザー アカウントに正しいアクセス許可があることを確認してください。 メッセージを送受信するには、Azure Service Bus データ所有者ロールが必要です。 このロールを自分に割り当てるには、ユーザー アクセス管理者ロール、または Microsoft.Authorization/roleAssignments/write
アクションを含む別のロールが必要です。 Azure portal、Azure CLI、または Azure PowerShell を使用して、ユーザーに Azure RBAC ロールを割り当てることができます。 ロールの割り当てに使用できるスコープの詳細は、スコープの概要ページを参照してください。
次の例では、ユーザー アカウントに、Azure Service Bus Data Owner
ロールを割り当てます。これにより、Azure Service Bus リソースへのフル アクセスが提供されます。 実際のシナリオでは、より安全な運用環境を実現するため、最小限の特権の原則に従って、必要な最小限のアクセス許可のみをユーザーに付与します。
Azure Service Bus 用の Azure 組み込みロール
Azure Service Bus の場合、名前空間およびそれに関連するすべてのリソースの Azure portal および Azure リソース管理 API による管理は、Azure RBAC モデルを使って既に保護されています。 Azure では、Service Bus 名前空間へのアクセスを承認するための次の Azure 組み込みロールが提供されています。
- Azure Service Bus データ所有者:Service Bus 名前空間とそのエンティティ (キュー、トピック、サブスクリプション、およびフィルター) へのデータ アクセスが可能です。 このロールのメンバーは、キューまたはトピックやサブスクリプションとの間でメッセージを送受信できます。
- Azure Service Bus データ送信者: このロールを使用して、Service Bus 名前空間とそのエンティティへの送信アクセスを許可します。
- Azure Service Bus データ受信者: このロールを使用して、Service Bus 名前空間とそのエンティティへの受信アクセスを許可します。
カスタム ロールを作成する場合は、Service Bus 操作に必要な権限に関するページを参照してください。
Microsoft Entra ユーザーを Azure Service Bus 所有者ロールに追加する
Microsoft Entra ユーザー名を、Service Bus 名前空間レベルの Azure Service Bus データ所有者ロール に追加します。 これにより、ユーザー アカウントのコンテキストで実行されているアプリがキューまたはトピックにメッセージを送信し、キューまたはトピックのサブスクリプションからメッセージを受信できるようになります。
重要
ほとんどの場合、ロールの割り当てが Azure に反映されるまでの時間は 1、2 分です。 まれに、最大 8 分かかる場合があります。 初めてコードを実行したときに認証エラーを受け取る場合は、しばらく待ってから再試行してください。
Azure portal で Service Bus 名前空間ページが開いていない場合は、メイン検索バーまたは左側のナビゲーションを使用して Service Bus 名前空間を見つけます。
概要ページで、左側のメニューから [アクセス制御 (IAM)] を選択します。
[アクセス制御 (IAM)] ページで、[ロールの割り当て] タブを選びます。
上部のメニューから [+ 追加] を選択し、次に結果のドロップダウン メニューから [ロールの割り当ての追加] を選択します。
検索ボックスを使って、結果を目的のロールに絞り込みます。 この例では、
Azure Service Bus Data Owner
を検索して一致する結果を選択します。 [次へ] を選びます。[アクセスの割り当て先] で、[ユーザー、グループ、またはサービス プリンシパル] を選び、[+ メンバーの選択] を選びます。
ダイアログで、自分の Microsoft Entra ユーザー名 (通常は user@domain メール アドレス) を検索し、ダイアログの下部にある [選択] を選びます。
[レビューと割り当て] を選んで最終ページに移動し、もう一度 [レビューと割り当て] を行ってプロセスを完了します。
メッセージをトピックに送信する
このセクションでは、Java コンソール プロジェクトを作成し、既に作成してあるトピックにメッセージを送信するコードを追加します。
Java コンソール プロジェクトを作成する
Eclipse または任意のツールを使用して Java プロジェクトを作成します。
Service Bus を使用するようにアプリケーションを構成する
Azure Core ライブラリおよび Azure Service Bus ライブラリへの参照を追加します。
Eclipse を使用して Java コンソール アプリケーションを作成した場合は、Java プロジェクトを Maven に変換します。[パッケージ エクスプローラー] ウィンドウでプロジェクトを右クリックし、[構成]->[Maven プロジェクトへの変換] を選択します。 その後、これら 2 つのライブラリへの依存関係を追加します。その例を次に示します。
pom.xml
ファイルを更新して、Azure Service Bus と Azure ID パッケージに依存関係を追加します。
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.13.3</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
トピックにメッセージを送信するためのコードを追加する
次の
import
ステートメントを Java ファイルの冒頭に追加します。import com.azure.messaging.servicebus.*; import com.azure.identity.*; import java.util.concurrent.TimeUnit; import java.util.Arrays; import java.util.List;
クラスで、接続文字列を保持する変数 (パスワードレス シナリオでは不要)、トピック名、サブスクリプション名を定義します。
static String topicName = "<TOPIC NAME>"; static String subName = "<SUBSCRIPTION NAME>";
重要
<TOPIC NAME>
をトピックの名前に置き換え、<SUBSCRIPTION NAME>
をトピックのサブスクリプションの名前に置き換えます。1 つのメッセージをトピックに送信するためのメソッドを、
sendMessage
という名前でこのクラスに追加します。重要
NAMESPACENAME
を、実際の Service Bus 名前空間の名前に置き換えます。static void sendMessage() { // create a token using the default Azure credential DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() .build(); ServiceBusSenderClient senderClient = new ServiceBusClientBuilder() .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net") .credential(credential) .sender() .topicName(topicName) .buildClient(); // send one message to the topic senderClient.sendMessage(new ServiceBusMessage("Hello, World!")); System.out.println("Sent a single message to the topic: " + topicName); }
一連のメッセージを作成するためのメソッドを、
createMessages
という名前でこのクラスに追加します。 通常、これらのメッセージはアプリケーションのさまざまな部分から届きます。 ここでは、次のように一連のサンプル メッセージを作成します。static List<ServiceBusMessage> createMessages() { // create a list of messages and return it to the caller ServiceBusMessage[] messages = { new ServiceBusMessage("First message"), new ServiceBusMessage("Second message"), new ServiceBusMessage("Third message") }; return Arrays.asList(messages); }
作成したトピックにメッセージを送信するためのメソッドを、
sendMessageBatch
という名前で追加します。 このメソッドは、トピックのServiceBusSenderClient
を作成し、createMessages
メソッドを呼び出して一連のメッセージを取得した後、1 つまたは複数のバッチを用意して、そのバッチをトピックに送信します。重要
NAMESPACENAME
を、実際の Service Bus 名前空間の名前に置き換えます。static void sendMessageBatch() { // create a token using the default Azure credential DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() .build(); ServiceBusSenderClient senderClient = new ServiceBusClientBuilder() .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net") .credential(credential) .sender() .topicName(topicName) .buildClient(); // Creates an ServiceBusMessageBatch where the ServiceBus. ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch(); // create a list of messages List<ServiceBusMessage> listOfMessages = createMessages(); // We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when // the batch can hold no more messages. Create a new batch for next set of messages and repeat until all // messages are sent. for (ServiceBusMessage message : listOfMessages) { if (messageBatch.tryAddMessage(message)) { continue; } // The batch is full, so we create a new batch and send the batch. senderClient.sendMessages(messageBatch); System.out.println("Sent a batch of messages to the topic: " + topicName); // create a new batch messageBatch = senderClient.createMessageBatch(); // Add that message that we couldn't before. if (!messageBatch.tryAddMessage(message)) { System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes()); } } if (messageBatch.getCount() > 0) { senderClient.sendMessages(messageBatch); System.out.println("Sent a batch of messages to the topic: " + topicName); } //close the client senderClient.close(); }
サブスクリプションからメッセージを受信する
このセクションでは、トピックのサブスクリプションからメッセージを取得するコードを追加します。
サブスクリプションからメッセージを受信するメソッドを、
receiveMessages
という名前で追加します。 このメソッドは、メッセージを処理するためのハンドラーとエラーを処理するためのハンドラーを指定してサブスクリプションのServiceBusProcessorClient
を作成します。 次に、プロセッサを起動して数秒間待機し、受信したメッセージを出力した後、プロセッサを停止して終了します。重要
NAMESPACENAME
を、実際の Service Bus 名前空間の名前に置き換えます。- コードに出現する
ServiceBusTopicTest::processMessage
のServiceBusTopicTest
の部分は、実際のクラスの名前に置き換えてください。
// handles received messages static void receiveMessages() throws InterruptedException { DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() .build(); // Create an instance of the processor through the ServiceBusClientBuilder ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder() .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net") .credential(credential) .processor() .topicName(topicName) .subscriptionName(subName) .processMessage(context -> processMessage(context)) .processError(context -> processError(context)) .buildProcessorClient(); System.out.println("Starting the processor"); processorClient.start(); TimeUnit.SECONDS.sleep(10); System.out.println("Stopping and closing the processor"); processorClient.close(); }
Service Bus サブスクリプションから受信したメッセージを処理するための
processMessage
メソッドを追加します。private static void processMessage(ServiceBusReceivedMessageContext context) { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(), message.getSequenceNumber(), message.getBody()); }
エラー メッセージを処理するための
processError
メソッドを追加します。private static void processError(ServiceBusErrorContext context) { System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n", context.getFullyQualifiedNamespace(), context.getEntityPath()); if (!(context.getException() instanceof ServiceBusException)) { System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException()); return; } ServiceBusException exception = (ServiceBusException) context.getException(); ServiceBusFailureReason reason = exception.getReason(); if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND || reason == ServiceBusFailureReason.UNAUTHORIZED) { System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n", reason, exception.getMessage()); } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) { System.out.printf("Message lock lost for message: %s%n", context.getException()); } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) { try { // Choosing an arbitrary amount of time to wait until trying again. TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { System.err.println("Unable to sleep for period of time"); } } else { System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(), reason, context.getException()); } }
sendMessage
、sendMessageBatch
、receiveMessages
の各メソッドを呼び出し、InterruptedException
をスローするようにmain
メソッドを更新します。public static void main(String[] args) throws InterruptedException { sendMessage(); sendMessageBatch(); receiveMessages(); }
アプリを実行する
プログラムを実行すると、次の出力に似た出力が表示されます。
Eclipse を使用している場合は、プロジェクトを右クリックし、[エクスポート] を選択し、[Java] を展開して、[実行可能 JAR ファイル] を選択して、実行可能 JAR ファイルを作成する手順に従います。
Azure Service Bus データ所有者ロールに追加されたユーザー アカウントとは異なるユーザー アカウントを使ってマシンにサインインしている場合は、以下の手順に従います。 それ以外の場合は、この手順をスキップし、次の手順で Jar ファイルを実行します。
お使いのマシンに Azure CLI をインストールします。
次の CLI コマンドを使用して、Azure にサインインします。 Azure Service Bus データ所有者ロールに追加したのと同じユーザー アカウントを使用します。
az login
次のコマンドを使用して Jar ファイルを実行します。
java -jar <JAR FILE NAME>
コンソール ウィンドウに次の出力が表示されます。
Sent a single message to the topic: mytopic Sent a batch of messages to the topic: mytopic Starting the processor Processing message. Session: e0102f5fbaf646988a2f4b65f7d32385, Sequence #: 1. Contents: Hello, World! Processing message. Session: 3e991e232ca248f2bc332caa8034bed9, Sequence #: 2. Contents: First message Processing message. Session: 56d3a9ea7df446f8a2944ee72cca4ea0, Sequence #: 3. Contents: Second message Processing message. Session: 7bd3bd3e966a40ebbc9b29b082da14bb, Sequence #: 4. Contents: Third message
Azure portal の Service Bus 名前空間の [概要] ページで、受信メッセージ数と送信メッセージ数を確認できます。 1 分ほど待ってからページを更新すると、最新の値が表示されます。
下部中央のペインの [トピック] タブに切り替えてトピックを選択すると、そのトピックの [Service Bus トピック] ページが表示されます。 このページの [メッセージ] グラフに 4 つの受信メッセージと 4 つの送信メッセージが確認できると思います。
main
メソッドの receiveMessages
呼び出しをコメント アウトして再度アプリを実行した場合、 [Service Bus トピック] ページには、8 つの受信メッセージ (うち 4 つは新規) が表示されますが、送信メッセージは 4 つと表示されます。
このページでいずれかのサブスクリプションを選択すると、その [Service Bus Subscription](Service Bus サブスクリプション) ページが表示されます。 このページで、アクティブなメッセージ数や配信不能メッセージ数を確認できます。 この例では、受信者がまだ受け取っていないアクティブなメッセージが 4 つ存在します。
次の手順
次のドキュメントおよびサンプルを参照してください。