Azure Cosmos DB ile İşlem Giden Kutusu düzeni

Azure Cosmos DB
Azure Service Bus
Azure Functions

Dağıtılmış sistemlerde güvenilir mesajlaşma uygulamak zor olabilir. Bu makalede, etkili ileti işlemeyi desteklemenin önemli bir parçası olan olayların güvenilir mesajlaşması ve garantili teslimi için İşlem Giden Kutusu düzeninin nasıl kullanılacağı açıklanmaktadır. Bunu başarmak için Azure Cosmos DB işlem toplu işlemlerini ve değişiklik akışını Azure Service Bus ile birlikte kullanacaksınız.

Genel Bakış

Mikro hizmet mimarileri giderek daha popüler hale geliyor ve özellikle büyük uygulamalarda ölçeklenebilirlik, bakım ve çeviklik gibi sorunların çözümünde söz veriyor. Ancak bu mimari desen, veri işleme konusunda da güçlükler ortaya çıkar. Dağıtılmış uygulamalarda her hizmet, ayrılmış bir hizmete ait veri deposunda çalışması için gereken verileri bağımsız olarak tutar. Böyle bir senaryoyu desteklemek için genellikle bir hizmetten uygulamanın diğer hizmetlerine veri (olaylar) dağıtan RabbitMQ, Kafka veya Azure Service Bus gibi bir mesajlaşma çözümü kullanırsınız. İç veya dış tüketiciler daha sonra bu iletilere abone olabilir ve veriler işlendiği anda değişikliklerden haberdar olabilir.

Bu alandaki iyi bilinen bir örnek bir sipariş sistemidir: Kullanıcı sipariş oluşturmak istediğinde, Ordering bir hizmet rest uç noktası aracılığıyla bir istemci uygulamasından veri alır. Verileri doğrulamak için yükü bir Order nesnenin iç gösterimiyle eşler. Veritabanına başarılı bir işlemeden sonra, ileti veri yolu için bir OrderCreated olay yayımlar. Yeni siparişlerle (örneğin bir Inventory veya Invoicing hizmet) ilgilenen diğer tüm hizmetler iletilere OrderCreated abone olur, bunları işler ve kendi veritabanında depolar.

Aşağıdaki sahte kod, bu işlemin genellikle hizmet açısından nasıl göründüğünü Ordering gösterir:

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

Bu yaklaşım, order nesnesini kaydetme ve karşılık gelen olayı yayımlama arasında bir hata oluşana kadar düzgün çalışır. Olay gönderme işlemi şu noktada birçok nedenden dolayı başarısız olabilir:

  • Ağ hataları
  • İleti hizmeti kesintisi
  • Konak hatası

Hata her ne olursa olsun, sonuç olayın ileti veri yolu üzerinde yayımlanamaz olmasıdır OrderCreated . Diğer hizmetlere bir siparişin oluşturulduğu bildirilmeyecek. Hizmetin Ordering artık gerçek iş süreciyle ilgili olmayan çeşitli işlemleri gerçekleştirmesi gerekiyor. Yeniden çevrimiçi olur olmaz ileti veri yoluna koyulması gereken olayları izlemesi gerekir. En kötü durum bile olabilir: kayıp olaylar nedeniyle uygulamadaki veri tutarsızlıkları.

Diagram that shows event handling without the Transactional Outbox pattern.

Çözüm

Bu durumlardan kaçınmanıza yardımcı olabilecek İşlemSel Giden Kutusu adlı iyi bilinen bir desen vardır. Olayların bir ileti aracısına gönderilmeden önce bir veri deposuna (genellikle veritabanınızdaki bir Giden Kutusu tablosuna) kaydedilmesini sağlar. İş nesnesi ve buna karşılık gelen olaylar aynı veritabanı işlemi içinde kaydedilirse hiçbir verinin kaybolmayacağı garanti edilir. Her şey işlenir veya hata olduğunda her şey geri alınır. Sonunda olayı yayımlamak için farklı bir hizmet veya çalışan işlemi, işlenmeyen girdiler için Giden Kutusu tablosunu sorgular, olayları yayımlar ve işlendi olarak işaretler. Bu düzen, bir iş nesnesi oluşturulduktan veya değiştirildikten sonra olayların kaybolmamasını sağlar.

