Bearbeiten

Transaktionsausgangsmuster mit Azure Cosmos DB

Azure Cosmos DB
Azure-Servicebus
Azure-Funktionen

Die Implementierung von zuverlässigem Messaging in verteilten Systemen kann eine Herausforderung sein. In diesem Artikel erfahren Sie, wie Sie das Transaktionsausgangsmuster für zuverlässiges Messaging und garantierte Ereigniszustellung verwenden – eine wichtige Komponente für die Unterstützung einer idempotenten Nachrichtenverarbeitung. Hierzu verwenden Sie Azure Cosmos DB-Transaktionsbatches und Änderungsfeeds in Kombination mit Azure Service Bus.

Übersicht

Microservicearchitekturen werden immer beliebter, gerade auch aufgrund ihrer vielversprechenden Lösungen bei Problemen insbesondere großer Anwendungen wie Skalierbarkeit, Wartbarkeit und Agilität. In puncto Datenverarbeitung zeigt sich dieses Architekturmuster jedoch auch herausfordernd. In verteilten Anwendungen verwaltet jeder Dienst unabhängig die Daten, die er benötigt, um in einem dedizierten diensteigenen Datenspeicher zu funktionieren. Zur Unterstützung eines solchen Szenarios verwenden Sie in der Regel eine Messaginglösung wie RabbitMQ, Kafka oder Azure Service Bus, die Daten (Ereignisse) von einem Dienst über einen Messagingbus an andere Dienste der Anwendung verteilt. Interne oder externe Benutzer können diese Nachrichten dann abonnieren und über Änderungen benachrichtigt werden, sobald Daten bearbeitet werden.

Ein typisches Beispiel in diesem Bereich ist ein Bestellsystem: Sobald ein Benutzer eine Bestellung aufgibt, empfängt ein Ordering-Dienst Daten von einer Clientanwendung über einen REST-Endpunkt. Er ordnet die Nutzlast einer internen Darstellung eines Order-Objekts zu, um die Daten zu überprüfen. Nach einem erfolgreichen Commit in die Datenbank veröffentlicht der Dienst ein OrderCreated-Ereignis in einem Nachrichtenbus. Jeder andere Dienst, der an neuen Bestellungen interessiert ist (z. B. ein Inventory- oder Invoicing-Dienst), würde OrderCreated-Nachrichten abonnieren, verarbeiten und in seiner eigenen Datenbank speichern.

Der folgende Pseudocode zeigt, wie dieser Prozess aus Perspektive des Ordering-Diensts in der Regel aussieht:

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

Dieser Ansatz funktioniert gut, bis zwischen dem Speichern des Bestellobjekts und dem Veröffentlichen des entsprechenden Ereignisses ein Fehler auftritt. Das Senden eines Ereignisses kann an diesem Punkt aus vielen Gründen fehlschlagen:

  • Netzwerkfehler
  • Ausfall des Nachrichtendiensts
  • Hostfehler

Ungeachtet der Art des Fehlers kann das OrderCreated-Ereignis nicht im Messagingbus veröffentlicht werden. Andere Dienste werden nicht benachrichtigt, dass eine Bestellung erstellt wurde. Der Ordering-Dienst muss sich nun um verschiedene Dinge kümmern, die nicht im Zusammenhang mit dem eigentlichen Geschäftsprozess stehen. Er muss sämtliche Ereignisse nachverfolgen, die noch in den Messagingbus gestellt werden müssen, sobald dieser wieder online verfügbar ist. Im schlechtesten Fall kann es aufgrund verloren gegangener Ereignisse zu Dateninkonsistenzen in der Anwendung kommen.

Diagram that shows event handling without the Transactional Outbox pattern.

Lösung

