Cara: Menerapkan pola aliran data produsen-konsumen

Dalam artikel ini, Anda akan mempelajari cara menggunakan pustaka aliran data TPL untuk menerapkan pola produsen-konsumen. Dalam pola ini, produsen mengirim pesan ke blok pesan, dan konsumen membaca pesan dari blok tersebut.

Nota

Pustaka aliran data TPL (System.Threading.Tasks.Dataflow namespace) disertakan dalam .NET 6 dan versi yang lebih baru. Untuk proyek .NET Framework dan .NET Standard, Anda perlu menginstal 📦 paket System.Threading.Tasks.Dataflow NuGet.

Example

Contoh berikut menunjukkan model konsumen produsen dasar yang menggunakan aliran data. Metode Produce menulis array yang berisi byte data acak ke objek System.Threading.Tasks.Dataflow.ITargetBlock<TInput> dan metode Consume membaca byte dari objek System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>. Dengan bertindak pada ISourceBlock<TOutput> antarmuka dan ITargetBlock<TInput> , alih-alih jenis turunannya, Anda dapat menulis kode yang dapat digunakan kembali yang dapat bertindak pada berbagai jenis blok aliran data. Contoh ini menggunakan BufferBlock<T> kelas . BufferBlock<T> Karena kelas bertindak sebagai blok sumber dan sebagai blok target, produsen dan konsumen dapat menggunakan objek bersama untuk mentransfer data.

Metode Produce memanggil metode Post dalam perulangan untuk menulis data secara sinkron ke blok tujuan. Produce Setelah metode menulis semua data ke blok target, metode memanggil Complete untuk menunjukkan bahwa blok tidak akan pernah memiliki data tambahan yang tersedia. Metode ini Consume menggunakan operator asinkron dan tunggu (Asinkron dan Tunggu di Visual Basic) untuk menghitung jumlah total byte yang diterima dari ISourceBlock<TOutput> objek secara asinkron. Untuk bertindak secara asinkron, Consume metode memanggil OutputAvailableAsync metode untuk menerima pemberitahuan ketika blok sumber memiliki data yang tersedia dan ketika blok sumber tidak akan pernah memiliki data tambahan yang tersedia.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class DataflowProducerConsumer
{
    static void Produce(ITargetBlock<byte[]> target)
    {
        var rand = new Random();

        for (int i = 0; i < 100; ++ i)
        {
            var buffer = new byte[1024];
            rand.NextBytes(buffer);
            target.Post(buffer);
        }

        target.Complete();
    }

    static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
    {
        int bytesProcessed = 0;

        while (await source.OutputAvailableAsync())
        {
            byte[] data = await source.ReceiveAsync();
            bytesProcessed += data.Length;
        }

        return bytesProcessed;
    }

    static async Task Main()
    {
        var buffer = new BufferBlock<byte[]>();
        var consumerTask = ConsumeAsync(buffer);
        Produce(buffer);

        var bytesProcessed = await consumerTask;

        Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
    }
}

// Sample  output:
//     Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

Friend Class DataflowProducerConsumer
    Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
        Dim rand As New Random()

        For i As Integer = 0 To 99
            Dim buffer(1023) As Byte
            rand.NextBytes(buffer)
            target.Post(buffer)
        Next i

        target.Complete()
    End Sub

    Private Shared Async Function ConsumeAsync(
        ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
        Dim bytesProcessed As Integer = 0

        Do While Await source.OutputAvailableAsync()
            Dim data() As Byte = Await source.ReceiveAsync()
            bytesProcessed += data.Length
        Loop

        Return bytesProcessed
    End Function

    Shared Sub Main()
        Dim buffer = New BufferBlock(Of Byte())()
        Dim consumer = ConsumeAsync(buffer)
        Produce(buffer)

        Dim result = consumer.GetAwaiter().GetResult()

        Console.WriteLine($"Processed {result:#,#} bytes.")
    End Sub
End Class

' Sample output:
'     Processed 102,400 bytes.

Pemrograman yang kuat

Contoh sebelumnya hanya menggunakan satu konsumen untuk memproses data sumber. Jika Anda memiliki beberapa konsumen dalam aplikasi Anda, gunakan TryReceive metode untuk membaca data dari blok sumber, seperti yang ditunjukkan dalam contoh berikut.

static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
    while (await source.OutputAvailableAsync())
    {
        while (source.TryReceive(out byte[] data))
        {
            bytesProcessed += data.Length;
        }
    }
    return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
    ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
    Dim bytesProcessed As Integer = 0
    
    Do While Await source.OutputAvailableAsync()
        Dim data() As Byte
        Do While source.TryReceive(data)
            bytesProcessed += data.Length
        Loop
    Loop

    Return bytesProcessed
End Function

Metode ini TryReceive mengembalikan False ketika tidak ada data yang tersedia. Ketika beberapa konsumen harus mengakses blok sumber secara bersamaan, mekanisme ini menjamin bahwa data masih tersedia setelah panggilan ke OutputAvailableAsync.

Lihat juga