Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Pada artikel ini, Anda akan mempelajari cara menggunakan pustaka aliran data TPL untuk menerapkan pola produsen-konsumen. Dalam pola ini, produsen mengirim pesan ke blok pesan, dan konsumen membaca pesan dari blok itu.
Catatan
Pustaka Aliran Data TPL (namespace layanan System.Threading.Tasks.Dataflow) tidak didistribusikan dengan .NET. Untuk menginstal namespace layanan System.Threading.Tasks.Dataflow di Visual Studio, buka proyek, pilih Kelola Paket NuGet dari menu Proyek, dan cari paket System.Threading.Tasks.Dataflow secara online. Atau, untuk menginstalnya menggunakan .NET Core CLI, jalankan dotnet add package System.Threading.Tasks.Dataflow.
Contoh
Contoh berikut menunjukkan model produsen-konsumen dasar yang menggunakan aliran data. Metode Produce menulis array yang berisi byte acak data ke objek System.Threading.Tasks.Dataflow.ITargetBlock<TInput> dan metode Consume membaca byte dari objek System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>. Dengan bertindak pada antarmuka ISourceBlock<TOutput> dan ITargetBlock<TInput>, alih-alih jenis turunannya, Anda dapat menulis kode yang dapat digunakan kembali, yang dapat bertindak pada berbagai jenis blok aliran data. Contoh ini menggunakan kelas BufferBlock<T>. Karena kelas BufferBlock<T> bertindak sebagai blok sumber dan sebagai blok target, produsen dan konsumen dapat menggunakan objek bersama untuk mentransfer data.
Metode Produce memanggil metode Post dalam perulangan untuk menulis data secara sinkron ke blok target. Setelah metode Produce menulis semua data ke blok target, metode ini memanggil metode Complete untuk menunjukkan bahwa blok tidak akan pernah memiliki data tambahan yang tersedia. Metode Consume menggunakan operator asinkron dan menunggu (Asinkron dan Menunggu dalam Visual Basic) untuk menghitung jumlah total byte yang diterima dari objek ISourceBlock<TOutput> secara asinkron. Untuk bertindak secara asinkron, metode Consume memanggil metode OutputAvailableAsync untuk menerima pemberitahuan saat blok sumber memiliki data yang tersedia dan saat blok sumber tidak akan pernah memiliki data tambahan yang tersedia.
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class DataflowProducerConsumer
{
static void Produce(ITargetBlock<byte[]> target)
{
var rand = new Random();
for (int i = 0; i < 100; ++ i)
{
var buffer = new byte[1024];
rand.NextBytes(buffer);
target.Post(buffer);
}
target.Complete();
}
static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
byte[] data = await source.ReceiveAsync();
bytesProcessed += data.Length;
}
return bytesProcessed;
}
static async Task Main()
{
var buffer = new BufferBlock<byte[]>();
var consumerTask = ConsumeAsync(buffer);
Produce(buffer);
var bytesProcessed = await consumerTask;
Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
}
}
// Sample output:
// Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
Friend Class DataflowProducerConsumer
Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
Dim rand As New Random()
For i As Integer = 0 To 99
Dim buffer(1023) As Byte
rand.NextBytes(buffer)
target.Post(buffer)
Next i
target.Complete()
End Sub
Private Shared Async Function ConsumeAsync(
ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte = Await source.ReceiveAsync()
bytesProcessed += data.Length
Loop
Return bytesProcessed
End Function
Shared Sub Main()
Dim buffer = New BufferBlock(Of Byte())()
Dim consumer = ConsumeAsync(buffer)
Produce(buffer)
Dim result = consumer.GetAwaiter().GetResult()
Console.WriteLine($"Processed {result:#,#} bytes.")
End Sub
End Class
' Sample output:
' Processed 102,400 bytes.
Pemrograman yang kuat
Contoh sebelumnya hanya menggunakan satu konsumen untuk memproses data sumber. Jika Anda memiliki beberapa konsumen dalam aplikasi, gunakan metode TryReceive untuk membaca data dari blok sumber, seperti yang ditunjukkan dalam contoh berikut.
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
while (source.TryReceive(out byte[] data))
{
bytesProcessed += data.Length;
}
}
return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte
Do While source.TryReceive(data)
bytesProcessed += data.Length
Loop
Loop
Return bytesProcessed
End Function
Metode TryReceive mengembalikan False saat tidak ada data yang tersedia. Saat beberapa konsumen harus mengakses blok sumber secara bersamaan, mekanisme ini menjamin bahwa data masih tersedia setelah panggilan ke OutputAvailableAsync.