Bagikan melalui


Orleans API streaming

Aplikasi berinteraksi dengan aliran melalui API yang sangat mirip dengan Reactive Extensions (Rx) terkenal di .NET. Perbedaan utamanya adalah bahwa Orleans ekstensi aliran asinkron untuk membuat pemrosesan lebih efisien dalam Orleans lapisan komputasi yang 'terdistribusi dan dapat diskalakan'.

Aliran asinkron

Anda mulai dengan menggunakan penyedia streaming untuk mendapatkan handle ke aliran. Anda dapat menganggap penyedia aliran sebagai pabrik aliran yang memungkinkan pelaksana untuk menyesuaikan perilaku streaming dan semantik:

IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");

Anda dapat mendapatkan referensi ke penyedia aliran baik dengan memanggil metode Grain.GetStreamProvider ketika berada di dalam grain atau dengan memanggil metode GetStreamProvider pada instans klien.

Orleans.Streams.IAsyncStream<T> adalah pegangan logis yang memiliki tipe data yang kuat ke aliran virtual, mirip dengan referensi Orleans Grain. Panggilan ke GetStreamProvider dan GetStream murni lokal. Argumen untuk GetStream adalah GUID dan string tambahan yang disebut namespace streaming (yang bisa null). Bersama, GUID dan string namespace membentuk identitas stream (mirip dengan argumen untuk IGrainFactory.GetGrain). Kombinasi ini memberikan fleksibilitas ekstra dalam menentukan identitas aliran. Sama seperti Grain 7 mungkin ada dalam kategori PlayerGrain dan Grain 7 yang berbeda mungkin ada dalam kategori ChatRoomGrain, Stream 123 dapat ada di dalam ruang nama PlayerEventsStream, dan aliran 123 yang berbeda dapat ada dalam ruang nama ChatRoomMessagesStream.

Memproduksi dan mengonsumsi

IAsyncStream<T> IAsyncObserver<T> mengimplementasikan antarmuka dan IAsyncObservable<T> . Ini memungkinkan aplikasi Anda untuk menggunakan aliran baik untuk menghasilkan peristiwa baru menggunakan IAsyncObserver<T> atau untuk berlangganan dan menggunakan peristiwa menggunakan IAsyncObservable<T>.

public interface IAsyncObserver<in T>
{
    Task OnNextAsync(T item, StreamSequenceToken token = null);
    Task OnCompletedAsync();
    Task OnErrorAsync(Exception ex);
}

public interface IAsyncObservable<T>
{
    Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}

Untuk menghasilkan peristiwa ke dalam aliran, aplikasi Anda memanggil:

await stream.OnNextAsync<T>(event)

Untuk berlangganan streaming, aplikasi Anda memanggil:

StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)

Argumen untuk SubscribeAsync dapat berupa objek yang mengimplementasikan IAsyncObserver<T> antarmuka atau kombinasi fungsi lambda untuk memproses peristiwa masuk. Opsi lainnya untuk SubscribeAsync tersedia melalui AsyncObservableExtensions kelas . SubscribeAsync mengembalikan sebuah StreamSubscriptionHandle<T>, suatu handle buram yang digunakan untuk berhenti berlangganan dari stream (mirip dengan versi asinkron dari IDisposable).

await subscriptionHandle.UnsubscribeAsync()

Penting untuk dicatat bahwa langganan adalah untuk butiran, bukan untuk aktivasi. Setelah kode grain berlangganan ke aliran, langganan ini berlangsung melampaui durasi aktivasi ini dan tetap permanen hingga kode grain tersebut (berpotensi dalam aktivasi yang berbeda) secara tegas berhenti berlangganan. Ini adalah inti dari abstraksi aliran virtual: tidak hanya semua aliran selalu ada secara logis, tetapi langganan aliran juga berkelanjutan dan tetap ada melampaui aktivasi fisik tertentu yang membuatnya.

Beberapa

Aliran Orleans dapat memiliki beberapa produsen dan beberapa konsumen. Pesan yang diterbitkan oleh produsen dikirimkan ke semua konsumen yang berlangganan aliran sebelum pesan diterbitkan.

Selain itu, konsumen dapat berlangganan aliran yang sama beberapa kali. Setiap kali berlangganan, itu mendapatkan sebuah StreamSubscriptionHandle<T> yang unik. Jika grain (atau klien) berlangganan X kali ke aliran yang sama, ia menerima peristiwa yang sama X kali, sekali untuk setiap langganan. Konsumen juga dapat berhenti berlangganan dari langganan individual. Anda dapat menemukan semua langganan saat ini dengan memanggil:

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

Memulihkan dari kegagalan

Jika produsen aliran mati (atau biji-bijiannya dinonaktifkan), tidak perlu melakukan apa pun. Lain kali unit ini ingin menghasilkan lebih banyak kejadian, ia dapat memperoleh pengendali aliran lagi dan menghasilkan kejadian baru seperti biasanya.

Logika konsumen sedikit lebih terlibat. Seperti disebutkan sebelumnya, setelah sebuah objek konsumen berlangganan pada sebuah aliran, langganan ini berlaku sampai objek tersebut secara eksplisit berhenti berlangganan. Jika konsumen stream mati (atau grain-nya dinonaktifkan) dan acara baru dihasilkan pada stream, grain konsumen secara otomatis diaktifkan kembali (sama seperti grain reguler Orleans yang secara otomatis diaktifkan ketika sebuah pesan dikirimkan kepadanya). Satu-satunya hal yang perlu dilakukan kode inti sekarang adalah menyediakan IAsyncObserver<T> untuk memproses data. Konsumen perlu melampirkan kembali logika pemrosesan sebagai bagian OnActivateAsync() dari metode . Untuk melakukan itu, ia dapat memanggil:

StreamSubscriptionHandle<int> newHandle =
    await subscriptionHandle.ResumeAsync(IAsyncObserver);

Konsumen menggunakan pegangan sebelumnya yang diperoleh selama langganan awal untuk "melanjutkan pemrosesan". Perhatikan bahwa ResumeAsync hanya memperbarui langganan yang ada dengan instans IAsyncObserver logika baru dan tidak mengubah fakta bahwa konsumen ini sudah berlangganan aliran ini.

Bagaimana konsumen mendapatkan produk lama subscriptionHandle? Terdapat dua pilihan. Konsumen mungkin telah mempertahankan handle yang dikembalikan dari operasi asli SubscribeAsync dan sekarang dapat menggunakannya. Atau, jika konsumen tidak memiliki tangkai, konsumen dapat meminta IAsyncStream<T> semua tangkai langganan aktif dengan melakukan panggilan:

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

Konsumen kemudian dapat melanjutkan semuanya atau berhenti berlangganan dari beberapa jika diinginkan.

Petunjuk / Saran

Jika butir konsumen secara langsung mengimplementasikan antarmuka IAsyncObserver<T> (public class MyGrain<T> : Grain, IAsyncObserver<T>), secara teoritis tidak perlu melampirkan kembali IAsyncObserver dan dengan demikian tidak perlu memanggil ResumeAsync. Runtime streaming harus secara otomatis mengetahui bahwa komponen sudah menerapkan IAsyncObserver dan memanggil metode-metode IAsyncObserver tersebut. Namun, runtime streaming saat ini tidak mendukung dukungan tersebut, dan kode grain masih perlu secara eksplisit memanggil ResumeAsync, bahkan jika grain mengimplementasikan IAsyncObserver secara langsung.

Langganan eksplisit dan implisit

Secara bawaan, pengguna aliran harus secara eksplisit berlangganan aliran. Langganan ini biasanya dipicu oleh pesan eksternal yang diterima grain (atau klien) yang menginstruksikannya untuk berlangganan. Misalnya, dalam layanan obrolan, ketika pengguna bergabung dengan ruang obrolan, grain mereka menerima JoinChatGroup pesan dengan nama obrolan, menyebabkan grain pengguna berlangganan aliran obrolan ini.

Selain itu, Orleans stream mendukung langganan implisit. Dalam model ini, biji-bijian tidak secara eksplisit berlangganan. Ini berlangganan secara otomatis dan implisit berdasarkan identitas butir dan ImplicitStreamSubscriptionAttribute. Nilai utama langganan implisit adalah memungkinkan aktivitas aliran untuk memicu aktivasi grain (dan juga langganan) secara otomatis. Misalnya, menggunakan rangkaian SMS, jika satu butir yang ingin menghasilkan rangkaian dan biji-bijian lain memprosesnya, produsen membutuhkan identitas butir konsumen dan melakukan panggilan ke butir yang memintanya untuk berlangganan. Barulah ia bisa mulai mengirimkan acara. Sebaliknya, dengan langganan implisit, produsen hanya dapat mulai mengirimkan peristiwa ke aliran, dan grain konsumen secara otomatis mengaktifkan dan berlangganan. Dalam hal ini, produser tidak perlu tahu siapa yang membaca peristiwa.

Implementasi MyGrainType biji-bijian dapat mendeklarasikan atribut [ImplicitStreamSubscription("MyStreamNamespace")]. Ini memberi tahu runtime streaming bahwa ketika peristiwa dihasilkan pada aliran dengan GUID identitas XXX dan namespace "MyStreamNamespace", itu harus dikirimkan ke grain dengan identitas XXX tipe MyGrainType. Artinya, peta runtime mengalir <XXX, MyStreamNamespace> ke butir <XXX, MyGrainType>konsumen .

Kehadiran ImplicitStreamSubscription menyebabkan runtime streaming secara otomatis berlangganan "grain" ini ke aliran dan mengirimkan acara streaming ke dalamnya. Namun, kode biji-bijian masih perlu memberi tahu runtime bagaimana peristiwa ingin diproses. Pada dasarnya, perlu melampirkan IAsyncObserver. Oleh karena itu, ketika grain diaktifkan, kode grain di dalam OnActivateAsync perlu memanggil:

IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

StreamId streamId =
    StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
    streamProvider.GetStream<T>(streamId);

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

IAsyncStream<T> stream =
    streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);

Menulis logika langganan

Di bawah ini adalah panduan untuk menulis logika langganan untuk berbagai kasus: langganan eksplisit dan implisit, aliran yang dapat digulung balik dan tidak dapat digulung balik. Perbedaan utama antara langganan eksplisit dan implisit adalah bahwa untuk langganan implisit, grain selalu memiliki tepat satu langganan implisit per namespace aliran. Tidak ada cara untuk membuat beberapa langganan (tidak ada perkalian langganan), tidak ada cara untuk berhenti berlangganan, dan logika grain hanya perlu melampirkan logika pemrosesan. Ini juga berarti tidak pernah ada kebutuhan untuk melanjutkan langganan implisit. Di sisi lain, untuk langganan eksplisit, Anda perlu menyambung kembali langganan; jika tidak, berlangganan lagi mengakibatkan entitas berlangganan secara berulang.

Langganan implisit:

Untuk langganan implisit, biji-bijian masih perlu berlangganan untuk melampirkan logika pemrosesan. Anda dapat melakukan ini pada grain konsumen dengan mengimplementasikan antarmuka IStreamSubscriptionObserver dan IAsyncObserver<T>, memungkinkan grain untuk diaktifkan secara terpisah dari berlangganan. Untuk berlangganan aliran, biji-bijian membuat handel dan panggilan await handle.ResumeAsync(this) dalam metodenya OnSubscribed(...) .

Untuk memproses pesan, terapkan IAsyncObserver<T>.OnNextAsync(...) metode untuk menerima data aliran dan token urutan. Atau, ResumeAsync metode ini dapat mengambil sekumpulan delegasi yang mewakili metode IAsyncObserver<T> antarmuka: onNextAsync, , onErrorAsyncdan onCompletedAsync.

public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
    _logger.LogInformation($"Received an item from the stream: {item}");
}

public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
    var handle = handleFactory.Create<string>();
    await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(
            this.GetPrimaryKey(), "MyStreamNamespace");

    await stream.SubscribeAsync(OnNextAsync);
}

Langganan eksplisit:

Untuk langganan eksplisit, grain harus memanggil SubscribeAsync untuk berlangganan aliran. Ini membuat langganan dan melampirkan logika pemrosesan. Langganan eksplisit ada sampai grain berhenti berlangganan. Jika grain dinonaktifkan dan diaktifkan kembali, grain tetap secara eksplisit berlangganan, namun tidak ada logika pemrosesan yang terhubung. Dalam hal ini, biji-bijian perlu melampirkan kembali logika pemrosesan. Untuk melakukan ini, dalam OnActivateAsync, biji-bijian pertama-tama perlu mencari tahu langganannya dengan memanggil IAsyncStream<T>.GetAllSubscriptionHandles(). Grain harus mengeksekusi ResumeAsync pada setiap handle yang ingin dilanjutkan pemrosesannya atau UnsubscribeAsync pada handle mana pun yang sudah selesai diproses. Grain juga dapat secara opsional menentukan StreamSequenceToken sebagai argumen untuk panggilan ke ResumeAsync, menyebabkan langganan eksplisit ini mulai mengonsumsi dari token tersebut.

public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
    var stream = streamProvider.GetStream<string>(streamId);

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    foreach (var handle in subscriptionHandles)
    {
       await handle.ResumeAsync(this);
    }
}
public async override Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    if (!subscriptionHandles.IsNullOrEmpty())
    {
        subscriptionHandles.ForEach(
            async x => await x.ResumeAsync(OnNextAsync));
    }
}

Streaming pesanan dan token urutan

Urutan pengiriman peristiwa antara produsen individu dan konsumen tergantung pada penyedia streaming.

Dengan SMS, produsen secara eksplisit mengontrol urutan peristiwa yang dilihat oleh konsumen dengan mengontrol cara mereka menerbitkannya. Secara bawaan (jika opsi SimpleMessageStreamProviderOptions.FireAndForgetDelivery untuk penyedia SMS adalah false) dan jika produsen menunggu setiap panggilan OnNextAsync, peristiwa akan tiba dalam urutan FIFO. Dalam SMS, produsen memutuskan cara menangani kegagalan pengiriman yang ditunjukkan oleh Task yang rusak, yang dikembalikan oleh panggilan OnNextAsync.

Aliran Antrean Azure tidak menjamin urutan FIFO karena Antrean Azure yang mendasar tidak menjamin urutan dalam situasi kegagalan (meskipun Antrean Azure menjamin urutan FIFO dalam kondisi tanpa kegagalan). Ketika produsen mengirimkan peristiwa ke dalam Antrean Azure, jika operasi antrean gagal, produsen harus mencoba antrean lain serta menangani kemungkinan pesan duplikat. Di sisi pengiriman, Orleans runtime Streaming menghapus antrean peristiwa dan mencoba mengirimkannya untuk diproses kepada konsumen. Runtime menghapus peristiwa dari antrean hanya setelah pemrosesan selesai dengan sukses. Jika pengiriman atau pemrosesan gagal, peristiwa tidak dihapus dari antrean dan secara otomatis muncul kembali nanti. Runtime Streaming mencoba mengirimkannya lagi, berpotensi melanggar pesanan FIFO. Perilaku ini cocok dengan semantik normal Azure Queues.

Urutan yang ditentukan aplikasi: Untuk menangani masalah pemesanan di atas, aplikasi Anda dapat secara opsional menentukan urutannya. Lakukan ini dengan StreamSequenceToken, sebuah objek buram IComparable yang digunakan untuk mengurutkan peristiwa. Produser dapat meneruskan opsional StreamSequenceToken ke OnNextAsync panggilan. Ini StreamSequenceToken diteruskan kepada pelanggan dan disampaikan bersama dengan acara. Dengan cara ini, aplikasi Anda dapat mempertimbangkan dan membangun ulang urutannya secara mandiri dari runtime streaming.

Aliran yang dapat digulung balik

Beberapa aliran hanya memungkinkan berlangganan mulai dari titik waktu terbaru, sementara yang lain memungkinkan "kembali ke waktu." Kemampuan ini tergantung pada teknologi antrean yang mendasar dan penyedia aliran tertentu. Misalnya, Azure Queues hanya mengizinkan konsumsi peristiwa yang baru masuk, sementara Event Hubs memungkinkan pemutaran ulang peristiwa dari titik waktu yang ditentukan (hingga batas waktu kedaluwarsa tertentu). Aliran yang mendukung kembali ke waktu yang disebut aliran yang dapat digulung balik.

Konsumen aliran yang dapat digulung balik dapat meneruskan StreamSequenceToken ke SubscribeAsync panggilan. Runtime mulai mengirimkan peristiwa dari titik tersebut StreamSequenceToken. Token null berarti konsumen ingin menerima peristiwa mulai dari yang terbaru.

Kemampuan untuk memutar balik aliran sangat berguna dalam skenario pemulihan. Misalnya, pertimbangkan butir yang berlangganan aliran dan secara berkala memeriksa statusnya bersama dengan token urutan terbaru. Saat pulih dari kegagalan, biji-bijian dapat berlangganan kembali ke aliran yang sama dari token urutan titik pemeriksaan terbaru, pulih tanpa kehilangan peristiwa apa pun yang dihasilkan sejak titik pemeriksaan terakhir.

Penyedia Azure Event Hubs dapat digulung balik. Anda dapat menemukan kodenya di GitHub: Orleans/Azure/Orleans. Streaming.EventHubs. SMS (sekarang Saluran Siaran) dan penyedia Azure Queuetidak dapat digulung balik.

Pemrosesan stateless yang diskalakan secara otomatis

Secara default, Orleans Target streaming mendukung sejumlah besar aliran yang relatif kecil, masing-masing diproses oleh satu atau beberapa butir stateful. Secara kolektif, pemrosesan semua aliran dipecah di antara banyak biji-bijian reguler (stateful). Kode aplikasi Anda mengontrol sharding ini dengan menetapkan ID stream dan ID grain serta berlangganan secara eksplisit. Tujuannya adalah pemrosesan stateful pecahan.

Namun, terdapat skenario menarik mengenai pemrosesan stateless yang bisa diskalakan secara otomatis. Dalam skenario ini, aplikasi memiliki sejumlah kecil aliran (atau bahkan satu aliran besar), dan tujuannya adalah pemrosesan tanpa status. Contohnya adalah aliran global peristiwa di mana pemrosesan melibatkan penguraian setiap peristiwa serta kemungkinan meneruskannya ke aliran lainnya untuk pemrosesan berbasis keadaan lebih lanjut. Pemrosesan aliran yang diskalakan tanpa status dapat didukung melalui OrleansStatelessWorkerAttribute biji-bijian.

Status pemrosesan stateless yang diskalakan secara otomatis saat ini: Ini belum diimplementasikan. Mencoba berlangganan aliran dari StatelessWorker butir menghasilkan perilaku yang tidak ditentukan. Kami sedang mempertimbangkan untuk mendukung opsi ini.

Biji-bijian dan Orleans klien

Orleans aliran bekerja secara seragam di seluruh biji-bijian dan Orleans klien. Itu berarti Anda dapat menggunakan API yang sama di dalam grain dan di klien Orleans untuk menghasilkan dan mengonsumsi event. Ini sangat menyederhanakan logika aplikasi, membuat API khusus sisi klien seperti Grain Observers tidak lagi diperlukan.

Pub-sub streaming yang dikelola sepenuhnya dan andal

Untuk melacak langganan streaming, Orleans menggunakan komponen runtime yang disebut Streaming Pub-Sub, yang berfungsi sebagai titik pertemuan bagi konsumen dan produsen stream. Pub-sub melacak semua langganan streaming, mempertahankannya, dan mencocokkan konsumen streaming dengan produsen streaming.

Aplikasi dapat memilih di mana dan bagaimana data Pub-Sub disimpan. Komponen Pub-Sub sendiri diimplementasikan sebagai biji-bijian (disebut PubSubRendezvousGrain), yang menggunakan Orleans persistensi deklaratif. PubSubRendezvousGrain menggunakan penyedia penyimpanan bernama PubSubStore. Seperti halnya biji-bijian apa pun, Anda dapat menunjuk implementasi untuk penyedia penyimpanan. Untuk Streaming Pub-Sub, Anda dapat mengubah implementasi PubSubStore pada waktu pembangunan silo menggunakan pembangun host silo.

Berikut ini mengonfigurasi Pub-Sub untuk menyimpan statusnya dalam tabel Azure.

hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConnectionString = "<Secret>");

Dengan cara ini, data Pub-Sub disimpan dalam Tabel Azure. Untuk pengembangan awal, Anda juga dapat menggunakan penyimpanan memori. Selain Pub-Sub, Orleans Streaming Runtime mengirimkan peristiwa dari produsen kepada konsumen, mengelola semua sumber daya runtime yang dialokasikan untuk aliran yang digunakan secara aktif, dan secara transparan mengumpulkan sumber daya runtime dari aliran yang tidak digunakan.

Konfigurasi

Untuk menggunakan aliran, Anda perlu mengaktifkan penyedia streaming melalui host silo atau penyusun klien kluster. Contoh penyiapan penyedia aliran:

hostBuilder.AddMemoryStreams("StreamProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConfigureTableServiceClient("<Secret>")))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConnectionString = "<Secret>"))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConnectionString = "<Secret>");

Lihat juga

Orleans Penyedia Aliran