Dela via


Anvisningar: Implementera ett mönster för producent-konsument-dataflöde

I den här artikeln får du lära dig hur du använder TPL-dataflödesbiblioteket för att implementera ett mönster för producent-konsument. I det här mönstret skickar producenten meddelanden till ett meddelandeblock och konsumenten läser meddelanden från det blocket.

Kommentar

TPL-dataflödesbiblioteket ( System.Threading.Tasks.Dataflow namnområdet) distribueras inte med .NET. Om du vill installera System.Threading.Tasks.Dataflow namnområdet i Visual Studio öppnar du projektet, väljer Hantera NuGet-paketProject-menyn och söker online efter System.Threading.Tasks.Dataflow paketet. Du kan också installera den med .NET Core CLI genom att köra dotnet add package System.Threading.Tasks.Dataflow.

Exempel

I följande exempel visas en grundläggande producent-konsumentmodell som använder dataflöde. Metoden Produce skriver matriser som innehåller slumpmässiga byte med data till ett System.Threading.Tasks.Dataflow.ITargetBlock<TInput> objekt och Consume metoden läser byte från ett System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> objekt. Genom att agera på gränssnitten ISourceBlock<TOutput> och ITargetBlock<TInput> , i stället för deras härledda typer, kan du skriva återanvändbar kod som kan fungera på en mängd olika typer av dataflödesblock. I det BufferBlock<T> här exemplet används klassen . BufferBlock<T> Eftersom klassen fungerar både som ett källblock och som ett målblock kan producenten och konsumenten använda ett delat objekt för att överföra data.

Metoden Produce anropar Post metoden i en loop för att synkront skriva data till målblocket. Produce När metoden har skrivit alla data till målblocket anropas Complete metoden för att indikera att blocket aldrig kommer att ha ytterligare data tillgängliga. Metoden Consume använder operatorerna async och await (Async och Await i Visual Basic) för att asynkront beräkna det totala antalet byte som tas emot från ISourceBlock<TOutput> objektet. För att agera asynkront Consume anropar OutputAvailableAsync metoden metoden för att ta emot ett meddelande när källblocket har tillgängliga data och när källblocket aldrig kommer att ha ytterligare data tillgängliga.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class DataflowProducerConsumer
{
    static void Produce(ITargetBlock<byte[]> target)
    {
        var rand = new Random();

        for (int i = 0; i < 100; ++ i)
        {
            var buffer = new byte[1024];
            rand.NextBytes(buffer);
            target.Post(buffer);
        }

        target.Complete();
    }

    static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
    {
        int bytesProcessed = 0;

        while (await source.OutputAvailableAsync())
        {
            byte[] data = await source.ReceiveAsync();
            bytesProcessed += data.Length;
        }

        return bytesProcessed;
    }

    static async Task Main()
    {
        var buffer = new BufferBlock<byte[]>();
        var consumerTask = ConsumeAsync(buffer);
        Produce(buffer);

        var bytesProcessed = await consumerTask;

        Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
    }
}

// Sample  output:
//     Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

Friend Class DataflowProducerConsumer
    Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
        Dim rand As New Random()

        For i As Integer = 0 To 99
            Dim buffer(1023) As Byte
            rand.NextBytes(buffer)
            target.Post(buffer)
        Next i

        target.Complete()
    End Sub

    Private Shared Async Function ConsumeAsync(
        ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
        Dim bytesProcessed As Integer = 0

        Do While Await source.OutputAvailableAsync()
            Dim data() As Byte = Await source.ReceiveAsync()
            bytesProcessed += data.Length
        Loop

        Return bytesProcessed
    End Function

    Shared Sub Main()
        Dim buffer = New BufferBlock(Of Byte())()
        Dim consumer = ConsumeAsync(buffer)
        Produce(buffer)

        Dim result = consumer.GetAwaiter().GetResult()

        Console.WriteLine($"Processed {result:#,#} bytes.")
    End Sub
End Class

' Sample output:
'     Processed 102,400 bytes.

Robust programmering

I föregående exempel används bara en konsument för att bearbeta källdata. Om du har flera konsumenter i ditt program använder du TryReceive metoden för att läsa data från källblocket, som du ser i följande exempel.

static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
    while (await source.OutputAvailableAsync())
    {
        while (source.TryReceive(out byte[] data))
        {
            bytesProcessed += data.Length;
        }
    }
    return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
    ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
    Dim bytesProcessed As Integer = 0
    
    Do While Await source.OutputAvailableAsync()
        Dim data() As Byte
        Do While source.TryReceive(data)
            bytesProcessed += data.Length
        Loop
    Loop

    Return bytesProcessed
End Function

Metoden TryReceive returnerar False när inga data är tillgängliga. När flera konsumenter måste komma åt källblocket samtidigt garanterar den här mekanismen att data fortfarande är tillgängliga efter anropet till OutputAvailableAsync.

Se även