Bagikan melalui


Cara: Menerapkan pola aliran data produsen-konsumen

Pada artikel ini, Anda akan mempelajari cara menggunakan pustaka aliran data TPL untuk menerapkan pola produsen-konsumen. Dalam pola ini, produsen mengirim pesan ke blok pesan, dan konsumen membaca pesan dari blok itu.

Catatan

Pustaka Aliran Data TPL (namespace layanan System.Threading.Tasks.Dataflow) tidak didistribusikan dengan .NET. Untuk menginstal namespace layanan System.Threading.Tasks.Dataflow di Visual Studio, buka proyek, pilih Kelola Paket NuGet dari menu Proyek, dan cari paket System.Threading.Tasks.Dataflow secara online. Atau, untuk menginstalnya menggunakan .NET Core CLI, jalankan dotnet add package System.Threading.Tasks.Dataflow.

Contoh

Contoh berikut menunjukkan model produsen-konsumen dasar yang menggunakan aliran data. Metode Produce menulis array yang berisi byte acak data ke objek System.Threading.Tasks.Dataflow.ITargetBlock<TInput> dan metode Consume membaca byte dari objek System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>. Dengan bertindak pada antarmuka ISourceBlock<TOutput> dan ITargetBlock<TInput>, alih-alih jenis turunannya, Anda dapat menulis kode yang dapat digunakan kembali, yang dapat bertindak pada berbagai jenis blok aliran data. Contoh ini menggunakan kelas BufferBlock<T>. Karena kelas BufferBlock<T> bertindak sebagai blok sumber dan sebagai blok target, produsen dan konsumen dapat menggunakan objek bersama untuk mentransfer data.

Metode Produce memanggil metode Post dalam perulangan untuk menulis data secara sinkron ke blok target. Setelah metode Produce menulis semua data ke blok target, metode ini memanggil metode Complete untuk menunjukkan bahwa blok tidak akan pernah memiliki data tambahan yang tersedia. Metode Consume menggunakan operator asinkron dan menunggu (Asinkron dan Menunggu dalam Visual Basic) untuk menghitung jumlah total byte yang diterima dari objek ISourceBlock<TOutput> secara asinkron. Untuk bertindak secara asinkron, metode Consume memanggil metode OutputAvailableAsync untuk menerima pemberitahuan saat blok sumber memiliki data yang tersedia dan saat blok sumber tidak akan pernah memiliki data tambahan yang tersedia.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class DataflowProducerConsumer
{
    static void Produce(ITargetBlock<byte[]> target)
    {
        var rand = new Random();

        for (int i = 0; i < 100; ++ i)
        {
            var buffer = new byte[1024];
            rand.NextBytes(buffer);
            target.Post(buffer);
        }

        target.Complete();
    }

    static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
    {
        int bytesProcessed = 0;

        while (await source.OutputAvailableAsync())
        {
            byte[] data = await source.ReceiveAsync();
            bytesProcessed += data.Length;
        }

        return bytesProcessed;
    }

    static async Task Main()
    {
        var buffer = new BufferBlock<byte[]>();
        var consumerTask = ConsumeAsync(buffer);
        Produce(buffer);

        var bytesProcessed = await consumerTask;

        Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
    }
}

// Sample  output:
//     Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

Friend Class DataflowProducerConsumer
    Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
        Dim rand As New Random()

        For i As Integer = 0 To 99
            Dim buffer(1023) As Byte
            rand.NextBytes(buffer)
            target.Post(buffer)
        Next i

        target.Complete()
    End Sub

    Private Shared Async Function ConsumeAsync(
        ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
        Dim bytesProcessed As Integer = 0

        Do While Await source.OutputAvailableAsync()
            Dim data() As Byte = Await source.ReceiveAsync()
            bytesProcessed += data.Length
        Loop

        Return bytesProcessed
    End Function

    Shared Sub Main()
        Dim buffer = New BufferBlock(Of Byte())()
        Dim consumer = ConsumeAsync(buffer)
        Produce(buffer)

        Dim result = consumer.GetAwaiter().GetResult()

        Console.WriteLine($"Processed {result:#,#} bytes.")
    End Sub
End Class

' Sample output:
'     Processed 102,400 bytes.

Pemrograman yang kuat

Contoh sebelumnya hanya menggunakan satu konsumen untuk memproses data sumber. Jika Anda memiliki beberapa konsumen dalam aplikasi, gunakan metode TryReceive untuk membaca data dari blok sumber, seperti yang ditunjukkan dalam contoh berikut.

static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
    while (await source.OutputAvailableAsync())
    {
        while (source.TryReceive(out byte[] data))
        {
            bytesProcessed += data.Length;
        }
    }
    return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
    ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
    Dim bytesProcessed As Integer = 0
    
    Do While Await source.OutputAvailableAsync()
        Dim data() As Byte
        Do While source.TryReceive(data)
            bytesProcessed += data.Length
        Loop
    Loop

    Return bytesProcessed
End Function

Metode TryReceive mengembalikan False saat tidak ada data yang tersedia. Saat beberapa konsumen harus mengakses blok sumber secara bersamaan, mekanisme ini menjamin bahwa data masih tersedia setelah panggilan ke OutputAvailableAsync.

Lihat juga