Megosztás a következőn keresztül:


Feliratkozás eseményekre

Jótanács

Ez a tartalom egy részlet a '.NET Microservices Architecture for Containerized .NET Applications' című eBook-ból, amely elérhető a .NET Docs oldalon, vagy ingyenesen letölthető PDF formátumban, amely offline módban is olvasható.

.NET mikroszolgáltatások architektúrája konténerizált .NET alkalmazásokhoz e-könyv borító miniatűr.

Az eseménybusz használatának első lépése a mikroszolgáltatások előfizetése a fogadni kívánt eseményekre. Ezt a funkciót a fogadó mikroszolgáltatásokban kell elvégezni.

Az alábbi egyszerű kód bemutatja, mit kell megvalósítania az egyes fogadó mikroszolgáltatásoknak a szolgáltatás indításakor (vagyis az Startup osztályban), hogy feliratkozzon a szükséges eseményekre. Ebben az esetben a basket-api mikroszolgáltatásnak elő kell fizetnie a ProductPriceChangedIntegrationEvent és az OrderStartedIntegrationEvent üzenetekre.

Az eseményre való feliratkozáskor például a ProductPriceChangedIntegrationEvent kosár mikroszolgáltatása értesül a termék árában bekövetkezett változásokról, és lehetővé teszi, hogy figyelmeztesse a felhasználót a változásra, ha az adott termék szerepel a felhasználó kosárjában.

var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();

eventBus.Subscribe<ProductPriceChangedIntegrationEvent,
                   ProductPriceChangedIntegrationEventHandler>();

eventBus.Subscribe<OrderStartedIntegrationEvent,
                   OrderStartedIntegrationEventHandler>();

A kód futtatása után az előfizető mikroszolgáltatás a RabbitMQ-csatornákon keresztül figyeli azokat. Amikor bármilyen ProductPriceChangedIntegrationEvent típusú üzenet érkezik, a kód meghívja a neki átadott eseménykezelőt, és feldolgozza az eseményt.

Események közzététele az eseménybuszon keresztül

Végül az üzenet feladója (forrás mikroszolgáltatás) az alábbi példához hasonló kóddal teszi közzé az integrációs eseményeket. (Ez a megközelítés egy egyszerűsített példa, amely nem veszi figyelembe az atomiságot.) Hasonló kódot akkor implementálna, amikor egy eseményt több mikroszolgáltatásban kell propagálni, általában közvetlenül azután, hogy véglegesíti az adatokat vagy tranzakciókat a forrás mikroszolgáltatásból.

Először is az event bus implementálási objektumát (a RabbitMQ vagy egy service bus alapján) injektáljuk a vezérlőkonstruktorba, ahogy az alábbi kódban is látható:

[Route("api/v1/[controller]")]
public class CatalogController : ControllerBase
{
    private readonly CatalogContext _context;
    private readonly IOptionsSnapshot<Settings> _settings;
    private readonly IEventBus _eventBus;

    public CatalogController(CatalogContext context,
        IOptionsSnapshot<Settings> settings,
        IEventBus eventBus)
    {
        _context = context;
        _settings = settings;
        _eventBus = eventBus;
    }
    // ...
}

Ezután a vezérlő metódusaiból, például az UpdateProduct metódusból használja:

[Route("items")]
[HttpPost]
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem product)
{
    var item = await _context.CatalogItems.SingleOrDefaultAsync(
        i => i.Id == product.Id);
    // ...
    if (item.Price != product.Price)
    {
        var oldPrice = item.Price;
        item.Price = product.Price;
        _context.CatalogItems.Update(item);
        var @event = new ProductPriceChangedIntegrationEvent(item.Id,
            item.Price,
            oldPrice);
        // Commit changes in original transaction
        await _context.SaveChangesAsync();
        // Publish integration event to the event bus
        // (RabbitMQ or a service bus underneath)
        _eventBus.Publish(@event);
        // ...
    }
    // ...
}

Ebben az esetben, mivel a forrás mikroszolgáltatás egy egyszerű CRUD-mikroszolgáltatás, ez a kód közvetlenül egy webes API-vezérlőbe kerül.

A fejlettebb mikroszolgáltatásokban, például a CQRS-megközelítések használatakor, a metóduson belül implementálható az CommandHandlerHandle() osztályban.

Az atomiság és az ellenálló képesség tervezése az eseménybuszhoz való közzétételkor

Ha az integrációs eseményeket egy elosztott üzenetkezelő rendszeren, például az eseménybuszon keresztül teszi közzé, azzal a problémával szembesül, hogy atomilag frissíti az eredeti adatbázist, és közzétesz egy eseményt (vagyis mindkét művelet befejeződött, vagy egyik sem). A korábban bemutatott egyszerűsített példában például a kód véglegesíti az adatokat az adatbázisba a termékár módosításakor, majd közzétesz egy ProductPriceChangedIntegrationEvent üzenetet. Kezdetben alapvető fontosságúnak tűnhet, hogy ezt a két műveletet atomi módon hajtsuk végre. Ha azonban az adatbázist és az üzenetközvetítőt érintő elosztott tranzakciót használ, ahogyan azt a régebbi rendszerekben, például a Microsoft Message Queuingben (MSMQ) végzi, ez a megközelítés nem ajánlott a CAP-tételben leírt okok miatt.

Alapvetően mikroszolgáltatások használatával hozhat létre méretezhető és magas rendelkezésre állású rendszereket. Némileg leegyszerűsítve a CAP-tétel azt mondja, hogy nem lehet olyan (elosztott) adatbázist (vagy a modelljét tulajdonoló mikroszolgáltatást) létrehozni, amely folyamatosan elérhető, erősen konzisztens és toleráns bármilyen partícióhoz. A három tulajdonság közül kettőt kell választania.

A mikroszolgáltatás-alapú architektúrákban a rendelkezésre állást és a tűrést kell választania, és ki kell emelnie az erős konzisztenciát. Ezért a legtöbb modern mikroszolgáltatás-alapú alkalmazásban általában nem szeretne elosztott tranzakciókat használni az üzenetkezelésben, mint amikor elosztott tranzakciókat implementál a Windows Elosztott tranzakció koordinátora (DTC) alapján az MSMQ-val.

Térjünk vissza a kezdeti problémára és annak példájára. Ha a szolgáltatás összeomlik az adatbázis frissítése után (ebben az esetben közvetlenül a kódsor _context.SaveChangesAsync()után), de az integrációs esemény közzététele előtt a teljes rendszer inkonzisztenssé válhat. Ez a megközelítés üzleti szempontból kritikus lehet, attól függően, hogy milyen üzleti művelettel foglalkozik.

Ahogy az architektúra szakaszban korábban említettük, a probléma kezelésére többféle megközelítéssel is rendelkezhet:

  • A teljes Event Sourcing minta használata.

  • Tranzakciós napló bányászatának használata.

  • A Beérkezett üzenetek minta használata. Ez egy tranzakciós tábla az integrációs események tárolására (a helyi tranzakció kiterjesztése).

Ebben a forgatókönyvben a teljes Event Sourcing (ES) minta használata az egyik legjobb módszer, ha nem a legjobb. Sok alkalmazásforgatókönyvben azonban előfordulhat, hogy nem tud teljes ES-rendszert implementálni. Az ES azt jelenti, hogy az aktuális állapotadatok tárolása helyett csak a tartományi eseményeket tárolja a tranzakciós adatbázisban. A csak tartományi események tárolása nagy előnyökkel járhat, például a rendszer előzményeinek rendelkezésre állása és a rendszer állapotának meghatározása a múlt bármely pillanatában. A teljes ES-rendszer implementálásához azonban újra kell terveznie a rendszer nagy részét, és számos egyéb összetettséget és követelményt is be kell vezetnie. Például olyan adatbázist szeretne használni, amely kifejezetten eseménybeszerzéshez készült, például az Event Store-hoz vagy egy dokumentumorientált adatbázishoz, például az Azure Cosmos DB-hez, a MongoDB-hez, a Cassandra-hoz, a CouchDB-hez vagy a RavenDB-hez. Az ES nagyszerű módszer erre a problémára, de nem a legegyszerűbb megoldás, hacsak nem ismeri már az esemény-beszerzést.

Lehetőség van tranzakciónaplók elemzésére, ami kezdetben átláthatónak tűnik. Ennek a megközelítésnek a használatához azonban a mikroszolgáltatást hozzá kell adni az RDBMS tranzakciónaplójához, például az SQL Server tranzakciónaplójához. Ez a megközelítés valószínűleg nem kívánatos. Egy másik hátránya, hogy a tranzakciónaplóban rögzített alacsony szintű frissítések nem feltétlenül azonosak a magas szintű integrációs eseményekkel. Ha igen, a tranzakciónapló-műveletek visszafejtési folyamata nehéz lehet.

A kiegyensúlyozott megközelítés egy tranzakciós adatbázistábla és egy egyszerűsített ES-minta keveréke. Használhat olyan állapotot, mint a "készen áll az esemény közzétételére", amelyet az eredeti eseményben állít be, amikor véglegesíti azt az integrációs események táblájában. Ezután próbálja meg közzétenni az eseményt az eseménybuszon. Ha a közzétételi esemény művelete sikeres, elindít egy másik tranzakciót a forrásszolgáltatásban, és áthelyezi az állapotot a "készen áll az esemény közzétételére" állapotról a "már közzétett eseményre".

Ha az eseménybusz közzétételi eseményművelete meghiúsul, az adatok továbbra sem lesznek inkonzisztensek a forrás mikroszolgáltatáson belül – továbbra is "készen áll az esemény közzétételére", és a többi szolgáltatás tekintetében az adatok végül konzisztensek lesznek. Mindig lehet háttérfeladatokkal ellenőrizni a tranzakciók vagy az integrációs események állapotát. Ha a feladat "készen áll az esemény közzétételére" állapotban talál egy eseményt, megpróbálhatja újból közzétenni az eseményt az eseménybuszon.

Figyelje meg, hogy ezzel a megközelítéssel csak az egyes forrás-mikroszolgáltatások integrációs eseményeit, és csak azokat az eseményeket tartja fenn, amelyeket más mikroszolgáltatásokkal vagy külső rendszerekkel szeretne kommunikálni. Ezzel szemben egy teljes ES-rendszerben az összes tartományi eseményt is tárolhatja.

Ezért ez a kiegyensúlyozott megközelítés egy egyszerűsített ES-rendszer. Szüksége van az aktuális állapotú integrációs események listájára ("közzétételre kész" és "közzétett"). Ezeket az állapotokat azonban csak az integrációs eseményekhez kell implementálnia. Ebben a megközelítésben nem kell minden tartományadatot eseményként tárolnia a tranzakciós adatbázisban, mint egy teljes ES-rendszerben.

Ha már használ relációs adatbázist, tranzakciós táblával tárolhatja az integrációs eseményeket. Az atomiság elérése érdekében az alkalmazásban egy helyi tranzakciókon alapuló kétlépéses folyamatot kell használnia. Alapvetően egy IntegrationEvent táblával rendelkezik ugyanabban az adatbázisban, ahol a tartományentitások találhatók. Ez a tábla az atomitás elérésének biztosításaként működik, így a tárolt integrációs eseményeket is belefoglalhatja ugyanabba a tranzakcióba, amely a tartomány adatait véglegesíti.

A folyamat lépésről lépésre a következőhöz hasonló:

  1. Az alkalmazás elindít egy helyi adatbázis-tranzakciót.

  2. Ezután frissíti a tartományi entitások állapotát, és beszúr egy eseményt az integrációs eseménytáblába.

  3. Végül véglegesíti a tranzakciót, így megkapja a kívánt atomicitást, majd folytatódik a folyamat.

  4. Valahogy közzéteszi az eseményt (következő lépésben).

Az események közzétételének lépéseinek végrehajtásakor az alábbi lehetőségek közül választhat:

  • Tegye közzé az integrációs eseményt közvetlenül a tranzakció véglegesítése után, és használjon egy másik helyi tranzakciót a táblában lévő események közzétételének megjelöléséhez. Ezután használja a táblát összetevőként az integrációs események nyomon követéséhez a távoli mikroszolgáltatások problémái esetén, és kompenzációs műveleteket hajthat végre a tárolt integrációs események alapján.

  • A táblázatot egyfajta üzenetsorként használhatja. Egy külön alkalmazásszál vagy folyamat lekérdezi az integrációs eseménytáblát, közzéteszi az eseményeket az eseménybuszon, majd egy helyi tranzakcióval megjelöli az eseményeket közzétettként.

A 6–22. ábra az első megközelítés architektúráját mutatja be.

Ábra az atomiságról feldolgozói mikroszolgáltatás nélkül történő közzétételkor.

6–22. ábra. Események eseménybuszra való közzétételének atomisága

A 6–22. ábrán bemutatott megközelítésből hiányzik egy további feldolgozói mikroszolgáltatás, amely a közzétett integrációs események sikerességének ellenőrzéséért és megerősítéséért felelős. Hiba esetén a további ellenőrző feldolgozó mikroszolgáltatás képes beolvasni az eseményeket a táblából, és újra közzéteheti őket, azaz ismételje meg a 2. lépést.

A második megközelítés: az EventLog-táblát használja üzenetsorként, és mindig egy feldolgozói mikroszolgáltatást használ az üzenetek közzétételéhez. Ebben az esetben a folyamat a 6–23. ábrán láthatóhoz hasonló. Ez egy további mikroszolgáltatást jelenít meg, és a tábla az egyetlen forrás az események közzétételekor.

A munkavégző mikroszolgáltatással végzett közzététel atomitásának diagramja.

6–23. ábra. Atomiság, amikor eseményeket tesz közzé az eseménybuszon egy feldolgozói mikroszolgáltatással

Az egyszerűség kedvéért az eShopOnContainers minta az első megközelítést használja (további folyamatok és ellenőrző mikroszolgáltatások nélkül), valamint az eseménybuszt. Az eShopOnContainers minta azonban nem kezeli az összes lehetséges hibaesetet. A felhőben üzembe helyezett valódi alkalmazásokban fel kell vennie a figyelmet arra, hogy a problémák végül felmerülnek, és implementálnia kell ezt az ellenőrzési és újraküldési logikát. A táblát sorként használni hatékonyabb lehet, mint az első megközelítés, ha a tábla az események közzétételéhez egyetlen eseményforrásként szolgál (a munkavállalóval együtt) az eseménybusz használatával.

Az atomiság megvalósítása az integrációs események eseménybuszon keresztüli közzétételekor

Az alábbi kód bemutatja, hogyan hozhat létre egyetlen tranzakciót, amely több DbContext objektumot is magában foglal – az egyik az eredeti adatok frissítésével kapcsolatos, a második pedig az IntegrationEventLog táblához kapcsolódó környezet.

Az alábbi példakódban szereplő tranzakció nem lesz rugalmas, ha az adatbázishoz való csatlakozáskor probléma merül fel a kód futtatásakor. Ez olyan felhőalapú rendszerekben fordulhat elő, mint az Azure SQL DB, amely az adatbázisokat kiszolgálók között mozgathatja. A rugalmas tranzakciók több környezetben történő implementálásához tekintse meg az útmutató későbbi, rugalmas Entity Framework Core SQL-kapcsolatok implementálásával foglalkozó szakaszát.

Az egyértelműség kedvéért az alábbi példa egyetlen kódrészletben mutatja be az egész folyamatot. Az eShopOnContainers implementáció azonban újra van bontva, és több osztályra osztja ezt a logikát, így könnyebben karbantartható.

// Update Product from the Catalog microservice
//
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem productToUpdate)
{
  var catalogItem =
       await _catalogContext.CatalogItems.SingleOrDefaultAsync(i => i.Id ==
                                                               productToUpdate.Id);
  if (catalogItem == null) return NotFound();

  bool raiseProductPriceChangedEvent = false;
  IntegrationEvent priceChangedEvent = null;

  if (catalogItem.Price != productToUpdate.Price)
          raiseProductPriceChangedEvent = true;

  if (raiseProductPriceChangedEvent) // Create event if price has changed
  {
      var oldPrice = catalogItem.Price;
      priceChangedEvent = new ProductPriceChangedIntegrationEvent(catalogItem.Id,
                                                                  productToUpdate.Price,
                                                                  oldPrice);
  }
  // Update current product
  catalogItem = productToUpdate;

  // Just save the updated product if the Product's Price hasn't changed.
  if (!raiseProductPriceChangedEvent)
  {
      await _catalogContext.SaveChangesAsync();
  }
  else  // Publish to event bus only if product price changed
  {
        // Achieving atomicity between original DB and the IntegrationEventLog
        // with a local transaction
        using (var transaction = _catalogContext.Database.BeginTransaction())
        {
           _catalogContext.CatalogItems.Update(catalogItem);
           await _catalogContext.SaveChangesAsync();

           await _integrationEventLogService.SaveEventAsync(priceChangedEvent);

           transaction.Commit();
        }

      // Publish the integration event through the event bus
      _eventBus.Publish(priceChangedEvent);

      _integrationEventLogService.MarkEventAsPublishedAsync(
                                                priceChangedEvent);
  }

  return Ok();
}

A ProductPriceChangedIntegrationEvent integrációs esemény létrehozása után az eredeti tartományi műveletet tároló tranzakció (a katalóguselem frissítése) az Eseménynapló táblában is tartalmazza az esemény megőrzését. Ez egyetlen tranzakcióvá teszi, és mindig ellenőrizheti, hogy az eseményüzenetek el lettek-e küldve.

Az eseménynapló-tábla atomi módon frissül az eredeti adatbázisművelettel, egy helyi tranzakcióval ugyanazon az adatbázison. Ha a műveletek bármelyike sikertelen, a rendszer kivételt okoz, és a tranzakció visszaállítja a befejezett műveletet, így fenntartja a tartományműveletek és a táblába mentett eseményüzenetek közötti konzisztenciát.

Üzenetek fogadása előfizetésekből: eseménykezelők a fogadó mikroszolgáltatásokban

Az esemény-előfizetés logikája mellett implementálnia kell az integrációs eseménykezelők belső kódját (például egy visszahívási módszert). Az eseménykezelőben adja meg, hogy egy adott típusú eseményüzenetek hol lesznek fogadva és feldolgozva.

Az eseménykezelő először egy eseménypéldányt kap az eseménybusztól. Ezután megkeresi a feldolgozandó összetevőt az integrációs eseményhez kapcsolódóan, propagálja és megőrzi az eseményt a fogadó mikroszolgáltatás állapotváltozásaként. Ha például egy ProductPriceChanged esemény a katalógus mikroszolgáltatásából származik, az a kosár mikroszolgáltatásban lesz kezelve, és az ebben a fogadókosár mikroszolgáltatásban lévő állapotot is módosítja, ahogyan az az alábbi kódban is látható.

namespace Microsoft.eShopOnContainers.Services.Basket.API.IntegrationEvents.EventHandling
{
    public class ProductPriceChangedIntegrationEventHandler :
        IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>
    {
        private readonly IBasketRepository _repository;

        public ProductPriceChangedIntegrationEventHandler(
            IBasketRepository repository)
        {
            _repository = repository;
        }

        public async Task Handle(ProductPriceChangedIntegrationEvent @event)
        {
            var userIds = await _repository.GetUsers();
            foreach (var id in userIds)
            {
                var basket = await _repository.GetBasket(id);
                await UpdatePriceInBasketItems(@event.ProductId, @event.NewPrice, basket);
            }
        }

        private async Task UpdatePriceInBasketItems(int productId, decimal newPrice,
            CustomerBasket basket)
        {
            var itemsToUpdate = basket?.Items?.Where(x => int.Parse(x.ProductId) ==
                productId).ToList();
            if (itemsToUpdate != null)
            {
                foreach (var item in itemsToUpdate)
                {
                    if(item.UnitPrice != newPrice)
                    {
                        var originalPrice = item.UnitPrice;
                        item.UnitPrice = newPrice;
                        item.OldUnitPrice = originalPrice;
                    }
                }
                await _repository.UpdateBasket(basket);
            }
        }
    }
}

Az eseménykezelőnek ellenőriznie kell, hogy a termék megtalálható-e bármelyik kosárpéldányban. Emellett frissíti az egyes kapcsolódó kosársorelemek cikkárat is. Végül létrehoz egy riasztást, amely a 6–24. ábrán látható módon megjelenik a felhasználó számára az árváltozásról.

Képernyőkép egy böngészőről, amelyen az árváltozásról szóló értesítés látható a felhasználói kosárban.

6–24. ábra. Elemárváltozás megjelenítése egy kosárban az integrációs események által közölt módon

Idempotencia frissítési üzeneteseményekben

A frissítési üzenet eseményeinek fontos eleme, hogy a kommunikáció bármely pontján bekövetkező hiba miatt az üzenet újrapróbálkozásra kerül. Ellenkező esetben egy háttérfeladat megpróbálhat közzétenni egy már közzétett eseményt, ezzel létrehozva egy versenyfeltételt. Győződjön meg arról, hogy a frissítések idempotensek, vagy elegendő információt biztosítanak ahhoz, hogy észlelni tudja a duplikált elemeket, elvethesse őket, és csak egy választ küldjön vissza.

Ahogy korábban említettük, az idempotencia azt jelenti, hogy egy művelet többször is végrehajtható az eredmény módosítása nélkül. Üzenetkezelési környezetben, mint az események kommunikálásakor, az esemény idempotens, ha többször is kézbesíthető a fogadó mikroszolgáltatás eredményének módosítása nélkül. Erre az esemény természete vagy az esemény kezelése miatt lehet szükség. Az üzenet idempotencia minden olyan alkalmazásban fontos, amely üzenetküldést használ, nem csak az eseménybusz-mintát megvalósító alkalmazásokban.

Idempotens műveletre példa egy SQL-utasítás, amely csak akkor szúr be adatokat egy táblába, ha az adatok még nincsenek a táblában. Nem számít, hogy hányszor futtatja az SQL-utasítás beszúrását; az eredmény ugyanaz lesz – a tábla tartalmazni fogja az adatokat. Az ilyen idempotencia akkor is szükséges lehet az üzenetek kezelésekor, ha az üzeneteket esetleg elküldhetik, és ezért többször is feldolgozhatják. Ha például az újrapróbálkozás logikája miatt a feladó többször is pontosan ugyanazt az üzenetet küldi el, győződjön meg arról, hogy idempotens.

Idempotens üzeneteket is tervezhet. Létrehozhat például egy eseményt, amely "a termék árát 25 usd-ra állítja" szöveg helyett "5 USD hozzáadása a termék árához". Az első üzenetet tetszőleges számú alkalommal biztonságosan feldolgozhatja, és az eredmény ugyanaz lesz. Ez nem igaz a második üzenetre. De még az első esetben sem érdemes feldolgozni az első eseményt, mert a rendszer egy újabb árváltozási eseményt is küldhetett volna, és felülírná az új árat.

Egy másik példa lehet egy befejezett rendelés esemény, amelyet több előfizető részére továbbítanak. Az alkalmazásnak meg kell győződnie arról, hogy a rendelési adatok csak egyszer frissülnek más rendszerekben, még akkor is, ha ugyanahhoz a megrendeléshez ismétlődő üzenetesemények tartoznak.

Eseményenként kényelmes valamilyen identitással rendelkezni, hogy olyan logikát hozzon létre, amely kényszeríti az egyes események feldolgozását fogadónként csak egyszer.

Egyes üzenetek feldolgozása eredendően idempotens. Ha például egy rendszer képminiatűröket hoz létre, nem számít, hogy hányszor dolgozzák fel a létrehozott miniatűrről szóló üzenetet; Az eredmény az, hogy a miniatűrök létre lesznek hozva, és minden alkalommal ugyanazok. Másfelől előfordulhat, hogy az olyan műveletek, mint például egy fizetési átjáró meghívása a hitelkártya díjának felszámítására, egyáltalán nem lehetnek idempotensek. Ezekben az esetekben gondoskodnia kell arról, hogy az üzenetek többszöri feldolgozása a várt hatást érje el.

További erőforrások

Integrációs eseményüzenetek deduplikálása

Győződjön meg arról, hogy az üzeneteseményeket előfizetőnként csak egyszer küldi el és dolgozza fel különböző szinteken. Ennek egyik módja a használt üzenetkezelési infrastruktúra által kínált deduplikációs funkció használata. A másik az egyéni logika implementálása a cél mikroszolgáltatásban. A legjobb megoldás az átvitel és az alkalmazás szintjén történő érvényesítés.

Üzenetesemények deduplikálása az EventHandler szintjén

Az egyik módja annak, hogy egy eseményt egy fogadó csak egyszer dolgoz fel, ha implementál bizonyos logikát az üzenetesemények eseménykezelőkben való feldolgozásakor. Ez például az eShopOnContainers alkalmazásban használt módszer, ahogy a UserCheckoutAcceptedIntegrationEventHandler osztály forráskódjában is látható, amikor integrációs eseményt UserCheckoutAcceptedIntegrationEvent kap. (Ebben az esetben az CreateOrderCommand be van csomagolva egy IdentifiedCommand-ba, eventMsg.RequestId-t azonosítóként használva, mielőtt a parancskezelőnek elküldenék).

Üzenetek deduplikálása a RabbitMQ használatakor

Időszakos hálózati hibák esetén az üzenetek duplikálhatók, és az üzenet fogadójának készen kell állnia az ismétlődő üzenetek kezelésére. Ha lehetséges, a fogadóknak idempotens módon kell kezelniük az üzeneteket, ami jobb, mint explicit módon kezelni őket deduplikációval.

A RabbitMQ dokumentációja szerint", "Ha egy üzenetet egy fogyasztónak kézbesítenek, majd újra lekérdezik (mert nem nyugtázták, mielőtt a fogyasztói kapcsolat megszakadt, például), akkor a RabbitMQ beállítja az újramegfelelt jelzőt az ismételt kézbesítéskor (akár ugyanarra a fogyasztóra, akár egy másikra).

Ha a "kézbesített" jelző be van állítva, a fogadónak ezt figyelembe kell vennie, mert lehetséges, hogy az üzenet már feldolgozásra került. De ez nem garantált; Előfordulhat, hogy az üzenet soha nem érte el a fogadót, miután elhagyta az üzenetközvetítőt, talán hálózati problémák miatt. Ha viszont a "visszaküldött" jelző nincs beállítva, akkor garantált, hogy az üzenet többször nem lett elküldve. Ezért a fogadónak csak akkor kell deduplikálnia az üzeneteket, vagy idempotens módon kell feldolgoznia az üzeneteket, ha a "redelivered" jelző be van állítva az üzenetben.

További erőforrások