使用 RabbitMQ 实现事件总线以用于开发或测试环境

小窍门

此内容摘自电子书《适用于容器化 .NET 应用程序的 .NET 微服务体系结构》,可以在 .NET Docs 上获取,也可以下载免费的 PDF 以供离线阅读。

适用于容器化 .NET 应用程序的 .NET 微服务体系结构电子书封面缩略图。

我们应该首先说,如果你基于在容器中运行的 RabbitMQ 创建自定义事件总线,正如 eShopOnContainers 应用程序所做的那样,它应该仅用于开发和测试环境。 请勿将其用于生产环境,除非要将其构建为生产就绪服务总线的一部分,如 下面的“其他资源”部分所述。 简单的自定义事件总线可能缺少商业服务总线具有的许多生产就绪关键功能。

eShopOnContainers 中的事件总线自定义实现之一基本上是一个使用 RabbitMQ API 的库。 (还有另一个基于 Azure 服务总线的实现。

使用 RabbitMQ 的事件总线实现允许微服务订阅事件、发布事件和接收事件,如图 6-21 所示。

显示消息发送方和消息接收方之间的 RabbitMQ 的关系图。

图 6-21. 事件总线的 RabbitMQ 实现

RabbitMQ 充当消息发布服务器和订阅服务器之间的中介,用于处理分发。 在代码中,EventBusRabbitMQ 类实现泛型 IEventBus 接口。 此实现基于依赖关系注入,以便你可以从此开发/测试版本交换到生产版本。

public class EventBusRabbitMQ : IEventBus, IDisposable
{
    // Implementation using RabbitMQ API
    //...
}

示例开发/测试事件总线的 RabbitMQ 实现是样本代码。 它必须处理与 RabbitMQ 服务器的连接,并提供用于将消息事件发布到队列的代码。 它还必须为每个事件类型实现集成事件处理程序集合的字典;这些事件类型可以为每个接收方微服务使用不同的实例化和不同的订阅,如图 6-21 所示。

使用 RabbitMQ 实现简单的发布方法

以下代码是 RabbitMQ 事件总线实现的 简化 版本,用于展示整个方案。 你真的不必以这种方式处理连接。 若要查看完整的实现,请参阅 dotnet-architecture/eShopOnContainers 存储库中的实际代码。

public class EventBusRabbitMQ : IEventBus, IDisposable
{
    // Member objects and other methods ...
    // ...

    public void Publish(IntegrationEvent @event)
    {
        var eventName = @event.GetType().Name;
        var factory = new ConnectionFactory() { HostName = _connectionString };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: _brokerName,
                type: "direct");
            string message = JsonConvert.SerializeObject(@event);
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: _brokerName,
                routingKey: eventName,
                basicProperties: null,
                body: body);
       }
    }
}

eShopOnContainers 应用程序中 Publish 方法 的实际代码 通过使用 Polly 重试策略进行了改进,当 RabbitMQ 容器未就绪时进行多次重试。 docker-compose 正在启动容器时,可能会出现这种情况;例如,RabbitMQ 容器的启动速度可能比其他容器慢。

如前所述,RabbitMQ 中有许多可能的配置,因此此代码应仅用于开发/测试环境。

使用 RabbitMQ API 实现订阅代码

与发布代码一样,以下代码是 RabbitMQ 事件总线实现的一部分简化。 同样,通常不需要更改它,除非你正在改进它。

public class EventBusRabbitMQ : IEventBus, IDisposable
{
    // Member objects and other methods ...
    // ...

    public void Subscribe<T, TH>()
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>
    {
        var eventName = _subsManager.GetEventKey<T>();

        var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
        if (!containsKey)
        {
            if (!_persistentConnection.IsConnected)
            {
                _persistentConnection.TryConnect();
            }

            using (var channel = _persistentConnection.CreateModel())
            {
                channel.QueueBind(queue: _queueName,
                                    exchange: BROKER_NAME,
                                    routingKey: eventName);
            }
        }

        _subsManager.AddSubscription<T, TH>();
    }
}

每个事件类型都有一个相关通道,用于从 RabbitMQ 获取事件。 然后,可以根据需要为每个通道和事件类型拥有任意数量的事件处理程序。

Subscribe 方法接受 IIntegrationEventHandler 对象,该对象类似于当前微服务中的回调方法及其相关的 IntegrationEvent 对象。 然后,该代码将该事件处理程序添加到每个集成事件类型的事件处理程序列表中,每个事件类型可对应于不同的客户端微服务。 如果客户端代码尚未订阅事件,则代码会为事件类型创建通道,以便在从任何其他服务发布事件时从 RabbitMQ 接收推送样式的事件。

如上所述,在 eShopOnContainers 中实现的事件总线仅用于教育目的,因为它只处理主要场景,尚不可用于生产环境。

对于生产环境,请查看以下额外资源,其中包括 RabbitMQ 特定的资源以及在微服务之间实现基于事件的通信的部分。

其他资源

支持 RabbitMQ 的生产就绪解决方案。