Bagikan melalui


Berlangganan peristiwa

Tip

Konten ini adalah kutipan dari eBook, .NET Microservices Architecture for Containerized .NET Applications, tersedia di .NET Docs atau sebagai PDF yang dapat diunduh gratis dan dapat dibaca secara offline.

.NET Microservices Architecture for Containerized .NET Applications eBook cover thumbnail.

Langkah pertama untuk menggunakan bus peristiwa adalah berlangganan layanan mikro ke peristiwa yang ingin mereka terima. Fungsionalitas tersebut harus dilakukan di layanan mikro penerima.

Kode sederhana berikut menunjukkan apa yang perlu diterapkan oleh setiap layanan mikro penerima saat memulai layanan (yaitu, di Startup kelas ) sehingga berlangganan peristiwa yang dibutuhkan. Dalam hal ini, basket-api layanan mikro perlu berlangganan ProductPriceChangedIntegrationEvent dan OrderStartedIntegrationEvent pesan.

Misalnya, saat berlangganan peristiwa, yang membuat layanan mikro keranjang ProductPriceChangedIntegrationEvent mengetahui perubahan apa pun pada harga produk dan memungkinkannya memperingatkan pengguna tentang perubahan jika produk tersebut ada di keranjang pengguna.

var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();

eventBus.Subscribe<ProductPriceChangedIntegrationEvent,
                   ProductPriceChangedIntegrationEventHandler>();

eventBus.Subscribe<OrderStartedIntegrationEvent,
                   OrderStartedIntegrationEventHandler>();

Setelah kode ini berjalan, layanan mikro pelanggan akan mendengarkan melalui saluran RabbitMQ. Ketika pesan jenis ProductPriceChangedIntegrationEvent tiba, kode memanggil penanganan aktivitas yang diteruskan ke dalamnya dan memproses peristiwa.

Menerbitkan peristiwa melalui bus peristiwa

Terakhir, pengirim pesan (layanan mikro asal) menerbitkan peristiwa integrasi dengan kode yang mirip dengan contoh berikut. (Pendekatan ini adalah contoh sederhana yang tidak mempertimbangkan atomitas.) Anda akan menerapkan kode serupa setiap kali peristiwa harus disebarluaskan di beberapa layanan mikro, biasanya tepat setelah melakukan data atau transaksi dari layanan mikro asal.

Pertama, objek implementasi bus peristiwa (berdasarkan RabbitMQ atau berdasarkan bus layanan) akan disuntikkan ke konstruktor pengontrol, seperti dalam kode berikut:

[Route("api/v1/[controller]")]
public class CatalogController : ControllerBase
{
    private readonly CatalogContext _context;
    private readonly IOptionsSnapshot<Settings> _settings;
    private readonly IEventBus _eventBus;

    public CatalogController(CatalogContext context,
        IOptionsSnapshot<Settings> settings,
        IEventBus eventBus)
    {
        _context = context;
        _settings = settings;
        _eventBus = eventBus;
    }
    // ...
}

Kemudian Anda menggunakannya dari metode pengontrol Anda, seperti dalam metode UpdateProduct:

[Route("items")]
[HttpPost]
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem product)
{
    var item = await _context.CatalogItems.SingleOrDefaultAsync(
        i => i.Id == product.Id);
    // ...
    if (item.Price != product.Price)
    {
        var oldPrice = item.Price;
        item.Price = product.Price;
        _context.CatalogItems.Update(item);
        var @event = new ProductPriceChangedIntegrationEvent(item.Id,
            item.Price,
            oldPrice);
        // Commit changes in original transaction
        await _context.SaveChangesAsync();
        // Publish integration event to the event bus
        // (RabbitMQ or a service bus underneath)
        _eventBus.Publish(@event);
        // ...
    }
    // ...
}

Dalam hal ini, karena layanan mikro asal adalah layanan mikro CRUD sederhana, kode tersebut ditempatkan langsung ke dalam pengontrol API Web.

