トピックを使用してメッセージを送受信するコードを記述する
分散アプリケーションでは、一部のメッセージは 1 つの受信側コンポーネントに対して使用される必要があります。 他のメッセージは、複数の宛先に到達する必要があります。
ユーザーが自転車の注文をキャンセルするとどうなるか考えてみます。 注文のキャンセルは、最初の注文とは少し異なります。 注文が行われるときのワークフローでは、注文の支払い処理が済むまで待った後、注文を現地の店舗に送信しました。 キャンセル操作の場合は、店舗と支払いプロセッサの "両方" に同時に通知します。 このようにすることで、配送ドライバーの時間が無駄になる可能性を最小限に抑えます。
複数のコンポーネントが同じメッセージを受信できるように、Azure Service Bus の "トピック" を使用します。 次に、コードを記述するプロセスと考慮事項について見ていきます。
トピックを含むコードと、キューを含むコードの違い
送信されたすべてのメッセージを、サブスクライブしているすべてのコンポーネントに配信する場合は、トピックを使用します。 トピックを使用するコードの記述は、キューを置き換える方法の 1 つです。 同じ Azure.Messaging.ServiceBus NuGet パッケージを使用し、接続文字列を構成して、非同期プログラミング パターンを使用します。
また、メッセージを送信するには同じ ServiceBusClient
クラスと ServiceBusSender
クラス、メッセージを受信するには ServiceBusProcessor
クラスを使用します。
サブスクリプションでフィルターを設定する
トピックに送信された特定のメッセージが特定のサブスクリプションに配信されるようにしたい場合は、トピックのサブスクリプションに 1 つ以上のフィルターを設定できます。 たとえば、自転車アプリケーションでは、店舗でユニバーサル Windows プラットフォーム (UWP) アプリケーションを実行しています。 各店舗で OrderCancellation
トピックにサブスクライブし、独自の StoreId
でフィルター処理できます。 複数の場所にある店に不要なメッセージが送信されないので、インターネットの帯域幅が節約されます。 一方、支払い処理コンポーネントは、すべての OrderCancellation
メッセージをサブスクライブします。
次の 3 種類のフィルターのいずれかを使用できます。
- ブール フィルター:
TrueFilter
は、トピックに送信されたすべてのメッセージが現在のサブスクリプションに確実に配信されるようにします。FalseFilter
は、現在のサブスクリプションにメッセージがまったく配信されないようにします (これは、サブスクリプションを効果的にブロックまたはオフにします)。 - SQL フィルター:SQL フィルターは、SQL クエリの
WHERE
句と同じ構文を使用して条件を指定します。 このフィルターに対して評価されたときにTrue
を返すメッセージのみが、サブスクライバーに配信されます。 - 相関関係フィルター:相関関係フィルターは、各メッセージのプロパティと照合される条件のセットを保持します。 フィルターのプロパティとメッセージのプロパティが同じ値である場合、一致するものと見なされます。
StoreId
フィルターの場合は、SQL フィルターを使用することも "できます"。 SQL フィルターは最も柔軟性がありますが、計算の負荷は最も大きく、フィルターによって Service Bus のスループットが低下する可能性があります。 この場合は、相関関係フィルターを選択します。
トピックにメッセージを送信するには
メッセージをトピックに送信するには、次の手順のようにします。
すべての送信または受信コンポーネントで、Service Bus トピックを呼び出すコード ファイルに次の using
ステートメントを追加します。
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
メッセージを送信するには、最初に新しい ServiceBusClient
オブジェクトを作成し、それに接続文字列とトピックの名前を渡します。
await using var client = new ServiceBusClient(connectionString);
次に、ServiceBusClient
オブジェクトで CreateSender
メソッドを呼び出して、トピック名を指定することで、ServiceBusSender
オブジェクトを作成します。
ServiceBusSender sender = client.CreateSender(topicName);
ServiceBusSender.SendMessageAsync()
メソッドを呼び出して ServiceBusMessage
を渡すことにより、トピックにメッセージを送信できます。 キューと同じように、メッセージは UTF-8 でエンコードされた文字列の形式にする必要があります。
string message = "Cancel! I have changed my mind!";
var message = new ServiceBusMessage(message);
// Send the message to the topic.
await sender.SendMessageAsync(message);
サブスクリプションからメッセージを受信するには
サブスクリプションからメッセージを受信するには、ServiceBusProcessor
オブジェクトを作成し、トピック名とサブスクリプション名を渡す必要があります。
processor = client.CreateProcessor(topicName, subscriptionName, options);
次に、メッセージ ハンドラーとエラー ハンドラーを登録します。
// Specify the handler method for messages.
processor.ProcessMessageAsync += MessageHandler;
// Specify the handler method for errors.
processor.ProcessErrorAsync += ErrorHandler;
メッセージ ハンドラー内で処理を行ってから、ProcessMessageEventArgs.CompleteMessageAsync()
メソッドを呼び出してサブスクリプションからメッセージを削除します。
// Complete the message. The message is deleted from the subscription.
await args.CompleteMessageAsync(args.Message);