チュートリアル: Azure Portal とトピック/サブスクリプションを使用して在庫を更新する
Azure Service Bus は、アプリケーションとサービスの間で情報を送信するマルチテナント クラウド メッセージング サービスです。 非同期操作により、柔軟なブローカー メッセージング、構造化された先入れ先出し型 (FIFO) のメッセージング、および発行/購読機能が可能になります。 Azure Service Bus の詳細な概要については、Azure Service Bus とはを参照してください。
このチュートリアルでは、公開/サブスクライブ チャネルで Azure Portal と .NET を使用して、小売在庫シナリオで Service Bus トピックとサブスクリプションを使用する方法について説明します。 このシナリオの例は、複数の小売店の在庫品の更新です。 このシナリオでは、各店舗または一連の店舗が、在庫品を更新するためのメッセージを受け取ります。 このチュートリアルでは、サブスクリプションとフィルターを使用してこのシナリオを実装する方法を示します。 まず、3 つのサブスクリプションを持つトピックを作成し、いくつかのルールとフィルターを追加してから、トピックとサブスクリプションにメッセージを送受信します。
このチュートリアルでは、以下の内容を学習します。
- Azure Portal を使用して、Service Bus のトピックとそのトピックへの 3 つのサブスクリプションを作成する
- .NET コードを使用して、サブスクリプションにフィルターを追加する
- 異なる内容の複数のメッセージを作成する
- メッセージを送信し、想定されるサブスクリプションに到着したことを確認する
- サブスクリプションからメッセージを受信する
前提条件
このチュートリアルを完了するには、以下のものが必要です。
- Azure サブスクリプション。 Azure Service Bus など、Azure の各種サービスを使用するには、サブスクリプションが必要です。 Azure サブスクリプションをお持ちでない場合は、開始する前に無料アカウントを作成できます。
- Visual Studio 2019 以降。
Service Bus トピックとサブスクリプション
トピックへの各サブスクリプションは、各メッセージのコピーを受信できます。 トピックは完全にプロトコルであり、意味的には Service Bus キューと互換性があります。 Service Bus のトピックは、フィルターの条件を持つさまざまな選択ルールをサポートしています。メッセージのプロパティを設定または変更するオプションのアクションもあります。 ルールが一致するたびに、メッセージが生成されます。 ルール、フィルター、およびアクションの詳細については、こちらのリンクを参照してください。
Azure Portal での名前空間の作成
Azure の Service Bus メッセージング エンティティを使用するには、Azure 全体で一意となる名前を備えた名前空間を最初に作成しておく必要があります。 名前空間により、ご利用のアプリケーション内に Service Bus リソース (キュー、トピックなど) 用のスコープ コンテナーが提供されます。
名前空間を作成するには:
Azure portal にサインインします。
[すべてのサービス] ページに移動します。
左側のナビゲーション バーで、カテゴリの一覧から [統合] を選択し、[Service Bus] 上にマウス ポインターを置き、[Service Bus] タイルの [+] ボタンを選択します。
[名前空間の作成] ページの [基本] タブで、こちらの手順を実行します。
[サブスクリプション] で、名前空間を作成する Azure サブスクリプションを選択します。
[リソース グループ] では、既存のリソース グループを選ぶか、新しく作成します。
名前空間の名前を入力します。 名前空間名は次の名前付け規則に従う必要があります。
- この名前は Azure 全体で一意である必要があります。 その名前が使用できるかどうかがすぐに自動で確認されます。
- 名前の長さは 6 ~ 50 文字である。
- この名前には、文字、数字、ハイフン
-
のみを含めることができます。 - 名前の先頭は文字、末尾は文字または数字にする必要があります。
- 名前の末尾を
-sb
または-mgmt
にすることはできません。
[場所] で、名前空間をホストするリージョンを選択します。
[価格レベル] で、名前空間の価格レベル (Basic、Standard、Premium) を選択します。 このクイック スタートでは、 [Standard] を選択します。
Premium レベルを選んだ場合は、名前空間の geo レプリケーションを有効にできるかどうかを選びます。 geo レプリケーション機能を使用すると、名前空間のメタデータとデータが、プライマリ Azure リージョンから 1 つ以上のセカンダリ Azure リージョンに、継続的にレプリケートされることが保証されます。
重要
トピックとサブスクリプションを使用する場合は、Standard または Premium を選択してください。 Basic 価格レベルでは、トピックとサブスクリプションはサポートされていません。
[Premium] 価格レベルを選択した場合は、メッセージング ユニットの数を指定します。 Premium レベルでは、各ワークロードが分離した状態で実行されるように、CPU とメモリのレベルでリソースが分離されます。 このリソースのコンテナーをメッセージング ユニットと呼びます。 Premium 名前空間には、少なくとも 1 つのメッセージング ユニットがあります。 Service Bus の Premium 名前空間ごとに、1 個、2 個、4 個、8 個、または 16 個のメッセージング ユニットを選択できます。 詳細については、Service Bus の Premium メッセージングに関するページをご覧ください。
ページ下部にある [確認と作成] を選択します。
[確認および作成] ページで、設定を確認し、 [作成] を選択します。
リソースのデプロイが成功したら、デプロイ ページで [リソースに移動] を選択します。
Service Bus 名前空間のホーム ページが表示されます。
名前空間への接続文字列を取得する (Azure portal)
新しい名前空間を作成すると、主キーとセカンダリ キー、およびそれぞれが名前空間のすべての側面を完全に制御できるプライマリとセカンダリの接続文字列を使用して、Shared Access Signature (SAS) の初期ポリシーが自動的に生成されます。 通常の送信者と受信者を対象に、より権限を制限した規則を作成する方法については、「Service Bus の認証と承認」をご覧ください。
クライアントは、接続文字列を使用して Service Bus の名前空間に接続できます。 名前空間のプライマリ接続文字列をコピーするには、次の手順に従います。
[Service Bus 名前空間] ページで、左側のメニューの [共有アクセス ポリシー] を選択します。
[共有アクセス ポリシー] ページで、 [RootManageSharedAccessKey] を選択します。
[ポリシー: RootManageSharedAccessKey] ウィンドウで、[プライマリ接続文字列] の横にある [コピー] ボタンを選択し、後で使用するために接続文字列をクリップボードにコピーします。 この値をメモ帳などに一時的に貼り付けます。
このページを使用して、主キー、2 次キー、プライマリ接続文字列、セカンダリ接続文字列をコピーできます。
Azure Portal を使用したトピックの作成
[Service Bus 名前空間] ページで、左側のナビゲーション メニューの [エンティティ] を展開し、左側のメニューで [トピック] を選びます。
ツール バーの [+ トピック] を選択します。
トピックの名前を入力します。 他のオプションは既定値のままにしてください。
[作成] を選択します。
トピックに対するサブスクリプションを作成する
前のセクションで作成したトピックを選択します。
[Service Bus トピック] ページで、ツール バーの [+ サブスクリプション] を選択します。
[サブスクリプションの作成] ページで、次の手順に従います。
サブスクリプションの名前として「S1」と入力します。
次に、 [作成] を選択してサブスクリプションを作成します。
前の手順を 2 回繰り返して、S2 と S3 というサブスクリプションを作成します。
サブスクリプションに対してフィルター ルールを作成する
名前空間とトピック/サブスクリプションがプロビジョニングされ、名前空間への接続文字列を取得したら、サブスクリプションに対してフィルター ルールを作成しメッセージを送受信する準備はできています。 こちらの GitHub サンプル フォルダーでコードを調べることができます。
メッセージを送受信する
コードを実行するには、次の手順に従います。
コマンド プロンプトまたは PowerShell プロンプトで、次のコマンドを発行して Service Bus GitHub リポジトリを複製します。
git clone https://github.com/Azure/azure-service-bus.git
サンプル フォルダー
azure-service-bus\samples\DotNet\Azure.Messaging.ServiceBus\BasicSendReceiveTutorialWithFilters
に移動します。このチュートリアルの前半で、メモ帳にコピーした接続文字列を取得します。 前のセクションで作成したトピックの名前も必要です。
コマンド プロンプトで、次のコマンドを入力します。
dotnet build
BasicSendReceiveTutorialWithFilters\bin\Debug\netcoreapp3.1
フォルダーに移動します。次のコマンドを入力してプログラムを実行します。
myConnectionString
を以前に取得した値に、myTopicName
を作成したトピックの名前に置き換えます。dotnet --roll-forward Major BasicSendReceiveTutorialWithFilters.dll -ConnectionString "myConnectionString" -TopicName "myTopicName"
コンソールの指示に従って、まずフィルター作成を選択します。 フィルターの作成処理には、既定のフィルターの削除が含まれています。 PowerShell または CLI を使用する場合は、既定のフィルターを削除する必要はありませんが、コードで実行する場合は、既定のフィルターを削除する必要があります。 コンソール コマンド 1 と 3 は、以前に作成したサブスクリプションに対するフィルターを管理するために役立ちます。
1 の実行: 既定のフィルターを削除します。
2 の実行: 独自のフィルターを追加します。
3 の実行: チュートリアルではこの手順はスキップします。 このオプションは、必要に応じて、設定した独自のフィルターを削除します。 この操作では、既定のフィルターは再作成されません。
フィルターの作成後は、メッセージを送信できます。 4 キーを押して、トピックに送信されている 10 個のメッセージを観察します。
5 キーを押して、受信中のメッセージを観察します。 10 個のメッセージが表示されない場合は、m キーを押してメニューを表示し、もう一度 5 キーを押します。
リソースをクリーンアップする
不要になったリソースをクリーンアップするには、次の手順に従います。
- Azure portal で名前空間に移動します。
- [Service Bus 名前空間] ページで、コマンド バーから [削除] を選択して、名前空間とそこにあるリソース (キュー、トピック、サブスクリプション) を削除します。
サンプル コードを理解する
このセクションでは、サンプル コードの処理内容の詳細について説明します。
接続文字列とトピックを取得する
コードはまず一連の変数を宣言し、プログラムの残りの処理を実行します。
string ServiceBusConnectionString;
string TopicName;
static string[] Subscriptions = { "S1", "S2", "S3" };
static IDictionary<string, string[]> SubscriptionFilters = new Dictionary<string, string[]> {
{ "S1", new[] { "StoreId IN('Store1', 'Store2', 'Store3')", "StoreId = 'Store4'"} },
{ "S2", new[] { "sys.To IN ('Store5','Store6','Store7') OR StoreId = 'Store8'" } },
{ "S3", new[] { "sys.To NOT IN ('Store1','Store2','Store3','Store4','Store5','Store6','Store7','Store8') OR StoreId NOT IN ('Store1','Store2','Store3','Store4','Store5','Store6','Store7','Store8')" } }
};
// You can have only have one action per rule and this sample code supports only one action for the first filter, which is used to create the first rule.
static IDictionary<string, string> SubscriptionAction = new Dictionary<string, string> {
{ "S1", "" },
{ "S2", "" },
{ "S3", "SET sys.Label = 'SalesEvent'" }
};
static string[] Store = { "Store1", "Store2", "Store3", "Store4", "Store5", "Store6", "Store7", "Store8", "Store9", "Store10" };
static string SysField = "sys.To";
static string CustomField = "StoreId";
static int NrOfMessagesPerStore = 1; // Send at least 1.
接続文字列とトピック名は、次のようにコマンド ライン パラメーターを介して渡され、次に Main()
メソッドで読み取られます。
static void Main(string[] args)
{
string ServiceBusConnectionString = "";
string TopicName = "";
for (int i = 0; i < args.Length; i++)
{
if (args[i] == "-ConnectionString")
{
Console.WriteLine($"ConnectionString: {args[i + 1]}");
ServiceBusConnectionString = args[i + 1]; // Alternatively enter your connection string here.
}
else if (args[i] == "-TopicName")
{
Console.WriteLine($"TopicName: {args[i + 1]}");
TopicName = args[i + 1]; // Alternatively enter your queue name here.
}
}
if (ServiceBusConnectionString != "" && TopicName != "")
{
Program P = StartProgram(ServiceBusConnectionString, TopicName);
P.PresentMenu().GetAwaiter().GetResult();
}
else
{
Console.WriteLine("Specify -Connectionstring and -TopicName to execute the example.");
Console.ReadKey();
}
}
既定のフィルターを削除する
サブスクリプションを作成すると、Service Bus でサブスクリプションごとに既定のフィルターが作成されます。 このフィルターを使用すると、トピックに送信されたすべてのメッセージを受信できます。 カスタム フィルターを使用する場合は、次のコードのように既定のフィルターを削除できます。
private async Task RemoveDefaultFilters()
{
Console.WriteLine($"Starting to remove default filters.");
try
{
var client = new ServiceBusAdministrationClient(ServiceBusConnectionString);
foreach (var subscription in Subscriptions)
{
await client.DeleteRuleAsync(TopicName, subscription, CreateRuleOptions.DefaultRuleName);
Console.WriteLine($"Default filter for {subscription} has been removed.");
}
Console.WriteLine("All default Rules have been removed.\n");
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
await PresentMenu();
}
フィルターの作成
次のコードは、このチュートリアルで定義したカスタム フィルターを追加します。
private async Task CreateCustomFilters()
{
try
{
for (int i = 0; i < Subscriptions.Length; i++)
{
var client = new ServiceBusAdministrationClient(ServiceBusConnectionString);
string[] filters = SubscriptionFilters[Subscriptions[i]];
if (filters[0] != "")
{
int count = 0;
foreach (var myFilter in filters)
{
count++;
string action = SubscriptionAction[Subscriptions[i]];
if (action != "")
{
await client.CreateRuleAsync(TopicName, Subscriptions[i], new CreateRuleOptions
{
Filter = new SqlRuleFilter(myFilter),
Action = new SqlRuleAction(action),
Name = $"MyRule{count}"
});
}
else
{
await client.CreateRuleAsync(TopicName, Subscriptions[i], new CreateRuleOptions
{
Filter = new SqlRuleFilter(myFilter),
Name = $"MyRule{count}"
});
}
}
}
Console.WriteLine($"Filters and actions for {Subscriptions[i]} have been created.");
}
Console.WriteLine("All filters and actions have been created.\n");
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
await PresentMenu();
}
作成したカスタム フィルターを削除する
サブスクリプションに対するすべてのフィルターを削除するには、次のコードでその方法を確認してください。
private async Task CleanUpCustomFilters()
{
foreach (var subscription in Subscriptions)
{
try
{
var client = new ServiceBusAdministrationClient(ServiceBusConnectionString);
IAsyncEnumerator<RuleProperties> rules = client.GetRulesAsync(TopicName, subscription).GetAsyncEnumerator();
while (await rules.MoveNextAsync())
{
await client.DeleteRuleAsync(TopicName, subscription, rules.Current.Name);
Console.WriteLine($"Rule {rules.Current.Name} has been removed.");
}
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
Console.WriteLine("All default filters have been removed.\n");
await PresentMenu();
}
メッセージを送信する
トピックにメッセージを送信する処理は、メッセージをキューに送信する処理に似ています。 この例は、タスク一覧と非同期処理を使用してメッセージを送信する方法を示しています。
public async Task SendMessages()
{
try
{
await using var client = new ServiceBusClient(ServiceBusConnectionString);
var taskList = new List<Task>();
for (int i = 0; i < Store.Length; i++)
{
taskList.Add(SendItems(client, Store[i]));
}
await Task.WhenAll(taskList);
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
Console.WriteLine("\nAll messages sent.\n");
}
private async Task SendItems(ServiceBusClient client, string store)
{
// create the sender
ServiceBusSender tc = client.CreateSender(TopicName);
for (int i = 0; i < NrOfMessagesPerStore; i++)
{
Random r = new Random();
Item item = new Item(r.Next(5), r.Next(5), r.Next(5));
// Note the extension class which is serializing an deserializing messages
ServiceBusMessage message = item.AsMessage();
message.To = store;
message.ApplicationProperties.Add("StoreId", store);
message.ApplicationProperties.Add("Price", item.GetPrice().ToString());
message.ApplicationProperties.Add("Color", item.GetColor());
message.ApplicationProperties.Add("Category", item.GetItemCategory());
await tc.SendMessageAsync(message);
Console.WriteLine($"Sent item to Store {store}. Price={item.GetPrice()}, Color={item.GetColor()}, Category={item.GetItemCategory()}"); ;
}
}
メッセージを受信する
メッセージはタスク一覧を介して再び受信され、コードはバッチ処理を使用します。 バッチ処理を使用して送受信することはできますが、この例はバッチ受信方法のみを示しています。 実際には、ループから抜けることはありませんが、ループを維持するよう、1 分などの長めの期間を設定します。 この期間はブローカーへの受信呼び出しが開かれたままになり、メッセージが到着すると、それはただちに返され、新しい受信呼び出しが発行されます。 この概念は、長いポーリングと呼ばれます。 より一般的なオプションは、このクイック スタートやリポジトリの他のいくつかのサンプルでも見られる受信ポンプです。
public async Task Receive()
{
var taskList = new List<Task>();
for (var i = 0; i < Subscriptions.Length; i++)
{
taskList.Add(this.ReceiveMessages(Subscriptions[i]));
}
await Task.WhenAll(taskList);
}
private async Task ReceiveMessages(string subscription)
{
await using var client = new ServiceBusClient(ServiceBusConnectionString);
ServiceBusReceiver receiver = client.CreateReceiver(TopicName, subscription);
// In reality you would not break out of the loop like in this example but would keep looping. The receiver keeps the connection open
// to the broker for the specified amount of seconds and the broker returns messages as soon as they arrive. The client then initiates
// a new connection. So in reality you would not want to break out of the loop.
// Also note that the code shows how to batch receive, which you would do for performance reasons. For convenience you can also always
// use the regular receive pump which we show in our Quick Start and in other GitHub samples.
while (true)
{
try
{
//IList<Message> messages = await receiver.ReceiveAsync(10, TimeSpan.FromSeconds(2));
// Note the extension class which is serializing an deserializing messages and testing messages is null or 0.
// If you think you did not receive all messages, just press M and receive again via the menu.
IReadOnlyList<ServiceBusReceivedMessage> messages = await receiver.ReceiveMessagesAsync(maxMessages: 100);
if (messages.Any())
{
foreach (ServiceBusReceivedMessage message in messages)
{
lock (Console.Out)
{
Item item = message.As<Item>();
IReadOnlyDictionary<string, object> myApplicationProperties = message.ApplicationProperties;
Console.WriteLine($"StoreId={myApplicationProperties["StoreId"]}");
if (message.Subject != null)
{
Console.WriteLine($"Subject={message.Subject}");
}
Console.WriteLine(
$"Item data: Price={item.GetPrice()}, Color={item.GetColor()}, Category={item.GetItemCategory()}");
}
await receiver.CompleteMessageAsync(message);
}
}
else
{
break;
}
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
}
Note
Service Bus リソースは、Service Bus Explorer で管理できます。 Service Bus Explorer を使用すると、ユーザーは Service Bus 名前空間に接続し、簡単な方法でメッセージング エンティティを管理できます。 このツールには、インポート/エクスポート機能や、トピック、キュー、サブスクリプション、リレー サービス、通知ハブ、イベント ハブをテストする機能などの高度な機能が用意されています。
次のステップ
このチュートリアルでは、Azure Portal を使用してリソースをプロビジョニングした後、Service Bus のトピックとそのサブスクリプションからメッセージを送受信しました。 以下の方法を学習しました。
- Azure Portal を使用して、Service Bus トピックとそのトピックへの 1 つ以上のサブスクリプションを作成する
- .NET コードを使用してトピック フィルターを追加する
- 異なる内容の 2 つのメッセージを作成する
- メッセージを送信し、所定のサブスクリプションに到着したことを確認する
- サブスクリプションからメッセージを受信する
メッセージの送受信の他の例については、GitHub の Service Bus サンプルから始めてください。
Service Bus の公開/サブスクライブ機能の使用方法の詳細については、次のチュートリアルに進んでください。