Dalam layanan mikro yang lebih canggih, seperti saat menggunakan pendekatan CQRS, dapat diimplementasikan di CommandHandler kelas, dalam metode Handle() tersebut.

Merancang atomitas dan ketahanan saat menerbitkan ke bus peristiwa

Saat Anda menerbitkan peristiwa integrasi melalui sistem olahpesan terdistribusi seperti bus peristiwa, Anda memiliki masalah memperbarui database asli secara atomik dan menerbitkan peristiwa (yaitu, baik operasi selesai atau tidak sama sekali). Misalnya, dalam contoh yang disederhanakan yang ditunjukkan sebelumnya, kode menerapkan data ke database ketika harga produk diubah lalu menerbitkan pesan ProductPriceChangedIntegrationEvent. Awalnya, mungkin terlihat penting bahwa kedua operasi ini dilakukan secara atomik. Namun, jika Anda menggunakan transaksi terdistribusi yang melibatkan database dan broker pesan, seperti yang Anda lakukan dalam sistem yang lebih lama seperti Microsoft Message Queuing (MSMQ), pendekatan ini tidak direkomendasikan karena alasan yang dijelaskan oleh teorema CAP.

Pada dasarnya, Anda menggunakan layanan mikro untuk membangun sistem yang dapat diskalakan dan sangat tersedia. Agak menyederhanakan, teorema CAP mengatakan bahwa Anda tidak dapat membangun database (terdistribusi) (atau layanan mikro yang memiliki modelnya) yang terus tersedia, sangat konsisten, dan toleran terhadap partisi apa pun. Anda harus memilih dua dari ketiga properti ini.

Dalam arsitektur berbasis layanan mikro, Anda harus memilih ketersediaan dan toleransi, dan Anda harus menetapkan konsistensi yang kuat. Oleh karena itu, di sebagian besar aplikasi berbasis layanan mikro modern, Anda biasanya tidak ingin menggunakan transaksi terdistribusi dalam olahpesan, seperti yang Anda lakukan ketika Anda menerapkan transaksi terdistribusi berdasarkan Windows Koordinator Transaksi Terdistribusi (DTC) dengan MSMQ.

Mari kita kembali ke masalah awal dan contohnya. Jika layanan crash setelah database diperbarui (dalam hal ini, tepat setelah baris kode dengan _context.SaveChangesAsync()), tetapi sebelum peristiwa integrasi diterbitkan, sistem keseluruhan bisa menjadi tidak konsisten. Pendekatan ini mungkin penting bagi bisnis, tergantung pada operasi bisnis tertentu yang Anda hadapi.

Seperti disebutkan sebelumnya di bagian arsitektur, Anda dapat memiliki beberapa pendekatan untuk menangani masalah ini:

  • Menggunakan pola Sumber Peristiwa lengkap.

  • Menggunakan penggalian log transaksi.

  • Menggunakan pola Outbox. Ini adalah tabel transaksional untuk menyimpan peristiwa integrasi (memperluas transaksi lokal).

Untuk skenario ini, menggunakan pola Event Sourcing (ES) lengkap adalah salah satu pendekatan terbaik, jika bukan yang terbaik. Namun, dalam banyak skenario aplikasi, Anda mungkin tidak dapat menerapkan sistem ES penuh. ES berarti hanya menyimpan peristiwa domain di database transaksional Anda, alih-alih menyimpan data status saat ini. Menyimpan hanya peristiwa domain yang dapat memiliki manfaat besar, seperti memiliki riwayat sistem Anda yang tersedia dan dapat menentukan status sistem Anda kapan saja di masa lalu. Namun, menerapkan sistem ES penuh mengharuskan Anda untuk merancang ulang sebagian besar sistem Anda dan memperkenalkan banyak kompleksitas dan persyaratan lainnya. Misalnya, Anda ingin menggunakan database yang dibuat khusus untuk sumber peristiwa, seperti Event Store, atau database berorientasi dokumen seperti Azure Cosmos DB, MongoDB, Cassandra, CouchDB, atau RavenDB. ES adalah pendekatan yang bagus untuk masalah ini, tetapi bukan solusi paling mudah kecuali Anda sudah terbiasa dengan sumber peristiwa.

