Senden von Nachrichten mit RabbitMQ
Es ist einfach, Code zu schreiben, der Warteschlangen erstellt, Nachrichten sendet und Nachrichten von RabbitMQ empfängt. In einer .NET Aspire-Lösung erhalten Sie außerdem Hilfe beim Erstellen des RabbitMQ-Containers und beim Herstellen von Verbindungen mit diesem Container über Microservices.
Bei Ihrem Einzelhändler für Outdoorausrüstung haben Sie sich entschieden, RabbitMQ als zentralen Nachrichtenbroker für Ihre für Kunden zugängliche Produktkatalogwebsite zu implementieren. Sie möchten die .NET Aspire RabbitMQ-Integration verwenden, um diesen Broker und seine Warteschlangen zu verwalten.
In dieser Lerneinheit erfahren Sie, wie Sie einen RabbitMQ-Container erstellen und verwenden, um Nachrichten zu senden und zu empfangen.
Verwenden der .NET Aspire RabbitMQ-Integration
Wenn Sie RabbitMQ aus .NET verwenden, müssen Sie in der Regel ein ConnectionFactory
-Objekt mit einer Verbindungszeichenfolge erstellen und dann verwenden, um Verbindungen mit dem Dienst herzustellen. In einem .NET Aspire-Projekt ist es aus folgenden Gründen einfacher, die RabbitMQ-Verbindung zu verwalten:
- Sie registrieren eine Verbindung und eine Verbindungszeichenfolge im AppHost-Projekt.
- Wenn Sie einen Verweis auf den Dienst an zugreifende Projekte übergeben, können sie die Abhängigkeitsinjektion verwenden, um eine Verbindung mit RabbitMQ herzustellen. Sie müssen keine eigenen Verbindungen erstellen und konfigurieren.
Konfigurieren von RabbitMQ im App-Host
In .NET Aspire müssen Sie die Rabbit MQ-Hostingintegration im App-Host installieren:
dotnet add package Aspire.Hosting.RabbitMQ
Jetzt können Sie den RabbitMQ-Dienst registrieren und an Projekte übergeben, die ihn verwenden:
// Service registration
var rabbit = builder.AddRabbitMQ("messaging");
// Service consumption
builder.AddProject<Projects.CatalogAPI>()
.WithReference(rabbit);
AppHost
verwaltet die Verbindung für alle Projekte in der Lösung.
Konfigurieren von Rab
Fügen Sie als Nächstes die .NET Aspire RabbitMQ-Integration zu jedem Projekt hinzu, das sie verwendet:
dotnet add package Aspire.RabbitMQ.Client
Rufen Sie die AddRabbitMQClient()
-Methode auf, um einen Verweis auf den RabbitMQ-Nachrichtenbroker zu erhalten:
builder.AddRabbitMQClient("messaging");
Jetzt können Sie die Abhängigkeitsinjektion verwenden, um die Verbindung mit RabbitMQ herzustellen:
public class CatalogAPI(IConnection rabbitConnection)
{
// Send and receive messages here
}
Bei hergestellter Verbindung besteht der nächste Schritt darin, wie folgt einen Messagingkanal zu erstellen:
var channel = connection.CreateModel();
Senden von Meldungen
Sobald Sie den Messagingkanal erstellt haben, können Sie ihn zum Einrichten von Warteschlangen, des Austausches und von anderen Integrationen Ihrer Messagingtopologie verwenden. Verwenden Sie zum Erstellen einer Warteschlange beispielsweise den folgenden Code:
channel.QueueDeclare(queue: "catalogEvents",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
Sie verwenden die BasicPublish
-Methode, um eine Nachricht an diese Warteschlange zu senden, aber die Nachricht erwartet, dass der Textkörper ein Bytearray ist:
var body = Encoding.UTF8.GetBytes("Getting all items in the catalog.");
channel.BasicPublish(exchange: string.Empty,
routingKey: "catalogEvents",
basicProperties: null,
body: body);
Empfangen von Nachrichten
In der empfangenden Integration erstellen Sie den Messagingkanal und die Warteschlange auf die gleiche Weise wie für den Absender. Stellen Sie sicher, dass der Warteschlangenname dem in der sendenden Integration erstellten Namen entspricht. Andernfalls erstellen Sie zwei separate Warteschlangen, und Nachrichten gelangen nicht zum richtigen Ziel.
Sie müssen eine neue EventingBasicConsumer()
-Methode erstellen und eine Methode registrieren, um das Received
-Ereignis zu behandeln:
var consumer = new EventingBasicConsumer(channel);
consumer.Received += ProcessMessageAsync;
Der Nachrichtenhandler verwendet ein BasicDeliverEventArgs
-Objekt, um die Eigenschaften der Nachricht abzurufen, einschließlich des Nachrichtentexts. Vergessen Sie nicht, den Nachrichtentext zu deserialisieren:
private void ProcessMessageAsync(object? sender, BasicDeliverEventArgs args)
{
string messagetext = Encoding.UTF8.GetString(args.Body.ToArray());
logger.LogInformation("The message is: {text}", messagetext);
}
Rufen Sie schließlich die BasicConsume()
-Methode auf, um die Warteschlange auf neue Nachrichten zu überprüfen:
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);