Solche Situationen lassen sich durch das bewährte Muster Transactional Outbox (Transaktionsausgang) vermeiden. Dieses Muster stellt sicher, dass Ereignisse vor der Weiterleitung an einen Nachrichtenbroker in einem Datenspeicher gespeichert werden. In der Regel ist dies die Tabelle „Outbox“ (Ausgang) in Ihrer Datenbank. Wenn das Geschäftsobjekt und die zugehörigen Ereignisse innerhalb der gleichen Datenbanktransaktion gespeichert werden, gehen keine Daten verloren. Alles wird mit einem Commit festgeschrieben bzw. bei einem Fehler per Rollback zurückgesetzt. Für die letztliche Veröffentlichung des Ereignisses fragt ein anderer Dienst oder ein Workerprozess die Ausgangstabelle nach nicht verarbeiteten Einträgen ab, veröffentlicht die Ereignisse und markiert sie danach als verarbeitet. Durch dieses Muster wird sichergestellt, dass nach der Erstellung oder Änderung eines Geschäftsobjekts keine Ereignisse verloren gehen.

Diagram that shows event handling with the Transactional Outbox pattern and a relay service for publishing events to the message broker.

Laden Sie eine Visio-Datei dieser Architektur herunter.

In einer relationalen Datenbank ist die Implementierung des Musters einfach. Bei Verwendung von Entity Framework Core beispielsweise nutzt der Dienst einen Entity Framework-Kontext zur Erstellung der Datenbanktransaktion, zum Speichern von Geschäftsobjekt und Ereignis sowie für das Commit oder Rollback der Transaktion. Auch der Workerdienst für die Verarbeitung der Ereignisse ist einfach zu implementieren: Er fragt die Ausgangstabelle in regelmäßigen Abständen nach neuen Einträgen ab, veröffentlicht diese Ereignisse im Messagingbus und markiert die Einträge letztlich als verarbeitet.

In der Praxis sind die Dinge aber oft nicht so einfach. In erster Linie ist hier sicherzustellen, dass die Reihenfolge der Ereignisse beibehalten wird. Denn ein OrderUpdated-Ereignis darf nicht vor einem OrderCreated-Ereignis veröffentlicht werden.

Implementierung in Azure Cosmos DB

In diesem Abschnitt erfahren Sie, wie Sie das Transaktionsausgangsmuster in Azure Cosmos DB implementieren, um zuverlässiges, in der richtigen Reihenfolge ausgeführtes Messaging zwischen verschiedenen Diensten mithilfe des Änderungsfeeds und des Service Bus von Azure Cosmos DB zu erhalten. Zur Veranschaulichung dient ein Beispieldienst, der Contact-Objekte verwaltet (z. B. FirstName-, LastName-, Email- und Company-Informationen). Der Dienst verwendet das CQRS-Muster (Command and Query Responsibility Segregation) und folgt den grundlegenden domänengesteuerten Designkonzepten. Den Beispielcode der Implementierung finden Sie auf GitHub.

Im Beispieldienst hat ein Contact-Objekt die folgende Struktur:

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

Sobald ein Contact erstellt oder aktualisiert ist, gibt der Dienst Ereignisse mit Informationen zur aktuellen Änderung aus. Domänenereignisse können unter anderem Folgendes sein:

  • ContactCreated. Ausgelöst beim Hinzufügen eines Kontakts.
  • ContactNameUpdated. Ausgelöst bei einer Änderung vonFirstName oder LastName.
  • ContactEmailUpdated. Ausgelöst bei einer Aktualisierung der E-Mail-Adresse.
  • ContactCompanyUpdated. Ausgelöst bei einer Änderung der Unternehmenseigenschaften.

Transaktionsbatches

Zur Implementierung dieses Musters müssen Sie sicherstellen, dass das Contact-Geschäftsobjekt und die zugehörigen Ereignisse in derselben Datenbanktransaktion gespeichert werden. In Azure Cosmos DB funktionieren Transaktionen anders als in relationalen Datenbanksystemen. Azure Cosmos DB-Transaktionen, auch als Transaktionsbatches bezeichnet, arbeiten auf einer einzigen logischen Partition. Damit sind für sie die ACID-Eigenschaften Atomarität, Konsistenz, Isolation und Dauerhaftigkeit gewährleistet. Zwei Dokumente des gleichen Transaktionsbatchvorgangs können nicht in verschiedenen Containern oder logischen Partitionen gespeichert werden. Für den Beispieldienst bedeutet dies, dass sowohl das Geschäftsobjekt als auch das bzw. die Ereignisse im selben Container und der gleichen logischen Partition enthalten sind.

Kontext, Repositorys und Arbeitseinheit

Der Kern der Beispielimplementierung ist ein Containerkontext, der im gleichen Transaktionsbatch gespeicherte Objekte nachverfolgt. Er verwaltet eine Liste der erstellten und geänderten Objekte eines einzelnen Azure Cosmos DB-Containers. Seine Schnittstelle sieht wie folgt aus:

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

Die Liste in der Containerkontextkomponente überwacht Contact- und DomainEvent-Objekte. Beide Objekte werden im gleichen Container gespeichert. Im gleichen Azure Cosmos DB-Container werden also verschiedene Objekttypen gespeichert, wobei die Eigenschaft Type zwischen Geschäftsobjekt und Ereignis unterscheidet.

Für jeden Objekttyp gibt es ein dediziertes Repository, das den Datenzugriff definiert und implementiert. Die Schnittstelle des Contact-Repositorys stellt die folgenden Methoden bereit:

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

Das Event-Repository sieht ähnlich aus. Allerdings gibt es bei diesem Repository nur eine Methode, die neue Ereignisse im Speicher erstellt:

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

Die Implementierungen beider Repositoryschnittstellen erhalten per Abhängigkeitsinjektion einen Verweis auf eine einzelne IContainerContext-Instanz. Dadurch wird sichergestellt, dass beide Schnittstellen im gleichen Azure Cosmos DB-Kontext ausgeführt werden.

Die letzte Komponente ist UnitOfWork. Sie schreibt die in der IContainerContext-Instanz gespeicherten Änderungen per Commit in Azure Cosmos DB fest:

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

Ereignisbehandlung: Erstellung und Veröffentlichung

Bei jedem Erstellen, Ändern oder (vorläufigen) Löschen eines Contact-Objekts löst der Dienst ein entsprechendes Ereignis aus. Der Kern der bereitgestellten Lösung ist eine Kombination aus domänengesteuertem Design (Domain-Driven Design, DDD) und dem von Jimmy Bogard vorgeschlagenen Vermittlermuster. Er schlägt vor, eine Liste der Ereignisse zu verwalten, die aufgrund von Änderungen am Domänenobjekt auftreten, und diese Ereignisse zu veröffentlichen, bevor das eigentliche Objekt in der Datenbank gespeichert wird.

Die Liste der Änderungen wird im Domänenobjekt selbst gespeichert, sodass keine andere Komponente die Ereigniskette ändern kann. Das Verhalten der Ereignisverwaltung (IEvent-Instanzen) im Domänenobjekt wird über eine IEventEmitter<IEvent>-Schnittstelle definiert und in einer abstrakten DomainEntity-Klasse implementiert:

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

Das Contact-Objekt löst Domänenereignisse aus. Die Contact-Entität folgt den grundlegenden DDD-Konzepten und konfiguriert die Setter der Domäneneigenschaften als privat. Die Klasse beinhaltet keine öffentlichen Setter. Stattdessen bietet sie Methoden zum Bearbeiten des internen Zustands. In diesen Methoden können für bestimmte Änderungen (z. B. ContactNameUpdated oder ContactEmailUpdated) entsprechende Ereignisse ausgelöst werden.

Im Folgenden finden Sie ein Beispiel für die Aktualisierung des Namens eines Kontakts. (Das Ereignis wird am Ende der Methode ausgelöst.)

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return;

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