Opsi untuk menggunakan penambangan log transaksi awalnya terlihat transparan. Namun, untuk menggunakan pendekatan ini, layanan mikro harus digabungkan ke log transaksi RDBMS Anda, seperti log transaksi SQL Server. Pendekatan ini mungkin tidak diinginkan. Kelemahan lainnya adalah bahwa pembaruan tingkat rendah yang dicatat dalam log transaksi mungkin tidak berada pada tingkat yang sama dengan peristiwa integrasi tingkat tinggi Anda. Jika demikian, proses rekayasa balik operasi log transaksi tersebut bisa sulit.

Pendekatan seimbang adalah campuran tabel database transaksional dan pola ES yang disederhanakan. Anda dapat menggunakan status seperti "siap untuk menerbitkan peristiwa," yang Anda tetapkan dalam peristiwa awal saat Anda menerapkannya ke tabel peristiwa integrasi. Anda kemudian mencoba menerbitkan peristiwa ke bus peristiwa. Jika tindakan terbitkan peristiwa berhasil, Anda memulai transaksi lain di layanan asal dan memindahkan status dari "siap menerbitkan peristiwa" ke "peristiwa yang sudah diterbitkan."

Jika tindakan penerbitan peristiwa di bus peristiwa gagal, data masih tidak akan konsisten dalam layanan mikro asal—itu masih ditandai sebagai "siap untuk menerbitkan peristiwa," dan sehubungan dengan layanan lainnya, pada akhirnya akan konsisten. Anda selalu dapat meminta pekerjaan latar belakang yang memeriksa status transaksi atau peristiwa integrasi. Jika pekerjaan menemukan peristiwa dalam status "siap untuk menerbitkan peristiwa", pekerjaan dapat mencoba menerbitkan ulang peristiwa tersebut ke bus peristiwa.

Perhatikan bahwa dengan pendekatan ini, Anda hanya mempertahankan peristiwa integrasi untuk setiap layanan mikro asal, dan hanya peristiwa yang ingin Anda komunikasikan ke layanan mikro atau sistem eksternal lainnya. Sebaliknya, dalam sistem ES penuh, Anda juga menyimpan semua peristiwa domain.

Oleh karena itu, pendekatan seimbang ini adalah sistem ES yang disederhanakan. Anda memerlukan daftar peristiwa integrasi dengan statusnya saat ini ("siap diterbitkan" versus "diterbitkan"). Tetapi Anda hanya perlu menerapkan status ini untuk peristiwa integrasi. Dan dalam pendekatan ini, Anda tidak perlu menyimpan semua data domain Anda sebagai peristiwa dalam database transaksional, seperti yang Anda lakukan dalam sistem ES penuh.

Jika Anda sudah menggunakan database relasional, Anda bisa menggunakan tabel transaksional untuk menyimpan peristiwa integrasi. Untuk mencapai atomitas dalam aplikasi, Anda menggunakan proses dua langkah berdasarkan transaksi lokal. Pada dasarnya, Anda memiliki tabel IntegrationEvent dalam database yang sama tempat Anda memiliki entitas domain Anda. Tabel tersebut berfungsi sebagai asuransi untuk mencapai atomitas sehingga Anda menyertakan peristiwa integrasi yang bertahan ke dalam transaksi yang sama yang menerapkan data domain Anda.

Langkah demi langkah, prosesnya berjalan seperti ini:

  1. Aplikasi memulai transaksi database lokal.

  2. Kemudian memperbarui status entitas domain Anda dan menyisipkan peristiwa ke dalam tabel peristiwa integrasi.

  3. Akhirnya, ia melakukan transaksi, sehingga Anda mendapatkan atomitas yang diinginkan dan kemudian

  4. Anda mempublikasikan peristiwa entah apa (berikutnya).

