Megosztás a következőn keresztül:


Útmutató: Termelői-fogyasztói adatfolyam-minta implementálása

Ebből a cikkből megtudhatja, hogyan használhatja a TPL adatfolyam-kódtárat egy gyártó-fogyasztó minta implementálásához. Ebben a mintában a gyártó üzeneteket küld egy üzenetblokkba, és a fogyasztó ebből a blokkból olvas üzeneteket.

Feljegyzés

A TPL-adatfolyamtár (a System.Threading.Tasks.Dataflow névtér) nincs elosztva a .NET-tel. Ha telepíteni szeretné a névteret a System.Threading.Tasks.Dataflow Visual Studióban, nyissa meg a projektet, válassza a Project menü NuGet-csomagok kezelése parancsát, és keressen rá online a System.Threading.Tasks.Dataflow csomagra. Másik lehetőségként futtassa dotnet add package System.Threading.Tasks.Dataflowa .NET Core parancssori felülettel való telepítéshez.

Példa

Az alábbi példa egy adatfolyamot használó alapvető gyártó-fogyasztói modellt mutat be. A Produce metódus véletlenszerű bájtnyi adatot tartalmazó tömböket ír egy System.Threading.Tasks.Dataflow.ITargetBlock<TInput> objektumba, a Consume metódus pedig bájtokat olvas be egy System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> objektumból. A származtatott típusok helyett a ISourceBlock<TOutput> ITargetBlock<TInput> felületeken való működéssel újrahasználható kódot írhat, amely számos adatfolyamblokk-típusra képes. Ez a példa az osztályt BufferBlock<T> használja. Mivel az BufferBlock<T> osztály forrásblokkként és célblokkként is működik, a gyártó és a fogyasztó megosztott objektumot használhat az adatok átviteléhez.

A Produce metódus meghívja a Post ciklusban lévő metódust, hogy szinkron módon írjon adatokat a célblokkba. Miután a Produce metódus az összes adatot a célblokkba írja, meghívja a Complete metódust, hogy jelezze, hogy a blokk soha nem lesz elérhető további adatokkal. A Consume metódus az aszinkron és várakozási operátorokat (Async és Await in Visual Basic) használja az objektumtól ISourceBlock<TOutput> kapott bájtok teljes számának aszinkron kiszámításához. Az aszinkron működéshez a metódus meghívja a Consume OutputAvailableAsync metódust, hogy értesítést kapjon arról, ha a forrásblokk rendelkezik elérhető adatokkal, és ha a forrásblokk soha nem lesz elérhető további adatokkal.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class DataflowProducerConsumer
{
    static void Produce(ITargetBlock<byte[]> target)
    {
        var rand = new Random();

        for (int i = 0; i < 100; ++ i)
        {
            var buffer = new byte[1024];
            rand.NextBytes(buffer);
            target.Post(buffer);
        }

        target.Complete();
    }

    static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
    {
        int bytesProcessed = 0;

        while (await source.OutputAvailableAsync())
        {
            byte[] data = await source.ReceiveAsync();
            bytesProcessed += data.Length;
        }

        return bytesProcessed;
    }

    static async Task Main()
    {
        var buffer = new BufferBlock<byte[]>();
        var consumerTask = ConsumeAsync(buffer);
        Produce(buffer);

        var bytesProcessed = await consumerTask;

        Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
    }
}

// Sample  output:
//     Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

Friend Class DataflowProducerConsumer
    Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
        Dim rand As New Random()

        For i As Integer = 0 To 99
            Dim buffer(1023) As Byte
            rand.NextBytes(buffer)
            target.Post(buffer)
        Next i

        target.Complete()
    End Sub

    Private Shared Async Function ConsumeAsync(
        ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
        Dim bytesProcessed As Integer = 0

        Do While Await source.OutputAvailableAsync()
            Dim data() As Byte = Await source.ReceiveAsync()
            bytesProcessed += data.Length
        Loop

        Return bytesProcessed
    End Function

    Shared Sub Main()
        Dim buffer = New BufferBlock(Of Byte())()
        Dim consumer = ConsumeAsync(buffer)
        Produce(buffer)

        Dim result = consumer.GetAwaiter().GetResult()

        Console.WriteLine($"Processed {result:#,#} bytes.")
    End Sub
End Class

' Sample output:
'     Processed 102,400 bytes.

Robusztus programozás

Az előző példa csak egy fogyasztót használ a forrásadatok feldolgozásához. Ha több felhasználója van az alkalmazásban, a TryReceive metódus használatával olvassa be az adatokat a forrásblokkból az alábbi példában látható módon.

static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
    while (await source.OutputAvailableAsync())
    {
        while (source.TryReceive(out byte[] data))
        {
            bytesProcessed += data.Length;
        }
    }
    return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
    ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
    Dim bytesProcessed As Integer = 0
    
    Do While Await source.OutputAvailableAsync()
        Dim data() As Byte
        Do While source.TryReceive(data)
            bytesProcessed += data.Length
        Loop
    Loop

    Return bytesProcessed
End Function

A TryReceive metódus akkor ad False vissza, ha nincs elérhető adat. Ha egyszerre több felhasználónak is hozzá kell férnie a forrásblokkhoz, ez a mechanizmus garantálja, hogy az adatok a hívás után is elérhetők maradnak OutputAvailableAsync.

Lásd még