Pola Outbox Transaksional dengan Azure Cosmos DB

Azure Cosmos DB
Azure Service Bus
Azure Functions

Menerapkan pesan yang andal dalam sistem terdistribusi bisa menjadi sulit. Artikel ini menjelaskan cara menggunakan pola Transactional Outbox untuk pesan yang andal dan pengiriman peristiwa yang terjamin, bagian penting dari mendukung pemrosesan pesan idempotensi. Untuk mencapai hal ini, Anda akan menggunakan batch transaksional Azure Cosmos DB dan mengubah umpan dalam kombinasi dengan Azure Service Bus.

Gambaran Umum

Arsitektur layanan mikro menjadi semakin populer dan menjanjikan dalam hal memecahkan masalah seperti skalabilitas, pemeliharaan, dan kelincahan, terutama dalam aplikasi besar. Tetapi pola arsitektur ini juga memberikan tantangan dalam hal penanganan data. Dalam aplikasi terdistribusi, setiap layanan secara independen memelihara data yang dibutuhkan untuk beroperasi di datastore milik layanan khusus. Untuk mendukung skenario seperti itu, Anda biasanya menggunakan solusi olah pesan seperti RabbitMQ, Kafka, atau Azure Service Bus yang mendistribusikan data (kejadian) dari satu layanan melalui bus olah pesan ke layanan lain dari aplikasi. Konsumen internal atau eksternal kemudian dapat berlangganan pesan tersebut dan mendapatkan pemberitahuan tentang perubahan segera setelah data dimanipulasi.

Contoh terkenal di bidang itu adalah sistem pemesanan: ketika pengguna ingin membuat pesanan, layanan Ordering menerima data dari aplikasi klien melalui titik akhir REST. Ini memetakan payload ke representasi internal suatu objek Order untuk memvalidasi data. Setelah berhasil berkomitmen pada database, ini menerbitkan kejadian OrderCreated ke bus pesan. Setiap layanan lain yang tertarik pada pesanan baru (misalnya layanan Inventory atau Invoicing), akan berlangganan ke pesan OrderCreated, memprosesnya, dan menyimpannya dalam database sendiri.

Pseudocode berikut menunjukkan bagaimana proses ini biasanya terlihat dari Ordering perspektif layanan:

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

Pendekatan ini bekerja dengan baik hingga terjadi kesalahan antara menyimpan objek pesanan dan menerbitkan kejadian yang sesuai. Mengirim kejadian mungkin gagal pada saat ini karena berbagai alasan:

  • Kesalahan jaringan
  • Gangguan layanan pesan
  • Kegagalan host

Apa pun kesalahannya, hasilnya adalah kejadian OrderCreated tersebut tidak dapat diterbitkan ke bus pesan. Layanan lain tidak akan diberi tahu bahwa pesanan telah dibuat. Layanan Ordering sekarang harus mengurus berbagai hal yang tidak berhubungan dengan proses bisnis yang sebenarnya. Ini perlu melacak peristiwa yang masih perlu ditaruh di bus pesan segera setelah kembali online. Bahkan kasus terburuk pun bisa terjadi: inkonsistensi data dalam aplikasi karena kejadian yang hilang.

Diagram that shows event handling without the Transactional Outbox pattern.

Solution

Ada pola terkenal yang disebut Transactional Outbox yang dapat membantu Anda menghindari situasi ini. Ini memastikan peristiwa disimpan di datastore (biasanya dalam tabel Outbox di database Anda) sebelum akhirnya didorong ke broker pesan. Jika objek bisnis dan kejadian yang sesuai disimpan dalam transaksi database yang sama, dijamin tidak ada data yang akan hilang. Semuanya akan dilakukan, atau semuanya akan dibatalkan jika ada kesalahan. Untuk akhirnya menerbitkan peristiwa, proses pekerja atau layanan yang berbeda meminta tabel Outbox untuk entri yang tidak ditangani, menerbitkan kejadian, dan menandainya sebagai diproses. Pola ini memastikan peristiwa tidak akan hilang setelah objek bisnis dibuat atau dimodifikasi.

