Freigeben über


Abonnieren von Ereignissen

Tipp

Dieser Inhalt ist ein Auszug aus dem eBook .NET Microservices Architecture for Containerized .NET Applications, verfügbar auf .NET Docs oder als kostenlose herunterladbare PDF, die offline gelesen werden kann.

.NET Microservices-Architektur für containerisierte .NET-Anwendungen eBook-Cover-Thumbnail.

Der erste Schritt für die Verwendung des Ereignisbus besteht darin, die Microservices für die Ereignisse zu abonnieren, die sie empfangen möchten. Diese Funktionalität sollte in den Empfänger-Microservices erfolgen.

Der folgende einfache Code zeigt, was jeder Empfänger-Microservice beim Starten des Diensts implementieren muss (d. h. in der Startup Klasse), damit er die benötigten Ereignisse abonniert. In diesem Fall muss der basket-api Microservice ProductPriceChangedIntegrationEvent und die OrderStartedIntegrationEvent Nachrichten abonnieren.

Wenn Sie z. B. das ProductPriceChangedIntegrationEvent Ereignis abonnieren, wird der Warenkorb-Microservice über Änderungen am Produktpreis informiert und ermöglicht es, den Benutzer über die Änderung zu warnen, wenn sich dieses Produkt im Warenkorb des Benutzers befindet.

var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();

eventBus.Subscribe<ProductPriceChangedIntegrationEvent,
                   ProductPriceChangedIntegrationEventHandler>();

eventBus.Subscribe<OrderStartedIntegrationEvent,
                   OrderStartedIntegrationEventHandler>();

Nachdem dieser Code ausgeführt wurde, wird der Abonnenten-Microservice über RabbitMQ-Kanäle lauschen. Wenn eine Nachricht vom Typ "ProductPriceChangedIntegrationEvent" eingeht, ruft der Code den Ereignishandler auf, der an ihn übergeben wird, und verarbeitet das Ereignis.

Veröffentlichen von Ereignissen über den Ereignisbus

Schließlich veröffentlicht der Absender der Nachricht (origin microservice) die Integrationsereignisse mit Code ähnlich dem folgenden Beispiel. (Dieser Ansatz ist ein vereinfachtes Beispiel, das die Atomität nicht berücksichtigt.) Sie würden ähnlichen Code implementieren, wenn ein Ereignis über mehrere Microservices verteilt werden muss, in der Regel direkt nach dem Commit von Daten oder Transaktionen vom Origin Microservice.

Zunächst würde das Ereignisbusimplementierungsobjekt (basierend auf RabbitMQ oder basierend auf einem Servicebus) an den Controllerkonstruktor eingefügt, wie im folgenden Code:

[Route("api/v1/[controller]")]
public class CatalogController : ControllerBase
{
    private readonly CatalogContext _context;
    private readonly IOptionsSnapshot<Settings> _settings;
    private readonly IEventBus _eventBus;

    public CatalogController(CatalogContext context,
        IOptionsSnapshot<Settings> settings,
        IEventBus eventBus)
    {
        _context = context;
        _settings = settings;
        _eventBus = eventBus;
    }
    // ...
}

Danach verwenden Sie es wie in der UpdateProduct-Methode über die Methoden des Controllers:

[Route("items")]
[HttpPost]
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem product)
{
    var item = await _context.CatalogItems.SingleOrDefaultAsync(
        i => i.Id == product.Id);
    // ...
    if (item.Price != product.Price)
    {
        var oldPrice = item.Price;
        item.Price = product.Price;
        _context.CatalogItems.Update(item);
        var @event = new ProductPriceChangedIntegrationEvent(item.Id,
            item.Price,
            oldPrice);
        // Commit changes in original transaction
        await _context.SaveChangesAsync();
        // Publish integration event to the event bus
        // (RabbitMQ or a service bus underneath)
        _eventBus.Publish(@event);
        // ...
    }
    // ...
}

Da der Origin microservice ein einfacher CRUD-Microservice ist, wird dieser Code direkt in einen Web-API-Controller eingefügt.

In komplexeren Microservices, zum Beispiel bei Verwendung von CQRS-Ansätzen, kann sie in der CommandHandler Klasse innerhalb der Handle() Methode implementiert werden.

Entwerfen von Unteilbarkeit und Stabilität beim Veröffentlichen im Ereignisbus

Wenn Sie Integrationsereignisse über ein verteiltes Messaging-System wie Ihren Ereignisbus veröffentlichen, haben Sie das Problem, die ursprüngliche Datenbank atomisch zu aktualisieren und ein Ereignis zu veröffentlichen (d. a. entweder beide Vorgänge abgeschlossen oder keine davon). Im zuvor gezeigten vereinfachten Beispiel werden Daten in die Datenbank eingetragen, wenn der Produktpreis geändert wird, und anschließend wird eine ProductPriceChangedIntegrationEvent-Nachricht veröffentlicht. Zunächst könnte es wichtig sein, dass diese beiden Operationen atomisch durchgeführt werden. Wenn Sie jedoch eine verteilte Transaktion mit der Datenbank und dem Nachrichtenbroker verwenden, wie in älteren Systemen wie Microsoft Message Queuing (MSMQ) wird dieser Ansatz aus den vom CAP-Theorem beschriebenen Gründen nicht empfohlen.

Im Grunde verwenden Sie Microservices, um skalierbare und hoch verfügbare Systeme zu erstellen. Das CAP-Theorem besagt vereinfacht, dass Sie keine (verteilte) Datenbank (oder einen Mikroservice, der sein Modell besitzt) erstellen können, die ständig verfügbar, stark konsistent bleibt und tolerant gegenüber jeder Partition ist. Sie müssen zwei dieser drei Eigenschaften auswählen.

In mikroservicesbasierten Architekturen sollten Sie Verfügbarkeit und Toleranz priorisieren und starke Konsistenz weniger betonen. Daher sollten Sie in den meisten modernen Mikroservice-basierten Anwendungen in der Regel keine verteilten Transaktionen im Messaging verwenden, wie es der Fall ist, wenn Sie verteilte Transaktionen basierend auf dem Windows Distributed Transaction Coordinator (DTC) mit MSMQ implementieren.

Kehren wir zum ursprünglichen Problem und dem zugehörigen Beispiel zurück. Wenn der Dienst abstürzt, nachdem die Datenbank aktualisiert wurde (in diesem Fall direkt nach der Codezeile mit _context.SaveChangesAsync()), aber bevor das Integrationsereignis veröffentlicht wird, kann das gesamte System inkonsistent werden. Dieser Ansatz kann geschäftskritisch sein, abhängig von dem spezifischen Geschäftsvorgang, mit dem Sie sich beschäftigen.

Wie bereits im Abschnitt "Architektur" erwähnt, können Sie mehrere Ansätze für die Behandlung dieses Problems haben:

  • Verwenden des vollständigen Event Sourcing-Musters.

  • Verwendung der Transaktionsprotokollanalyse.

  • Sie können das Muster „Postausgang“ verwenden. Dies ist eine Transaktionstabelle zum Speichern der Integrationsereignisse (Erweitern der lokalen Transaktion).

Für dieses Szenario ist die Verwendung des vollständigen Event Sourcing (ES)-Musters eine der besten Ansätze, wenn nicht die beste. In vielen Anwendungsszenarien können Sie jedoch möglicherweise kein vollständiges ES-System implementieren. ES bedeutet, dass nur Domänenereignisse in Ihrer Transaktionsdatenbank gespeichert werden, anstatt aktuelle Zustandsdaten zu speichern. Das Speichern nur von Domänenereignissen kann große Vorteile haben, z. B. die Verfügbarkeit des Systemverlaufs und die Möglichkeit, den Zustand Ihres Systems zu einem beliebigen Zeitpunkt in der Vergangenheit zu bestimmen. Die Implementierung eines vollständigen ES-Systems erfordert jedoch, dass Sie den Großteil Ihres Systems neu erstellen und viele andere Komplexitäten und Anforderungen einführen. Beispielsweise möchten Sie eine Datenbank verwenden, die speziell für die Ereignisbeschaffung erstellt wurde, z. B. event store, oder eine dokumentorientierte Datenbank wie Azure Cosmos DB, MongoDB, Cassandra, CouchDB oder RavenDB. ES ist ein großartiger Ansatz für dieses Problem, aber nicht die einfachste Lösung, es sei denn, Sie sind bereits mit der Event Sourcing vertraut.

Die Option zum Verwenden des Transaktionsprotokoll-Minings sieht zunächst transparent aus. Um diesen Ansatz zu verwenden, muss der Microservice jedoch mit Ihrem RDBMS-Transaktionsprotokoll gekoppelt werden, z. B. dem SQL Server-Transaktionsprotokoll. Dieser Ansatz ist wahrscheinlich nicht wünschenswert. Ein weiterer Nachteil ist, dass die im Transaktionsprotokoll aufgezeichneten Aktualisierungen auf niedriger Ebene möglicherweise nicht auf der gleichen Ebene wie Ihre allgemeinen Integrationsereignisse sind. Wenn ja, kann der Prozess des Reverse Engineerings für diese Transaktionsprotokollvorgänge schwierig sein.

Ein ausgewogener Ansatz ist eine Mischung aus einer Transaktionsdatenbanktabelle und einem vereinfachten ES-Muster. Sie können einen Zustand wie "Bereit zum Veröffentlichen des Ereignisses" verwenden, den Sie im ursprünglichen Ereignis festlegen, wenn Sie ihn für die Integrationsereignissetabelle übernehmen. Dann können Sie das Ereignis im Ereignisbus veröffentlichen. Wenn die Veröffentlichungsereignisaktion erfolgreich ist, starten Sie eine weitere Transaktion im Ursprungsdienst und verschieben den Status von "Bereit zum Veröffentlichen des Ereignisses" in "Ereignis bereits veröffentlicht".

Wenn die Veröffentlichungsereignisaktion im Ereignisbus fehlschlägt, sind die Daten immer noch nicht innerhalb des Ursprungs-Microservice inkonsistent – sie ist weiterhin als "bereit für die Veröffentlichung des Ereignisses" gekennzeichnet, und im Hinblick auf die restlichen Dienste wird sie schließlich konsistent sein. Sie können immer Hintergrundaufträge haben, die den Status der Transaktionen oder Integrationsereignisse überprüfen. Wenn der Auftrag ein Ereignis mit dem Status „Ereignis bereit zu Veröffentlichung“ findet, kann dieser versuchen, das Ereignis erneut im Ereignisbus zu veröffentlichen.

Beachten Sie, dass Sie bei diesem Ansatz nur die Integrationsereignisse für jeden Ursprungs-Microservice und nur die Ereignisse beibehalten, die Sie mit anderen Microservices oder externen Systemen kommunizieren möchten. Im Gegensatz dazu speichern Sie in einem vollständigen ES-System auch alle Domänenereignisse.

Daher ist dieser ausgewogene Ansatz ein vereinfachtes ES-System. Sie benötigen eine Liste der Integrationsereignisse mit ihrem aktuellen Status ("bereit für die Veröffentlichung" im Vergleich zu "veröffentlicht"). Diese Status müssen Sie aber nur für die Integrationsereignisse implementieren. Und bei diesem Ansatz müssen Sie nicht alle Ihre Domänendaten als Ereignisse in der Transaktionsdatenbank speichern, wie in einem vollständigen ES-System.

Wenn Sie bereits eine relationale Datenbank verwenden, können Sie eine Transaktionstabelle verwenden, um Integrationsereignisse zu speichern. Um die Atomität in Ihrer Anwendung zu erreichen, verwenden Sie einen zweistufigen Prozess basierend auf lokalen Transaktionen. Im Grunde haben Sie eine IntegrationEvent-Tabelle in derselben Datenbank, in der Sie über Ihre Domänenentitäten verfügen. Diese Tabelle stellt sicher, dass Sie die Unteilbarkeit gewährleisten können, damit Sie gespeicherte Integrationsereignisse in den gleichen Transaktionen einbeziehen können, die Ihre Domänendateien committen.

Schritt für Schritt sieht der Prozess wie folgt aus:

  1. Die Anwendung beginnt eine lokale Datenbanktransaktion.

  2. Anschließend wird der Status Ihrer Domänenentitäten aktualisiert und ein Ereignis in die Integrationsereignistabelle eingefügt.

  3. Schließlich wird die Transaktion von der Anwendung committet, und die gewünschte Unteilbarkeit wird erreicht.

  4. Sie veröffentlichen das Ereignis irgendwie (weiter).

Bei der Implementierung der Schritte zum Veröffentlichen der Ereignisse haben Sie folgende Möglichkeiten:

  • Veröffentlichen Sie das Integrationsereignis direkt nach dem Commit der Transaktion, und verwenden Sie eine andere lokale Transaktion, um die Ereignisse in der Tabelle als veröffentlicht zu markieren. Verwenden Sie dann die Tabelle als Artefakt, um die Integrationsereignisse bei Problemen in den Remote-Microservices nachzuverfolgen und Ausgleichsaktionen basierend auf den gespeicherten Integrationsereignissen auszuführen.

  • Verwenden Sie die Tabelle als Art von Warteschlange. Ein separater Anwendungsthread oder -prozess fragt die Integrationsereignistabelle ab, veröffentlicht die Ereignisse im Ereignisbus und verwendet dann eine lokale Transaktion, um die Ereignisse als veröffentlicht zu markieren.

Abbildung 6-22 zeigt die Architektur für die erste dieser Ansätze.

Diagramm der Unteilbarkeit beim Veröffentlichen ohne einen Workermicroservice.

Abbildung 6-22. Unteilbarkeit beim Veröffentlichen von Ereignissen im Ereignisbus

Im Ansatz, der in Abbildung 6-22 dargestellt ist, fehlt ein zusätzlicher Worker-Microservice, der für die Überprüfung und Bestätigung des Erfolgs der veröffentlichten Integrationsereignisse zuständig ist. Im Falle eines Fehlers kann dieser zusätzliche Prüfer-Worker-Microservice Ereignisse aus der Tabelle lesen und erneut veröffentlichen, d. h. Schritt 2 wiederholen.

Beim zweiten Ansatz verwenden Sie die EventLog-Tabelle als Warteschlange und zusätzlich immer einen Workermicroservice zum Veröffentlichen von Meldungen. In diesem Fall ist der Prozess wie in Abbildung 6-23 dargestellt. Dies zeigt einen zusätzlichen Microservice an, und die Tabelle ist die einzige Quelle beim Veröffentlichen von Ereignissen.

Diagramm der Unteilbarkeit beim Veröffentlichen mit einem Workermicroservice.

Abbildung 6-23. Unteilbarkeit beim Veröffentlichen von Ereignissen im Ereignisbus mit einem Workermicroservice

Der Einfachheit halber verwendet das eShopOnContainers-Beispiel den ersten Ansatz (ohne zusätzliche Prozesse oder Prüfer-Mikrodienste) sowie den Ereignisbus. Das Beispiel "eShopOnContainers" behandelt jedoch nicht alle möglichen Fehlerfälle. In einer echten Anwendung, die in der Cloud bereitgestellt wird, müssen Sie die Tatsache berücksichtigen, dass Probleme letztendlich auftreten, und Sie müssen diese Überprüfungs- und Erneutes Senden-Logik implementieren. Das Verwenden der Tabelle als Warteschlange kann effektiver sein als der erste Ansatz, wenn Sie diese Tabelle als einzige Ereignisquelle beim Veröffentlichen von Ereignissen (mit dem Worker) im Ereignisbus verwenden.

Implementieren von Unteilbarkeit beim Veröffentlichen von Integrationsereignissen im Ereignisbus

Der folgende Code zeigt, wie Sie eine einzelne Transaktion mit mehreren DbContext-Objekten erstellen können– einen Kontext im Zusammenhang mit den ursprünglichen Daten, die aktualisiert werden, und den zweiten Kontext im Zusammenhang mit der IntegrationEventLog-Tabelle.

Die Transaktion im folgenden Beispielcode ist nicht widerstandsfähig, wenn Verbindungen mit der Datenbank zu dem Zeitpunkt, zu dem der Code ausgeführt wird, probleme haben. Dies kann in cloudbasierten Systemen wie Azure SQL DB geschehen, wodurch Datenbanken auf Server verschoben werden können. Informationen zum Implementieren robuster Transaktionen über mehrere Kontexte hinweg finden Sie im Abschnitt "Implementieren robuster Entity Framework Core SQL-Verbindungen " weiter unten in diesem Handbuch.

Aus Gründen der Übersichtlichkeit zeigt das folgende Beispiel den gesamten Prozess in einem einzigen Codeabschnitt. Die Implementierung von eShopOnContainers wird jedoch umgestaltet und teilt diese Logik in mehrere Klassen auf, sodass es einfacher zu verwalten ist.

// Update Product from the Catalog microservice
//
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem productToUpdate)
{
  var catalogItem =
       await _catalogContext.CatalogItems.SingleOrDefaultAsync(i => i.Id ==
                                                               productToUpdate.Id);
  if (catalogItem == null) return NotFound();

  bool raiseProductPriceChangedEvent = false;
  IntegrationEvent priceChangedEvent = null;

  if (catalogItem.Price != productToUpdate.Price)
          raiseProductPriceChangedEvent = true;

  if (raiseProductPriceChangedEvent) // Create event if price has changed
  {
      var oldPrice = catalogItem.Price;
      priceChangedEvent = new ProductPriceChangedIntegrationEvent(catalogItem.Id,
                                                                  productToUpdate.Price,
                                                                  oldPrice);
  }
  // Update current product
  catalogItem = productToUpdate;

  // Just save the updated product if the Product's Price hasn't changed.
  if (!raiseProductPriceChangedEvent)
  {
      await _catalogContext.SaveChangesAsync();
  }
  else  // Publish to event bus only if product price changed
  {
        // Achieving atomicity between original DB and the IntegrationEventLog
        // with a local transaction
        using (var transaction = _catalogContext.Database.BeginTransaction())
        {
           _catalogContext.CatalogItems.Update(catalogItem);
           await _catalogContext.SaveChangesAsync();

           await _integrationEventLogService.SaveEventAsync(priceChangedEvent);

           transaction.Commit();
        }

      // Publish the integration event through the event bus
      _eventBus.Publish(priceChangedEvent);

      _integrationEventLogService.MarkEventAsPublishedAsync(
                                                priceChangedEvent);
  }

  return Ok();
}

Nachdem das ProductPriceChangedIntegrationEvent-Integrationsereignis erstellt wurde, enthält die Transaktion, die den ursprünglichen Domänenvorgang speichert (aktualisieren Sie das Katalogelement), auch die Persistenz des Ereignisses in der EventLog-Tabelle. Dadurch wird sie zu einer einzelnen Transaktion, und Sie können immer überprüfen, ob Ereignisnachrichten gesendet wurden.

Die Ereignisprotokolltabelle wird atomisch mit dem ursprünglichen Datenbankvorgang aktualisiert, wobei eine lokale Transaktion für dieselbe Datenbank verwendet wird. Wenn eine der Vorgänge fehlschlägt, wird eine Ausnahme ausgelöst, und die Transaktion führt einen Rollback eines abgeschlossenen Vorgangs durch, wodurch die Konsistenz zwischen den Domänenvorgängen und den ereignismeldungen beibehalten wird, die in der Tabelle gespeichert sind.

Empfangen von Nachrichten von Abonnements: Ereignishandler in Empfänger-Microservices

Zusätzlich zur Ereignisabonnementlogik müssen Sie den internen Code für die Integrationsereignishandler (z. B. eine Rückrufmethode) implementieren. Der Ereignishandler gibt an, wo die Ereignismeldungen eines bestimmten Typs empfangen und verarbeitet werden.

Ein Ereignishandler empfängt zunächst eine Ereignisinstanz vom Ereignisbus. Anschließend findet er die Komponente, die verarbeitet werden soll und die mit diesem Integrationsereignis verknüpft ist. Dann gibt er das Ereignis als Statusänderung des empfangenden Microservice weiter und speichert es als solche. Wenn beispielsweise ein ProductPriceChanged-Ereignis aus dem Katalog-Microservice stammt, wird es im Korb-Microservice behandelt und ändert auch den Zustand in diesem Empfängerkorb-Microservice, wie im folgenden Code dargestellt.

namespace Microsoft.eShopOnContainers.Services.Basket.API.IntegrationEvents.EventHandling
{
    public class ProductPriceChangedIntegrationEventHandler :
        IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>
    {
        private readonly IBasketRepository _repository;

        public ProductPriceChangedIntegrationEventHandler(
            IBasketRepository repository)
        {
            _repository = repository;
        }

        public async Task Handle(ProductPriceChangedIntegrationEvent @event)
        {
            var userIds = await _repository.GetUsers();
            foreach (var id in userIds)
            {
                var basket = await _repository.GetBasket(id);
                await UpdatePriceInBasketItems(@event.ProductId, @event.NewPrice, basket);
            }
        }

        private async Task UpdatePriceInBasketItems(int productId, decimal newPrice,
            CustomerBasket basket)
        {
            var itemsToUpdate = basket?.Items?.Where(x => int.Parse(x.ProductId) ==
                productId).ToList();
            if (itemsToUpdate != null)
            {
                foreach (var item in itemsToUpdate)
                {
                    if(item.UnitPrice != newPrice)
                    {
                        var originalPrice = item.UnitPrice;
                        item.UnitPrice = newPrice;
                        item.OldUnitPrice = originalPrice;
                    }
                }
                await _repository.UpdateBasket(basket);
            }
        }
    }
}

Der Ereignishandler muss überprüfen, ob das Produkt in einer Warenkorbinstanz vorhanden ist. Außerdem wird der Preis für jede zugehörige Warenkorbposition aktualisiert. Schließlich wird eine Warnung erstellt, die dem Benutzer über die Preisänderung angezeigt wird, wie in Abbildung 6-24 dargestellt.

Screenshot eines Browsers mit der Preisänderungsbenachrichtigung im Benutzerwagen.

Abbildung 6-24. Preisänderung in einem Warenkorb wie von Integrationsereignissen gemeldet

Idempotenz bei Updatemeldungsereignissen

Ein wichtiger Aspekt der Aktualisierungsnachrichtenereignisse ist, dass ein Fehler an einem beliebigen Punkt in der Kommunikation dazu führen sollte, dass die Nachricht erneut versucht wird. Andernfalls kann es passieren, dass eine Hintergrundaufgabe versucht, ein bereits veröffentlichtes Ereignis erneut zu veröffentlichen, wodurch eine Wettlaufbedingung entsteht. Stellen Sie sicher, dass die Updates entweder idempotent sind oder dass sie genügend Informationen bereitstellen, um sicherzustellen, dass Sie ein Duplikat erkennen, verwerfen und nur eine Antwort zurücksenden können.

Wie bereits erwähnt, bedeutet Idempotency, dass eine Operation mehrmals ausgeführt werden kann, ohne das Ergebnis zu ändern. In einer Messagingumgebung, wie bei der Kommunikation von Ereignissen, ist ein Ereignis idempotent, wenn es mehrmals übermittelt werden kann, ohne das Ergebnis für den Empfänger-Microservice zu ändern. Dies kann nötig werden, sowohl wegen der Natur des Ereignisses selbst als auch wegen der Art und Weise, wie das System das Ereignis behandelt. Die Nachrichten-Idempotenz ist in jeder Anwendung wichtig, die Messaging verwendet, nicht nur in Anwendungen, die das Ereignisbusmuster implementieren.

Ein Beispiel für einen idempotenten Vorgang ist eine SQL-Anweisung, die Daten nur dann in eine Tabelle einfügt, wenn diese Daten nicht bereits in der Tabelle enthalten sind. Es spielt keine Rolle, wie oft Sie diese SQL-Anweisung einfügen. Das Ergebnis ist identisch – die Tabelle enthält diese Daten. Derartige Idempotenz kann auch nötig sein, wenn Sie Meldungen behandeln müssen und diese Meldungen theoretisch mehrmals gesendet und deshalb auch mehrmals verarbeitet werden können. Wenn eine Wiederholungslogik beispielsweise bewirkt, dass ein Absender genau dieselbe Nachricht mehrmals sendet, müssen Sie sicherstellen, dass es idempotent ist.

Es ist möglich, idempotente Nachrichten zu entwerfen. Sie können z. B. ein Ereignis erstellen, das besagt, dass der Produktpreis auf 25 $ festgelegt wird, anstatt "5 $ zum Produktpreis hinzuzufügen". Sie könnten die erste Nachricht beliebig oft verarbeiten, und das Ergebnis ist identisch. Das gilt nicht für die zweite Nachricht. Aber auch im ersten Fall möchten Sie das erste Ereignis möglicherweise nicht verarbeiten, da das System auch ein neueres Preisänderungsereignis gesendet haben könnte und Sie den neuen Preis überschreiben würden.

Ein weiteres Beispiel ist ein Ereignis zum Abschluss der Bestellung, das an mehrere Abonnenten übermittelt wird. Die App muss sicherstellen, dass die Bestellinformationen nur einmal in anderen Systemen aktualisiert werden, auch wenn für das gleiche Bestellung-abgeschlossen-Ereignis duplizierte Nachrichtenmeldungen vorhanden sind.

Es ist praktisch, eine Art von Identität pro Ereignis zu haben, damit Sie Logik erstellen können, die erzwingt, dass jedes Ereignis nur einmal pro Empfänger verarbeitet wird.

Einige Nachrichtenverarbeitungen sind inhärent idempotent. Wenn z. B. ein System Bildminiaturansichten generiert, spielt es möglicherweise keine Rolle, wie oft die Nachricht über die generierte Miniaturansicht verarbeitet wird. Das Ergebnis ist, dass die Miniaturansichten generiert werden und sie jedes Mal gleich sind. Andererseits können Vorgänge wie das Aufrufen eines Zahlungsgateways zum Aufladen einer Kreditkarte überhaupt nicht idempotent sein. In diesen Fällen müssen Sie sicherstellen, dass die Verarbeitung einer Nachricht mehrmals den erwarteten Effekt hat.

Weitere Ressourcen

Deduplizieren von Integrationsereignismeldungen

Sie können sicherstellen, dass Nachrichtenereignisse nur einmal pro Abonnent auf unterschiedlichen Ebenen gesendet und verarbeitet werden. Eine Möglichkeit besteht darin, ein Deduplizierungsfeature zu verwenden, das von der von Ihnen verwendeten Messaginginfrastruktur angeboten wird. Eine andere besteht darin, benutzerdefinierte Logik in Ihrem Ziel-Microservice zu implementieren. Überprüfungen auf Transportebene und Anwendungsebene sind Ihre beste Wahl.

Deduplizieren von Nachrichtenereignissen auf Ereignishandlerebene

Eine Möglichkeit, sicherzustellen, dass ein Ereignis nur einmal von einem Empfänger verarbeitet wird, besteht darin, bestimmte Logik beim Verarbeiten der Nachrichtenereignisse in Ereignishandlern zu implementieren. Dies ist beispielsweise der Ansatz, der in der eShopOnContainers-Anwendung verwendet wird, wie Sie im Quellcode der UserCheckoutAcceptedIntegrationEventHandler-Klasse sehen können, wenn es ein UserCheckoutAcceptedIntegrationEvent Integrationsereignis empfängt. (In diesem Fall wird das CreateOrderCommand mit einem IdentifiedCommand umschlossen, wobei eventMsg.RequestId als Bezeichner verwendet wird, bevor es an den Befehlshandler gesendet wird).

Deduplizieren von Nachrichten bei Verwendung von RabbitMQ

Wenn zeitweilige Netzwerkfehler auftreten, können Nachrichten dupliziert werden, und der Nachrichtenempfänger muss bereit sein, diese duplizierten Nachrichten zu verarbeiten. Wenn möglich sollten Empfänger Meldungen auf idempotente Weise verarbeiten. Dies ist sinnvoller als das explizite Verarbeiten anhand der Deduplizierung.

In der RabbitMQ-Dokumentation steht sinngemäß: Wenn eine Meldung an einen Consumer gesendet und anschließend erneut in die Warteschlange eingereiht wird (weil sie nicht bestätigt wurde, bevor die Verbindung zum Consumer abgebrochen ist), legt RabbitMQ das Flag „Redelivered“ (Erneut zugestellt) für diese fest, wenn die Meldung erneut gesendet wird (egal ob an den gleichen Consumer oder nicht).

Wenn das "redelivered"-Flag gesetzt ist, muss der Empfänger dies berücksichtigen, weil die Nachricht möglicherweise bereits verarbeitet wurde. Aber das ist nicht garantiert; Die Nachricht hat den Empfänger möglicherweise nie erreicht, nachdem er den Nachrichtenbroker verlassen hat, möglicherweise aufgrund von Netzwerkproblemen. Andererseits wurde die Meldung garantiert nicht mehr als einmal gesendet, wenn das Flag „Redelivered“ nicht festgelegt wurde. Deshalb muss der Empfänger die Meldungen nur dann deduplizieren oder idempotent verarbeiten, wenn das Flag „Redelivered“ in der Meldung festgelegt wurde.

Weitere Ressourcen