Отправка сообщений с помощью RabbitMQ

Завершено

Просто написать код, который создает очереди, отправляет сообщения и получает сообщения из RabbitMQ. В решении .NET Aspire также есть помощь в создании контейнера RabbitMQ и подключений к нему из микрослужб.

Вы решили реализовать RabbitMQ в качестве централизованного брокера сообщений для веб-сайта каталога продуктов с поддержкой клиентов. Вы хотите использовать интеграцию .NET Aspire RabbitMQ для управления этим брокером и его очередями.

В этом уроке вы узнаете, как создать контейнер RabbitMQ и использовать его для отправки и получения сообщений.

Использование интеграции .NET Aspire RabbitMQ

При использовании RabbitMQ из .NET обычно необходимо создать ConnectionFactory объект с строка подключения, а затем использовать его для подключения к службе. В проекте .NET Aspire проще управлять подключением RabbitMQ, так как:

  • Вы регистрируете подключение и строка подключения в проекте AppHost.
  • При передаче ссылки на службу на использование проектов они могут использовать внедрение зависимостей для подключения к RabbitMQ. Им не нужно создавать и настраивать собственные подключения.

Настройка RabbitMQ в узле приложения

В .NET Aspire необходимо установить интеграцию размещения Rabbit MQ в узле приложения:

dotnet add package Aspire.Hosting.RabbitMQ

Теперь вы можете зарегистрировать службу RabbitMQ и передать ее в проекты, которые используют его:

// Service registration
var rabbit = builder.AddRabbitMQ("messaging");

// Service consumption
builder.AddProject<Projects.CatalogAPI>()
       .WithReference(rabbit);

Управление AppHost подключением для всех проектов в решении.

Настройка Rab

Затем добавьте интеграцию .NET Aspire RabbitMQ в каждый проект, использующий его:

dotnet add package Aspire.RabbitMQ.Client

Чтобы получить ссылку на брокер сообщений RabbitMQ, вызовите AddRabbitMQClient() метод:

builder.AddRabbitMQClient("messaging");

Теперь можно использовать внедрение зависимостей для получения подключения к RabbitMQ:

public class CatalogAPI(IConnection rabbitConnection)
{
    // Send and receive messages here
}

При подключении следующим шагом является создание канала обмена сообщениями, как показано ниже.

var channel = connection.CreateModel();

Отправка сообщений

После создания канала обмена сообщениями его можно использовать для настройки очередей, обмена и других интеграции топологии обмена сообщениями. Например, чтобы создать очередь, используйте следующий код:

channel.QueueDeclare(queue: "catalogEvents",
    durable: false,
    exclusive: false,
    autoDelete: false,
    arguments: null);

Метод используется BasicPublish для отправки сообщения в эту очередь, но сообщение ожидает, что текст будет массивом байтов:

var body = Encoding.UTF8.GetBytes("Getting all items in the catalog.");

channel.BasicPublish(exchange: string.Empty,
    routingKey: "catalogEvents",
    basicProperties: null,
    body: body);

Получение сообщений

При получении интеграции вы создаете канал обмена сообщениями и очередь таким же образом, как и отправитель. Убедитесь, что имя очереди совпадает с именем очереди, созданной при отправке интеграции. В противном случае вы создадите две отдельные очереди и сообщения не будут поступать в правильное место назначения.

Необходимо создать новый EventingBasicConsumer() метод и зарегистрировать метод для обработки Received события:

var consumer = new EventingBasicConsumer(channel);
consumer.Received += ProcessMessageAsync;

Обработчик сообщений использует BasicDeliverEventArgs объект для получения свойств сообщения, включая текст сообщения. Необходимо помнить, чтобы десериализировать текст сообщения:

private void ProcessMessageAsync(object? sender, BasicDeliverEventArgs args)
{
    string messagetext = Encoding.UTF8.GetString(args.Body.ToArray());
    logger.LogInformation("The message is: {text}", messagetext);
}

Наконец, чтобы проверить очередь для новых сообщений, вызовите BasicConsume() метод;

channel.BasicConsume(queue:  queueName,
                    autoAck: true, 
                    consumer: consumer);

Подробнее