Diagram that shows event handling with the Transactional Outbox pattern and a relay service for publishing events to the message broker.

Unduh file Visio arsitektur ini.

Dalam database hubungan, implementasi pola mudah. Jika layanan menggunakan Entity Framework Core, misalnya, layanan akan menggunakan konteks Entity Framework untuk membuat transaksi database, menyimpan objek bisnis dan peristiwa, dan melakukan transaksi–atau melakukan rollback. Selain itu, layanan pekerja yang memproses peristiwa mudah diterapkan: ini secara berkala menanyakan tabel Outbox untuk entri baru, menerbitkan peristiwa yang baru dimasukkan ke bus pesan, dan akhirnya menandai entri ini sebagai diproses.

Dalam praktiknya, ini tidak semudah yang mereka lihat pertama kali. Yang terpenting, Anda perlu memastikan bahwa urutan peristiwa dijaga sehingga peristiwa OrderUpdated tidak diterbitkan sebelum peristiwa OrderCreated.

Implementasi di Azure Cosmos DB

Bagian ini menunjukkan cara menerapkan pola Outbox Transaksional di Azure Cosmos DB untuk mencapai pesan sesuai urutan yang andal di antara berbagai layanan dengan bantuan umpan perubahan dan Service Bus Azure Cosmos DB. Ini menunjukkan layanan sampel yang mengelola Contactobjek (informasi FirstName, LastName, Email, Company, dan sebagainya). Ini menggunakan pola Command and Query Responsibility Segregation (CQRS) dan mengikuti konsep desain yang didorong domain dasar. Anda dapat menemukan kode sampel untuk implementasi pada GitHub.

Objek Contact dalam layanan sampel memiliki struktur berikut:

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

Segera setelah Contact dibuat atau diperbarui, ini akan memancarkan peristiwa yang berisi informasi tentang perubahan saat ini. Antara lain, peristiwa domain dapat:

  • ContactCreated. Muncul ketika kontak ditambahkan.
  • ContactNameUpdated. Muncul ketika FirstName atau LastName diubah.
  • ContactEmailUpdated. Muncul ketika alamat email diperbarui.
  • ContactCompanyUpdated. Muncul ketika salah satu properti perusahaan diubah.

Batch transaksional

Untuk menerapkan pola ini, Anda perlu memastikan Contact objek bisnis dan peristiwa yang sesuai akan disimpan dalam transaksi database yang sama. Di Azure Cosmos DB, transaksi bekerja secara berbeda dari yang dilakukan dalam sistem database hubungan. Transaksi Azure Cosmos DB, yang disebut batch transaksional, beroperasi pada partisi logis tunggal, sehingga menjamin sifat Atomicity, Consistency, Isolation, and Durability (ACID). Anda tidak dapat menyimpan dua dokumen dalam operasi batch transaksional dalam container yang berbeda atau partisi logis. Untuk layanan sampel, ini berarti objek bisnis dan acara atau peristiwa akan dimasukkan ke dalam container dan partisi logis yang sama.

Konteks, repositori, dan UnitOfWork

Inti dari implementasi sampel adalah konteks container yang melacak objek yang disimpan dalam batch transaksional yang sama. Ini mempertahankan daftar objek yang dibuat dan dimodifikasi dan beroperasi pada satu container Azure Cosmos DB. Antarmuka untuk konteks tersebut terlihat seperti ini:

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

Daftar dalam jalur komponen konteks container objek Contact dan DomainEvent. Keduanya akan dimasukkan ke dalam container yang sama. Ini berarti beberapa jenis objek disimpan dalam container Azure Cosmos DB yang sama dan menggunakan properti Type untuk membedakan antara objek bisnis dan suatu peristiwa.

Untuk setiap jenis, ada repositori khusus yang mendefinisikan dan mengimplementasikan akses data. Antarmuka repositori Contact menyediakan metode ini:

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

Repositori Event terlihat serupa, kecuali hanya ada satu metode, yang membuat peristiwa baru di toko:

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

Implementasi kedua antarmuka repositori mendapatkan referensi melalui injeksi dependensi ke satu instans IContainerContext untuk memastikan keduanya beroperasi pada konteks Azure Cosmos DB yang sama.

Komponen terakhir adalah UnitOfWork, yang melakukan perubahan yang diadakan dalam instans IContainerContext ke Azure Cosmos DB:

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

Penanganan peristiwa: Pembuatan dan publikasi

Setiap kali objek Contact dibuat, dimodifikasi atau dihapus (sementara), layanan menimbulkan peristiwa yang sesuai. Inti dari solusi yang diberikan adalah kombinasi desain berbasis domain (DDD) dan pola mediator yang diusulkan oleh Jimmy Bogard. Dia menyarankan untuk mempertahankan daftar peristiwa yang terjadi karena modifikasi objek domain dan menerbitkan peristiwa ini sebelum Anda menyimpan objek yang sebenarnya ke database.

Daftar perubahan disimpan dalam objek domain itu sendiri sehingga tidak ada komponen lain yang dapat memodifikasi rantai peristiwa. Perilaku mempertahankan peristiwa (IEvent instans) dalam objek domain didefinisikan melalui antarmuka IEventEmitter<IEvent> dan diimplementasikan dalam kelas DomainEntity abstrak:

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

Objek Contact tersebut menimbulkan peristiwa domain. Entitas Contact mengikuti konsep DDD dasar, mengonfigurasi pengatur properti domain sebagai pribadi. Tidak ada pengatur publik di kelas. Sebaliknya, ia menawarkan metode untuk memanipulasi keadaan internal. Dalam metode ini, peristiwa yang sesuai untuk modifikasi tertentu (misalnya ContactNameUpdated atau ContactEmailUpdated) dapat dimunculkan.

Berikut contoh untuk pembaruan nama kontak. (Peristiwa ini dimunculkan pada akhir metode.)

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return;

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

ContactNameUpdatedEvent yang sesuai, yang melacak perubahan, terlihat seperti ini:

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

Sejauh ini, peristiwa hanya dicatat dalam objek domain dan tidak ada yang disimpan ke database atau bahkan diterbitkan ke broker pesan. Sesuai dengan rekomendasi, daftar peristiwa akan diproses tepat sebelum objek bisnis disimpan ke penyimpanan data. Dalam hal ini, itu terjadi dalam metode SaveChangesAsync instans IContainerContext, yang diimplementasikan dalam metode RaiseDomainEvents pribadi. (dObjs adalah daftar entitas yang dilacak dari konteks container.)

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

Pada baris terakhir, paket MediatR, implementasi pola mediator di C #, digunakan untuk menerbitkan peristiwa dalam aplikasi. Melakukan hal itu mungkin dilakukan karena semua peristiwa seperti ContactNameUpdatedEvent menerapkan INotification antarmuka paket MediatR.

Peristiwa ini perlu diproses oleh penangan yang sesuai. Di sini, implementasi IEventsRepository turut berperan. Berikut adalah sampel penangan peristiwa NameUpdated:

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

Instans IEventRepository diinjeksi ke kelas penangan melalui konstruktor. Segera setelah ContactNameUpdatedEvent diterbitkan dalam layanan, metode Handle dipanggil dan menggunakan instance repositori kejadian untuk membuat objek pemberitahuan. Objek pemberitahuan tersebut nantinya dimasukkan ke dalam daftar objek yang dilacak dalam objek IContainerContext dan bergabung dengan objek yang disimpan dalam batch transaksional yang sama ke Azure Cosmos DB.