Diagram that shows event handling with the Transactional Outbox pattern and a relay service for publishing events to the message broker.

Bu mimarinin bir Visio dosyasını indirin.

İlişkisel bir veritabanında, desenin uygulanması basittir. Örneğin hizmet Entity Framework Core kullanıyorsa, veritabanı işlemi oluşturmak, iş nesnesini ve olayı kaydetmek ve işlemi işlemek için bir Entity Framework bağlamı kullanır veya geri alma işlemi yapar. Ayrıca, olayları işleyen çalışan hizmetinin uygulanması kolaydır: Yeni girdiler için Giden Kutusu tablosunu düzenli aralıklarla sorgular, yeni eklenen olayları ileti veri yolu için yayımlar ve son olarak bu girdileri işlendi olarak işaretler.

Pratikte, işler ilk bakışta göründüğü kadar kolay değildir. En önemlisi, bir olayın bir olaydan önce OrderCreated yayımlanmaması için olayların sırasının korundığından OrderUpdated emin olmanız gerekir.

Azure Cosmos DB'de uygulama

Bu bölümde, Azure Cosmos DB değişiklik akışı ve Service Bus yardımıyla farklı hizmetler arasında güvenilir ve sıralı mesajlaşma elde etmek için Azure Cosmos DB'de İşlem Giden Kutusu düzeninin nasıl uygulaneceği gösterilmektedir. Nesneleri (, , Email, Company bilgileri vb.) yöneten Contact örnek LastNamebir hizmet gösterir.FirstName Komut ve Sorgu Sorumluluğu Ayrım (CQRS) desenini kullanır ve etki alanı odaklı temel tasarım kavramlarını izler. Uygulamanın örnek kodunu GitHub'da bulabilirsiniz.

Örnek hizmetteki bir Contact nesne aşağıdaki yapıya sahiptir:

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

Bir Contact oluşturulur veya güncelleştirilir oluşturulmaz, geçerli değişiklik hakkında bilgi içeren olayları yayar. Etki alanı olayları arasında aşağıdakiler de olabilir:

  • ContactCreated. Kişi eklendiğinde oluşturulur.
  • ContactNameUpdated. veya LastName değiştirildiğinde FirstName yükseltilir.
  • ContactEmailUpdated. E-posta adresi güncelleştirildiğinde oluşturulur.
  • ContactCompanyUpdated. Şirket özelliklerinden herhangi biri değiştirildiğinde oluşturulur.

İşlem toplu işlemleri

Bu düzeni uygulamak için iş nesnesinin Contact ve ilgili olayların aynı veritabanı işlemine kaydedildiğinden emin olmanız gerekir. Azure Cosmos DB'de işlemler ilişkisel veritabanı sistemlerinde olduğundan farklı çalışır. İşlem toplu işlemleri olarak adlandırılan Azure Cosmos DB işlemleri tek bir mantıksal bölümde çalışır, böylece Bölünmezlik, Tutarlılık, Yalıtım ve Dayanıklılık (ACID) özelliklerini garanti eder. İşlem toplu işlemindeki iki belgeyi farklı kapsayıcılara veya mantıksal bölümlere kaydedemezsiniz. Örnek hizmet için bu, hem iş nesnesinin hem de olayın veya olayların aynı kapsayıcıya ve mantıksal bölüme yerleştirileceği anlamına gelir.

Bağlam, depolar ve UnitOfWork

Örnek uygulamanın temeli, aynı işlem toplu işlemine kaydedilen nesneleri izleyen bir kapsayıcı bağlamıdır . Oluşturulan ve değiştirilen nesnelerin listesini tutar ve tek bir Azure Cosmos DB kapsayıcısı üzerinde çalışır. Arabirimi şöyle görünür:

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

Kapsayıcı bağlamı bileşenindeki liste ve DomainEvent nesneleri izlerContact. Her ikisi de aynı kapsayıcıya yerleştirilir. Bu, birden çok nesne türünün aynı Azure Cosmos DB kapsayıcısında depolandığı ve bir Type iş nesnesi ile olayı ayırt etmek için bir özellik kullandığı anlamına gelir.

Her tür için veri erişimini tanımlayan ve uygulayan ayrılmış bir depo vardır. Contact Depo arabirimi şu yöntemleri sağlar:

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

Event Depo benzer görünür, ancak depoda yeni olaylar oluşturan tek bir yöntem vardır:

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

Her iki depo arabiriminin de uygulamaları, her ikisinin de aynı Azure Cosmos DB bağlamında çalıştığından emin olmak için tek IContainerContext bir örneğe bağımlılık ekleme yoluyla bir başvuru alır.

Son bileşen, örnekte tutulan değişiklikleri Azure Cosmos DB'ye IContainerContext işleyen bileşenidirUnitOfWork:

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

Olay işleme: Oluşturma ve yayımlama

Bir Contact nesne her oluşturulduğunda, değiştirildiğinde veya (geçici-) silindiğinde, hizmet karşılık gelen bir olayı tetikler. Sağlanan çözümün temeli, etki alanı odaklı tasarım (DDD) ve Jimmy Bogard tarafından önerilen aracı deseninin bir bileşimidir. Asıl nesneyi veritabanına kaydetmeden önce etki alanı nesnesinde yapılan değişiklikler nedeniyle gerçekleşen olayların listesinin tutulmasını ve bu olayların yayımlanmasını önerir.

Değişikliklerin listesi etki alanı nesnesinde tutulur, böylece başka hiçbir bileşen olay zincirini değiştiremez. Etki alanı nesnesinde olayları (IEvent örnekleri) koruma davranışı bir arabirim IEventEmitter<IEvent> aracılığıyla tanımlanır ve soyut DomainEntity bir sınıfta uygulanır:

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

Contact nesnesi etki alanı olaylarını oluşturur. Varlık Contact , etki alanı özelliklerinin ayarlayıcılarını özel olarak yapılandırarak temel DDD kavramlarını izler. Sınıfında genel ayarlayıcı yok. Bunun yerine iç durumu işlemek için yöntemler sunar. Bu yöntemlerde, belirli bir değişiklik (örneğin ContactNameUpdated veya ContactEmailUpdated) için uygun olaylar oluşturulabilir.

Bir kişinin adıyla ilgili güncelleştirmeler için bir örnek aşağıda verilmiştır. (Olay, yöntemin sonunda oluşturulur.)

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return;

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

Değişiklikleri izleyen karşılık gelen ContactNameUpdatedEventaşağıdaki gibi görünür:

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

Şu ana kadar olaylar yalnızca etki alanı nesnesine kaydedilir ve hiçbir şey veritabanına kaydedilmez ve hatta ileti aracısında yayımlanır. Önerinin ardından, olay listesi iş nesnesi veri deposuna kaydedilmeden hemen önce işlenir. Bu durumda, özel bir yöntemde SaveChangesAsync uygulanan örneğin yönteminde IContainerContext gerçekleşir.RaiseDomainEvents (dObjs kapsayıcı bağlamının izlenen varlıklarının listesidir.)

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

Son satırda , C# dilinde aracı düzeninin bir uygulaması olan MediatR paketi, uygulama içinde bir olay yayımlamak için kullanılır. MediatR paketinin arabirimini INotification uygulamak gibi ContactNameUpdatedEvent tüm olaylar nedeniyle bunu yapmak mümkündür.

Bu olayların ilgili işleyici tarafından işlenmesi gerekir. IEventsRepository Burada uygulama devreye girer. Olay işleyicisinin örneği aşağıda verilmişti NameUpdated :

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

Oluşturucu aracılığıyla işleyici sınıfına bir IEventRepository örnek eklenir. hizmette Handle bir ContactNameUpdatedEvent yayımlanır yayımlanmaz yöntemi çağrılır ve bir bildirim nesnesi oluşturmak için olay deposu örneğini kullanır. Bu bildirim nesnesi de nesnedeki izlenen nesneler IContainerContext listesine eklenir ve aynı işlem toplu işleminde kaydedilen nesneleri Azure Cosmos DB'ye ekler.