Saat menerapkan langkah-langkah penerbitan peristiwa, Anda memiliki pilihan berikut:

  • Terbitkan peristiwa integrasi tepat setelah melakukan transaksi dan gunakan transaksi lokal lain untuk menandai peristiwa dalam tabel sebagai diterbitkan. Kemudian, gunakan tabel hanya sebagai artefak untuk melacak peristiwa integrasi jika terjadi masalah di layanan mikro jarak jauh, dan melakukan tindakan kompensasi berdasarkan peristiwa integrasi yang disimpan.

  • Gunakan tabel sebagai jenis antrean. Utas aplikasi atau proses terpisah mengkueri tabel peristiwa integrasi, menerbitkan peristiwa ke bus peristiwa, lalu menggunakan transaksi lokal untuk menandai peristiwa sebagai diterbitkan.

Gambar 6-22 menunjukkan arsitektur untuk pendekatan pertama dari pendekatan ini.

Diagram of atomicity when publishing without a worker microservice.

Gambar 6-22. Atomitas saat menerbitkan peristiwa ke bus peristiwa

Pendekatan yang diilustrasikan pada Gambar 6-22 tidak memiliki layanan mikro pekerja tambahan yang bertugas memeriksa dan mengonfirmasi keberhasilan peristiwa integrasi yang diterbitkan. Jika terjadi kegagalan, layanan mikro pekerja pemeriksa tambahan tersebut dapat membaca peristiwa dari tabel dan menerbitkan ulang, yaitu, ulangi langkah nomor 2.

Tentang pendekatan kedua: Anda menggunakan tabel EventLog sebagai antrean dan selalu menggunakan layanan mikro pekerja untuk menerbitkan pesan. Dalam hal ini, prosesnya seperti yang ditunjukkan pada Gambar 6-23. Ini menunjukkan layanan mikro tambahan, dan tabel adalah sumber tunggal saat menerbitkan peristiwa.

Diagram of atomicity when publishing with a worker microservice.

Gambar 6-23. Atomitas saat menerbitkan peristiwa ke bus peristiwa dengan layanan mikro pekerja

Untuk kemudahan, sampel eShopOnContainers menggunakan pendekatan pertama (tanpa proses tambahan atau layanan mikro pemeriksa) ditambah bus peristiwa. Namun, sampel eShopOnContainers tidak menangani semua kemungkinan kasus kegagalan. Dalam aplikasi nyata yang disebarkan ke cloud, Anda harus menerima fakta bahwa masalah akan muncul pada akhirnya, dan Anda harus menerapkan logika pemeriksaan pengiriman ulang itu. Menggunakan tabel sebagai antrean bisa lebih efektif daripada pendekatan pertama jika Anda memiliki tabel tersebut sebagai sumber peristiwa tunggal saat menerbitkannya (dengan pekerja) melalui bus peristiwa.

Menerapkan atomitas saat menerbitkan peristiwa integrasi melalui bus peristiwa

Kode berikut menunjukkan bagaimana Anda dapat membuat satu transaksi yang melibatkan beberapa objek DbContext—satu konteks yang terkait dengan data asli yang diperbarui, dan konteks kedua yang terkait dengan tabel IntegrationEventLog.

Transaksi dalam contoh kode di bawah ini tidak akan tangguh jika koneksi ke database memiliki masalah pada saat kode berjalan. Ini dapat terjadi dalam sistem berbasis cloud seperti Azure SQL DB, yang mungkin memindahkan database di seluruh server. Untuk menerapkan transaksi tangguh di beberapa konteks, lihat bagian Menerapkan koneksi SQL Inti Kerangka Kerja Entitas yang tangguh nanti dalam panduan ini.

Untuk kejelasan, contoh berikut menunjukkan seluruh proses dalam satu bagian kode. Namun, implementasi eShopOnContainers direfaktor dan membagi logika ini menjadi beberapa kelas sehingga lebih mudah untuk dipertahankan.

// Update Product from the Catalog microservice
//
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem productToUpdate)
{
  var catalogItem =
       await _catalogContext.CatalogItems.SingleOrDefaultAsync(i => i.Id ==
                                                               productToUpdate.Id);
  if (catalogItem == null) return NotFound();

  bool raiseProductPriceChangedEvent = false;
  IntegrationEvent priceChangedEvent = null;

  if (catalogItem.Price != productToUpdate.Price)
          raiseProductPriceChangedEvent = true;

  if (raiseProductPriceChangedEvent) // Create event if price has changed
  {
      var oldPrice = catalogItem.Price;
      priceChangedEvent = new ProductPriceChangedIntegrationEvent(catalogItem.Id,
                                                                  productToUpdate.Price,
                                                                  oldPrice);
  }
  // Update current product
  catalogItem = productToUpdate;

  // Just save the updated product if the Product's Price hasn't changed.
  if (!raiseProductPriceChangedEvent)
  {
      await _catalogContext.SaveChangesAsync();
  }
  else  // Publish to event bus only if product price changed
  {
        // Achieving atomicity between original DB and the IntegrationEventLog
        // with a local transaction
        using (var transaction = _catalogContext.Database.BeginTransaction())
        {
           _catalogContext.CatalogItems.Update(catalogItem);
           await _catalogContext.SaveChangesAsync();

           await _integrationEventLogService.SaveEventAsync(priceChangedEvent);

           transaction.Commit();
        }

      // Publish the integration event through the event bus
      _eventBus.Publish(priceChangedEvent);

      _integrationEventLogService.MarkEventAsPublishedAsync(
                                                priceChangedEvent);
  }

  return Ok();
}

Setelah peristiwa integrasi ProductPriceChangedIntegrationEvent dibuat, transaksi yang menyimpan operasi domain asli (perbarui item katalog) juga menyertakan persistensi peristiwa dalam tabel EventLog. Ini menjadikannya satu transaksi, dan Anda akan selalu dapat memeriksa apakah pesan peristiwa dikirim.

Tabel log peristiwa diperbarui secara atomik dengan operasi database asli, menggunakan transaksi lokal terhadap database yang sama. Jika salah satu operasi gagal, pengecualian dilemparkan dan transaksi mengembalikan operasi yang telah selesai, sehingga mempertahankan konsistensi antara operasi domain dan pesan peristiwa yang disimpan ke tabel.

Menerima pesan dari langganan: penanganan aktivitas di layanan mikro penerima

Selain logika langganan peristiwa, Anda perlu menerapkan kode internal untuk penanganan aktivitas integrasi (seperti metode panggilan balik). Penanganan aktivitas adalah tempat Anda menentukan di mana pesan peristiwa dari jenis tertentu akan diterima dan diproses.

Penanganan aktivitas pertama kali menerima instans peristiwa dari bus peristiwa. Kemudian menemukan komponen yang akan diproses terkait dengan peristiwa integrasi tersebut, menyebarkan dan mempertahankan peristiwa sebagai perubahan status dalam layanan mikro penerima. Misalnya, jika peristiwa ProductPriceChanged berasal dari layanan mikro katalog, itu ditangani di layanan mikro keranjang dan mengubah status dalam layanan mikro keranjang penerima ini juga, seperti yang ditunjukkan dalam kode berikut.

namespace Microsoft.eShopOnContainers.Services.Basket.API.IntegrationEvents.EventHandling
{
    public class ProductPriceChangedIntegrationEventHandler :
        IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>
    {
        private readonly IBasketRepository _repository;

        public ProductPriceChangedIntegrationEventHandler(
            IBasketRepository repository)
        {
            _repository = repository;
        }

        public async Task Handle(ProductPriceChangedIntegrationEvent @event)
        {
            var userIds = await _repository.GetUsers();
            foreach (var id in userIds)
            {
                var basket = await _repository.GetBasket(id);
                await UpdatePriceInBasketItems(@event.ProductId, @event.NewPrice, basket);
            }
        }

        private async Task UpdatePriceInBasketItems(int productId, decimal newPrice,
            CustomerBasket basket)
        {
            var itemsToUpdate = basket?.Items?.Where(x => int.Parse(x.ProductId) ==
                productId).ToList();
            if (itemsToUpdate != null)
            {
                foreach (var item in itemsToUpdate)
                {
                    if(item.UnitPrice != newPrice)
                    {
                        var originalPrice = item.UnitPrice;
                        item.UnitPrice = newPrice;
                        item.OldUnitPrice = originalPrice;
                    }
                }
                await _repository.UpdateBasket(basket);
            }
        }
    }
}

Penanganan aktivitas perlu memverifikasi apakah produk ada di salah satu instans keranjang. Penanganan aktivitas juga memperbarui harga item untuk setiap item baris kerajang terkait. Terakhir, penanganan aktivitas membuat pemberitahuan yang akan ditampilkan kepada pengguna tentang perubahan harga, seperti yang ditunjukkan pada Gambar 6-24.

Screenshot of a browser showing the price change notification on the user cart.

Gambar 6-24. Menampilkan perubahan harga item dalam ke keranjang, seperti yang dikomunikasikan oleh peristiwa integrasi

Idempotensi dalam memperbarui peristiwa pesan

Aspek penting dari peristiwa pesan pembaruan adalah bahwa kegagalan pada titik mana pun dalam komunikasi harus menyebabkan pesan dicoba kembali. Jika tidak, tugas latar belakang mungkin mencoba menerbitkan peristiwa yang telah diterbitkan, membuat kondisi balapan. Pastikan bahwa pembaruan bersifat idempoten atau memberikan informasi yang cukup untuk memastikan bahwa Anda dapat mendeteksi duplikat, membuangnya, dan hanya mengirim kembali satu respons.

Seperti disebutkan sebelumnya, idempotensi berarti bahwa operasi dapat dilakukan beberapa kali tanpa mengubah hasilnya. Dalam lingkungan olahpesan, seperti saat mengkomunikasikan peristiwa, peristiwa idempoten jika dapat dikirimkan beberapa kali tanpa mengubah hasil untuk layanan mikro penerima. Ini mungkin diperlukan karena sifat peristiwa itu sendiri, atau karena cara sistem menangani peristiwa. Idempotensi pesan penting dalam aplikasi apa pun yang menggunakan olahpesan, bukan hanya dalam aplikasi yang menerapkan pola bus peristiwa.

Contoh operasi idempoten adalah pernyataan SQL yang menyisipkan data ke dalam tabel hanya jika data tersebut belum ada dalam tabel. Tidak masalah berapa kali Anda menjalankan pernyataan sisipkan SQL; hasilnya akan sama—tabel akan berisi data tersebut. Idempotensi seperti ini juga dapat diperlukan saat berhadapan dengan pesan jika pesan berpotensi dikirim dan oleh karena itu diproses lebih dari sekali. Misalnya, jika logika coba lagi menyebabkan pengirim mengirim pesan yang sama persis lebih dari sekali, Anda perlu memastikan bahwa hal itu bersifat idempoten.

Dimungkinkan untuk merancang pesan yang idempoten. Misalnya, Anda dapat membuat peristiwa yang mengatakan "atur harga produk ke $25" alih-alih "tambahkan $5 ke harga produk." Anda dapat memproses pesan pertama dengan aman berapa kali dan hasilnya akan sama. Itu tidak benar untuk pesan kedua. Tetapi bahkan dalam kasus pertama, Anda mungkin tidak ingin memproses peristiwa pertama, karena sistem juga dapat mengirim peristiwa perubahan harga yang lebih baru dan Anda akan menimpa harga baru.

