Azure Service Bus キューとの間でメッセージを送受信する (Java)
このクイックスタートでは、Azure Service Bus キューとの間でメッセージを送受信する Java アプリを作成します。
Note
このクイック スタートでは、メッセージを Service Bus キューに送信し、それらを受信するという簡単なシナリオでのステップ バイ ステップの手順を説明します。 Azure Service Bus の事前構築済みの Java サンプルが、GitHub の Azure SDK for Java リポジトリに用意されています。
ヒント
Spring アプリケーションで Azure Service Bus リソースを操作している場合は、Spring Cloud Azure を代替手段として検討することをお勧めします。 Spring Cloud Azure は、Spring と Azure サービスのシームレスな統合を実現するオープンソース プロジェクトです。 Spring Cloud Azure の詳細と Service Bus の使用例については、「Azure Service Bus を使用する Spring Cloud Stream with」を参照してください。
前提条件
- Azure サブスクリプション。 このチュートリアルを完了するには、Azure アカウントが必要です。 MSDN のサブスクライバー特典を有効にするか、無料アカウントにサインアップしてください。
- Azure SDK for Java をインストールします。 Eclipse を使用している場合は、Azure SDK for Java が含まれている Azure Toolkit for Eclipse をインストールできます。 これで Microsoft Azure Libraries for Java をプロジェクトに追加できます。 IntelliJ を使用している場合は、Azure Toolkit for IntelliJ のインストールに関するページを参照してください。
Azure Portal での名前空間の作成
Azure の Service Bus メッセージング エンティティを使用するには、Azure 全体で一意となる名前を備えた名前空間を最初に作成しておく必要があります。 名前空間により、ご利用のアプリケーション内に Service Bus リソース (キュー、トピックなど) 用のスコープ コンテナーが提供されます。
名前空間を作成するには:
Azure portal にサインインします。
[すべてのサービス] ページに移動します。
左側のナビゲーション バーで、カテゴリの一覧から [統合] を選択し、[Service Bus] 上にマウス ポインターを置き、[Service Bus] タイルの [+] ボタンを選択します。
[名前空間の作成] ページの [基本] タブで、こちらの手順を実行します。
[サブスクリプション] で、名前空間を作成する Azure サブスクリプションを選択します。
[リソース グループ] では、既存のリソース グループを選択するか、新しいリソース グループを作成します。
名前空間の名前を入力します。 名前空間名は次の名前付け規則に従う必要があります。
- この名前は Azure 全体で一意である必要があります。 その名前が使用できるかどうかがすぐに自動で確認されます。
- 名前の長さは 6 ~ 50 文字である。
- 名前には文字、数字、ハイフン
-
のみを含めることができます。 - 名前の先頭は文字、末尾は文字または数字にする必要があります。
- 名前の末尾を
-sb
または-mgmt
にすることはできません。
[場所] で、名前空間をホストするリージョンを選択します。
[価格レベル] で、名前空間の価格レベル (Basic、Standard、Premium) を選択します。 このクイック スタートでは、 [Standard] を選択します。
Premium レベルを選択した場合は、名前空間に対して geo レプリケーションを有効にできるかどうかを選択します。 geo レプリケーション機能を使用すると、名前空間のメタデータとデータが、プライマリ Azure リージョンから 1 つ以上のセカンダリ Azure リージョンに、継続的にレプリケートされることが保証されます。
重要
トピックとサブスクリプションを使用する場合は、Standard または Premium を選択してください。 Basic 価格レベルでは、トピックとサブスクリプションはサポートされていません。
[Premium] 価格レベルを選択した場合は、メッセージング ユニットの数を指定します。 Premium レベルでは、各ワークロードが分離した状態で実行されるように、CPU とメモリのレベルでリソースが分離されます。 このリソースのコンテナーをメッセージング ユニットと呼びます。 Premium 名前空間には、少なくとも 1 つのメッセージング ユニットがあります。 Service Bus の Premium 名前空間ごとに、1 個、2 個、4 個、8 個、または 16 個のメッセージング ユニットを選択できます。 詳細については、Service Bus の Premium メッセージングに関するページをご覧ください。
ページ下部にある [確認と作成] を選択します。
[確認および作成] ページで、設定を確認し、 [作成] を選択します。
リソースのデプロイが成功したら、デプロイ ページで [リソースに移動] を選択します。
Service Bus 名前空間のホーム ページが表示されます。
Azure portal でキューを作成する
[Service Bus 名前空間] ページで、左側のナビゲーション メニューの [エンティティ] を展開し、[キュー] を選択します。
[キュー] ページで、ツール バーの [+ キュー] を選択します。
キューの名前を入力し、他の値は既定値のままにします。
ここで、 [作成] を選択します。
Azure に対してアプリを認証する
このクイック スタートでは、Azure Service Bus に接続する 2 つの方法である、パスワードレスと接続文字列について説明します。
最初のオプションでは、Microsoft Entra ID とロールベースのアクセス制御 (RBAC) でセキュリティ プリンシパルを使用して Service Bus 名前空間に接続する方法を示します。 コードや構成ファイル、または Azure Key Vault などのセキュリティで保護されたストレージに、ハードコーディングされた接続文字列を含める心配はありません。
2 番目のオプションでは、接続文字列を使用して Service Bus 名前空間に接続する方法を示します。 Azure を初めて使用する場合は、接続文字列オプションの方が理解しやすいかもしれません。 実際のアプリケーションと運用環境では、パスワードレス オプションを使用することをお勧めします。 詳細については、「認証と承認」を参照してください。 パスワードレス認証の詳細については、概要ページを参照してください。
Microsoft Entra ユーザーにロールを割り当てる
ローカルでの開発時には、Azure Service Bus に接続するユーザー アカウントに正しいアクセス許可があることを確認してください。 メッセージを送受信するには、Azure Service Bus データ所有者ロールが必要です。 このロールを自分に割り当てるには、ユーザー アクセス管理者ロール、または Microsoft.Authorization/roleAssignments/write
アクションを含む別のロールが必要です。 Azure portal、Azure CLI、または Azure PowerShell を使用して、ユーザーに Azure RBAC ロールを割り当てることができます。 ロールの割り当てに使用できるスコープの詳細は、スコープの概要ページを参照してください。
次の例では、ユーザー アカウントに、Azure Service Bus Data Owner
ロールを割り当てます。これにより、Azure Service Bus リソースへのフル アクセスが提供されます。 実際のシナリオでは、より安全な運用環境を実現するため、最小限の特権の原則に従って、必要な最小限のアクセス許可のみをユーザーに付与します。
Azure Service Bus 用の Azure 組み込みロール
Azure Service Bus の場合、名前空間およびそれに関連するすべてのリソースの Azure portal および Azure リソース管理 API による管理は、Azure RBAC モデルを使って既に保護されています。 Azure では、Service Bus 名前空間へのアクセスを承認するための次の Azure 組み込みロールが提供されています。
- Azure Service Bus データ所有者:Service Bus 名前空間とそのエンティティ (キュー、トピック、サブスクリプション、およびフィルター) へのデータ アクセスが可能です。 このロールのメンバーは、キューまたはトピックやサブスクリプションとの間でメッセージを送受信できます。
- Azure Service Bus データ送信者: このロールを使用して、Service Bus 名前空間とそのエンティティへの送信アクセスを許可します。
- Azure Service Bus データ受信者: このロールを使用して、Service Bus 名前空間とそのエンティティへの受信アクセスを許可します。
カスタム ロールを作成する場合は、Service Bus 操作に必要な権限に関するページを参照してください。
Microsoft Entra ユーザーを Azure Service Bus 所有者ロールに追加する
Microsoft Entra ユーザー名を、Service Bus 名前空間レベルの Azure Service Bus データ所有者ロール に追加します。 これにより、ユーザー アカウントのコンテキストで実行されているアプリがキューまたはトピックにメッセージを送信し、キューまたはトピックのサブスクリプションからメッセージを受信できるようになります。
重要
ほとんどの場合、ロールの割り当てが Azure に反映されるまでの時間は 1、2 分です。 まれに、最大 8 分かかる場合があります。 初めてコードを実行したときに認証エラーを受け取る場合は、しばらく待ってから再試行してください。
Azure portal で Service Bus 名前空間ページが開いていない場合は、メイン検索バーまたは左側のナビゲーションを使用して Service Bus 名前空間を見つけます。
概要ページで、左側のメニューから [アクセス制御 (IAM)] を選択します。
[アクセス制御 (IAM)] ページで、[ロールの割り当て] タブを選びます。
上部のメニューから [+ 追加] を選択し、次に結果のドロップダウン メニューから [ロールの割り当ての追加] を選択します。
検索ボックスを使って、結果を目的のロールに絞り込みます。 この例では、
Azure Service Bus Data Owner
を検索して一致する結果を選択します。 [次へ] を選びます。[アクセスの割り当て先] で、[ユーザー、グループ、またはサービス プリンシパル] を選び、[+ メンバーの選択] を選びます。
ダイアログで、自分の Microsoft Entra ユーザー名 (通常は user@domain メール アドレス) を検索し、ダイアログの下部にある [選択] を選びます。
[レビューと割り当て] を選んで最終ページに移動し、もう一度 [レビューと割り当て] を行ってプロセスを完了します。
メッセージをキューに送信する
このセクションでは、Java コンソール プロジェクトを作成し、既に作成してあるキューにメッセージを送信するコードを追加します。
Java コンソール プロジェクトを作成する
Eclipse または任意のツールを使用して Java プロジェクトを作成します。
Service Bus を使用するようにアプリケーションを構成する
Azure Core ライブラリおよび Azure Service Bus ライブラリへの参照を追加します。
Eclipse を使用して Java コンソール アプリケーションを作成した場合は、Java プロジェクトを Maven に変換します。[パッケージ エクスプローラー] ウィンドウでプロジェクトを右クリックし、[構成]->[Maven プロジェクトへの変換] を選択します。 その後、これら 2 つのライブラリへの依存関係を追加します。その例を次に示します。
pom.xml
ファイルを更新して、Azure Service Bus と Azure ID パッケージに依存関係を追加します。
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.13.3</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
キューにメッセージを送信するコードを追加する
次の
import
ステートメントを Java ファイルの冒頭に追加します。import com.azure.messaging.servicebus.*; import com.azure.identity.*; import java.util.concurrent.TimeUnit; import java.util.Arrays; import java.util.List;
このクラスに、接続文字列とキュー名を保持する変数を定義します。
static String queueName = "<QUEUE NAME>";
重要
<QUEUE NAME>
をキューの名前に置き換えます。1 つのメッセージをキューに送信するためのメソッドを、
sendMessage
という名前でこのクラスに追加します。重要
NAMESPACENAME
を、実際の Service Bus 名前空間の名前に置き換えます。static void sendMessage() { // create a token using the default Azure credential DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() .build(); ServiceBusSenderClient senderClient = new ServiceBusClientBuilder() .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net") .credential(credential) .sender() .queueName(queueName) .buildClient(); // send one message to the queue senderClient.sendMessage(new ServiceBusMessage("Hello, World!")); System.out.println("Sent a single message to the queue: " + queueName); }
一連のメッセージを作成するためのメソッドを、
createMessages
という名前でこのクラスに追加します。 通常、これらのメッセージはアプリケーションのさまざまな部分から届きます。 ここでは、次のように一連のサンプル メッセージを作成します。static List<ServiceBusMessage> createMessages() { // create a list of messages and return it to the caller ServiceBusMessage[] messages = { new ServiceBusMessage("First message"), new ServiceBusMessage("Second message"), new ServiceBusMessage("Third message") }; return Arrays.asList(messages); }
作成したキューにメッセージを送信するためのメソッドを、
sendMessageBatch
という名前で追加します。 このメソッドは、キューのServiceBusSenderClient
を作成し、createMessages
メソッドを呼び出して一連のメッセージを取得した後、1 つまたは複数のバッチを用意して、そのバッチをキューに送信します。重要
NAMESPACENAME
を、実際の Service Bus 名前空間の名前に置き換えます。static void sendMessageBatch() { // create a token using the default Azure credential DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() .build(); ServiceBusSenderClient senderClient = new ServiceBusClientBuilder() .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net") .credential(credential) .sender() .queueName(queueName) .buildClient(); // Creates an ServiceBusMessageBatch where the ServiceBus. ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch(); // create a list of messages List<ServiceBusMessage> listOfMessages = createMessages(); // We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when // the batch can hold no more messages. Create a new batch for next set of messages and repeat until all // messages are sent. for (ServiceBusMessage message : listOfMessages) { if (messageBatch.tryAddMessage(message)) { continue; } // The batch is full, so we create a new batch and send the batch. senderClient.sendMessages(messageBatch); System.out.println("Sent a batch of messages to the queue: " + queueName); // create a new batch messageBatch = senderClient.createMessageBatch(); // Add that message that we couldn't before. if (!messageBatch.tryAddMessage(message)) { System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes()); } } if (messageBatch.getCount() > 0) { senderClient.sendMessages(messageBatch); System.out.println("Sent a batch of messages to the queue: " + queueName); } //close the client senderClient.close(); }
キューからメッセージを受信する
このセクションでは、キューからメッセージを取得するコードを追加します。
キューからメッセージを受信するメソッドを、
receiveMessages
という名前で追加します。 このメソッドは、メッセージを処理するためのハンドラーとエラーを処理するためのハンドラーを指定してキューのServiceBusProcessorClient
を作成します。 次に、プロセッサを起動して数秒間待機し、受信したメッセージを出力した後、プロセッサを停止して終了します。重要
NAMESPACENAME
を、実際の Service Bus 名前空間の名前に置き換えます。- コードに出現する
QueueTest::processMessage
のQueueTest
の部分は、実際のクラスの名前に置き換えてください。
// handles received messages static void receiveMessages() throws InterruptedException { DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() .build(); ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder() .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net") .credential(credential) .processor() .queueName(queueName) .processMessage(context -> processMessage(context)) .processError(context -> processError(context)) .buildProcessorClient(); System.out.println("Starting the processor"); processorClient.start(); TimeUnit.SECONDS.sleep(10); System.out.println("Stopping and closing the processor"); processorClient.close(); }
Service Bus サブスクリプションから受信したメッセージを処理するための
processMessage
メソッドを追加します。private static void processMessage(ServiceBusReceivedMessageContext context) { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(), message.getSequenceNumber(), message.getBody()); }
エラー メッセージを処理するための
processError
メソッドを追加します。private static void processError(ServiceBusErrorContext context) { System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n", context.getFullyQualifiedNamespace(), context.getEntityPath()); if (!(context.getException() instanceof ServiceBusException)) { System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException()); return; } ServiceBusException exception = (ServiceBusException) context.getException(); ServiceBusFailureReason reason = exception.getReason(); if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND || reason == ServiceBusFailureReason.UNAUTHORIZED) { System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n", reason, exception.getMessage()); } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) { System.out.printf("Message lock lost for message: %s%n", context.getException()); } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) { try { // Choosing an arbitrary amount of time to wait until trying again. TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { System.err.println("Unable to sleep for period of time"); } } else { System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(), reason, context.getException()); } }
sendMessage
、sendMessageBatch
、receiveMessages
の各メソッドを呼び出し、InterruptedException
をスローするようにmain
メソッドを更新します。public static void main(String[] args) throws InterruptedException { sendMessage(); sendMessageBatch(); receiveMessages(); }
アプリを実行する
Eclipse を使用している場合は、プロジェクトを右クリックし、[エクスポート] を選択し、[Java] を展開して、[実行可能 JAR ファイル] を選択して、実行可能 JAR ファイルを作成する手順に従います。
Azure Service Bus データ所有者ロールに追加されたユーザー アカウントとは異なるユーザー アカウントを使ってマシンにサインインしている場合は、以下の手順に従います。 それ以外の場合は、この手順をスキップし、次の手順で Jar ファイルを実行します。
お使いのマシンに Azure CLI をインストールします。
次の CLI コマンドを使用して、Azure にサインインします。 Azure Service Bus データ所有者ロールに追加したのと同じユーザー アカウントを使用します。
az login
次のコマンドを使用して Jar ファイルを実行します。
java -jar <JAR FILE NAME>
コンソール ウィンドウに次の出力が表示されます。
Sent a single message to the queue: myqueue Sent a batch of messages to the queue: myqueue Starting the processor Processing message. Session: 88d961dd801f449e9c3e0f8a5393a527, Sequence #: 1. Contents: Hello, World! Processing message. Session: e90c8d9039ce403bbe1d0ec7038033a0, Sequence #: 2. Contents: First message Processing message. Session: 311a216a560c47d184f9831984e6ac1d, Sequence #: 3. Contents: Second message Processing message. Session: f9a871be07414baf9505f2c3d466c4ab, Sequence #: 4. Contents: Third message Stopping and closing the processor
Azure portal の Service Bus 名前空間の [概要] ページで、受信メッセージ数と送信メッセージ数を確認できます。 1 分ほど待ってからページを更新すると、最新の値が表示されます。
この [概要] ページでキューを選択して、 [Service Bus キュー] ページに移動します。 このページにも、受信メッセージ数と送信メッセージ数が表示されます。 加えて、キューの現在のサイズ、最大サイズ、アクティブなメッセージ数など、他の情報も表示されます。
次の手順
次のドキュメントおよびサンプルを参照してください。