Sejauh ini, konteks container tahu objek mana yang harus diproses. Untuk akhirnya mempertahankan objek yang dilacak ke Azure Cosmos DB, implementasi IContainerContext membuat batch transaksional, menambahkan semua objek yang relevan, dan menjalankan operasi terhadap database. Proses yang dijelaskan ditangani dalam SaveInTransactionalBatchAsync metode, yang dipanggil oleh metode SaveChangesAsync.

Berikut adalah bagian penting dari implementasi yang perlu Anda buat dan jalankan batch transaksional:

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects); 

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

Berikut adalah gambaran umum tentang cara kerja proses sejauh ini (untuk memperbarui nama pada objek kontak):

  1. Klien ingin memperbarui nama kontak. Metode SetName ini dipanggil pada objek kontak dan properti diperbarui.
  2. Peristiwa ContactNameUpdated ditambahkan ke daftar peristiwa di objek domain.
  3. Metode repositori kontak Update dipanggil, yang menambahkan objek domain ke konteks kontainer. Objek sekarang dilacak.
  4. CommitAsync dipanggil pada instans UnitOfWork, yang kemudian memanggil SaveChangesAsync pada konteks container.
  5. Dalam SaveChangesAsync, semua peristiwa di daftar objek domain diterbitkan oleh instance MediatR dan ditambahkan melalui repositori peristiwa ke konteks container yang sama.
  6. Pada SaveChangesAsync, TransactionalBatch dibuat. Ini akan menahan objek kontak dan peristiwa.
  7. TransactionalBatch berjalan dan data dijalankan di Azure Cosmos DB.
  8. SaveChangesAsync dan CommitAsync berhasil kembali.

Persistensi

Seperti yang dapat Anda lihat di cuplikan kode sebelumnya, semua objek yang disimpan ke Azure Cosmos DB dibungkus dalam instans DataObject. Objek ini menyediakan properti umum:

  • ID.
  • PartitionKey.
  • Type.
  • State. Seperti Created, Updated tidak akan bertahan di Azure Cosmos DB.
  • Etag. Untuk penguncian optimis.
  • TTL. Properti Time To Live untuk pembersihan otomatis dokumen lama.
  • Data. Objek data generik.

Properti ini didefinisikan dalam antarmuka generik yang disebut IDataObject dan digunakan oleh repositori dan konteks container:


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

Objek yang dibungkus dalam instans DataObject dan disimpan ke database kemudian akan terlihat seperti sampel ini (Contact dan ContactNameUpdatedEvent):

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

Anda dapat melihat bahwa dokumen Contact dan ContactNameUpdatedEvent (tipe domainEvent) memiliki kunci partisi yang sama dan bahwa kedua dokumen akan dipertahankan dalam partisi logis yang sama.

Mengubah pemrosesan umpan

Untuk membaca aliran peristiwa dan mengirimkannya ke broker pesan, layanan akan menggunakan umpan perubahan Azure Cosmos DB.

Umpan perubahan adalah log perubahan yang dipertahankan dalam container Anda. Ini beroperasi di latar belakang dan melacak modifikasi. Dalam satu partisi logis, urutan perubahan dijamin. Cara paling nyaman untuk membaca umpan perubahan adalah dengan menggunakan fungsi Azure dengan pemicu Azure Cosmos DB. Pilihan lain adalah menggunakan pustaka prosesor umpan perubahan. Ini memungkinkan Anda mengintegrasikan pemrosesan umpan perubahan di API Web Anda sebagai layanan latar belakang (melalui antarmuka IHostedService). Contoh di sini menggunakan aplikasi konsol sederhana yang mengimplementasikan BackgroundService kelas abstrak untuk menghosting tugas latar belakang yang sudah berjalan lama di aplikasi .NET Core.

Untuk menerima perubahan dari umpan perubahan Azure Cosmos DB, Anda perlu membuat contoh objek ChangeFeedProcessor, mendaftarkan metode penangan untuk pemrosesan pesan, dan mulai mendengarkan perubahan:

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

Metode penangan (HandleChangesAsync di sini) kemudian memproses pesan. Dalam sampel ini, peristiwa dipublikasikan ke topik Service Bus yang dipartisi untuk skalabilitas dengan fitur de-duplikasi diaktifkan. Setiap layanan yang tertarik pada perubahan objek Contact kemudian dapat berlangganan topik Service Bus itu dan menerima dan memproses perubahan untuk konteksnya sendiri.

Pesan Service Bus yang dihasilkan memiliki properti SessionId. Saat Anda menggunakan sesi di Service Bus, Anda menjamin bahwa urutan pesan dipertahankan (FIFO). Mempertahankan pesanan diperlukan untuk kasus penggunaan ini.

Berikut cuplikan yang menangani pesan dari umpan perubahan:

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

Penanganan kesalahan

Jika ada kesalahan saat perubahan sedang diproses, pustaka umpan perubahan akan memulai ulang membaca pesan pada posisi di mana ia berhasil memproses batch terakhir. Misalnya, jika aplikasi berhasil memproses 10.000 pesan, sekarang bekerja pada batch 10.001 hingga 10.025, dan kesalahan terjadi, aplikasi dapat memulai ulang dan mengulangi pekerjaannya pada posisi 10.001. Pustaka secara otomatis melacak apa yang telah diproses melalui informasi yang disimpan dalam container Leases di Azure Cosmos DB.

Ada kemungkinan bahwa layanan akan telah mengirim beberapa pesan yang diproses ulang untuk Service Bus. Biasanya, skenario itu akan mengarah pada pemrosesan pesan duplikat. Seperti disebutkan sebelumnya, Service Bus memiliki fitur untuk deteksi pesan duplikat yang perlu Anda aktifkan untuk skenario ini. Layanan memeriksa apakah pesan telah ditambahkan ke topik Service Bus (atau antrean) berdasarkan properti MessageId pesan yang dikendalikan aplikasi. Properti itu diatur ke ID dokumen peristiwa. Jika pesan yang sama dikirim lagi ke Service Bus, layanan akan mengabaikan dan menjatuhkannya.

Pemeliharaan

Dalam implementasi Outbox Transaksional yang khas, layanan memperbarui peristiwa yang ditangani dan menetapkan properti Processed ke true, menunjukkan bahwa pesan telah berhasil dipublikasikan. Perilaku ini dapat diimplementasikan secara manual dalam metode penangan. Proses tersebut tidak diperlukan dalam skenario saat ini. Azure Cosmos DB melacak peristiwa yang diproses dengan menggunakan umpan perubahan (bersama dengan container Leases).

Sebagai langkah terakhir, Anda terkadang perlu menghapus peristiwa dari container sehingga Anda hanya menyimpan catatan/dokumen terbaru. Untuk melakukan pembersihan secara berkala, implementasi menerapkan fitur lain dari Azure Cosmos DB: Time To Live (TTL) pada dokumen. Azure Cosmos DB dapat secara otomatis menghapus dokumen berdasarkan properti TTL yang dapat ditambahkan ke dokumen: rentang waktu dalam hitungan detik. Layanan akan terus-menerus memeriksa container untuk dokumen yang memiliki properti TTL. Segera setelah dokumen kedaluwarsa, Azure Cosmos DB akan menghapusnya dari database.

Ketika semua komponen bekerja seperti yang diharapkan, peristiwa diproses dan dipublikasikan dengan cepat: dalam hitungan detik. Jika ada kesalahan di Azure Cosmos DB, peristiwa tidak akan dikirim ke bus pesan, karena objek bisnis dan peristiwa yang sesuai tidak dapat disimpan ke database. Satu-satunya hal yang perlu dipertimbangkan adalah menetapkan nilai TTL yang sesuai pada dokumen DomainEvent ketika pekerja latar belakang (mengubah prosesor umpan) atau service bus tidak tersedia. Dalam lingkungan produksi, yang terbaik adalah memilih rentang waktu beberapa hari. Misalnya, 10 hari. Semua komponen yang terlibat kemudian akan memiliki cukup waktu untuk memproses/menerbitkan perubahan dalam aplikasi.

Ringkasan

Pola Outbox Transaksional memecahkan masalah penerbitan peristiwa domain yang andal dalam sistem terdistribusi. Dengan menerapkan keadaan objek bisnis dan peristiwanya dalam batch transaksional yang sama dan menggunakan prosesor latar belakang sebagai relai pesan, Anda memastikan bahwa layanan lain, internal atau eksternal, pada akhirnya akan menerima informasi yang mereka andalkan. Sampel ini bukan implementasi tradisional dari pola Outbox Transaksional. Ini menggunakan fitur seperti umpan perubahan Azure Cosmos DB dan Time To Live yang menjaga semuanya tetap sederhana dan bersih.

Berikut adalah ringkasan komponen Azure yang digunakan dalam skenario ini:

Diagram that shows the Azure components to implement Transactional Outbox with Azure Cosmos DB and Azure Service Bus.

Unduh file Visio arsitektur ini.

Keuntungan dari solusi ini adalah:

  • Pesan yang andal dan pengiriman peristiwa yang terjamin.
  • Urutan peristiwa dan de-duplikasi pesan yang dipertahankan melalui Service Bus.
  • Tidak perlu mempertahankan properti Processed tambahan yang menunjukkan keberhasilan pemrosesan dokumen peristiwa.
  • Penghapusan peristiwa dari Azure Cosmos DB melalui TTL. Proses ini tidak menggunakan unit permintaan yang diperlukan untuk menangani permintaan pengguna/aplikasi. Sebaliknya, ini menggunakan unit permintaan "sisa" dalam tugas latar belakang.
  • Pemrosesan pesan yang tahan kesalahan melalui ChangeFeedProcessor (atau fungsi Azure).
  • Opsional: Beberapa prosesor umpan perubahan, masing-masing mempertahankan penunjuknya sendiri di umpan perubahan.

Pertimbangan

Contoh aplikasi yang dibahas dalam artikel ini menunjukkan bagaimana Anda dapat menerapkan pola Outbox Transaksional di Azure dengan Azure Cosmos DB dan Service Bus. Ada juga pendekatan lain yang menggunakan database NoSQL. Untuk menjamin bahwa objek dan peristiwa bisnis akan disimpan dengan andal dalam database, Anda dapat menyematkan daftar peristiwa dalam dokumen objek bisnis. Kelemahan dari pendekatan ini adalah bahwa proses pembersihan perlu memperbarui setiap dokumen yang berisi peristiwa. Ini tidak ideal, terutama dalam hal biaya Unit Permintaan, dibandingkan dengan menggunakan TTL.

Perlu diingat bahwa Anda tidak boleh mempertimbangkan kode sampel yang disediakan dalam kode siap produksi. Ini memiliki beberapa keterbatasan mengenai multithreading, terutama cara acara ditangani di kelas DomainEntity dan bagaimana objek dilacak dalam implementasi CosmosContainerContext. Gunakan sebagai titik awal untuk implementasi Anda sendiri. Atau, pertimbangkan untuk menggunakan pustaka yang sudah ada yang sudah memiliki fungsionalitas ini yang terpasang di dalamnya seperti NServiceBus atau MassTransit.

Menyebarkan skenario ini

Anda dapat menemukan kode sumber, file penyebaran, dan instruksi untuk menguji skenario ini di GitHub: https://github.com/mspnp/transactional-outbox-pattern.

Kontributor

Artikel ini dikelola oleh Microsoft. Ini awalnya ditulis oleh kontributor berikut.

Penulis utama:

  • Dennig Kristen | Insinyur Perangkat Lunak Senior

Untuk melihat profil LinkedIn non-publik, masuk ke LinkedIn.

Langkah berikutnya

Untuk mempelajari lebih lanjut, tinjau artikel ini: