Wzorzec transakcyjnej skrzynki wychodzącej z usługą Azure Cosmos DB

Azure Cosmos DB
Azure Service Bus
Azure Functions

Implementacja niezawodnej obsługi komunikatów w systemach rozproszonych może być trudna. W tym artykule opisano sposób używania wzorca transakcyjnej skrzynki wychodzącej na potrzeby niezawodnej obsługi komunikatów i gwarantowanego dostarczania zdarzeń, co jest ważną częścią obsługi przetwarzania komunikatów idempotentnych. W tym celu użyjesz partii transakcyjnych i zestawienia zmian usługi Azure Cosmos DB w połączeniu z usługą Azure Service Bus.

Omówienie

Architektury mikrousług stają się coraz bardziej popularne i pokazują obietnicę rozwiązywania problemów, takich jak skalowalność, łatwość konserwacji i elastyczność, szczególnie w dużych aplikacjach. Jednak ten wzorzec architektury wprowadza również wyzwania związane z obsługą danych. W aplikacjach rozproszonych każda usługa niezależnie utrzymuje dane, których potrzebuje do działania w dedykowanym magazynie danych należącym do usługi. Aby obsłużyć taki scenariusz, zazwyczaj używasz rozwiązania do obsługi komunikatów, takiego jak RabbitMQ, Kafka lub Azure Service Bus, które dystrybuuje dane (zdarzenia) z jednej usługi za pośrednictwem magistrali komunikatów do innych usług aplikacji. Użytkownicy wewnętrzni lub zewnętrzni mogą następnie subskrybować te komunikaty i otrzymywać powiadomienia o zmianach zaraz po manipulowaniu danymi.

Dobrze znanym przykładem w tym obszarze jest system zamawiania: gdy użytkownik chce utworzyć zamówienie, Ordering usługa odbiera dane z aplikacji klienckiej za pośrednictwem punktu końcowego REST. Mapuje ładunek na wewnętrzną reprezentację Order obiektu w celu zweryfikowania danych. Po pomyślnym zatwierdzeniu w bazie danych publikuje OrderCreated ono zdarzenie w magistrali komunikatów. Każda inna usługa zainteresowana nowymi zamówieniami (na przykład usługa Inventory lub Invoicing ) będzie subskrybować komunikaty, przetwarzać je i przechowywać OrderCreated w własnej bazie danych.

Poniższy pseudokod pokazuje, jak ten proces zwykle wygląda z Ordering perspektywy usługi:

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

Takie podejście działa dobrze do momentu wystąpienia błędu między zapisaniem obiektu zamówienia i opublikowaniem odpowiedniego zdarzenia. Wysłanie zdarzenia może zakończyć się niepowodzeniem z wielu powodów:

  • Błędy sieci
  • Awaria usługi komunikatów
  • Niepowodzenie hosta

Niezależnie od błędu, wynikiem jest to, że OrderCreated nie można opublikować zdarzenia w magistrali komunikatów. Inne usługi nie będą powiadamiane o utworzeniu zamówienia. Usługa Ordering musi teraz dbać o różne elementy, które nie odnoszą się do rzeczywistego procesu biznesowego. Musi śledzić zdarzenia, które nadal muszą być umieszczane w magistrali komunikatów zaraz po powrocie do trybu online. Nawet najgorszy przypadek może wystąpić: niespójności danych w aplikacji z powodu utraconych zdarzeń.

Diagram that shows event handling without the Transactional Outbox pattern.

Rozwiązanie

Istnieje dobrze znany wzorzec o nazwie Transakcyjna skrzynka nadawcza , która może pomóc uniknąć tych sytuacji. Gwarantuje to, że zdarzenia są zapisywane w magazynie danych (zazwyczaj w tabeli Skrzynka nadawcza w bazie danych), zanim zostaną one ostatecznie wypchnięte do brokera komunikatów. Jeśli obiekt biznesowy i odpowiednie zdarzenia są zapisywane w ramach tej samej transakcji bazy danych, gwarantowane jest, że żadne dane nie zostaną utracone. Wszystko zostanie zatwierdzone lub wszystko wycofa się, jeśli wystąpi błąd. Aby ostatecznie opublikować zdarzenie, inny proces usługi lub procesu roboczego wysyła zapytanie do tabeli Skrzynka odbiorcza dla nieobsługiwane wpisy, publikuje zdarzenia i oznacza je jako przetworzone. Ten wzorzec zapewnia, że zdarzenia nie zostaną utracone po utworzeniu lub zmodyfikowaniu obiektu biznesowego.

