演習 - トピックを使用してメッセージを送受信する
あなたは、Azure Service Bus トピックを使用して、セールスフォース アプリケーションで販売実績メッセージを配布することにしました。 営業担当者は、このアプリを自分のモバイル デバイスで使用して、各エリアと期間ごとの売上高をまとめたメッセージを送信します。 これらのメッセージは、南北アメリカやヨーロッパなど会社の営業地域内にある Web サービスに配信されます。
トピックに必要なインフラストラクチャは、お使いの Azure サブスクリプションに既に実装されています。 ここでは、トピックにメッセージを送信するコードと、サブスクリプションからメッセージを取得するコードを記述します。 次に、メッセージをトピックに送信し、特定のサブスクリプションのメッセージを取得します。
Azure Cloud Shell で次のコマンドを実行して、正しいディレクトリで作業していることを確認します。
cd ~/mslearn-connect-services-together/implement-message-workflows-with-service-bus/src/start
code .
トピックにメッセージを送信するコードを記述する
販売業績に関するメッセージを送信するコンポーネントを完成させるには、次の手順のようにします。
Azure Cloud Shell エディターで、performancemessagesender/Program.cs を開き、次のコード行を見つけます。
const string ServiceBusConnectionString = "";
引用符の間に、前の演習で保存した接続文字列を貼り付けます。
キュー名に salesperformancemessages とは異なる名前を使用した場合は、コード内の
TopicName
プロパティの値を更新します。const string TopicName = "salesperformancemessages";
SendPerformanceMessageAsync()
メソッドを見つけます。 (ヒント: 26 行目またはその付近にあります)。そのメソッド内で、次のコード行を見つけます。// Create a Service Bus client here
そのコード行を次のコードに置き換えます。
// By leveraging "await using", the DisposeAsync method will be called automatically when the client variable goes out of scope. // In more realistic scenarios, you would store off a class reference to the client (rather than to a local variable) so that it can be used throughout your program. await using var client = new ServiceBusClient(ServiceBusConnectionString);
SendPerformanceMessageAsync()
メソッド内で、次のコード行を見つけます。// Create a sender here
そのコード行を次のコードに置き換えます。
await using ServiceBusSender sender = client.CreateSender(TopicName);
try...catch
ブロック内で、次のコード行を見つけます。// Create and send a message here
そのコード行を次のコードに置き換えます。
string messageBody = "Total sales for Brazil in August: $13m."; var message = new ServiceBusMessage(messageBody);
コンソールでメッセージを表示するには、次の行に以下のコードを挿入します。
Console.WriteLine($"Sending message: {messageBody}");
トピックにメッセージを送信するには、次の行に以下のコードを挿入します。
await sender.SendMessageAsync(message);
最後のコードが次の例のようになっていることを確認します。
using System; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; namespace performancemessagesender { class Program { const string ServiceBusConnectionString = "Endpoint=sb://example.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxx"; const string TopicName = "salesperformancemessages"; static void Main(string[] args) { Console.WriteLine("Sending a message to the Sales Performance topic..."); SendPerformanceMessageAsync().GetAwaiter().GetResult(); Console.WriteLine("Message was sent successfully."); } static async Task SendPerformanceMessageAsync() { // By leveraging "await using", the DisposeAsync method will be called automatically once the client variable goes out of scope. // In more realistic scenarios, you would store off a class reference to the client (rather than to a local variable) so that it can be used throughout your program. await using var client = new ServiceBusClient(ServiceBusConnectionString); await using ServiceBusSender sender = client.CreateSender(TopicName); try { string messageBody = "Total sales for Brazil in August: $13m."; var message = new ServiceBusMessage(messageBody); Console.WriteLine($"Sending message: {messageBody}"); await sender.SendMessageAsync(message); } catch (Exception exception) { Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}"); } } } }
変更を保存するには、Ctrl + S キーを押します。また、エディターを閉じるには、Ctrl + Q キーを押します。
メッセージをトピックに送信する
販売に関するメッセージを送信するコンポーネントを実行するには、Cloud Shell で次のコマンドを実行します。
dotnet run --project performancemessagesender
プログラムを実行したら、メッセージが送信されていることを示す通知を Cloud Shell で監視します。 アプリを実行するたびに、別のメッセージがトピックに追加され、各サブスクリプションでコピーが使用できるようになります。
Sending a message to the Sales Performance topic... Sending message: Total sales for Brazil in August: $13m. Message was sent successfully.
サブスクリプションのメッセージを取得する前に、メッセージ数を調べる
Message was sent successfully
と表示されたら、次のコマンドを実行して、Americas
サブスクリプション内のメッセージの数を確認します。 <namespace-name> を Service Bus 名前空間に置き換えることを忘れないでください。
az servicebus topic subscription show \
--resource-group "<rgn>[sandbox resource group name]</rgn>" \
--topic-name salesperformancemessages \
--name Americas \
--query messageCount \
--namespace-name <namespace-name>
Americas
を EuropeAndAsia
に置き換え、コマンドをもう一度実行すると、両方のサブスクリプションに同じ数のメッセージがあることがわかります。
サブスクリプションのトピック メッセージを取得するコードを記述する
販売業績に関するメッセージを取得するコンポーネントを作成するには、次の手順のようにします。
code .
を実行してエディターを起動します。エディターで、performancemessagereceiver/Program.cs を開き、次のコード行を見つけます。
const string ServiceBusConnectionString = "";
引用符の間に、前の演習で保存した接続文字列を貼り付けます。
Service Bus クライアントを作成するには、
MainAsync()
メソッドを見つけます。 そのメソッド内で、次のコード行を見つけます。// Create a Service Bus client that will authenticate using a connection string
その行を次のコードに置き換えます。
var client = new ServiceBusClient(ServiceBusConnectionString);
メッセージ処理オプションを構成するには、次のコード行を見つけます。
// Create the options to use for configuring the processor
その行を次のコードに置き換えます。
var processorOptions = new ServiceBusProcessorOptions { MaxConcurrentCalls = 1, AutoCompleteMessages = false };
プロセッサを作成するには、次のコード行を見つけます。
// Create a processor that we can use to process the messages
その行を次のコードに置き換えます。
ServiceBusProcessor processor = client.CreateProcessor(TopicName, SubscriptionName, processorOptions);
ハンドラーを構成するには、次のコード行を見つけます。
// Configure the message and error handler to use
その行を次のコードに置き換えます。
processor.ProcessMessageAsync += MessageHandler; processor.ProcessErrorAsync += ErrorHandler;
処理を開始するには、次のコード行を見つけます。
// Start processing
その行を次のコードに置き換えます。
await processor.StartProcessingAsync();
次のコードを見つけます。
// Since we didn't use the "await using" syntax here, we need to explicitly dispose the processor and client
その行を次のコードに置き換えます。
await processor.DisposeAsync(); await client.DisposeAsync();
受信メッセージをコンソールに表示するには、
MessageHandler()
メソッドを見つけます。 受信メッセージを処理するためにこのメソッドを登録しました。そのメソッド内のすべてのコードを、次のコードに置き換えます。
Console.WriteLine($"Received message: SequenceNumber:{args.Message.SequenceNumber} Body:{args.Message.Body}");
サブスクリプションから受信したメッセージを削除するには、その次の行に、以下のコードを追加します。
await args.CompleteMessageAsync(args.Message);
最後のコードが次の例のようになっていることを確認します。
using System; using System.Text; using System.Threading; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; namespace performancemessagereceiver { class Program { const string ServiceBusConnectionString = "Endpoint=sb://alexgeddyneil.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxx"; const string TopicName = "salesperformancemessages"; const string SubscriptionName = "Americas"; static void Main(string[] args) { MainAsync().GetAwaiter().GetResult(); } static async Task MainAsync() { var client = new ServiceBusClient(ServiceBusConnectionString); Console.WriteLine("======================================================"); Console.WriteLine("Press ENTER key to exit after receiving all the messages."); Console.WriteLine("======================================================"); var processorOptions = new ServiceBusProcessorOptions { MaxConcurrentCalls = 1, AutoCompleteMessages = false }; ServiceBusProcessor processor = client.CreateProcessor(TopicName, SubscriptionName, processorOptions); processor.ProcessMessageAsync += MessageHandler; processor.ProcessErrorAsync += ErrorHandler; await processor.StartProcessingAsync(); Console.Read(); await processor.DisposeAsync(); await client.DisposeAsync(); } static async Task MessageHandler(ProcessMessageEventArgs args) { Console.WriteLine($"Received message: SequenceNumber:{args.Message.SequenceNumber} Body:{args.Message.Body}"); await args.CompleteMessageAsync(args.Message); } static Task ErrorHandler(ProcessErrorEventArgs args) { Console.WriteLine($"Message handler encountered an exception {args.Exception}."); Console.WriteLine("Exception context for troubleshooting:"); Console.WriteLine($"- Endpoint: {args.FullyQualifiedNamespace}"); Console.WriteLine($"- Entity Path: {args.EntityPath}"); Console.WriteLine($"- Executing Action: {args.ErrorSource}"); return Task.CompletedTask; } } }
変更を保存するには、Ctrl + S キーを押します。また、エディターを閉じるには、Ctrl + Q キーを押します。
サブスクリプションのトピック メッセージを取得する
サブスクリプションの販売業績に関するメッセージを取得するコンポーネントを実行するには、次のコマンドを実行します。
dotnet run --project performancemessagereceiver
次の例のような出力が表示されます。
Received message: SequenceNumber:1 Body:Total sales for Brazil in August: $13m.
メッセージを受信しているという通知がプログラムによって返されたら、Enter キーを押してアプリを停止します。
サブスクリプションのメッセージを取得した後でメッセージ数を調べる
次のコマンドを実行して、Americas
サブスクリプションにメッセージが残っていないことを確認します。 <namespace-name> を Service Bus 名前空間に必ず置き換えてください。
az servicebus topic subscription show \
--resource-group "<rgn>[sandbox resource group name]</rgn>" \
--topic-name salesperformancemessages \
--name Americas \
--query messageCount \
--namespace-name <namespace-name>
このコードの Americas
を EuropeAndAsia
に置き換えて、EuropeAndAsia
サブスクリプションの現在のメッセージ数を確認すると、メッセージの数が 1
であることがわかります。 前のコードでは、Americas
のみのトピック メッセージを取得するように設定したので、EuropeAndAsia
に対するそのメッセージはまだ取得されるのを待機しています。