Kapsayıcı bağlamı şu ana kadar hangi nesnelerin işlenmek üzere olduğunu biliyor. Sonunda izlenen nesneleri Azure Cosmos DB'de IContainerContext kalıcı hale getirmek için uygulama işlem toplu işlemini oluşturur, tüm ilgili nesneleri ekler ve işlemi veritabanına karşı çalıştırır. Açıklanan işlem yöntemi tarafından çağrılan yönteminde SaveChangesAsync işlenirSaveInTransactionalBatchAsync.

uygulamanın işlem toplu işlemini oluşturmak ve çalıştırmak için ihtiyaç duyduğunuz önemli bölümleri şunlardır:

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects); 

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

Burada, işlemin şu ana kadar nasıl çalıştığına ilişkin genel bir bakış bulabilirsiniz (kişi nesnesinde adı güncelleştirmek için):

  1. İstemci bir kişinin adını güncelleştirmek istiyor. yöntemi SetName kişi nesnesinde çağrılır ve özellikler güncelleştirilir.
  2. Olay ContactNameUpdated , etki alanı nesnesindeki olaylar listesine eklenir.
  3. Kişi deposunun Update yöntemi çağrılır ve bu da etki alanı nesnesini kapsayıcı bağlamlarına ekler. Nesne artık izlenir.
  4. CommitAsync örneğinde çağrılır UnitOfWork ve kapsayıcı bağlamını da çağırır SaveChangesAsync .
  5. içindeSaveChangesAsync, etki alanı nesnesi listesindeki tüm olaylar bir MediatR örnek tarafından yayımlanır ve olay deposu aracılığıyla aynı kapsayıcı bağlamı içine eklenir.
  6. içinde SaveChangesAsyncbir TransactionalBatch oluşturulur. Hem kişi nesnesini hem de olayı barındıracaktır.
  7. TransactionalBatch Çalıştırmalar ve veriler Azure Cosmos DB'ye işlenir.
  8. SaveChangesAsync ve CommitAsync başarıyla geri dönün.

Kalıcılık

Yukarıdaki kod parçacıklarında görebileceğiniz gibi Azure Cosmos DB'ye kaydedilen tüm nesneler bir DataObject örnekte sarmalanmıştır. Bu nesne ortak özellikler sağlar:

Bu özellikler, depolar ve kapsayıcı bağlamı tarafından çağrılan IDataObject ve kullanılan genel bir arabirimde tanımlanır:


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

Bir DataObject örnekte sarmalanan ve veritabanına kaydedilen nesneler şu örneğe (Contact ve ContactNameUpdatedEvent) benzer olacaktır:

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

ve ContactNameUpdatedEvent (tür domainEvent) belgelerinin Contact aynı bölüm anahtarına sahip olduğunu ve her iki belgenin de aynı mantıksal bölümde kalıcı olacağını görebilirsiniz.

Değişiklik akışı işleme

Olay akışını okumak ve bir ileti aracısına göndermek için hizmet, Azure Cosmos DB değişiklik akışını kullanır.

Değişiklik akışı, kapsayıcınızdaki değişikliklerin kalıcı bir günlüğüdür. Arka planda çalışır ve değişiklikleri izler. Tek bir mantıksal bölümde değişikliklerin sırası garanti edilir. Değişiklik akışını okumanın en kullanışlı yolu, Azure Cosmos DB tetikleyicisiyle bir Azure işlevi kullanmaktır. Bir diğer seçenek de değişiklik akışı işlemci kitaplığını kullanmaktır. Web API'nizdeki değişiklik akışı işlemeyi arka plan hizmeti olarak tümleştirmenize olanak tanır (arabirim aracılığıyla IHostedService ). Buradaki örnek, .NET Core uygulamalarında uzun süre çalışan arka plan görevlerini barındırmak için BackgroundService soyut sınıfını uygulayan basit bir konsol uygulaması kullanır.

Değişiklikleri Azure Cosmos DB değişiklik akışından almak için bir ChangeFeedProcessor nesne örneği oluşturmanız, ileti işleme için bir işleyici yöntemi kaydetmeniz ve değişiklikleri dinlemeye başlamanız gerekir:

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