Diagram that shows event handling with the Transactional Outbox pattern and a relay service for publishing events to the message broker.

Pobierz plik programu Visio z tą architekturą.

W relacyjnej bazie danych implementacja wzorca jest prosta. Jeśli na przykład usługa używa platformy Entity Framework Core, użyje kontekstu programu Entity Framework do utworzenia transakcji bazy danych, zapisania obiektu biznesowego i zdarzenia oraz zatwierdzenia transakcji lub wycofania. Ponadto usługa procesu roboczego, która przetwarza zdarzenia, jest łatwa do zaimplementowania: okresowo wysyła zapytanie do tabeli Skrzynka nadawcza dla nowych wpisów, publikuje nowo wstawione zdarzenia do magistrali komunikatów i na koniec oznacza te wpisy jako przetworzone.

W praktyce rzeczy nie są tak proste, jak mogą wyglądać na początku. Co najważniejsze, należy upewnić się, że kolejność zdarzeń jest zachowywana, aby OrderUpdated zdarzenie nie zostało opublikowane przed zdarzeniem OrderCreated .

Implementacja w usłudze Azure Cosmos DB

W tej sekcji pokazano, jak zaimplementować wzorzec transakcyjnej skrzynki odbiorczej w usłudze Azure Cosmos DB w celu zapewnienia niezawodnej obsługi komunikatów w kolejności między różnymi usługami za pomocą zestawienia zmian usługi Azure Cosmos DB i usługi Service Bus. Przedstawia przykładową usługę, która zarządza obiektami Contact (FirstName, LastName, , EmailCompany i tak dalej). Używa wzorca segregacji odpowiedzialności poleceń i zapytań (CQRS) i jest zgodny z podstawowymi pojęciami projektowymi opartymi na domenie. Przykładowy kod implementacji można znaleźć w witrynie GitHub.

Obiekt Contact w przykładowej usłudze ma następującą strukturę:

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

Gdy tylko element Contact zostanie utworzony lub zaktualizowany, emituje zdarzenia zawierające informacje o bieżącej zmianie. Między innymi zdarzenia domeny mogą być następujące:

  • ContactCreated. Podniesione po dodaniu kontaktu.
  • ContactNameUpdated. Podniesione w przypadku FirstName zmiany lub LastName .
  • ContactEmailUpdated. Zgłaszane po zaktualizowaniu adresu e-mail.
  • ContactCompanyUpdated. Zgłaszane po zmianie dowolnego z właściwości firmy.

Partie transakcyjne

Aby zaimplementować ten wzorzec, należy upewnić się, że Contact obiekt biznesowy i odpowiednie zdarzenia zostaną zapisane w tej samej transakcji bazy danych. W usłudze Azure Cosmos DB transakcje działają inaczej niż w systemach relacyjnych baz danych. Transakcje usługi Azure Cosmos DB, nazywane partiami transakcyjnym, działają na jednej partycji logicznej, dzięki czemu gwarantują niepodzielność, spójność, izolację i trwałość (ACID). Nie można zapisać dwóch dokumentów w transakcyjnej operacji wsadowej w różnych kontenerach lub partycjach logicznych. W przypadku przykładowej usługi oznacza to, że zarówno obiekt biznesowy, jak i zdarzenie lub zdarzenia zostaną umieszczone w tym samym kontenerze i partycji logicznej.

Kontekst, repozytoria i UnitOfWork

Rdzeniem przykładowej implementacji jest kontekst kontenera, który śledzi obiekty zapisane w tej samej partii transakcyjnej. Utrzymuje listę utworzonych i zmodyfikowanych obiektów i działa w jednym kontenerze usługi Azure Cosmos DB. Interfejs dla niego wygląda następująco:

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

Lista w składniku kontekstu kontenera śledzi Contact i DomainEvent obiekty. Oba te elementy zostaną umieszczone w tym samym kontenerze. Oznacza to, że wiele typów obiektów jest przechowywanych w tym samym kontenerze usługi Azure Cosmos DB i używa Type właściwości do rozróżnienia między obiektem biznesowym a zdarzeniem.

