Wysyłanie komunikatów za pomocą programu RabbitMQ

Ukończone

Proste jest pisanie kodu, który tworzy kolejki, wysyła komunikaty i odbiera komunikaty z rabbitMQ. W rozwiązaniu .NET Aspire masz również pomoc w tworzeniu kontenera RabbitMQ i nawiązywaniu z nim połączeń z mikrousługami.

W sklepie detalicznym sprzętu zewnętrznego zdecydowano się wdrożyć RabbitMQ jako scentralizowany broker komunikatów dla witryny internetowej katalogu produktów przeznaczonych dla klientów. Chcesz użyć składnika .NET Aspire RabbitMQ do zarządzania tym brokerem i jego kolejkami.

W tej lekcji dowiesz się, jak utworzyć kontener RabbitMQ i użyć go do wysyłania i odbierania komunikatów.

Korzystanie ze składnika .NET Aspire RabbitMQ

W przypadku korzystania z programu RabbitMQ z platformy .NET zwykle trzeba utworzyć ConnectionFactory obiekt z parametry połączenia, a następnie użyć go do nawiązywania połączeń z usługą. W projekcie .NET Aspire łatwiej jest zarządzać połączeniem RabbitMQ, ponieważ:

  • Należy zarejestrować połączenie i parametry połączenia w projekcie AppHost.
  • Po przekazaniu odwołania do usługi do korzystania z projektów mogą używać iniekcji zależności, aby uzyskać połączenie z RabbitMQ. Nie muszą tworzyć i konfigurować własnych połączeń.

Konfigurowanie programu RabbitMQ na hoście aplikacji

Na platformie .NET Aspire należy zainstalować składnik hostingu Rabbit MQ na hoście aplikacji:

dotnet add package Aspire.Hosting.RabbitMQ

Teraz możesz zarejestrować usługę RabbitMQ i przekazać ją do projektów, które go używają:

// Service registration
var rabbit = builder.AddRabbitMQ("messaging");

// Service consumption
builder.AddProject<Projects.CatalogAPI>()
       .WithReference(rabbit);

Narzędzie AppHost zarządza połączeniem dla wszystkich projektów w rozwiązaniu.

Konfigurowanie funkcji Rab

Następnie dodaj składnik .NET Aspire RabbitMQ do każdego projektu, który go używa:

dotnet add package Aspire.RabbitMQ.Client

Aby uzyskać odwołanie do brokera komunikatów RabbitMQ, wywołaj metodę AddRabbitMQClient() :

builder.AddRabbitMQClient("messaging");

Teraz możesz użyć iniekcji zależności, aby uzyskać połączenie z RabbitMQ:

public class CatalogAPI(IConnection rabbitConnection)
{
    // Send and receive messages here
}

Następnym krokiem połączenia jest utworzenie kanału obsługi komunikatów w następujący sposób:

var channel = connection.CreateModel();

Wysyłanie komunikatów

Po utworzeniu kanału obsługi komunikatów można go użyć do konfigurowania kolejek, wymiany i innych składników topologii obsługi komunikatów. Aby na przykład utworzyć kolejkę, użyj następującego kodu:

channel.QueueDeclare(queue: "catalogEvents",
    durable: false,
    exclusive: false,
    autoDelete: false,
    arguments: null);

Metoda służy BasicPublish do wysyłania komunikatu do tej kolejki, ale komunikat oczekuje, że treść będzie tablicą bajtów:

var body = Encoding.UTF8.GetBytes("Getting all items in the catalog.");

channel.BasicPublish(exchange: string.Empty,
    routingKey: "catalogEvents",
    basicProperties: null,
    body: body);

Odbieranie komunikatów

W składniku odbierającym tworzysz kanał obsługi komunikatów i kolejkę w taki sam sposób jak dla nadawcy. Upewnij się, że nazwa kolejki jest zgodna z nazwą utworzoną w składniku wysyłającym. W przeciwnym razie utworzysz dwie oddzielne kolejki i komunikaty nie będą docierać do poprawnego miejsca docelowego.

Musisz utworzyć nową EventingBasicConsumer() metodę i zarejestrować metodę do obsługi Received zdarzenia:

var consumer = new EventingBasicConsumer(channel);
consumer.Received += ProcessMessageAsync;

Procedura obsługi komunikatów używa BasicDeliverEventArgs obiektu do uzyskania właściwości komunikatu, w tym treści komunikatu. Pamiętaj, aby deserializować treść wiadomości:

private void ProcessMessageAsync(object? sender, BasicDeliverEventArgs args)
{
    string messagetext = Encoding.UTF8.GetString(args.Body.ToArray());
    logger.LogInformation("The message is: {text}", messagetext);
}

Na koniec, aby sprawdzić kolejkę nowych komunikatów, wywołaj metodę BasicConsume() ;

channel.BasicConsume(queue:  queueName,
                    autoAck: true, 
                    consumer: consumer);

Dowiedz się więcej