針對開發或測試環境使用 RabbitMQ 實作事件匯流排
提示
本內容節錄自《容器化 .NET 應用程式的 .NET 微服務架構》(.NET Microservices Architecture for Containerized .NET Applications) 電子書,可以在 .NET Docs 上取得,或免費下載可供離線閱讀的 PDF。
首先,若您依據容器中執行的 RabbitMQ 來建立自訂事件匯流排 (如同 eShopOnContainers 應用程式的做法),則僅應將該事件匯流排用於您的開發和測試環境。 請勿將其用於實際執行環境,除非您將其建置為已準備好投入生產環境的服務匯流排,如下方的「其他資源」一節所述。 簡單的自訂事件匯流排可能會遺失許多商業服務匯流排所具備並可供生產環境使用的重要功能。
eShopOnContainers 的其中一個事件匯流排自訂實作基本上是使用 RabbitMQ API 的程式庫。 (還有另一個實作是採用 Azure 服務匯流排。)
使用 RabbitMQ 實作事件匯流排可讓微服務訂閱事件、發行事件和接收事件,如圖 6-21 所示。
圖 6-12。 事件匯流排的 RabbitMQ 實作
RabbitMQ 的功能是訊息發行者與訂閱者之間的媒介,用來處理散發。 在程式碼中,EventBusRabbitMQ 類別會實作泛型 IEventBus 介面。 此實作是以相依性插入為基礎,因此您可以從此開發/測試版本切換至正式版本。
public class EventBusRabbitMQ : IEventBus, IDisposable
{
// Implementation using RabbitMQ API
//...
}
範例開發/測試事件匯流排的 RabbitMQ 實作是未定案程式碼。 它必須處理 RabbitMQ 伺服器的連接,並提供程式碼將訊息事件發行到佇列。 它也必須實作每個事件類型之整合事件處理常式集合的字典,這些事件類型對每個接收者微服務的具現化和訂閱可能都不同,如圖 6-21 所示。
使用 RabbitMQ 實作簡單的發行方法
下列程式碼是簡化版本的 RabbitMQ 事件匯流排實作,目的是展示整個情節。 您並不會真的這樣處理連線。 若要查看完整的實作,請參閱 dotnet-architecture/eShopOnContainers 存放庫中的實際程式碼。
public class EventBusRabbitMQ : IEventBus, IDisposable
{
// Member objects and other methods ...
// ...
public void Publish(IntegrationEvent @event)
{
var eventName = @event.GetType().Name;
var factory = new ConnectionFactory() { HostName = _connectionString };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: _brokerName,
type: "direct");
string message = JsonConvert.SerializeObject(@event);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: _brokerName,
routingKey: eventName,
basicProperties: null,
body: body);
}
}
}
eShopOnContainers 應用程式中發佈方法的實際程式碼可透過使用 Polly 重試原則來改進,該原則會在 RabbitMQ 容器未就緒時,重試數次工作。 當 docker-compose 正在啟動容器時,可能就會發生此情況;例如,RabbitMQ 容器的啟動速度可能會比其他容器更慢。
如前所述,RabbitMQ 中有許多可能的組態,因此這段程式碼只應該用於開發/測試環境。
使用 RabbitMQ API 實作訂閱程式碼
如同發行程式碼,下列程式碼是 RabbitMQ 之事件匯流排實作的一部分簡化。 同樣地,除非您想要改進,否則您通常不需要將它變更。
public class EventBusRabbitMQ : IEventBus, IDisposable
{
// Member objects and other methods ...
// ...
public void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
var eventName = _subsManager.GetEventKey<T>();
var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
if (!containsKey)
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
using (var channel = _persistentConnection.CreateModel())
{
channel.QueueBind(queue: _queueName,
exchange: BROKER_NAME,
routingKey: eventName);
}
}
_subsManager.AddSubscription<T, TH>();
}
}
每個事件類型會有相關的通道以從 RabbitMQ 取得事件。 之後,您可以視需要針對每個通道和事件類型,擁有許多事件處理常式。
Subscribe 方法接受 IIntegrationEventHandler 物件,就像是目前微服務及其相關 IntegrationEvent 物件中的回呼方法。 此程式碼接著會將該事件處理常式新增至事件處理常式清單,每個整合事件類型可根據每個用戶端微服務擁有這些事件處理常式。 如果用戶端程式碼尚未訂閱事件,程式碼會建立事件類型的通道,以便在從任何其他服務發行該事件時,可從 RabbitMQ 接收推送樣式的事件。
如上所述,由於 eShopOnContainers 中實作的事件匯流排僅會處理主要案例,尚未準備好投入生產環境,因此僅供教學使用。
如果是生產案例,請查看下方專適用於 RabbitMQ 的其他資源,以及實作微服務之間的事件通訊一節。
其他資源
支援 RabbitMQ 且已準備好投入生產環境的解決方案。
Peregrine 連線 - 使用有效率的設計、部署和管理應用程式、API 和工作流程,簡化您的整合
https://www.peregrineconnect.com/why-peregrine/rabbitmq-integrationNServiceBus:受完整支援的商業服務匯流排,其中包含可用於 .NET 的進階管理和監視工具
https://particular.net/EasyNetQ:適用於 RabbitMQ 的開放原始碼 .NET API 用戶端
https://easynetq.com/MassTransit:適用於 .NET 的免費開放原始碼分散式應用程式架構
https://masstransit-project.com/Rebus:開放原始碼 .NET 服務匯流排
https://github.com/rebus-org/Rebus