Bagikan melalui


Detail implementasi aliran Orleans

Bagian ini memberikan gambaran tingkat tinggi tentang implementasi Orleans Stream. Ini menjelaskan konsep dan detail yang tidak terlihat pada tingkat aplikasi. Jika Anda hanya berencana untuk menggunakan stream, Anda tidak perlu membaca bagian ini.

Terminologi:

Kami merujuk dengan kata "antrian" ke teknologi penyimpanan tahan lama yang dapat menelan peristiwa streaming dan memungkinkan untuk menarik acara atau menyediakan mekanisme berbasis push untuk mengkonsumsi acara. Biasanya, untuk memberikan skalabilitas, teknologi tersebut menyediakan antrian yang dipartisi / dipartisi. Misalnya, Azure Queues memungkinkan Anda membuat beberapa antrean, dan Hub Acara memiliki beberapa hub.

Aliran persisten

Semua penyedia aliran persisten Orleans berbagi implementasi PersistentStreamProvideryang sama. Penyedia aliran generik ini perlu dikonfigurasi dengan teknologi khusus IQueueAdapterFactory.

Misalnya, untuk tujuan pengujian, kami memiliki adaptor antrean yang menghasilkan data pengujian mereka daripada membaca data dari antrean. Kode di bawah ini menunjukkan bagaimana kami mengonfigurasi penyedia aliran persisten untuk menggunakan adaptor antrean kustom (generator) kami. Ini dilakukan dengan mengonfigurasi penyedia aliran persisten dengan fungsi pabrik yang digunakan untuk membuat adaptor.

hostBuilder.AddPersistentStreams(
    StreamProviderName, GeneratorAdapterFactory.Create);

Ketika produsen streaming menghasilkan item streaming baru dan panggilan stream.OnNext(), runtime streaming Orleans memanggil metode yang sesuai pada IQueueAdapter penyedia aliran yang enqueues item langsung ke antrian yang sesuai.

Menarik agen

Di jantung Penyedia Aliran Persisten adalah agen penarik. Menarik agen menarik peristiwa dari satu set antrian tahan lama dan mengirimkannya ke kode aplikasi dalam biji-bijian yang mengkonsumsinya. Seseorang dapat menganggap agen penarik sebagai "layanan mikro" terdistribusi - komponen terdistribusi yang dipartisi, sangat tersedia, dan elastis. Agen penarik berjalan di dalam silo yang sama yang menghosting butiran aplikasi dan sepenuhnya dikelola oleh Orleans Streaming Runtime.

StreamQueueMapper dan StreamQueueBalancer

Menarik agen parameter dengan IStreamQueueMapper dan IStreamQueueBalancer. Ini IStreamQueueMapper menyediakan daftar semua antrian dan juga bertanggung jawab untuk memetakan aliran ke antrian. Dengan begitu, sisi produsen Penyedia Aliran Persisten tahu antrean mana yang akan menyiarkan pesan.

Ini IStreamQueueBalancer mengungkapkan cara antrian seimbang di seluruh silo dan agen Orleans. Tujuannya adalah untuk menetapkan antrian ke agen secara seimbang, untuk mencegah kemacetan dan mendukung elastisitas. Ketika silo baru ditambahkan ke kluster Orleans, antrean secara otomatis diseimbangkan kembali di silo lama dan baru. Ini StreamQueueBalancer memungkinkan penyesuaian proses itu. Orleans memiliki beberapa StreamQueueBalancers bawaan, untuk mendukung skenario penyeimbangan yang berbeda (sejumlah besar dan kecil antrian) dan lingkungan yang berbeda (Azure, on-prem, statis).

Dengan menggunakan contoh generator uji dari atas, kode di bawah ini menunjukkan bagaimana seseorang dapat mengonfigurasi peng mapper antrian dan penyeimbang antrean.

hostBuilder
    .AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
        providerConfigurator =>
        providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
            ob => ob.Configure(options => options.TotalQueueCount = 8))
      .UseDynamicClusterConfigDeploymentBalancer());

Kode di atas mengonfigurasi GeneratorAdapterFactory untuk menggunakan mapper antrian dengan delapan antrean, dan menyeimbangkan antrean di seluruh kluster menggunakan DynamicClusterConfigDeploymentBalancer.

Protokol menarik

Setiap silo menjalankan satu set agen penarik, setiap agen menarik dari satu antrian. Menarik agen itu sendiri diimplementasikan oleh komponen runtime internal, yang disebut SystemTarget. SystemTargets pada dasarnya adalah biji-bijian runtime, tunduk pada konkurensi berulir tunggal, dapat menggunakan pesan biji-bijian biasa, dan ringan seperti biji-bijian. Berbeda dengan biji-bijian, SystemTargets tidak virtual: mereka secara eksplisit dibuat (oleh runtime) dan tidak transparan lokasi. Dengan menerapkan menarik agen sebagai SystemTargets, Orleans Streaming Runtime dapat mengandalkan fitur Orleans built-in dan dapat menskalakan ke sejumlah besar antrian, karena membuat agen penarik baru semurah membuat biji-bijian baru.

Setiap agen penarik menjalankan timer periodik yang menarik dari antrian dengan IQueueAdapterReceiver.GetQueueMessagesAsync menerapkan metode. Pesan yang dikembalikan dimasukkan ke dalam struktur data per agen internal yang disebut IQueueCache. Setiap pesan diperiksa untuk mengetahui aliran tujuannya. Agen menggunakan Pub-Sub untuk mengetahui daftar konsumen streaming yang berlangganan aliran ini. Setelah daftar konsumen diambil, agen menyimpannya secara lokal (di pub-sub cache) sehingga tidak perlu berkonsultasi dengan Pub-Sub pada setiap pesan. Agen juga berlangganan pub-sub untuk menerima pemberitahuan dari setiap konsumen baru yang berlangganan aliran itu. Jabat tangan antara agen dan pub-sub ini menjamin semantik langganan streaming yang kuat: setelah konsumen berlangganan streaming, ia akan melihat semua peristiwa yang dihasilkan setelah berlangganan. Selain itu, menggunakannya StreamSequenceToken memungkinkan untuk berlangganan di masa lalu.

Cache antrean

IQueueCache Adalah struktur data per agen internal yang memungkinkan untuk memisahkan dequeuing peristiwa baru dari antrian dan mengirimkannya ke konsumen. Hal ini juga memungkinkan untuk memisahkan pengiriman ke aliran yang berbeda dan konsumen yang berbeda.

Bayangkan situasi di mana satu aliran memiliki 3 konsumen aliran dan salah satunya lambat. Jika perawatan tidak diambil, konsumen yang lambat ini dapat mempengaruhi kemajuan agen, memperlambat konsumsi konsumen lain dari aliran itu, dan bahkan memperlambat dequeuing dan pengiriman acara untuk aliran lain. Untuk mencegah hal itu dan memungkinkan paralelisme maksimum dalam agen, kami menggunakan IQueueCache.

IQueueCache buffer mengalirkan peristiwa dan menyediakan cara bagi agen untuk memberikan acara kepada setiap konsumen dengan kecepatannya sendiri. Pengiriman per konsumen diimplementasikan oleh komponen internal yang disebut IQueueCacheCursor, yang melacak kemajuan per konsumen. Dengan begitu, setiap konsumen menerima acara dengan kecepatannya sendiri: konsumen cepat menerima acara secepat mereka dicairkan dari antrian, sementara konsumen yang lambat menerimanya di kemudian hari. Setelah pesan dikirim ke semua konsumen, pesan tersebut dapat dihapus dari cache.

Backpressure

Backpressure di Orleans Streaming Runtime berlaku di dua tempat: membawa acara streaming dari antrian ke agen dan mengirimkan acara dari agen untuk mengalirkan konsumen.

Yang terakhir ini disediakan oleh mekanisme pengiriman pesan Orleans built-in. Setiap acara streaming dikirim dari agen ke konsumen melalui pesan biji-bijian Orleans standar, satu per satu. Artinya, agen mengirim satu acara (atau kumpulan acara ukuran terbatas) ke setiap konsumen streaming dan menunggu panggilan ini. Acara berikutnya tidak akan mulai disampaikan sampai Tugas untuk acara sebelumnya diselesaikan atau rusak. Dengan begitu kita secara alami membatasi tingkat pengiriman per konsumen menjadi satu pesan pada satu waktu.

Saat membawa acara streaming dari antrian ke agen, Orleans Streaming menyediakan mekanisme Backpressure khusus baru. Karena agen memisahkan dequeuing peristiwa dari antrian dan mengirimkannya ke konsumen, satu konsumen lambat mungkin tertinggal begitu banyak sehingga IQueueCache akan terisi. Untuk mencegah IQueueCache pertumbuhan tanpa batas waktu, kami membatasi ukurannya (batas ukuran dapat dikonfigurasi). Namun, agen tidak pernah membuang peristiwa yang tidak terkirim.

Sebaliknya, ketika cache mulai terisi, agen memperlambat laju dequeuing peristiwa dari antrian. Dengan begitu, kita dapat "keluar" periode pengiriman yang lambat dengan menyesuaikan tingkat di mana kita mengkonsumsi dari antrian ("backpressure") dan kembali ke tingkat konsumsi yang cepat di kemudian hari. Untuk mendeteksi lembah IQueueCache "pengiriman lambat" menggunakan struktur data internal bucket cache yang melacak kemajuan pengiriman peristiwa ke konsumen aliran individu. Ini menghasilkan sistem yang sangat responsif dan menyesuaikan diri.