bir işleyici yöntemi (HandleChangesAsync burada) iletileri işler. Bu örnekte olaylar, ölçeklenebilirlik için bölümlenmiş ve yinelenenleri kaldırma özelliğinin etkinleştirildiği bir Service Bus konusuna yayımlanır. Nesnelerdeki değişikliklerle Contact ilgilenen tüm hizmetler bu Service Bus konusuna abone olabilir ve değişiklikleri kendi bağlamı için alıp işleyebilir.

Oluşturulan Service Bus iletilerinin bir SessionId özelliği vardır. Service Bus'ta oturumları kullandığınızda, iletilerin sırasının korunduğunu (FIFO) garanti edersiniz. Bu kullanım örneği için siparişin korunması gerekir.

Değişiklik akışındaki iletileri işleyen kod parçacığı aşağıdadır:

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

Hata işleme

Değişiklikler işlenirken bir hata oluşursa, değişiklik akışı kitaplığı son toplu işlemi başarıyla işlediği konumda okuma iletilerini yeniden başlatır. Örneğin, uygulama 10.000 iletiyi başarıyla işlediyse, şimdi 10.001 ile 10.025 arasında toplu işlem üzerinde çalışıyorsa ve bir hata oluşursa, yeniden başlatılabilir ve 10.001 konumundan çalışmasını alabilir. Kitaplık, Azure Cosmos DB'de bir Leases kapsayıcıya kaydedilen bilgiler aracılığıyla işlenen işlemleri otomatik olarak izler.

Hizmet, Service Bus'a yeniden işlenmiş iletilerin bazılarını zaten göndermiş olabilir. Normalde, bu senaryo yinelenen ileti işlemeye yol açar. Daha önce belirtildiği gibi, Service Bus'ın bu senaryo için etkinleştirmeniz gereken yinelenen ileti algılama özelliği vardır. Hizmet, iletinin uygulama denetimli MessageId özelliğine göre bir iletinin Service Bus konusuna (veya kuyruğuna) zaten eklenip eklenmediğini denetler. Bu özellik, olay belgesinin değerine ayarlanır ID . Aynı ileti Service Bus'a yeniden gönderilirse, hizmet bu iletiyi yoksayar ve bırakır.

Bakım ve temizlik

Tipik bir İşlem Giden Kutusu uygulamasında hizmet, işlenen olayları güncelleştirir ve bir iletinin başarıyla yayımlandığını belirten bir Processed özelliği olarak trueayarlar. Bu davranış işleyici yönteminde el ile uygulanabilir. Geçerli senaryoda, böyle bir işleme gerek yoktur. Azure Cosmos DB, değişiklik akışı kullanılarak işlenen olayları izler (kapsayıcıyla birlikte Leases ).

Son adım olarak, yalnızca en son kayıtları/belgeleri tutmak için zaman zaman olayları kapsayıcıdan silmeniz gerekir. Düzenli aralıklarla temizleme yapmak için uygulama, Azure Cosmos DB'nin başka bir özelliğini uygular: Belgelere Yaşam Süresi (TTL). Azure Cosmos DB, belgeye eklenebilen bir TTL özelliğe göre belgeleri otomatik olarak silebilir: saniye olarak bir zaman aralığı. Hizmet, özelliği olan TTL belgeler için kapsayıcıyı sürekli denetler. Belgenin süresi dolduğunda Azure Cosmos DB belgeyi veritabanından kaldırır.

Tüm bileşenler beklendiği gibi çalıştığında olaylar hızla işlenir ve yayımlanır: saniyeler içinde. Azure Cosmos DB'de bir hata varsa, hem iş nesnesi hem de ilgili olaylar veritabanına kaydedilemediğinden olaylar ileti veri yolu'na gönderilmez. Dikkate alınması gereken tek şey, arka plan çalışanı (değişiklik akışı işlemcisi) veya hizmet veri yolu kullanılabilir olmadığında belgelerde uygun TTL bir değer DomainEvent ayarlamaktır. Bir üretim ortamında, birden çok günlük bir zaman aralığı seçmek en iyisidir. Örneğin, 10 gün. Daha sonra ilgili tüm bileşenler uygulama içindeki değişiklikleri işlemek/yayımlamak için yeterli zamana sahip olur.

Özet

İşlem Giden Kutusu düzeni, etki alanı olaylarını dağıtılmış sistemlerde güvenilir bir şekilde yayımlama sorununu çözer. İş nesnesinin durumunu ve olaylarını aynı işlem toplu işleminde işleyerek ve ileti geçişi olarak bir arka plan işlemcisi kullanarak, iç veya dış diğer hizmetlerin sonunda bağımlı oldukları bilgileri aldığından emin olursunuz. Bu örnek, İşlem Giden Kutusu düzeninin geleneksel bir uygulaması değildir. Azure Cosmos DB değişiklik akışı ve Yaşam Süresi gibi işleri basit ve temiz tutan özellikleri kullanır.

Bu senaryoda kullanılan Azure bileşenlerinin özeti aşağıdadır:

Diagram that shows the Azure components to implement Transactional Outbox with Azure Cosmos DB and Azure Service Bus.

Bu mimarinin bir Visio dosyasını indirin.

Bu çözümün avantajları şunlardır:

  • Güvenilir mesajlaşma ve olayların garantili teslimi.
  • Service Bus aracılığıyla olayların ve ileti yinelenenleri kaldırmanın korunan sırası.
  • Olay belgesinin başarılı bir şekilde işlendiğini gösteren ek Processed bir özellik bulundurmanıza gerek yoktur.
  • TTL aracılığıyla Azure Cosmos DB'den olay silme. İşlem, kullanıcı/uygulama isteklerini işlemek için gereken istek birimlerini kullanmaz. Bunun yerine, bir arka plan görevinde "kalan" istek birimlerini kullanır.
  • İletilerin (veya bir Azure işlevi) aracılığıyla ChangeFeedProcessor hataya dayanıklı işlenmesi.
  • İsteğe bağlı: Her biri değişiklik akışında kendi işaretçisini koruyan birden çok değişiklik akışı işlemcisi.

Dikkat edilmesi gereken noktalar

Bu makalede ele alınan örnek uygulama, Azure Cosmos DB ve Service Bus ile Azure'da İşlem Giden Kutusu düzenini nasıl uygulayabileceğinizi gösterir. NoSQL veritabanlarını kullanan başka yaklaşımlar da vardır. İş nesnesinin ve olayların veritabanına güvenilir bir şekilde kaydedileceğini garanti etmek için, olay listesini iş nesnesi belgesine ekleyebilirsiniz. Bu yaklaşımın dezavantajı, temizleme işleminin olay içeren her belgeyi güncelleştirmesi gerekeceğidir. Bu, TTL kullanımına kıyasla özellikle İstek Birimi maliyeti açısından ideal değildir.

Üretime hazır kodda sağlanan örnek kodu dikkate almamanız gerektiğini unutmayın. Çok iş parçacığı kullanımıyla ilgili bazı sınırlamaları vardır, özellikle de DomainEntity olayların sınıfta işlenme şekli ve uygulamalarda nesnelerin nasıl izlendiği CosmosContainerContext . Kendi uygulamalarınız için başlangıç noktası olarak kullanın. Alternatif olarak, NServiceBus veya MassTransit gibi bu işlevlere sahip mevcut kitaplıkları kullanmayı göz önünde bulundurun.

Bu senaryoyu dağıtın

Bu senaryoyu test etmek için kaynak kodunu, dağıtım dosyalarını ve yönergeleri GitHub'da bulabilirsiniz: https://github.com/mspnp/transactional-outbox-pattern.

Katkıda Bulunanlar

Bu makale Microsoft tarafından yönetilir. Başlangıçta aşağıdaki katkıda bulunanlar tarafından yazılmıştır.

Asıl yazar:

Genel olmayan LinkedIn profillerini görmek için LinkedIn'de oturum açın.

Sonraki adımlar

Daha fazla bilgi edinmek için şu makaleleri gözden geçirin: