我們應該先說明,如果您基於在容器中運行的 RabbitMQ 建立您自己的自定義事件總線,就像 eShopOnContainers 應用程式那樣,它應該僅用於開發和測試環境。 請勿將它用於生產環境,除非您將它建置為生產就緒服務總線的一部分,如 下列其他資源一節所述。 簡單的自定義事件總線可能缺少商業服務總線擁有的許多生產就緒關鍵功能。
eShopOnContainers 中的其中一個事件匯流排的自訂實作基本上是使用 RabbitMQ API 的函式庫。 (還有另一個以 Azure 服務總線為基礎的實作。
使用RabbitMQ的事件總線實作可讓微服務訂閱事件、發佈事件和接收事件,如圖 6-21 所示。
圖 6-21。 事件總線的RabbitMQ實作
RabbitMQ 可作為訊息發行者和訂閱者之間的媒介,以處理散發。 在程序代碼中,EventBusRabbitMQ 類別會實作泛型 IEventBus 介面。 此實作是以相依性插入為基礎,因此您可以將此開發/測試版本交換至生產版本。
public class EventBusRabbitMQ : IEventBus, IDisposable
{
// Implementation using RabbitMQ API
//...
}
範例開發/測試事件總線的RabbitMQ實作是重複使用的程序代碼。 它必須處理 RabbitMQ 伺服器的連線,並提供將訊息事件發佈至佇列的程式代碼。 它也必須針對每個事件類型實作整合事件處理程式集合的字典;這些事件類型可以針對每個接收者微服務有不同的具現化和不同的訂用帳戶,如圖 6-21 所示。
使用RabbitMQ實作簡單的發行方法
下列程式代碼是RabbitMQ事件總線實作的 簡化 版本,可展示整個案例。 您真的不會以這種方式處理連線。 若要查看完整的實作,請參閱 dotnet-architecture/eShopOnContainers 存放庫中的實際程序代碼。
public class EventBusRabbitMQ : IEventBus, IDisposable
{
// Member objects and other methods ...
// ...
public void Publish(IntegrationEvent @event)
{
var eventName = @event.GetType().Name;
var factory = new ConnectionFactory() { HostName = _connectionString };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: _brokerName,
type: "direct");
string message = JsonConvert.SerializeObject(@event);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: _brokerName,
routingKey: eventName,
basicProperties: null,
body: body);
}
}
}
eShopOnContainers 應用程式中的 Publish 方法 的實際程式代碼 已透過使用 Polly 重試策略得到改善。當 RabbitMQ 容器尚未就緒時,這個策略會多次重試該任務。 當 docker-compose 正在啟動容器時,就可能發生此案例;例如,RabbitMQ 容器的啟動速度可能會比其他容器慢。
如先前所述,RabbitMQ 中有許多可能的組態,因此此程式代碼應該只用於開發/測試環境。
使用RabbitMQ API實作訂用帳戶程式代碼
如同發行程序代碼,下列程式代碼會簡化RabbitMQ事件總線實作的一部分。 同樣地,您通常不需要變更它,除非您正在改善它。
public class EventBusRabbitMQ : IEventBus, IDisposable
{
// Member objects and other methods ...
// ...
public void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
var eventName = _subsManager.GetEventKey<T>();
var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
if (!containsKey)
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
using (var channel = _persistentConnection.CreateModel())
{
channel.QueueBind(queue: _queueName,
exchange: BROKER_NAME,
routingKey: eventName);
}
}
_subsManager.AddSubscription<T, TH>();
}
}
每個事件類型都有相關通道,可從 RabbitMQ 取得事件。 然後,就通道和事件類型而言,您可以擁有所需數量的事件處理程式。
Subscribe 方法接受 IIntegrationEventHandler 物件,就像目前微服務中的回呼方法,再加上其相關的 IntegrationEvent 物件。 然後,程式代碼會將那個事件處理器新增至每個用戶端微服務的每個整合事件類型可以具有的事件處理程式清單。 如果客戶端程式代碼尚未訂閱該事件,則程式代碼會為該事件類型建立一個管道,以便當事件從任何其他服務發佈時,可以以推送方式從 RabbitMQ 接收事件。
如上所述,eShopOnContainers 中實作的事件總線只有教育用途,因為它只會處理主要案例,因此尚未準備好生產環境。
在生產環境中,請檢閱下方的額外資源,包括 RabbitMQ 的特定資源,以及微服務間的事件型通訊實踐一節。
其他資源
支援RabbitMQ的生產就緒解決方案。
Peregrine Connect - 使用有效率的設計、部署和管理應用程式、API 和工作流程,簡化整合
https://www.peregrineconnect.com/why-peregrine/rabbitmq-integrationNServiceBus - 完全支援的商業服務總線,具有適用於 .NET 的進階管理和監視工具
https://particular.net/EasyNetQ - 適用於 RabbitMQ 的開放原始碼 .NET API 用戶端
https://easynetq.com/MassTransit - 適用於 .NET 的免費開放原始碼分散式應用程式架構
https://masstransit-project.com/Rebus - 開放原始碼 .NET 服務總線
https://github.com/rebus-org/Rebus