Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Bagian ini memberikan gambaran umum secara keseluruhan tentang Orleans implementasi Stream. Ini menjelaskan konsep dan detail yang tidak terlihat pada tingkat aplikasi. Jika Anda hanya berencana menggunakan stream, Anda tidak perlu membaca bagian ini.
Terminologi:
Kami merujuk dengan kata "antrean" ke teknologi penyimpanan tahan lama yang dapat menyerap peristiwa streaming dan memungkinkan untuk menarik peristiwa atau menyediakan mekanisme berbasis push untuk mengonsumsi peristiwa. Biasanya, untuk memberikan skalabilitas, teknologi tersebut menyediakan antrean yang dipecah/dipartisi. Misalnya, Azure Queues memungkinkan Anda membuat beberapa antrean, dan Event Hubs memiliki beberapa hub.
Aliran berkelanjutan
Semua Orleans penyedia aliran persisten berbagi implementasi PersistentStreamProviderumum . Penyedia aliran generik ini perlu dikonfigurasi dengan teknologi khusus IQueueAdapterFactory.
Misalnya, untuk tujuan pengujian, kami memiliki adaptor antrean yang menghasilkan data pengujiannya sendiri daripada membaca data dari antrean. Kode di bawah ini menunjukkan cara kami mengonfigurasi penyedia aliran persisten untuk menggunakan adaptor antrean kustom (generator) kami. Ini dilakukan dengan mengonfigurasi penyedia aliran persisten dengan fungsi pabrik yang digunakan untuk membuat adaptor.
hostBuilder.AddPersistentStreams(
StreamProviderName, GeneratorAdapterFactory.Create);
Ketika penghasil aliran menghasilkan item streaming baru dan memanggil stream.OnNext(), sistem runtime streaming Orleans memanggil metode yang sesuai pada IQueueAdapter penyedia aliran tersebut yang memasukkan item langsung ke antrean yang tepat.
Agen Penarik
Inti dari Penyedia Aliran Data Persisten adalah agen penarik. Menarik agen menarik peristiwa dari serangkaian antrean tahan lama dan mengirimkannya ke kode aplikasi dalam biji-bijian yang mengonsumsinya. Seseorang dapat menganggap agen penarik sebagai "layanan mikro" yang terdistribusi - ini adalah komponen yang dipartisi, sangat tersedia, dan elastis. Agen penarikan berjalan di dalam silo yang sama yang menghosting butir aplikasi dan dikelola sepenuhnya oleh Orleans Runtime Streaming.
StreamQueueMapper dan StreamQueueBalancer
Agen penarikan diparameterkan dengan IStreamQueueMapper dan IStreamQueueBalancer.
IStreamQueueMapper menyediakan daftar semua antrean dan juga bertanggung jawab untuk memetakan aliran ke antrean. Dengan cara itu, pihak produsen dari Penyedia Aliran Persisten tahu ke dalam antrean mana untuk mengantrekan pesan.
Ekspresi IStreamQueueBalancer menunjukkan bagaimana antrean diimbangi di berbagai Orleans silo dan agen. Tujuannya adalah untuk mendistribusikan antrean kepada petugas dengan cara yang seimbang, guna mencegah kemacetan dan mendukung elastisitas. Ketika silo baru ditambahkan ke Orleans kluster, antrean secara otomatis diseimbangkan kembali di seluruh silo lama dan baru.
StreamQueueBalancer memungkinkan penyesuaian proses tersebut.
Orleans memiliki beberapa StreamQueueBalancers bawaan, untuk mendukung skenario penyeimbangan yang berbeda (sejumlah besar dan kecil antrean) dan lingkungan yang berbeda (Azure, lokal, statis).
Menggunakan contoh generator pengujian dari atas, kode di bawah ini menunjukkan bagaimana seseorang dapat mengonfigurasi pemeta antrean dan penyeimbang antrean.
hostBuilder
.AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
providerConfigurator =>
providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
ob => ob.Configure(options => options.TotalQueueCount = 8))
.UseDynamicClusterConfigDeploymentBalancer());
Kode di atas mengonfigurasi GeneratorAdapterFactory untuk menggunakan pemeta antrean dengan delapan antrean, dan menyeimbangkan antrean di seluruh kluster menggunakan DynamicClusterConfigDeploymentBalancer.
Menarik protokol
Setiap silo menjalankan satu set agen penarikan, setiap agen menarik dari satu antrean. Agen penarik itu sendiri diimplementasikan oleh komponen runtime internal, yang dikenal sebagai SystemTarget. SystemTargets pada dasarnya adalah butiran runtime, tunduk pada konkurensi utas tunggal, dapat menggunakan olahpesan biji-bijian reguler, dan ringan seperti biji-bijian. Berbeda dengan butiran, SystemTargets tidak virtual: mereka secara eksplisit dibuat (oleh runtime) dan tidak transparan lokasi. Dengan menerapkan agen penarikan sebagai SystemTargets, Orleans Runtime Streaming dapat mengandalkan fitur bawaan dan dapat menskalakan ke sejumlah besar antrean, karena membuat agen penarikan baru sama murahnya dengan membuat biji-bijian Orleans baru.
Setiap agen penarik menjalankan timer berkala yang menarik dari antrean dengan memanggil metode IQueueAdapterReceiver.GetQueueMessagesAsync. Pesan yang dikembalikan dimasukkan ke dalam struktur data internal per agen yang disebut IQueueCache. Setiap pesan diperiksa untuk mengetahui aliran tujuannya. Agen menggunakan Pub-Sub untuk mengetahui daftar konsumen streaming yang berlangganan aliran ini. Setelah daftar konsumen diperoleh, agen menyimpannya secara lokal (di dalam cache pub-sub-nya) sehingga tidak perlu berkonsultasi dengan Pub-Sub pada setiap pesan. Agen juga berlangganan pub-sub untuk menerima pemberitahuan tentang konsumen baru yang berlangganan aliran tersebut. Jabat tangan antara agen dan pub-sub ini menjamin semantik langganan streaming yang kuat: setelah konsumen telah berlangganan stream tersebut, ia akan melihat semua peristiwa yang dihasilkan setelah langganan dilakukan. Selain itu, menggunakan StreamSequenceToken memungkinkannya untuk berlangganan di masa lalu.
Cache antrean
IQueueCache adalah struktur data internal per agen yang memungkinkan untuk memisahkan peristiwa baru dari antrean dan mengirimkannya kepada konsumen. Ini juga memungkinkan untuk memisahkan pengiriman ke aliran yang berbeda dan konsumen yang berbeda.
Bayangkan situasi di mana satu aliran memiliki 3 konsumen streaming dan salah satunya lambat. Jika tidak waspada, konsumen lambat ini dapat berdampak pada kemajuan agen, memperlambat konsumsi oleh konsumen lain dari aliran tersebut, dan bahkan memperlambat pengantrian serta pengiriman kejadian untuk aliran lainnya. Untuk mencegahnya dan memungkinkan paralelisme maksimum dalam agen, kami menggunakan IQueueCache.
IQueueCache buffer mengalirkan peristiwa dan menyediakan cara bagi agen untuk mengirimkan peristiwa kepada setiap konsumen dengan kecepatannya sendiri. Pengiriman per konsumen diimplementasikan oleh komponen internal yang disebut IQueueCacheCursor, yang melacak kemajuan per konsumen. Dengan begitu, setiap konsumen menerima kejadian dengan kecepatannya sendiri: konsumen cepat menerima kejadian secepat mereka dikeluarkan dari antrean, sementara konsumen lambat menerimanya lebih lambat. Setelah pesan dikirimkan ke semua konsumen, pesan dapat dihapus dari cache.
Tekanan Balik (Backpressure)
Backpressure di Orleans Runtime Streaming berlaku di dua tempat: membawa peristiwa streaming dari antrean ke agen dan mengirimkan peristiwa dari agen kepada konsumen stream.
Yang terakhir disediakan oleh mekanisme pengiriman pesan bawaan Orleans . Setiap acara stream dikirim dari agen ke konsumen melalui pesan "grain" standar Orleans, satu per satu. Artinya, agen mengirim satu peristiwa (atau batch peristiwa ukuran terbatas) ke setiap konsumen stream dan menunggu panggilan ini. Acara berikutnya tidak akan mulai dikirimkan hingga Tugas untuk acara sebelumnya telah diselesaikan atau gagal. Dengan begitu kami secara alami membatasi tingkat pengiriman per konsumen ke satu pesan pada satu waktu.
Saat membawa peristiwa streaming dari antrean ke agen, Orleans Streaming menyediakan mekanisme Pembatasan Arus khusus yang baru. Karena agen memisahkan pengeluaran peristiwa dari antrean dan mengirimkannya ke konsumen, konsumen yang lambat mungkin akan tertinggal begitu jauh sehingga IQueueCache akan penuh. Untuk mencegah IQueueCache pertumbuhan tanpa batas waktu, kami membatasi ukurannya (batas ukuran dapat dikonfigurasi). Namun, agen tidak pernah membuang pesan yang tidak terkirimkan.
Sebaliknya, ketika cache mulai terisi, agen memperlambat laju pengeluaran peristiwa dari antrean. Dengan begitu, kita dapat melewati periode pengiriman yang lambat dengan menyesuaikan laju yang kita konsumsi dari antrean ("backpressure") dan kembali ke tingkat konsumsi yang cepat kemudian. Untuk mendeteksi lembah "pengiriman lambat", IQueueCache menggunakan struktur data internal berbentuk wadah cache yang melacak kemajuan pengiriman peristiwa ke masing-masing konsumen aliran. Ini menghasilkan sistem yang sangat responsif dan menyesuaikan diri.