Dla każdego typu istnieje dedykowane repozytorium, które definiuje i implementuje dostęp do danych. Interfejs Contact repozytorium udostępnia następujące metody:

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

Repozytorium Event wygląda podobnie, z wyjątkiem jednej metody, która tworzy nowe zdarzenia w magazynie:

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

Implementacje obu interfejsów repozytorium uzyskują odwołanie za pośrednictwem wstrzykiwania zależności do pojedynczego IContainerContext wystąpienia, aby upewnić się, że oba działają w tym samym kontekście usługi Azure Cosmos DB.

Ostatnim składnikiem jest UnitOfWork, który zatwierdza zmiany przechowywane w wystąpieniu IContainerContext w usłudze Azure Cosmos DB:

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

Obsługa zdarzeń: tworzenie i publikowanie

Za każdym razem, gdy Contact obiekt jest tworzony, modyfikowany lub usuwany nietrwale, usługa zgłasza odpowiednie zdarzenie. Podstawowym rozwiązaniem jest połączenie projektowania opartego na domenie (DDD) i wzorca mediatora proponowanego przez Jimmy'ego Bogarda. Sugeruje zachowanie listy zdarzeń, które wystąpiły z powodu modyfikacji obiektu domeny i opublikowania tych zdarzeń przed zapisaniem rzeczywistego obiektu w bazie danych.

Lista zmian jest przechowywana w samym obiekcie domeny, aby żaden inny składnik nie mógł modyfikować łańcucha zdarzeń. Zachowanie obsługi zdarzeń (IEvent wystąpień) w obiekcie domeny jest definiowane za pośrednictwem interfejsu IEventEmitter<IEvent> i implementowane w klasie abstrakcyjnej DomainEntity :

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

Obiekt Contact zgłasza zdarzenia domeny. Jednostka jest zgodna Contact z podstawowymi pojęciami DDD, konfigurując zestawy właściwości domeny jako prywatne. W klasie nie istnieją żadne publiczne zestawy. Zamiast tego oferuje metody manipulowania stanem wewnętrznym. W tych metodach można zgłaszać odpowiednie zdarzenia dla określonej modyfikacji (na przykład ContactNameUpdated lub ContactEmailUpdated).

Oto przykład aktualizacji nazwy kontaktu. (Zdarzenie jest wywoływane na końcu metody).

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return;

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

Odpowiedni ContactNameUpdatedEventelement , który śledzi zmiany, wygląda następująco:

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

Do tej pory zdarzenia są po prostu rejestrowane w obiekcie domeny i nic nie jest zapisywane w bazie danych, a nawet publikowane w brokerze komunikatów. Zgodnie z zaleceniem lista zdarzeń zostanie przetworzona bezpośrednio przed zapisaniem obiektu biznesowego w magazynie danych. W takim przypadku odbywa się to w SaveChangesAsync metodzie IContainerContext wystąpienia, która jest implementowana w metodzie prywatnej RaiseDomainEvents . (dObjs to lista śledzonych jednostek kontekstu kontenera).

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

W ostatnim wierszu pakiet MediatR , implementacja wzorca mediatora w języku C#, służy do publikowania zdarzenia w aplikacji. Jest to możliwe, ponieważ wszystkie zdarzenia, takie jak ContactNameUpdatedEvent implementowanie INotification interfejsu pakietu MediatR.

Te zdarzenia muszą być przetwarzane przez odpowiednią procedurę obsługi. W tym miejscu implementacja IEventsRepository wchodzi w grę. Oto przykład procedury obsługi zdarzeń NameUpdated :

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

Wystąpienie IEventRepository jest wstrzykiwane do klasy programu obsługi za pośrednictwem konstruktora. Po ContactNameUpdatedEvent opublikowaniu elementu w usłudze Handle metoda jest wywoływana i używa wystąpienia repozytorium zdarzeń do utworzenia obiektu powiadomienia. Ten obiekt powiadomienia z kolei jest wstawiany na liście śledzonych obiektów w IContainerContext obiekcie i łączy obiekty zapisane w tej samej partii transakcyjnej w usłudze Azure Cosmos DB.

