Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Panduan ini menunjukkan kepada Anda cara cepat untuk menyiapkan dan menggunakan Orleans Stream. Untuk mempelajari selengkapnya tentang detail fitur streaming, baca bagian lain dari dokumentasi ini.
Konfigurasi yang diperlukan
Dalam panduan ini, Anda menggunakan aliran berbasis memori yang menggunakan pesan berbasis grain untuk mengirim data aliran ke pelanggan. Anda menggunakan penyedia penyimpanan dalam memori untuk menyimpan daftar langganan. Menggunakan mekanisme berbasis memori untuk streaming dan penyimpanan hanya ditujukan untuk pengembangan dan pengujian lokal, bukan untuk lingkungan produksi.
Pada silo, di mana silo adalah ISiloBuilder, panggil AddMemoryStreams:
silo.AddMemoryStreams("StreamProvider")
.AddMemoryGrainStorage("PubSubStore");
Pada klien kluster, di mana client adalah IClientBuilder, panggil AddMemoryStreams.
client.AddMemoryStreams("StreamProvider");
Dalam panduan ini, gunakan streaming berbasis pesan sederhana yang menggunakan grain messaging untuk mengirim data ke pengguna. Gunakan penyedia penyimpanan dalam memori untuk menyimpan daftar langganan; ini bukan pilihan bijaksana untuk aplikasi produksi nyata.
Pada silo, di mana hostBuilder adalah ISiloHostBuilder, panggil AddSimpleMessageStreamProvider:
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddMemoryGrainStorage("PubSubStore");
Pada klien kluster, di mana clientBuilder adalah IClientBuilder, panggil AddSimpleMessageStreamProvider.
clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");
Nota
Secara default, pesan yang diteruskan melalui Aliran Pesan Sederhana dianggap tidak dapat diubah dan mungkin diteruskan secara referensi ke butir lain. Untuk menonaktifkan perilaku ini, konfigurasikan penyedia SMS untuk menonaktifkan SimpleMessageStreamProviderOptions.OptimizeForImmutableData.
siloBuilder
.AddSimpleMessageStreamProvider(
"SMSProvider",
options => options.OptimizeForImmutableData = false);
Anda dapat membuat aliran, mengirim data dengan menggunakannya sebagai produsen, dan menerima data sebagai langganan.
Menyelenggarakan acara
Cukup mudah untuk menghasilkan event untuk stream. Pertama, dapatkan akses ke penyedia streaming yang ditentukan dalam konfigurasi sebelumnya ("StreamProvider"), lalu pilih aliran dan dorong data ke dalamnya.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
Cukup mudah untuk menghasilkan event untuk stream. Pertama, dapatkan akses ke penyedia streaming yang ditentukan dalam konfigurasi sebelumnya ("SMSProvider"), lalu pilih aliran dan dorong data ke dalamnya.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
Seperti yang Anda lihat, aliran memiliki GUID dan namespace. Ini memudahkan untuk mengidentifikasi aliran unik. Misalnya, namespace untuk ruang obrolan adalah "Rooms", dan GUID bisa menjadi GUID dari pemilik RoomGrain.
Di sini, gunakan GUID ruang obrolan yang diketahui. Menggunakan metode OnNextAsync dari aliran, dorong data ke dalamnya. Mari kita lakukan ini di dalam timer menggunakan angka acak. Anda juga dapat menggunakan jenis data lain untuk aliran.
RegisterTimer(_ =>
{
return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));
Berlangganan dan menerima data streaming
Untuk menerima data, Anda dapat menggunakan langganan implisit dan eksplisit, yang dijelaskan secara lebih rinci di Langganan eksplisit dan implisit. Contoh ini menggunakan langganan implisit, yang lebih mudah. Ketika tipe grain ingin berlangganan ke stream secara implisit, ia menggunakan atribut [ImplicitStreamSubscription(namespace)].
Untuk kasus Anda, tentukan ReceiverGrain seperti ini:
[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver
Setiap kali data dikirim ke aliran di RANDOMDATA namespace (seperti dalam contoh timer), grain bertipe ReceiverGrain dengan Guid yang sama dengan aliran menerima pesan. Bahkan jika tidak ada aktivasi biji-bijian yang saat ini ada, runtime secara otomatis membuat yang baru dan mengirim pesan ke dalamnya.
Agar ini berfungsi, selesaikan proses langganan dengan mengatur OnNextAsync metode untuk menerima data. Untuk melakukannya, ReceiverGrain harus memanggil sesuatu seperti ini dalam :OnActivateAsync
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
Anda sudah siap! Sekarang, satu-satunya persyaratan adalah bahwa sesuatu memicu terbentuknya unit produksi. Kemudian, ia mendaftarkan timer dan mulai mengirim bilangan bulat acak ke semua pihak yang tertarik.
Sekali lagi, panduan ini melewati banyak detail dan hanya memberikan gambaran umum tingkat tinggi. Baca bagian lain dari manual ini dan sumber daya lain di Rx untuk mendapatkan pemahaman yang baik tentang apa yang tersedia dan cara kerjanya.
Pemrograman reaktif dapat menjadi pendekatan yang kuat untuk memecahkan banyak masalah. Misalnya, Anda dapat menggunakan LINQ di pelanggan untuk memfilter angka dan melakukan berbagai operasi yang menarik.