Tranzakciós kimenő üzenetek mintája az Azure Cosmos DB-vel

Azure Cosmos DB
Azure Service Bus
Azure Functions

A megbízható üzenetkezelés megvalósítása az elosztott rendszerekben kihívást jelenthet. Ez a cikk bemutatja, hogyan használható a tranzakciós kimenő üzenetek mintája a megbízható üzenetkezeléshez és az események garantált kézbesítéséhez, amely fontos része az idempotens üzenetek feldolgozásának támogatásának. Ehhez azure Cosmos DB tranzakciós kötegeket fog használni, és az Azure Service Bus használatával együtt módosítja a hírcsatornát.

Áttekintés

A mikroszolgáltatás-architektúrák egyre népszerűbbek, és ígéretet mutatnak olyan problémák megoldásában, mint a méretezhetőség, a karbantarthatóság és az agilitás, különösen a nagy alkalmazásokban. Ez az architekturális minta azonban kihívásokat is jelent az adatkezelés során. Az elosztott alkalmazásokban minden szolgáltatás egymástól függetlenül tartja karban a dedikált szolgáltatás tulajdonában lévő adattárban való működéshez szükséges adatokat. Egy ilyen forgatókönyv támogatásához általában egy olyan üzenetkezelési megoldást használ, mint a RabbitMQ, a Kafka vagy az Azure Service Bus, amely adatokat (eseményeket) oszt ki egy szolgáltatásból egy üzenetkezelő buszon keresztül az alkalmazás más szolgáltatásai között. A belső vagy külső felhasználók ezután feliratkozhatnak ezekre az üzenetekre, és az adatok kezelése után értesítést kaphatnak a változásokról.

Ezen a területen jól ismert példa egy rendelési rendszer: amikor egy felhasználó megrendelést szeretne létrehozni, a Ordering szolgáltatás egy REST-végponton keresztül fogadja az adatokat egy ügyfélalkalmazásból. A hasznos adatokat egy Order objektum belső reprezentációjára képezi le az adatok ellenőrzéséhez. Miután sikeresen véglegesíti az adatbázist, közzétesz egy eseményt OrderCreated egy üzenetbuszon. Az új megrendelések iránt érdeklődő bármely más szolgáltatás (például egy Inventory vagy Invoicing szolgáltatás) feliratkozna az üzenetekre OrderCreated , feldolgozná őket, és tárolná őket a saját adatbázisában.

Az alábbi pszeudokód bemutatja, hogyan néz ki ez a folyamat általában a Ordering szolgáltatás szempontjából:

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

Ez a módszer mindaddig jól működik, amíg hiba nem lép fel a rendelésobjektum mentése és a megfelelő esemény közzététele között. Az esemény küldése ezen a ponton számos okból meghiúsulhat:

  • Hálózati hibák
  • Üzenetszolgáltatás kimaradása
  • Gazdagép hibája

Bármi is a hiba, az eredmény az, hogy az OrderCreated esemény nem tehető közzé az üzenetbuszon. Más szolgáltatások nem kapnak értesítést a rendelés létrejöttéről. A Ordering szolgáltatásnak most már gondoskodnia kell a különböző dolgokról, amelyek nem kapcsolódnak a tényleges üzleti folyamathoz. Nyomon kell követnie azokat az eseményeket, amelyeket még fel kell venni az üzenetbuszra, amint újra online állapotba kerül. Még a legrosszabb eset is előfordulhat: adatkonkonzisztenciák az alkalmazásban az elveszett események miatt.

Diagram that shows event handling without the Transactional Outbox pattern.

Megoldás

Létezik egy jól ismert tranzakciós kimenő üzenetek nevű minta, amely segít elkerülni ezeket a helyzeteket. Ez biztosítja, hogy az eseményeket egy adattárba (általában az adatbázis Kimenő üzenetek táblájába) mentsük, mielőtt végül elküldené őket egy üzenetközvetítőnek. Ha az üzleti objektumot és a kapcsolódó eseményeket ugyanabban az adatbázistranzakcióban menti a rendszer, garantáltan nem vesznek el adatok. Minden véglegesítésre kerül, vagy hiba esetén minden vissza fog gördülni. Az esemény végleges közzétételéhez egy másik szolgáltatás- vagy feldolgozófolyamat lekérdezi a Postázatlan bejegyzések táblát, közzéteszi az eseményeket, és feldolgozottként jelöli meg őket. Ez a minta biztosítja, hogy az üzleti objektumok létrehozása vagy módosítása után ne vesszenek el az események.

Diagram that shows event handling with the Transactional Outbox pattern and a relay service for publishing events to the message broker.

Töltse le az architektúra Visio-fájlját.

Egy relációs adatbázisban a minta megvalósítása egyszerű. Ha a szolgáltatás például az Entity Framework Core-t használja, egy Entity Framework-környezet használatával hoz létre egy adatbázis-tranzakciót, menti az üzleti objektumot és az eseményt, és véglegesíti a tranzakciót, vagy visszaállítja azt. Az eseményeket feldolgozó feldolgozó szolgáltatás is könnyen implementálható: rendszeresen lekérdezi a Postázandó üzenetek táblát az új bejegyzésekhez, közzéteszi az újonnan beszúrt eseményeket az üzenetbuszon, és végül feldolgozottként jelöli meg ezeket a bejegyzéseket.

A gyakorlatban a dolgok nem olyan egyszerűek, mint amilyennek elsőre néznek. A legfontosabb, hogy meg kell győződnie arról, hogy az események sorrendje megmarad, hogy egy OrderUpdated esemény ne legyen közzétéve egy OrderCreated esemény előtt.

Implementáció az Azure Cosmos DB-ben

Ez a szakasz bemutatja, hogyan implementálhatja a tranzakciós kimenő üzenetek mintáját az Azure Cosmos DB-ben a különböző szolgáltatások közötti megbízható, sorrendben történő üzenetküldés érdekében az Azure Cosmos DB változáscsatorna és a Service Bus segítségével. Egy objektumokat Contact (FirstName, , LastNameinformációkat EmailCompany stb.) kezelő mintaszolgáltatást mutat be. A parancs- és lekérdezésfelelősségi elkülönítés (CQRS) mintát használja, és az alapvető tartományalapú tervezési fogalmakat követi. A GitHubon megtalálja az implementáció mintakódját.

A Contact mintaszolgáltatás egyik objektuma a következő struktúrával rendelkezik:

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

A létrehozás vagy frissítés után Contact olyan eseményeket bocsát ki, amelyek információkat tartalmaznak az aktuális változásról. A tartományesemények többek között a következőek lehetnek:

  • ContactCreated. Névjegy hozzáadásakor merül fel.
  • ContactNameUpdated. Az emelt, ha FirstName módosítják vagy LastName módosítják.
  • ContactEmailUpdated. Az e-mail-cím frissítésekor aktiválódik.
  • ContactCompanyUpdated. A vállalati tulajdonságok bármelyikének módosításakor történik.

Tranzakciós kötegek

A minta implementálásához meg kell győződnie arról, hogy az Contact üzleti objektum és a kapcsolódó események ugyanabban az adatbázis-tranzakcióban lesznek mentve. Az Azure Cosmos DB-ben a tranzakciók másképp működnek, mint a relációs adatbázisrendszerekben. Az Azure Cosmos DB-tranzakciók, úgynevezett tranzakciós kötegek egyetlen logikai partíción működnek, így garantálják az atomiság, a konzisztencia, az elkülönítés és a tartósság (ACID) tulajdonságait. Nem menthet két dokumentumot tranzakciós kötegműveletben különböző tárolókban vagy logikai partíciókban. A mintaszolgáltatás esetében ez azt jelenti, hogy az üzleti objektum és az esemény vagy események ugyanabban a tárolóban és logikai partícióban lesznek elhelyezve.

Környezet, adattárak és UnitOfWork

A minta implementáció magja egy tárolókörnyezet , amely nyomon követi azokat az objektumokat, amelyeket ugyanabban a tranzakciós kötegben mentettek. Fenntartja a létrehozott és módosított objektumok listáját, és egyetlen Azure Cosmos DB-tárolón működik. A felület a következőképpen néz ki:

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

A tárolókörnyezet összetevőjének listája nyomon követi Contact és DomainEvent megjeleníti az objektumokat. Mindkettő ugyanabban a tárolóban lesz elhelyezve. Ez azt jelenti, hogy a rendszer több objektumtípust tárol ugyanabban az Azure Cosmos DB-tárolóban, és egy Type tulajdonság használatával tesz különbséget egy üzleti objektum és egy esemény között.

Minden típushoz tartozik egy dedikált adattár, amely meghatározza és implementálja az adathozzáférést. Az Contact adattár felülete a következő módszereket biztosítja:

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

Az Event adattár hasonlónak tűnik, kivéve, hogy csak egy metódus létezik, amely új eseményeket hoz létre az áruházban:

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

Mindkét adattár-interfész implementációi függőséginjektáláson keresztül kapnak referenciát egyetlen IContainerContext példányhoz, hogy mindkettő ugyanazon az Azure Cosmos DB-környezetben működjön.

Az utolsó összetevő az UnitOfWork, amely véglegesíti a példányban tárolt módosításokat az IContainerContext Azure Cosmos DB-ben:

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

Eseménykezelés: Létrehozás és közzététel

A szolgáltatás minden alkalommal, amikor Contact létrehoz, módosít vagy (helyreállítható) töröl egy objektumot, a szolgáltatás létrehoz egy megfelelő eseményt. A megoldás lényege a tartományalapú tervezés (DDD) és a Jimmy Bogard által javasolt mediátori minta kombinációja. Javasolja, hogy a tartományobjektum módosítása miatt történt események listáját tartsa karban, és tegye közzé ezeket az eseményeket, mielőtt a tényleges objektumot az adatbázisba menti.

A módosítások listája a tartományi objektumban marad, így egyetlen más összetevő sem módosíthatja az eseményláncot. A tartományi objektumban lévő események (IEvent példányok) fenntartásának viselkedése egy interfészen IEventEmitter<IEvent> keresztül van definiálva, és egy absztrakt DomainEntity osztályban van implementálva:

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

Az Contact objektum tartományi eseményeket vet fel. Az Contact entitás alapvető DDD-fogalmakat követ, és privátként konfigurálja a tartománytulajdonságok beállítóit. Az osztályban nincsenek nyilvános beállítók. Ehelyett metódusokat kínál a belső állapot módosítására. Ezekben a módszerekben egy bizonyos módosításhoz (például ContactNameUpdated vagy ContactEmailUpdated) megfelelő események hozhatók létre.

Íme egy példa egy partner nevének frissítésére. (Az esemény a metódus végén lesz előállítva.)

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return;

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

A megfelelő ContactNameUpdatedEvent, amely nyomon követi a változásokat, a következőképpen néz ki:

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

Eddig az események csak a tartományi objektumba vannak naplózva, és semmit sem mentenek az adatbázisba, vagy akár közzé sem tesznek egy üzenetközvetítőben. A javaslatot követve az események listája közvetlenül az üzleti objektum adattárba való mentése előtt lesz feldolgozva. Ebben az esetben a SaveChangesAsync példány metódusában IContainerContext történik, amely privát RaiseDomainEvents metódusban van implementálva. (dObjs a tárolókörnyezet nyomon követett entitásainak listája.)

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

Az utolsó sorban a MediatR-csomag , a mediátori minta implementációja a C#-ban, egy esemény közzétételére szolgál az alkalmazásban. Ez azért lehetséges, mert minden esemény, például ContactNameUpdatedEvent a INotification MediatR-csomag felületének implementálása.

Ezeket az eseményeket egy megfelelő kezelőnek kell feldolgoznia. Itt a IEventsRepository megvalósítás kerül előtérbe. Az eseménykezelő mintája NameUpdated :

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

A IEventRepository példány a konstruktoron keresztül kerül be a kezelőosztályba. A szolgáltatásban való közzétételt követően ContactNameUpdatedEvent a metódust meghívja Handle a program, és az eseménytár-példány használatával létrehoz egy értesítési objektumot. Ez az értesítési objektum be lesz szúrva az objektum nyomon követett objektumainak listájába IContainerContext , és összekapcsolja azokat az objektumokat, amelyeket ugyanabban a tranzakciós kötegben mentettek az Azure Cosmos DB-be.

A tárolókörnyezet eddig tudja, hogy mely objektumokat kell feldolgozni. A nyomon követett objektumok Azure Cosmos DB-ben való végleges megőrzéséhez az IContainerContext implementáció létrehozza a tranzakciós köteget, hozzáadja az összes releváns objektumot, és futtatja a műveletet az adatbázison. A leírt folyamatot a SaveInTransactionalBatchAsync metódus kezeli, amelyet a SaveChangesAsync metódus hív meg.

A tranzakciós köteg létrehozásához és futtatásához az implementáció fontos részeit kell létrehoznia és futtatnia:

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects); 

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

Íme egy áttekintés a folyamat eddigi működéséről (a név névjegyobjektumon való frissítéséhez):

  1. Az ügyfél frissíteni szeretné egy partner nevét. A SetName rendszer meghívja a metódust a partnerobjektumon, és a tulajdonságok frissülnek.
  2. Az ContactNameUpdated esemény hozzáadódik a tartományobjektum eseménylistájához.
  3. A rendszer meghívja a partneradattár metódusát Update , amely hozzáadja a tartományobjektumot a tárolókörnyezethez. Az objektum nyomon lett követve.
  4. CommitAsync a rendszer meghívja a UnitOfWork példányt, amely viszont meghívja SaveChangesAsync a tárolókörnyezetet.
  5. A SaveChangesAsynctartományobjektum listájában szereplő összes eseményt egy MediatR példány közzéteszi, és az eseménytáron keresztül egyazon tárolókörnyezethez adja hozzá.
  6. Ebben SaveChangesAsynca fájlban létrejön egy TransactionalBatch . A névjegyobjektumot és az eseményt is megtartja.
  7. A TransactionalBatch futtatások és az adatok elkötelezettek az Azure Cosmos DB-ben.
  8. SaveChangesAsync és CommitAsync sikeresen visszatér.

Kitartás

Ahogy az előző kódrészletekben látható, az Azure Cosmos DB-be mentett összes objektum egy DataObject példányba van csomagolva. Ez az objektum általános tulajdonságokkal rendelkezik:

  • ID.
  • PartitionKey.
  • Type.
  • State. Updated Például Createdaz Azure Cosmos DB-ben nem fog megmaradni.
  • Etag. Optimista zároláshoz.
  • TTL. Time To Live tulajdonság a régi dokumentumok automatikus törléséhez.
  • Data. Általános adatobjektum.

Ezek a tulajdonságok egy általános felületen vannak definiálva, IDataObject amelyet az adattárak és a tárolókörnyezet használ:


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

A példányba DataObject burkolt és az adatbázisba mentett objektumok így fognak kinézni:ContactContactNameUpdatedEvent

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

Láthatja, hogy a (típus) és ContactNameUpdatedEvent a Contact (típusdomainEvent) dokumentumok ugyanazt a partíciókulcsot kapják, és mindkét dokumentum ugyanabban a logikai partícióban lesz megőrizve.

Változáscsatorna feldolgozása

Az események streamjének olvasásához és üzenetközvetítőnek való elküldéséhez a szolgáltatás az Azure Cosmos DB változáscsatornáját fogja használni.

A változáscsatorna a tároló módosításainak állandó naplója. A háttérben működik, és nyomon követi a módosításokat. Egy logikai partíción belül a módosítások sorrendje garantált. A változáscsatorna olvasásának legkényelmesebb módja egy Azure-függvény használata egy Azure Cosmos DB-eseményindítóval. Egy másik lehetőség a változáscsatorna processzortárának használata. Lehetővé teszi a változáscsatorna-feldolgozás integrálását a webes API-ban háttérszolgáltatásként (a IHostedService felületen keresztül). Az itt látható minta egy egyszerű konzolalkalmazást használ, amely az absztrakt BackgroundService osztályt implementálja a hosszú ideig futó háttérfeladatok üzemeltetéséhez a .NET Core-alkalmazásokban.

Ha az Azure Cosmos DB változáscsatornájából szeretné megkapni a módosításokat, példányosítania kell egy ChangeFeedProcessor objektumot, regisztrálnia kell egy kezelőmetódust az üzenetfeldolgozáshoz, és el kell kezdenie a módosítások figyelését:

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

Ezután egy kezelőmetódus (HandleChangesAsync itt) dolgozza fel az üzeneteket. Ebben a mintában az események közzé lesznek téve egy Service Bus-témakörben, amely particionálva van a méretezhetőség érdekében, és engedélyezve van a duplikálás megszüntetése funkció. Az objektumok módosítása Contact iránt érdeklődő bármely szolgáltatás feliratkozhat erre a Service Bus-témakörre, és saját környezetében fogadhatja és feldolgozhatja a módosításokat.

A létrehozott Service Bus-üzenetek tulajdonsága SessionId van. Amikor munkameneteket használ a Service Busban, garantáltan megőrzi az üzenetek sorrendjét (FIFO). Ehhez a használati esethez meg kell őrizni a sorrendet.

Az alábbi kódrészlet kezeli a változáscsatorna üzeneteit:

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

Hibakezelés

Ha hiba történik a módosítások feldolgozása közben, a változáscsatorna-kódtár újraindítja az üzenetek olvasását abban a helyen, ahol sikeresen feldolgozta az utolsó köteget. Ha például az alkalmazás sikeresen feldolgozta a 10 000 üzenetet, most már a 10 001–10 025-ös kötegen dolgozik, és hiba történik, újraindulhat, és a 10 001 pozícióban folytathatja a munkáját. A kódtár automatikusan nyomon követi az Azure Cosmos DB-ben egy Leases tárolóban mentett adatokon keresztül feldolgozott adatokat.

Lehetséges, hogy a szolgáltatás már elküldte az újrafeldolgozott üzenetek egy részét a Service Busnak. Ez a forgatókönyv általában ismétlődő üzenetfeldolgozáshoz vezetne. Ahogy korábban említettük, a Service Bus rendelkezik egy olyan funkcióval, amellyel ismétlődő üzenetészlelést kell engedélyeznie ehhez a forgatókönyvhöz. A szolgáltatás ellenőrzi, hogy az üzenet alkalmazás által vezérelt MessageId tulajdonsága alapján már hozzáadtak-e üzenetet egy Service Bus-témakörhöz (vagy üzenetsorhoz). Ez a tulajdonság az ID eseménydokumentumra van állítva. Ha ugyanazt az üzenetet ismét elküldi a Service Busnak, a szolgáltatás figyelmen kívül hagyja és elveti.

Egyéb folyamatok

A tranzakciós kimenő üzenetek tipikus implementációjában a szolgáltatás frissíti a kezelt eseményeket, és beállít egy Processed tulajdonságot true, amely azt jelzi, hogy egy üzenet közzététele sikeresen megtörtént. Ez a viselkedés manuálisan implementálható a kezelőmetódusban. A jelenlegi forgatókönyvben nincs szükség ilyen folyamatra. Az Azure Cosmos DB nyomon követi a változáscsatorna használatával feldolgozott eseményeket (a Leases tárolóval együtt).

Utolsó lépésként időnként törölnie kell az eseményeket a tárolóból, hogy csak a legfrissebb rekordokat/dokumentumokat tárolja. A karbantartás rendszeres végrehajtásához az implementáció az Azure Cosmos DB egy másik funkcióját alkalmazza: Az élettartam (TTL) a dokumentumokon. Az Azure Cosmos DB automatikusan törölheti a dokumentumokat egy TTL dokumentumhoz hozzáadható tulajdonság alapján: másodpercek alatt eltelt idő alapján. A szolgáltatás folyamatosan ellenőrzi a tárolót, hogy vannak-e tulajdonságú TTL dokumentumok. Amint egy dokumentum lejár, az Azure Cosmos DB eltávolítja azt az adatbázisból.

Ha az összes összetevő a várt módon működik, az események feldolgozása és közzététele gyorsan megtörténik: másodperceken belül. Ha hiba történik az Azure Cosmos DB-ben, a rendszer nem küld eseményeket az üzenetbusznak, mert az üzleti objektum és a kapcsolódó események nem menthetők az adatbázisba. Az egyetlen dolog, amit figyelembe kell venni, hogy megfelelő TTL értéket kell beállítani a DomainEvent dokumentumokon, ha a háttérmunkás (változáscsatorna-feldolgozó) vagy a service bus nem érhető el. Éles környezetben a legjobb, ha több napos időtartamot választ. Például 10 nap. Ezután minden érintett összetevőnek elegendő ideje lesz a módosítások feldolgozására/közzétételére az alkalmazásban.

Összesítés

A tranzakciós kimenő üzenetek mintája megoldja a tartományesemények elosztott rendszerekben való megbízható közzétételének problémáját. Ha az üzleti objektum állapotát és eseményeit ugyanabban a tranzakciós kötegben véglegesíti, és egy háttérprocesszort használ az üzenettovábbítóhoz, gondoskodik arról, hogy a többi, belső vagy külső szolgáltatás végül megkapja azokat az információkat, amelyektől függenek. Ez a minta nem a tranzakciós kimenő üzenetek mintájának hagyományos implementációja. Olyan funkciókat használ, mint az Azure Cosmos DB változáscsatornája és a Time To Live, amelyek egyszerű és tiszta dolgokat tartalmaznak.

Az alábbiakban összefoglaljuk az ebben a forgatókönyvben használt Azure-összetevőket:

Diagram that shows the Azure components to implement Transactional Outbox with Azure Cosmos DB and Azure Service Bus.

Töltse le az architektúra Visio-fájlját.

Ennek a megoldásnak az előnyei a következők:

  • Megbízható üzenetkezelés és az események garantált kézbesítése.
  • Az események sorrendjének megőrzése és az üzenetek duplikációjának megszüntetése a Service Buson keresztül.
  • Nem kell olyan extra Processed tulajdonságot fenntartania, amely egy eseménydokumentum sikeres feldolgozását jelzi.
  • Események törlése az Azure Cosmos DB-ből TTL-en keresztül. A folyamat nem használja fel a felhasználói/alkalmazáskérések kezeléséhez szükséges kérelemegységeket. Ehelyett "maradék" kérésegységeket használ egy háttérfeladatban.
  • Az üzenetek hibamentes feldolgozása (vagy egy Azure-függvény).ChangeFeedProcessor
  • Nem kötelező: Több változáscsatorna-processzor, mindegyik megtartja a saját mutatóját a változáscsatornában.

Considerations

A cikkben tárgyalt mintaalkalmazás bemutatja, hogyan implementálhatja a tranzakciós kimenő üzenetek mintáját az Azure-ban az Azure Cosmos DB és a Service Bus használatával. Vannak más módszerek is, amelyek NoSQL-adatbázisokat használnak. Annak érdekében, hogy az üzleti objektum és események megbízhatóan legyenek mentve az adatbázisban, beágyazhatja az események listáját az üzleti objektum dokumentumába. Ennek a megközelítésnek a hátránya, hogy a törlési folyamatnak frissítenie kell az eseményeket tartalmazó összes dokumentumot. Ez nem ideális, különösen a kérelemegységek költségei szempontjából, a TTL-hez képest.

Ne feledje, hogy az itt megadott mintakódot nem érdemes éles üzemre kész kódként figyelembe venni. Bizonyos korlátozások vonatkoznak a többszálúságra, különösen az események osztályban való kezelésére és az DomainEntity objektumok nyomon követésére az CosmosContainerContext implementációkban. Használja kiindulási pontként saját implementációihoz. Másik lehetőségként érdemes lehet olyan meglévő kódtárakat használni, amelyekbe már beépítettük ezt a funkciót, például az NServiceBust vagy a MassTransitet.

A forgatókönyv üzembe helyezése

Megtalálhatja a forráskódot, az üzembehelyezési fájlokat és az utasításokat, hogy tesztelje ezt a forgatókönyvet a GitHubon: https://github.com/mspnp/transactional-outbox-pattern.

Közreműködők

Ezt a cikket a Microsoft tartja karban. Eredetileg a következő közreműködők írták.

Fő szerző:

A nem nyilvános LinkedIn-profilok megtekintéséhez jelentkezzen be a LinkedInbe.

További lépések

További információért tekintse át ezeket a cikkeket: