使用事件總線的第一個步驟是將微服務訂閱到他們想要接收的事件。 該功能應在接收者微服務中完成。
下列簡單程式碼顯示啟動服務時,每個接收者微服務需要在 Startup 類別中實作的部分,以便訂閱所需的事件。 在此情況下, basket-api 微服務必須訂閱 ProductPriceChangedIntegrationEvent 和 OrderStartedIntegrationEvent 訊息。
例如,當訂閱 ProductPriceChangedIntegrationEvent 事件時,購物籃微服務可以知道產品價格的變動,並且如果該產品位於使用者的購物籃中,即可警告使用者有變更。
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
eventBus.Subscribe<ProductPriceChangedIntegrationEvent,
ProductPriceChangedIntegrationEventHandler>();
eventBus.Subscribe<OrderStartedIntegrationEvent,
OrderStartedIntegrationEventHandler>();
執行此程式代碼之後,訂閱者微服務會透過RabbitMQ通道接聽。 當 ProductPriceChangedIntegrationEvent 類型的任何訊息送達時,程式代碼會叫用傳遞給它的事件處理程式,並處理事件。
透過事件總線發佈事件
最後,訊息傳送者(原始微服務)會以類似下列範例的程式代碼發佈整合事件。 這種方法是一個簡化的範例,未考慮到原子性。每當事件必須傳播到多個微服務時,通常是在源微服務認可數據或交易之後,您都會撰寫類似的程式碼。
首先,事件總線實作對象(基於 RabbitMQ 或服務總線)會注入至控制器的建構子,如下列程式碼所示:
[Route("api/v1/[controller]")]
public class CatalogController : ControllerBase
{
private readonly CatalogContext _context;
private readonly IOptionsSnapshot<Settings> _settings;
private readonly IEventBus _eventBus;
public CatalogController(CatalogContext context,
IOptionsSnapshot<Settings> settings,
IEventBus eventBus)
{
_context = context;
_settings = settings;
_eventBus = eventBus;
}
// ...
}
然後,您可以從控制器的各個方法使用它,例如 UpdateProduct 方法:
[Route("items")]
[HttpPost]
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem product)
{
var item = await _context.CatalogItems.SingleOrDefaultAsync(
i => i.Id == product.Id);
// ...
if (item.Price != product.Price)
{
var oldPrice = item.Price;
item.Price = product.Price;
_context.CatalogItems.Update(item);
var @event = new ProductPriceChangedIntegrationEvent(item.Id,
item.Price,
oldPrice);
// Commit changes in original transaction
await _context.SaveChangesAsync();
// Publish integration event to the event bus
// (RabbitMQ or a service bus underneath)
_eventBus.Publish(@event);
// ...
}
// ...
}
在此情況下,因為原始微服務是簡單的 CRUD 微服務,該程式代碼會直接放在 Web API 控制器中。
在更進階的微服務中,例如使用 CQRS 方法時,它可以在 CommandHandler 類別的 Handle() 方法內實作。
在發佈至事件總線時設計原子性和韌性
當您透過像事件總線這樣的分散式訊息系統發佈整合事件時,您會面臨原始資料庫的原子性更新以及事件發佈的問題(也就是說,兩項作業必須要麼全部完成,要麼全部未完成)。 例如,在稍早所示的簡化範例中,程式代碼會在產品價格變更后,將數據認可至資料庫,然後發佈 ProductPriceChangedIntegrationEvent 訊息。 一開始,這兩項作業可能看起來很必要以原子性地執行。 不過,如果您使用涉及資料庫和訊息代理程式的分散式交易,就像在舊版系統中所做的一樣,例如 Microsoft消息佇列 (MSMQ),不建議使用此方法,因為 CAP 定理所描述的原因。
基本上,您會使用微服務來建置可調整且高可用性的系統。 CAP 定理稍微簡化來說,表示您無法建置一個(分散式)資料庫(或擁有其模型的微服務)同時具備持續可用、強一致性且能夠容忍任何分割的。 您必須選擇這三個屬性中的兩個。
在微服務架構中,您應該選擇可用性和容錯,而且應該取消強調強式一致性。 因此,在大部分的新式微服務型應用程式中,您通常不想在傳訊中使用分散式交易,就像使用 MSMQ 實作以 Windows 分散式交易協調器 (DTC) 為基礎的分散式交易時所做的一樣。
讓我們回到初始問題及其範例。 如果服務在資料庫更新后當機(在此案例中,在程式 _context.SaveChangesAsync()代碼行後面加上 ),但在發佈整合事件之前,整體系統可能會變得不一致。 根據您處理的特定商務作業而定,這種方法可能是業務關鍵。
如先前的架構一節所述,您可以有數種方法來處理此問題:
在此案例中,使用完整的事件來源(ES)模式是其中一個最佳方法,甚至可能是最佳方法。 不過,在許多應用程式案例中,您可能無法實作完整的ES系統。 ES 表示只儲存交易資料庫中的網域事件,而不是儲存目前的狀態數據。 只儲存網域事件可能會有極大的好處,例如擁有您系統可用的歷程記錄,以及能夠判斷您系統在過去任何時刻的狀態。 不過,實作完整的ES系統需要重新架構大部分的系統,並導入許多其他複雜度和需求。 例如,您想要使用特別針對事件來源所建立的資料庫,例如 事件存放區,或文件導向資料庫,例如 Azure Cosmos DB、MongoDB、Cassandra、CouchDB 或 RavenDB。 事件溯源是解決此問題的非常好的方法,但除非您已經熟悉事件溯源,否則它不是最簡單的解決方案。
使用事務歷史記錄採礦的選項一開始看起來是透明的。 不過,若要使用此方法,微服務必須結合至 RDBMS 事務歷史記錄,例如 SQL Server 事務歷史記錄。 這種方法可能不理想。 另一個缺點是,事務歷史記錄中記錄的低層級更新可能與高階整合事件不相同。 如果是,則反向工程這些交易記錄作業可能會很困難。
平衡的方法混合了交易資料庫數據表和簡化的 ES 模式。 您可以使用例如「準備好發佈事件」這樣的狀態,並在將事件提交到整合事件表時在原始事件中設定。 然後,您嘗試將事件發佈至事件總線。 如果 publish-event 動作成功,您會在源服務中啟動另一個交易,並將狀態從「準備好發佈事件」移至「已發行的事件」。
如果事件總線中的 publish-event 動作失敗,原始微服務中的數據仍然不會不一致,它仍然標示為「已準備好發佈事件」,而至於其餘的服務,它最終會保持一致。 您隨時都可以有背景工作程序檢查交易或整合項目的狀態。 如果作業在「準備好發佈事件」狀態中找到事件,它可以嘗試將該事件重新發佈至事件總線。
請注意,使用此方法時,您只會保存每個原始微服務的整合事件,而只保存您想要與其他微服務或外部系統通訊的事件。 相反地,在完整的 ES 系統中,您也會儲存所有網域事件。
因此,這種平衡的方法是簡化的ES系統。 您需要包含其目前狀態的整合事件清單(「已準備好發佈」與「已發佈」)。 但您只需要針對整合事件實作這些狀態。 在這種方法中,您不需要將所有網域數據儲存為交易式資料庫中的事件,就像在完整的ES系統中一樣。
如果您已經使用關係資料庫,您可以使用交易數據表來儲存整合事件。 若要在應用程式中達到原子性,您可以使用以本地交易為基礎的兩步驟過程。 基本上,您在擁有網域實體的相同資料庫中有 IntegrationEvent 數據表。 該資料表作為保證交易原子性的手段,確保您能在提交網域數據的同一交易中包含已持久化的整合事件。
逐步執行流程如下:
應用程式開始進行本機資料庫交易。
然後,它會更新網域實體的狀態,並將事件插入整合事件數據表中。
最後,它會認可交易,因此您可以確保所需的原子性,然後
您以某種方式發佈事件(下一步)。
實作發佈事件的步驟時,您有下列選擇:
提交交易後立即發佈集成事件,並使用另一個本地交易將資料表中的事件標示為已發佈。 然後,使用數據表就像成品一樣,在遠端微服務中發生問題時追蹤整合事件,並根據預存整合事件執行補償動作。
使用數據表做為佇列類型。 個別的應用程式線程或進程會查詢整合事件數據表、將事件發佈至事件總線,然後使用本機交易將事件標示為已發佈。
圖 6-22 顯示這些方法的第一個架構。
圖 6-22。 發佈事件至事件總線時的原子性
圖 6-22 中說明的方法遺漏了額外的背景工作微服務,負責檢查和確認已發佈整合事件的成功。 如果失敗,該額外的檢查工作微服務可以從資料表讀取事件,並重新發布它們,也就是重複步驟 2。
關於第二種方法:您會使用 EventLog 表作為佇列,並且總是使用工作者微服務來發佈消息。 在此情況下,程式就像如圖 6-23 所示。 這會顯示額外的微服務,而數據表是發佈事件時的單一來源。
圖 6-23。 發布事件至事件總線的原子性及使用工作者微服務的考量
為了簡單起見,eShopOnContainers 範例會使用第一種方法(沒有額外的進程或檢查程式微服務),再加上事件總線。 不過,eShopOnContainers 範例不會處理所有可能的失敗案例。 在部署至雲端的實際應用程式中,您必須接受最終會發生問題的事實,而且您必須實作該檢查和重新傳送邏輯。 如果您將這個數據表作為單一事件來源,並透過工作者透過事件總線發佈事件,那麼使用數據表作為佇列可能會比第一種方法更有效。
透過事件總線發佈整合事件時實作原子性
下列程式代碼示範如何建立涉及多個 DbContext 物件的單一交易,其中一個內容與要更新的原始數據相關,以及與 IntegrationEventLog 數據表相關的第二個內容。
如果資料庫連接在程式碼執行期間出現任何問題,下列範例程式碼中的交易將無法保持彈性。 這可能發生在 Azure SQL DB 之類的雲端系統中,可能會跨伺服器遷移資料庫。 如需在多個環境中實作具韌性的交易,請在本指南稍後的實作具韌性 Entity Framework Core SQL 連線一節中查看。
為了清楚起見,下列範例會在單一程式代碼中顯示整個程式。 不過,eShopOnContainers 實作會重構,並將此邏輯分割成多個類別,因此更容易維護。
// Update Product from the Catalog microservice
//
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem productToUpdate)
{
var catalogItem =
await _catalogContext.CatalogItems.SingleOrDefaultAsync(i => i.Id ==
productToUpdate.Id);
if (catalogItem == null) return NotFound();
bool raiseProductPriceChangedEvent = false;
IntegrationEvent priceChangedEvent = null;
if (catalogItem.Price != productToUpdate.Price)
raiseProductPriceChangedEvent = true;
if (raiseProductPriceChangedEvent) // Create event if price has changed
{
var oldPrice = catalogItem.Price;
priceChangedEvent = new ProductPriceChangedIntegrationEvent(catalogItem.Id,
productToUpdate.Price,
oldPrice);
}
// Update current product
catalogItem = productToUpdate;
// Just save the updated product if the Product's Price hasn't changed.
if (!raiseProductPriceChangedEvent)
{
await _catalogContext.SaveChangesAsync();
}
else // Publish to event bus only if product price changed
{
// Achieving atomicity between original DB and the IntegrationEventLog
// with a local transaction
using (var transaction = _catalogContext.Database.BeginTransaction())
{
_catalogContext.CatalogItems.Update(catalogItem);
await _catalogContext.SaveChangesAsync();
await _integrationEventLogService.SaveEventAsync(priceChangedEvent);
transaction.Commit();
}
// Publish the integration event through the event bus
_eventBus.Publish(priceChangedEvent);
_integrationEventLogService.MarkEventAsPublishedAsync(
priceChangedEvent);
}
return Ok();
}
建立 ProductPriceChangedIntegrationEvent 整合事件之後,儲存原始網域作業的交易(更新目錄項目)也會將該事件保存至 EventLog 數據表中。 這會使它成為單一交易,而且您一律能夠檢查是否已傳送事件訊息。
事件日志表會隨原始資料庫操作進行原子更新,並針對相同的資料庫使用本地交易。 如果有任何作業失敗,則會擲回例外狀況,而交易會回復任何已完成的作業,因此會維持網域作業與儲存至數據表的事件訊息之間的一致性。
從訂用帳戶接收訊息:接收者微服務中的事件處理程式
除了事件訂閱邏輯之外,您還需要實作整合事件處理程式的內部程式代碼(例如回呼方法)。 事件處理程式是您指定接收和處理特定類型之事件訊息的位置。
事件處理程式會先從事件總線接收事件實例。 然後,它會找出要處理與該整合事件相關的元件,將事件傳播並保存為接收者微服務中狀態的變更。 例如,如果 ProductPriceChanged 事件源自類別目錄微服務,則會在購物籃微服務中處理,並變更此接收者購物籃微服務的狀態,如下列程式代碼所示。
namespace Microsoft.eShopOnContainers.Services.Basket.API.IntegrationEvents.EventHandling
{
public class ProductPriceChangedIntegrationEventHandler :
IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>
{
private readonly IBasketRepository _repository;
public ProductPriceChangedIntegrationEventHandler(
IBasketRepository repository)
{
_repository = repository;
}
public async Task Handle(ProductPriceChangedIntegrationEvent @event)
{
var userIds = await _repository.GetUsers();
foreach (var id in userIds)
{
var basket = await _repository.GetBasket(id);
await UpdatePriceInBasketItems(@event.ProductId, @event.NewPrice, basket);
}
}
private async Task UpdatePriceInBasketItems(int productId, decimal newPrice,
CustomerBasket basket)
{
var itemsToUpdate = basket?.Items?.Where(x => int.Parse(x.ProductId) ==
productId).ToList();
if (itemsToUpdate != null)
{
foreach (var item in itemsToUpdate)
{
if(item.UnitPrice != newPrice)
{
var originalPrice = item.UnitPrice;
item.UnitPrice = newPrice;
item.OldUnitPrice = originalPrice;
}
}
await _repository.UpdateBasket(basket);
}
}
}
}
事件處理程式必須確認產品是否存在於任何購物籃實例中。 它還會更新每個相關購物籃項目的價格。 最後,它會建立警示,向用戶顯示價格變更的相關信息,如圖 6-24 所示。
圖 6-24。 在購物籃中顯示項目價格變更,如整合事件所傳達
更新訊息事件的等冪性
更新訊息情形的一個重要層面是,在通訊過程中的任何一個時間點發生失敗,都應導致訊息重試。 否則,背景工作可能會嘗試發佈已發行的事件,以建立競爭條件。 請確定更新是等冪的,或提供足夠的資訊,以確保您可以偵測到重複項、捨棄重複項,並僅傳回一個回應。
如先前所述,等冪表示可以多次執行作業,而不需要變更結果。 在傳訊環境中,如同在通訊事件時,如果事件可以傳遞多次,則事件具有等冪性,而不會變更接收者微服務的結果。 這可能是必要的,因為事件本身的性質,或因為系統處理事件的方式。 訊息等冪性在任何使用傳訊的應用程式中都很重要,而不只是實作事件總線模式的應用程式。
等冪運算的範例是 SQL 語句,只有在數據表中還沒有數據時,才會將數據插入數據表中。 執行插入 SQL 語句的次數並不重要;結果會相同,數據表將包含該數據。 在處理訊息時,如果訊息可能會被傳送並因此可能會被處理多次,那麼像這樣的等冪性可能是必要的。 例如,如果重試邏輯導致傳送者多次傳送完全相同的訊息,您必須確定它是等冪的。
可以設計等冪訊息。 例如,您可以建立事件,指出「將產品價格設定為 $25」,而不是「將 $5 新增至產品價格」。您可以安全地處理第一則訊息的次數,結果會相同。 第二則訊息並非如此。 但是,即使在第一個案例中,您可能不想處理第一個事件,因為系統可能也會傳送較新的價格變更事件,而您會覆寫新的價格。
另一個範例可能是傳播至多個訂閱者的訂單完成事件。 應用程式必須確定其他系統中只會更新訂單資訊一次,即使相同訂單完成事件有重複的訊息事件也一樣。
讓每個事件有某種身分識別,讓您可以建立邏輯,強制每個接收者只處理每個事件一次。
某些訊息處理本質上是等冪的。 例如,如果系統產生影像縮圖,則處理所產生縮圖的訊息次數可能無關;結果就是產生縮圖,而且每次都相同。 另一方面,呼叫付款網關來收取信用卡費用等作業可能完全不具等冪性。 在這些情況下,您必須確保處理訊息多次具有預期的效果。
其他資源
消除重複整合事件訊息
您可以確定每個訂閱者在不同層級只會傳送和處理一次訊息事件。 其中一種方式是使用您所使用之傳訊基礎結構所提供的重複數據刪除功能。 另一個是在目的地微服務中實作自定義邏輯。 在傳輸層級和應用層級進行驗證是最佳策略。
訊息事件在 EventHandler 層級進行去重處理
若要確保事件只會由任何接收者處理一次,就是在事件處理程式中處理訊息事件時實作特定邏輯。 例如,這是在 eShopOnContainers 應用程式中所使用的方法,如在 UserCheckoutAcceptedIntegrationEventHandler 類別的原始程式碼中所見,當它收到整合事件時。 (在此情況下,CreateOrderCommand 先用 IdentifiedCommand 作為識別符號包裝在 eventMsg.RequestId 中,再將它傳送至命令處理程式。)
使用 RabbitMQ 時進行訊息去重處理
發生間歇性網路失敗時,訊息可能會重複,而且訊息接收者必須準備好處理這些重複的訊息。 如果可能的話,接收者應該以等冪方式(即使重複處理也不影響結果)來處理訊息,這比使用資料去重來明確處理訊息更好。
根據 RabbitMQ 文件,「如果訊息傳送給消費者後重新排入佇列(例如,因為在消費者連線卸載之前未被確認),則 RabbitMQ 會在再次送達時,為該訊息設定重新送達的旗標(無論是送達給相同的消費者或不同的消費者)。
如果已設定「重新傳遞」標誌,接收者必須注意,因為訊息可能已經被處理過。 但這是不保證的:訊息在離開訊息代理程序之後可能從未到達接收者,可能是因為網路問題。 另一方面,如果未設定「重新傳遞」旗標,則保證訊息未多次傳送。 因此,只有在訊息中設定「redelivered」旗標時,接收者才需要將訊息去重或以等冪方式處理訊息。
其他資源
使用 NServiceBus 派生的 eShopOnContainers(Particular Software)
https://go.particular.net/eShopOnContainers事件驅動傳訊
https://patterns.arcitura.com/soa-patterns/design_patterns/event_driven_messaging吉米·博加德 重構以增強韌性:評估耦合
https://jimmybogard.com/refactoring-towards-resilience-evaluating-coupling/Publish-Subscribe 通道
https://www.enterpriseintegrationpatterns.com/patterns/messaging/PublishSubscribeChannel.html在限定內容之間通訊
https://learn.microsoft.com/previous-versions/msp-n-p/jj591572(v=pandp.10)菲力浦·布朗 整合限界上下文的策略
https://www.culttt.com/2014/11/26/strategies-integrating-bounded-contexts/克裡斯·理查森 使用匯總、事件來源和 CQRS 開發交易微服務 - 第 2 部分
https://www.infoq.com/articles/microservices-aggregates-events-cqrs-part-2-richardson克裡斯·理查森 事件來源模式
https://microservices.io/patterns/data/event-sourcing.html事件溯源介紹
https://learn.microsoft.com/previous-versions/msp-n-p/jj591559(v=pandp.10)事件存放區資料庫。 官方網站。
https://geteventstore.com/派翠克·諾明森 微服務 Event-Driven 數據管理
https://dzone.com/articles/event-driven-data-management-for-microservices-1什麼是 CAP 定理?
https://www.quora.com/What-Is-CAP-Theorem-1數據一致性入門
https://learn.microsoft.com/previous-versions/msp-n-p/dn589800(v=pandp.10)裡克·薩林 CAP 定理:為什麼一切皆不同與雲端和互聯網
https://learn.microsoft.com/archive/blogs/rickatmicrosoft/the-cap-theorem-why-everything-is-different-with-the-cloud-and-internet/埃裡克·布魯爾 CAP 十二年後:「規則」是如何改變的
https://www.infoq.com/articles/cap-twelve-years-later-how-the-rules-have-changedCAP、PACELC 和微服務
https://ardalis.com/cap-pacelc-and-microservices/Azure 服務匯流排: 代理傳訊:重複偵測
https://github.com/microsoftarchive/msdn-code-gallery-microsoft/tree/master/Windows%20Azure%20Product%20Team/Brokered%20Messaging%20Duplicate%20Detection可靠性指南 (RabbitMQ 檔)
https://www.rabbitmq.com/reliability.html#consumer