Do tej pory kontekst kontenera wie, które obiekty mają być przetwarzane. Aby ostatecznie utrwały śledzone obiekty do usługi Azure Cosmos DB, IContainerContext implementacja tworzy partię transakcyjną, dodaje wszystkie odpowiednie obiekty i uruchamia operację względem bazy danych. Opisany proces jest obsługiwany w metodzie SaveInTransactionalBatchAsync , która jest wywoływana przez metodę SaveChangesAsync .

Poniżej przedstawiono ważne elementy implementacji, które należy utworzyć i uruchomić transakcyjną partię:

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects); 

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

Poniżej przedstawiono omówienie sposobu działania procesu (w celu zaktualizowania nazwy obiektu kontaktu):

  1. Klient chce zaktualizować nazwę kontaktu. Metoda SetName jest wywoływana w obiekcie kontaktowym, a właściwości są aktualizowane.
  2. Zdarzenie ContactNameUpdated jest dodawane do listy zdarzeń w obiekcie domeny.
  3. Wywoływana jest metoda repozytorium kontaktów Update , która dodaje obiekt domeny do kontekstu kontenera. Obiekt jest teraz śledzony.
  4. CommitAsync jest wywoływany w wystąpieniu UnitOfWork , które z kolei wywołuje SaveChangesAsync kontekst kontenera.
  5. W programie SaveChangesAsyncwszystkie zdarzenia na liście obiektu domeny są publikowane przez MediatR wystąpienie i są dodawane za pośrednictwem repozytorium zdarzeń do tego samego kontekstu kontenera.
  6. W SaveChangesAsyncpliku zostanie utworzony element TransactionalBatch . Będzie on przechowywać zarówno obiekt kontaktu, jak i zdarzenie.
  7. Przebiegi TransactionalBatch i dane są zatwierdzane w usłudze Azure Cosmos DB.
  8. SaveChangesAsync i CommitAsync pomyślnie powrócić.

Trwałość

Jak widać w poprzednich fragmentach kodu, wszystkie obiekty zapisane w usłudze Azure Cosmos DB są opakowane w wystąpieniu DataObject . Ten obiekt zawiera typowe właściwości:

  • ID.
  • PartitionKey.
  • Type.
  • State. Podobnie jak w usłudze Created, Updated nie będzie utrwalany w usłudze Azure Cosmos DB.
  • Etag. Dla optymistycznego blokowania.
  • TTL. Właściwość Time To Live w celu automatycznego czyszczenia starych dokumentów.
  • Data. Obiekt danych ogólnych.

Te właściwości są definiowane w interfejsie ogólnym, który jest wywoływany IDataObject i jest używany przez repozytoria i kontekst kontenera:


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

Obiekty opakowane w wystąpieniu DataObject i zapisane w bazie danych będą wyglądać podobnie do tego przykładu (Contact i ContactNameUpdatedEvent):

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

Widać, że Contact dokumenty i ContactNameUpdatedEvent (typ domainEvent) mają ten sam klucz partycji i że oba dokumenty będą utrwalane w tej samej partycji logicznej.

Przetwarzanie zestawienia zmian

Aby odczytać strumień zdarzeń i wysłać je do brokera komunikatów, usługa będzie używać zestawienia zmian usługi Azure Cosmos DB.

Źródło zmian jest trwałym dziennikiem zmian w kontenerze. Działa w tle i śledzi modyfikacje. W ramach jednej partycji logicznej gwarantowana jest kolejność zmian. Najwygodniejszym sposobem odczytywania zestawienia zmian jest użycie funkcji platformy Azure z wyzwalaczem usługi Azure Cosmos DB. Inną opcją jest użycie biblioteki procesora zestawienia zmian. Umożliwia zintegrowanie przetwarzania zestawienia zmian w internetowym interfejsie API jako usługi w tle (za pośrednictwem interfejsu IHostedService ). W tym przykładzie użyto prostej aplikacji konsolowej, która implementuje abstrakcyjną klasę BackgroundService do hostowania długotrwałych zadań w tle w aplikacjach platformy .NET Core.

Aby otrzymywać zmiany ze źródła zmian usługi Azure Cosmos DB, należy utworzyć wystąpienie obiektu, zarejestrować metodę ChangeFeedProcessor obsługi na potrzeby przetwarzania komunikatów i rozpocząć nasłuchiwanie zmian:

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

Metoda obsługi (HandleChangesAsync tutaj) przetwarza komunikaty. W tym przykładzie zdarzenia są publikowane w temacie usługi Service Bus podzielonym na partycje pod kątem skalowalności i mają włączoną funkcję deduplikacji. Każda usługa zainteresowana zmianami w Contact obiektach może następnie subskrybować ten temat usługi Service Bus i odbierać i przetwarzać zmiany dla własnego kontekstu.

Wygenerowane komunikaty usługi Service Bus mają SessionId właściwość . W przypadku korzystania z sesji w usłudze Service Bus gwarantujesz, że kolejność komunikatów zostanie zachowana (FIFO). Zachowanie kolejności jest niezbędne dla tego przypadku użycia.

Oto fragment kodu, który obsługuje komunikaty ze źródła zmian:

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

Obsługa błędów

Jeśli wystąpi błąd podczas przetwarzania zmian, biblioteka zestawienia zmian uruchomi ponownie odczytywanie komunikatów w miejscu, w którym pomyślnie przetworzy ostatnią partię. Jeśli na przykład aplikacja pomyślnie przetworzyła 10 000 komunikatów, działa teraz w partii 10 001 do 10 025, a wystąpi błąd, może ponownie uruchomić i odebrać pracę na pozycji 10 001. Biblioteka automatycznie śledzi, co zostało przetworzone za pośrednictwem informacji zapisanych w kontenerze w Leases usłudze Azure Cosmos DB.

Istnieje możliwość, że usługa będzie już wysyłać niektóre komunikaty, które są ponownie przetwarzane do usługi Service Bus. Zwykle ten scenariusz może prowadzić do zduplikowanego przetwarzania komunikatów. Jak wspomniano wcześniej, usługa Service Bus ma funkcję wykrywania zduplikowanych komunikatów, którą należy włączyć w tym scenariuszu. Usługa sprawdza, czy komunikat został już dodany do tematu usługi Service Bus (lub kolejki) na podstawie właściwości kontrolowanej MessageId przez aplikację komunikatu. Ta właściwość jest ustawiona na ID dokument zdarzenia. Jeśli ten sam komunikat zostanie ponownie wysłany do usługi Service Bus, usługa zignoruje ją i upuści.

Konserwacja

W typowej implementacji transakcyjnej skrzynki odbiorczej usługa aktualizuje obsługiwane zdarzenia i ustawia Processed właściwość na true, wskazując, że komunikat został pomyślnie opublikowany. To zachowanie można zaimplementować ręcznie w metodzie obsługi. W bieżącym scenariuszu nie ma potrzeby takiego procesu. Usługa Azure Cosmos DB śledzi zdarzenia, które zostały przetworzone przy użyciu zestawienia zmian (w połączeniu z kontenerem Leases ).

W ostatnim kroku należy od czasu do czasu usunąć zdarzenia z kontenera, aby zachować tylko najnowsze rekordy/dokumenty. Aby okresowo wykonywać oczyszczanie, implementacja stosuje inną funkcję usługi Azure Cosmos DB: czas wygaśnięcia (TTL) w dokumentach. Usługa Azure Cosmos DB może automatycznie usuwać dokumenty na TTL podstawie właściwości, którą można dodać do dokumentu: przedział czasu w sekundach. Usługa stale sprawdza kontener pod kątem TTL dokumentów, które mają właściwość. Po wygaśnięciu dokumentu usługa Azure Cosmos DB usunie go z bazy danych.

Gdy wszystkie składniki działają zgodnie z oczekiwaniami, zdarzenia są przetwarzane i publikowane szybko: w ciągu kilku sekund. Jeśli wystąpi błąd w usłudze Azure Cosmos DB, zdarzenia nie zostaną wysłane do magistrali komunikatów, ponieważ zarówno obiekt biznesowy, jak i odpowiednie zdarzenia nie mogą być zapisywane w bazie danych. Jedyną rzeczą, którą należy wziąć pod uwagę, jest ustawienie odpowiedniej TTL wartości na DomainEvent dokumentach, gdy proces roboczy w tle (procesor zestawienia zmian) lub magistrala usług nie są dostępne. W środowisku produkcyjnym najlepiej wybrać przedział czasu obejmujący wiele dni. Na przykład 10 dni. Wszystkie zaangażowane składniki będą miały wystarczający czas na przetworzenie/opublikowanie zmian w aplikacji.

Podsumowanie

Wzorzec transakcyjnej skrzynki odbiorczej rozwiązuje problem niezawodnego publikowania zdarzeń domeny w systemach rozproszonych. Zatwierdzając stan obiektu biznesowego i jego zdarzenia w tej samej partii transakcyjnej i używając procesora w tle jako przekaźnika komunikatów, upewnij się, że inne usługi, wewnętrzne lub zewnętrzne, ostatecznie otrzymają informacje, od których zależą. Ten przykład nie jest tradycyjną implementacją wzorca transakcyjnej skrzynki odbiorczej. Używa ona takich funkcji, jak zestawienie zmian usługi Azure Cosmos DB i czas wygaśnięcia, które zapewniają proste i czyste rzeczy.

Oto podsumowanie składników platformy Azure używanych w tym scenariuszu:

Diagram that shows the Azure components to implement Transactional Outbox with Azure Cosmos DB and Azure Service Bus.

Pobierz plik programu Visio z tą architekturą.

Zalety tego rozwiązania to:

  • Niezawodna obsługa komunikatów i gwarantowane dostarczanie zdarzeń.
  • Zachowana kolejność zdarzeń i deduplikacji komunikatów za pośrednictwem usługi Service Bus.
  • Nie trzeba utrzymywać dodatkowej Processed właściwości wskazującej pomyślne przetwarzanie dokumentu zdarzenia.
  • Usuwanie zdarzeń z usługi Azure Cosmos DB za pośrednictwem czasu wygaśnięcia. Proces nie korzysta z jednostek żądań wymaganych do obsługi żądań użytkowników/aplikacji. Zamiast tego w zadaniu w tle są używane jednostki żądań "leftover".
  • Przetwarzanie komunikatów przy użyciu sprawdzania błędów za pośrednictwem ChangeFeedProcessor (lub funkcji platformy Azure).
  • Opcjonalnie: Wiele procesorów zestawienia zmian, z których każda utrzymuje własny wskaźnik w zestawieniach zmian.

Kwestie wymagające rozważenia

Przykładowa aplikacja omówiona w tym artykule pokazuje, jak można zaimplementować wzorzec transakcyjnej skrzynki odbiorczej na platformie Azure przy użyciu usług Azure Cosmos DB i Service Bus. Istnieją również inne podejścia korzystające z baz danych NoSQL. Aby zagwarantować, że obiekt biznesowy i zdarzenia zostaną niezawodnie zapisane w bazie danych, można osadzić listę zdarzeń w dokumencie obiektu biznesowego. Wadą tego podejścia jest to, że proces oczyszczania będzie musiał zaktualizować każdy dokument zawierający zdarzenia. Nie jest to idealne rozwiązanie, zwłaszcza jeśli chodzi o koszt jednostkowy żądania, w porównaniu z użyciem czasu wygaśnięcia.

Pamiętaj, że nie należy brać pod uwagę przykładowego kodu dostarczonego tutaj w środowisku produkcyjnym. Ma pewne ograniczenia dotyczące wielowątkowości, zwłaszcza sposób obsługi zdarzeń w DomainEntity klasie i sposobu śledzenia obiektów w CosmosContainerContext implementacjach. Użyj go jako punktu wyjścia dla własnych implementacji. Alternatywnie rozważ użycie istniejących bibliotek, które mają już tę funkcję wbudowaną w te funkcje, takie jak NServiceBus lub MassTransit.

Wdrażanie tego scenariusza

Kod źródłowy, pliki wdrażania i instrukcje dotyczące testowania tego scenariusza można znaleźć w witrynie GitHub: https://github.com/mspnp/transactional-outbox-pattern.

Współautorzy

Ten artykuł jest obsługiwany przez firmę Microsoft. Pierwotnie został napisany przez następujących współautorów.

Główny autor:

Aby wyświetlić niepubalne profile serwisu LinkedIn, zaloguj się do serwisu LinkedIn.

Następne kroki

Zapoznaj się z następującymi artykułami, aby dowiedzieć się więcej: