RabbitMQ を使用したメッセージの送信

完了

キューを作成し、メッセージを送信し、RabbitMQ からメッセージを受信するコードを簡単に記述できます。 .NET Aspire ソリューションでは、RabbitMQ コンテナーを作成し、マイクロサービスからそのコンテナーに接続することもできます。

屋外機器の販売店で、顧客向けの製品カタログ Web サイトの一元化されたメッセージ ブローカーとして RabbitMQ を実装することにしました。 .NET Aspire RabbitMQ コンポーネントを使用して、このブローカーとそのキューを管理することにしました。

このユニットでは、RabbitMQ コンテナーを作成し、それを使用してメッセージを送受信する方法について説明します。

.NET Aspire RabbitMQ コンポーネントの使用

.NET から RabbitMQ を使用する場合は、通常、接続文字列を使用して ConnectionFactory オブジェクトを作成し、それを使用してサービスへの接続を行う必要があります。 .NET Aspire プロジェクトでは、次の理由から、RabbitMQ 接続を管理する方が簡単です。

  • 接続と接続文字列は、AppHost プロジェクトに登録します。
  • サービスへの参照を使用するプロジェクトに渡すと、依存関係の挿入を使用して RabbitMQ への接続を取得できます。 独自の接続を作成して構成する必要はありません。

アプリ ホスティング プロセスでの RabbitMQ の構成

.NET Aspire では、Rabbit MQ ホスティング コンポーネントをアプリ ホストにインストールする必要があります。

dotnet add package Aspire.Hosting.RabbitMQ

これで、RabbitMQ サービスを登録し、それを使用するプロジェクトに渡すことができます。

// Service registration
var rabbit = builder.AddRabbitMQ("messaging");

// Service consumption
builder.AddProject<Projects.CatalogAPI>()
       .WithReference(rabbit);

AppHost は、ソリューション内のすべてのプロジェクトの接続を管理します。

Rab の構成

次に、.NET Aspire RabbitMQ コンポーネントを、それを使用する各プロジェクトに追加します。

dotnet add package Aspire.RabbitMQ.Client

RabbitMQ メッセージ ブローカーへの参照を取得するには、AddRabbitMQClient() メソッドを呼び出します。

builder.AddRabbitMQClient("messaging");

これで、依存関係の挿入を使用して RabbitMQ への接続を取得できます。

public class CatalogAPI(IConnection rabbitConnection)
{
    // Send and receive messages here
}

この接続を使用した次の手順は、次のようにメッセージング チャネルを作成することです。

var channel = connection.CreateModel();

メッセージの送信

メッセージング チャネルの準備ができたら、それを使用して、メッセージング トポロジのキュー、交換、その他のコンポーネントを設定できます。 たとえば、キューを作成するには、次のコードを使用します。

channel.QueueDeclare(queue: "catalogEvents",
    durable: false,
    exclusive: false,
    autoDelete: false,
    arguments: null);

BasicPublish メソッドを使用してこのキューにメッセージを送信しますが、メッセージは本文がバイト配列であると想定しています。

var body = Encoding.UTF8.GetBytes("Getting all items in the catalog.");

channel.BasicPublish(exchange: string.Empty,
    routingKey: "catalogEvents",
    basicProperties: null,
    body: body);

メッセージの受信

受信側コンポーネントでは、送信者の場合と同じ方法でメッセージング チャネルとキューを作成します。 キュー名が、送信側コンポーネントで作成したものと一致していることを確認します。 そうしないと、2 つの個別のキューが作成され、メッセージが正しい宛先に到着しません。

新しい EventingBasicConsumer() メソッドを作成し、Received イベントを処理するメソッドを登録する必要があります。

var consumer = new EventingBasicConsumer(channel);
consumer.Received += ProcessMessageAsync;

メッセージ ハンドラーは、BasicDeliverEventArgs オブジェクトを使用して、メッセージ本文を含むメッセージのプロパティを取得します。 メッセージ本文は必ず逆シリアル化する必要があります。

private void ProcessMessageAsync(object? sender, BasicDeliverEventArgs args)
{
    string messagetext = Encoding.UTF8.GetString(args.Body.ToArray());
    logger.LogInformation("The message is: {text}", messagetext);
}

最後に、キューに新しいメッセージがあるかどうかを確認するには、BasicConsume() メソッドを呼び出します。

channel.BasicConsume(queue:  queueName,
                    autoAck: true, 
                    consumer: consumer);

詳細情報