Contoh lain mungkin merupakan peristiwa yang diselesaikan pesanan yang disebarluaskan ke beberapa pelanggan. Aplikasi harus memastikan bahwa informasi pesanan diperbarui di sistem lain hanya sekali, bahkan jika ada peristiwa pesan duplikat untuk peristiwa yang diselesaikan pesanan yang sama.

Lebih mudah untuk memiliki semacam identitas per peristiwa sehingga Anda dapat membuat logika yang memberlakukan bahwa setiap peristiwa diproses hanya sekali per penerima.

Beberapa pemrosesan pesan secara inheren yang bersifat idempoten. Misalnya, jika sistem menghasilkan gambar mini gambar, mungkin tidak masalah berapa kali pesan tentang gambar mini yang dihasilkan diproses; hasilnya adalah thumbnail dihasilkan dan sama setiap saat. Di sisi lain, operasi seperti memanggil gateway pembayaran untuk membebankan kartu kredit mungkin tidak idempoten sama sekali. Dalam kasus ini, Anda perlu memastikan bahwa memproses pesan beberapa kali memiliki efek yang Anda harapkan.

Sumber daya tambahan

Deduplikasi pesan peristiwa integrasi

Anda dapat memastikan bahwa peristiwa pesan dikirim dan diproses hanya sekali per pelanggan pada tingkat yang berbeda. Salah satu caranya adalah dengan menggunakan fitur deduplikasi yang ditawarkan oleh infrastruktur olahpesan yang Anda gunakan. Lainnya adalah menerapkan logika kustom di layanan mikro tujuan Anda. Memiliki validasi di tingkat transportasi dan tingkat aplikasi adalah taruhan terbaik Anda.

Mendeduplikasi peristiwa pesan di tingkat EventHandler

Salah satu cara untuk memastikan bahwa peristiwa hanya diproses sekali oleh penerima mana pun adalah dengan menerapkan logika tertentu saat memproses peristiwa pesan di penanganan aktivitas. Misalnya, itu adalah pendekatan yang digunakan dalam aplikasi eShopOnContainers, seperti yang Dapat Anda lihat di kode sumber kelas UserCheckoutAcceptedIntegrationEventHandler saat menerima UserCheckoutAcceptedIntegrationEvent peristiwa integrasi. (Dalam hal ini, dikemas CreateOrderCommand dengan IdentifiedCommand, menggunakan eventMsg.RequestId sebagai pengidentifikasi, sebelum mengirimkannya ke penanganan perintah).

Deduplikasi pesan saat menggunakan RabbitMQ

Ketika kegagalan jaringan terputus-terputus terjadi, pesan dapat diduplikasi, dan penerima pesan harus siap untuk menangani pesan duplikat ini. Jika memungkinkan, penerima harus menangani pesan dengan cara yang idempoten, yang lebih baik daripada secara eksplisit menanganinya dengan deduplikasi.

Menurut dokumentasi RabbitMQ, "Jika pesan dikirimkan kepada konsumen dan kemudian diantrekan kembali (karena tidak diakui sebelum koneksi konsumen terputus, misalnya) maka RabbitMQ akan mengatur bendera yang dikirimkan kembali di atasnya ketika dikirimkan lagi (apakah ke konsumen yang sama atau yang berbeda).

Jika bendera "disetel ulang", penerima harus mempertimbangkannya, karena pesan mungkin sudah diproses. Tetapi itu tidak dijamin; pesan mungkin tidak pernah mencapai penerima setelah meninggalkan broker pesan, mungkin karena masalah jaringan. Di sisi lain, jika bendera "dibuat ulang" tidak diatur, dijamin bahwa pesan belum dikirim lebih dari sekali. Oleh karena itu, penerima perlu mendeduplikasi pesan atau memproses pesan dengan cara yang idempoten hanya jika bendera "dibuat ulang" diatur dalam pesan.

Sumber daya tambahan