Die entsprechende ContactNameUpdatedEvent, welche die Änderungen verfolgt, sieht wie folgt aus:

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

Bislang wurden die Ereignisse nur im Domänenobjekt protokolliert und nichts wurde in der Datenbank gespeichert oder sogar in einem Nachrichtenbroker veröffentlicht. Entsprechend der Empfehlung wird die Liste der Ereignisse verarbeitet, bevor das Geschäftsobjekt im Datenspeicher gespeichert wird. In diesem Fall geschieht dies in der SaveChangesAsync-Methode der IContainerContext-Instanz, die in einer privaten RaiseDomainEvents-Methode implementiert ist. (dObjs ist die Liste der nachverfolgten Entitäten des Containerkontexts.)

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

In der letzten Zeile wird das MediatR-Paket, eine Implementierung des Vermittlermusters in C#, zur Veröffentlichung eines Ereignisses in der Anwendung verwendet. Dies ist möglich, da alle Ereignisse wie ContactNameUpdatedEvent die INotification-Schnittstelle des MediatR-Pakets implementieren.

Diese Ereignisse müssen von einem entsprechenden Handler verarbeitet werden. Hier kommt die IEventsRepository-Implementierung ins Spiel. Nachfolgend ein Beispiel für den NameUpdated-Ereignishandler:

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

Eine IEventRepository-Instanz wird über den Konstruktor in die Handlerklasse eingefügt. Sobald ein ContactNameUpdatedEvent im Dienst veröffentlicht ist, wird die Handle-Methode aufgerufen. Diese verwendet die Ereignisrepositoryinstanz zur Erstellung eines Benachrichtigungsobjekts. Dieses Benachrichtigungsobjekt wird wiederum in die Liste der nachverfolgten Objekte im IContainerContext-Objekt eingefügt und verbindet die im gleichen Transaktionsbatch gespeicherten Objekte mit Azure Cosmos DB.

Bisher weiß der Containerkontext, welche Objekte zu verarbeiten sind. Um die nachverfolgten Objekte schließlich in Azure Cosmos DB zu speichern, erstellt die IContainerContext-Implementierung den Transaktionsbatch, fügt alle relevanten Objekte hinzu und führt den Vorgang an der Datenbank aus. Der beschriebene Prozess wird in der SaveInTransactionalBatchAsync-Methode behandelt, die von der SaveChangesAsync-Methode aufgerufen wird.

Die folgenden Teile der Implementierung benötigen Sie zum Erstellen und Ausführen des Transaktionsbatches:

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects); 

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

Im Folgenden finden Sie eine Übersicht über die bisherige Funktionsweise des Prozesses (Aktualisieren des Namens eines Kontaktobjekts):

  1. Ein Kunde möchte den Namen eines Kontakts aktualisieren. Die SetName-Methode wird für das Kontaktobjekt aufgerufen, und die Eigenschaften werden aktualisiert.
  2. Das ContactNameUpdated-Ereignis wird der Liste der Ereignisse im Domänenobjekt hinzugefügt.
  3. Die Update-Methode des Kontaktrepositorys wird aufgerufen, wodurch das Domänenobjekt dem Containerkontext hinzugefügt wird. Das Objekt wird jetzt nachverfolgt.
  4. CommitAsync wird für die UnitOfWork-Instanz aufgerufen, die wiederum SaveChangesAsync im Containerkontext aufruft.
  5. In SaveChangesAsync werden alle Ereignisse in der Liste des Domänenobjekts von einerMediatR-Instanz veröffentlicht und über das Ereignisrepository dem gleichen Containerkontext hinzugefügt.
  6. In SaveChangesAsyncwird ein TransactionalBatch erstellt. Es wird sowohl das Kontaktobjekt als auch das Ereignis enthalten.
  7. Die TransactionalBatch wird ausgeführt, und die Daten werden in Azure Cosmos DB per Commit festgeschrieben.
  8. SaveChangesAsync und CommitAsync werden erfolgreich zurückgegeben.

Persistenz

Wie Sie den obigen Codeausschnitten entnehmen, werden alle in Azure Cosmos DB gespeicherten Objekte in einer DataObject-Instanz umschlossen. Dieses Objekt stellt allgemeine Eigenschaften bereit:

  • ID.
  • PartitionKey.
  • Type.
  • State. Wie Created wird auch Updated in Azure Cosmos DB nicht beibehalten.
  • Etag. Für eine optimistische Sperre.
  • TTL. Time To Live-Eigenschaft für die automatische Bereinigung alter Dokumente.
  • Data. Generisches Datenobjekt.

Diese Eigenschaften werden in der von den Repositorys und dem Containerkontext verwendeten generischen Schnittstelle IDataObject definiert:


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

Objekte, die in einer DataObject-Instanz umschlossen und in der Datenbank gespeichert werden, sehen dann wie in diesem Beispiel aus (Contact und ContactNameUpdatedEvent):

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

Sie sehen, dass die Contact- und ContactNameUpdatedEvent (Typ domainEvent)-Dokumente denselben Partitionsschlüssel aufweisen und beide Dokumente in derselben logischen Partition beibehalten werden.

Änderungsfeedverarbeitung

Um den Ereignisdatenstrom zu lesen und an einen Nachrichtenbroker zu senden, verwendet der Dienst den Azure Cosmos DB-Änderungsfeed.

Der Änderungsfeed ist ein permanentes Protokoll der Änderungen in Ihrem Container. Er arbeitet im Hintergrund und verfolgt Änderungen. Innerhalb einer logischen Partition ist die Reihenfolge der Änderungen garantiert. Am einfachsten lässt sich der Änderungsfeed mit einer Azure-Funktion mit einem Azure Cosmos DB-Trigger lesen. Aber auch die Änderungsfeedprozessor-Bibliothek kann hierfür verwendet werden. Mit ihr können Sie die Änderungsfeedverarbeitung als Hintergrunddienst in Ihre Web-API integrieren (über die IHostedService-Schnittstelle). Im folgenden Beispiel wird eine einfache Konsolenanwendung verwendet, die die abstrakte Klasse BackgroundService implementiert, um lang andauernde Hintergrundaufgaben in .NET Core-Anwendungen zu hosten.

Um die Änderungen aus dem Azure Cosmos DB-Änderungsfeed zu empfangen, müssen Sie ein ChangeFeedProcessor-Objekt instanziieren, eine Handlermethode für die Nachrichtenverarbeitung registrieren und die Überwachung der Änderungen starten:

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

Die Nachrichten werden dann durch eine Handlermethode (hier HandleChangesAsync) verarbeitet. In diesem Beispiel werden Ereignisse in einem Service Bus-Thema mit aktiviertem Deduplizierungsfeature veröffentlicht, das aus Gründen der Skalierbarkeit partitioniert ist. Jeder Dienst, der an Änderungen an Contact-Objekten interessiert ist, kann dieses Service Bus-Thema dann abonnieren und die Änderungen für seinen eigenen Kontext empfangen und verarbeiten.

Die generierten Service Bus-Nachrichten weisen eine SessionId-Eigenschaft auf. Wenn Sie in Service Bus Sitzungen verwenden, bleibt die Reihenfolge der Nachrichten nach der FIFO-Regel erhalten. Für diesen Anwendungsfall ist die Beibehaltung der Reihenfolge erforderlich.

Hier ist der Codeausschnitt, der die Nachrichten aus dem Änderungsfeed verarbeitet:

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

Fehlerbehandlung

