Отправка сообщений с помощью RabbitMQ
Просто написать код, который создает очереди, отправляет сообщения и получает сообщения из RabbitMQ. В решении .NET Aspire также есть помощь в создании контейнера RabbitMQ и подключений к нему из микрослужб.
Вы решили реализовать RabbitMQ в качестве централизованного брокера сообщений для веб-сайта каталога продуктов с поддержкой клиентов. Вы хотите использовать интеграцию .NET Aspire RabbitMQ для управления этим брокером и его очередями.
В этом уроке вы узнаете, как создать контейнер RabbitMQ и использовать его для отправки и получения сообщений.
Использование интеграции .NET Aspire RabbitMQ
При использовании RabbitMQ из .NET обычно необходимо создать ConnectionFactory
объект с строка подключения, а затем использовать его для подключения к службе. В проекте .NET Aspire проще управлять подключением RabbitMQ, так как:
- Вы регистрируете подключение и строка подключения в проекте AppHost.
- При передаче ссылки на службу на использование проектов они могут использовать внедрение зависимостей для подключения к RabbitMQ. Им не нужно создавать и настраивать собственные подключения.
Настройка RabbitMQ в узле приложения
В .NET Aspire необходимо установить интеграцию размещения Rabbit MQ в узле приложения:
dotnet add package Aspire.Hosting.RabbitMQ
Теперь вы можете зарегистрировать службу RabbitMQ и передать ее в проекты, которые используют его:
// Service registration
var rabbit = builder.AddRabbitMQ("messaging");
// Service consumption
builder.AddProject<Projects.CatalogAPI>()
.WithReference(rabbit);
Управление AppHost
подключением для всех проектов в решении.
Настройка Rab
Затем добавьте интеграцию .NET Aspire RabbitMQ в каждый проект, использующий его:
dotnet add package Aspire.RabbitMQ.Client
Чтобы получить ссылку на брокер сообщений RabbitMQ, вызовите AddRabbitMQClient()
метод:
builder.AddRabbitMQClient("messaging");
Теперь можно использовать внедрение зависимостей для получения подключения к RabbitMQ:
public class CatalogAPI(IConnection rabbitConnection)
{
// Send and receive messages here
}
При подключении следующим шагом является создание канала обмена сообщениями, как показано ниже.
var channel = connection.CreateModel();
Отправка сообщений
После создания канала обмена сообщениями его можно использовать для настройки очередей, обмена и других интеграции топологии обмена сообщениями. Например, чтобы создать очередь, используйте следующий код:
channel.QueueDeclare(queue: "catalogEvents",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
Метод используется BasicPublish
для отправки сообщения в эту очередь, но сообщение ожидает, что текст будет массивом байтов:
var body = Encoding.UTF8.GetBytes("Getting all items in the catalog.");
channel.BasicPublish(exchange: string.Empty,
routingKey: "catalogEvents",
basicProperties: null,
body: body);
Получение сообщений
При получении интеграции вы создаете канал обмена сообщениями и очередь таким же образом, как и отправитель. Убедитесь, что имя очереди совпадает с именем очереди, созданной при отправке интеграции. В противном случае вы создадите две отдельные очереди и сообщения не будут поступать в правильное место назначения.
Необходимо создать новый EventingBasicConsumer()
метод и зарегистрировать метод для обработки Received
события:
var consumer = new EventingBasicConsumer(channel);
consumer.Received += ProcessMessageAsync;
Обработчик сообщений использует BasicDeliverEventArgs
объект для получения свойств сообщения, включая текст сообщения. Необходимо помнить, чтобы десериализировать текст сообщения:
private void ProcessMessageAsync(object? sender, BasicDeliverEventArgs args)
{
string messagetext = Encoding.UTF8.GetString(args.Body.ToArray());
logger.LogInformation("The message is: {text}", messagetext);
}
Наконец, чтобы проверить очередь для новых сообщений, вызовите BasicConsume()
метод;
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);