Wenn während der Verarbeitung der Änderungen ein Fehler auftritt, startet die Änderungsfeedbibliothek das Lesen von Nachrichten an der Position neu, an der der letzte Batch erfolgreich verarbeitet wurde. Die Anwendung hat beispielsweise erfolgreich 10.000 Nachrichten verarbeitet und arbeitet nun an Batch 10.001 bis 10.025. Wenn hier ein Fehler auftritt, kann die Anwendung neu gestartet werden und ihre Arbeit an Position 10.001 wiederaufnehmen. Die Bibliothek verfolgt automatisch über Informationen, die in einem Leases-Container in Azure Cosmos DB gespeichert werden, was bereits verarbeitet wurde.

Möglicherweise hat der Dienst auch einige der Nachrichten bereits gesendet, die nun zur erneuten Verarbeitung an Service Bus geleitet werden. Ein solches Szenario würde normalerweise zu einer doppelten Nachrichtenverarbeitung führen. Wie bereits erwähnt, verfügt Service Bus aber über ein Feature zur Erkennung doppelter Nachrichten, das Sie für dieses Szenario aktivieren müssen. Der Dienst überprüft anhand der anwendungsgesteuerten MessageId-Eigenschaft der Nachricht, ob eine Nachricht bereits einem Service Bus-Thema (oder einer Warteschlange) hinzugefügt wurde. Diese Eigenschaft wird auf ID des Ereignisdokuments gesetzt. Wenn eine gleiche Nachricht erneut an Service Bus gesendet wird, ignoriert und verwirft der Dienst sie.

Housekeeping

In einer typischen Transaktionsausgangsimplementierung aktualisiert der Dienst die behandelten Ereignisse und setzt die Processed-Eigenschaft auf true. Dieser Wert gibt an, dass die Nachricht erfolgreich veröffentlicht wurde. Dieses Verhalten kann manuell in der Handlermethode implementiert werden. Im aktuellen Szenario ist ein solcher Prozess nicht erforderlich. Azure Cosmos DB verfolgt Ereignisse nach, die mithilfe des Änderungsfeeds (in Kombination mit dem Leases-Container) verarbeitet wurden.

Gelegentlich müssen die Ereignisse aus dem Container in einem letzten Schritt gelöscht werden, damit nur die neuesten Datensätze bzw. Dokumente erhalten bleiben. Für regelmäßige Bereinigungen wendet die Implementierung ein weiteres Feature von Azure Cosmos DB an: Time To Live bzw. die Gültigkeitsdauer (TTL) der Dokumente. Azure Cosmos DB kann Dokumente auch automatisch basierend auf einer TTL-Eigenschaft löschen, die den Dokumenten hinzugefügt werden kann: eine Zeitspanne in Sekunden. Der Dienst überprüft den Container fortlaufend auf Dokumente, die eine TTL-Eigenschaft aufweisen. Sobald ein Dokument abläuft, entfernt Azure Cosmos DB das Dokument aus der Datenbank.

Wenn alle Komponenten erwartungsgemäß funktionieren, werden Ereignisse schnell verarbeitet und veröffentlicht: innerhalb von Sekunden. Bei einem Fehler in Azure Cosmos DB werden die Ereignisse nicht an den Nachrichtenbus gesendet, da sowohl das Geschäftsobjekt als auch die zugehörigen Ereignisse nicht in der Datenbank gespeichert werden können. Für den Fall, dass der Hintergrundworker (Änderungsfeedprozessor) oder der Service Bus nicht verfügbar ist, müssen Sie einzig beachten, einen geeigneten TTL-Wert für die DomainEvent-Dokumente festzulegen. In einer Produktionsumgebung wählen Sie am besten einen Zeitraum von mehreren Tagen. Beispiel: 10 Tage. Allen beteiligten Komponenten bleibt so genügend Zeit, um Änderungen innerhalb der Anwendung zu verarbeiten bzw. zu veröffentlichen.

Zusammenfassung

Das Transaktionsausgangsmuster löst das Problem der zuverlässigen Veröffentlichung von Domänenereignissen in verteilten Systemen. Durch ein Commit des Zustands des Geschäftsobjekts sowie seiner Ereignisse im gleichen Transaktionsbatch und Verwendung eines Hintergrundprozessors als Nachrichtenrelay wird sichergestellt, dass andere interne oder externe Dienste die von ihnen benötigten Informationen zuverlässig erhalten. Dieses Beispiel ist keine herkömmliche Implementierung des Transaktionsausgangsmusters. Vielmehr vereinfacht es das herkömmliche Konzept durch Features wie den Änderungsfeed von Azure Cosmos DB und die Gültigkeitsdauer von Dokumenten.

Im Folgenden finden Sie eine Zusammenfassung der in diesem Szenario verwendeten Azure-Komponenten:

Diagram that shows the Azure components to implement Transactional Outbox with Azure Cosmos DB and Azure Service Bus.

Laden Sie eine Visio-Datei dieser Architektur herunter.

Die Vorteile dieser Lösung sind:

  • Zuverlässiges Messaging und garantierte Ereigniszustellung.
  • Beibehaltene Ereignisreihenfolge und Nachrichtendeduplizierung via Service Bus.
  • Keine zusätzliche Processed-Eigenschaft zur Kennzeichnung der erfolgreichen Verarbeitung eines Ereignisdokuments erforderlich.
  • Löschen von Ereignissen aus Azure Cosmos DB auf Basis der Gültigkeitsdauer (TTL). Der Prozess verbraucht keine Anforderungseinheiten, die für die Verarbeitung von Benutzer- und Anwendungsanforderungen erforderlich sind. Stattdessen werden in einer Hintergrundaufgabe „übrig gebliebene“ Anforderungseinheiten genutzt.
  • Fehlersichere Verarbeitung von Nachrichten über ChangeFeedProcessor (oder eine Azure-Funktion).
  • Optional: Mehrere Änderungsfeedprozessoren mit jeweils eigenem Zeiger im Änderungsfeed.

Überlegungen

Die in diesem Artikel erläuterte Beispielanwendung veranschaulicht, wie Sie das Transaktionsausgangsmuster in Azure mit Azure Cosmos DB und Service Bus implementieren können. Es gibt auch andere Ansätze, die NoSQL-Datenbanken verwenden. Um sicherzustellen, dass das Geschäftsobjekt und die Ereignisse zuverlässig in der Datenbank gespeichert werden, können Sie die Liste der Ereignisse in das Geschäftsobjektdokument einbetten. Der Nachteil dieses Ansatzes besteht darin, dass beim Bereinigungsprozess jedes Dokument aktualisiert werden muss, das Ereignisse enthält. Dies ist im Vergleich zur Verwendung der Gültigkeitsdauer (TTL) nicht ideal, insbesondere im Hinblick auf die Kosten für Anforderungseinheiten.

Allerdings ist der hier bereitgestellte Beispielcode keinesfalls mit einem produktionsbereiten Code gleichzusetzen. Hinsichtlich Multithreading ist der Code keinesfalls ideal. Dies gilt insbesondere für die Art und Weise, wie Ereignisse in der DomainEntity-Klasse behandelt werden und wie Objekte in den CosmosContainerContext-Implementierungen nachverfolgt werden. Betrachten Sie diesen Beispielcode lediglich als Ausgangspunkt für eigene Implementierungen. Alternativ können Sie auch vorhandene Bibliotheken verwenden, die bereits über diese Funktionalität verfügen, z. B. NServiceBus oder MassTransit.

Bereitstellen dieses Szenarios

Quellcode, Bereitstellungsdateien und Anweisungen zum Testen dieses Szenarios finden Sie auf GitHub: https://github.com/mspnp/transactional-outbox-pattern.

Beitragende

Dieser Artikel wird von Microsoft gepflegt. Er wurde ursprünglich von folgenden Mitwirkenden geschrieben:

Hauptautor:

Melden Sie sich bei LinkedIn an, um nicht öffentliche LinkedIn-Profile anzuzeigen.

Nächste Schritte

Weitere Informationen finden Sie in den folgenden Artikeln: