Teilen über


Vorgehensweise: Schreiben und Lesen von Nachrichten in einem Datenflussblock

In diesem Artikel wird beschrieben, wie die Task Parallel Library-Datenflussbibliothek (TPL) verwendet wird, um Nachrichten in einen Datenflussblock zu schreiben und aus einem Datenflussblock zu lesen. Die TPL-Datenflussbibliothek bietet synchrone und asynchrone Methoden zum Schreiben von Nachrichten an einen Datenflussblock und zum Lesen von Nachrichten aus einem Datenflussblock. In diesem Artikel wird die Verwendung der System.Threading.Tasks.Dataflow.BufferBlock<T>-Klasse veranschaulicht. Die BufferBlock<T>-Klasse puffert Nachrichten und fungiert sowohl als Nachrichtenquelle als auch als Nachrichtenziel.

Hinweis

Die TPL-Datenflussbibliothek (System.Threading.Tasks.Dataflow-Namespace) wird nicht mit .NET ausgeliefert. Öffnen Sie zum Installieren des System.Threading.Tasks.Dataflow-Namespace in Visual Studio Ihr Projekt, wählen Sie im Menü Projekt die Option NuGet-Pakete verwalten aus, und suchen Sie online nach dem System.Threading.Tasks.Dataflow-Paket. Alternativ können Sie es mithilfe der .NET Core-CLI installieren und dazu dotnet add package System.Threading.Tasks.Dataflow ausführen.

Synchrones Schreiben und Lesen

Im folgenden Beispiel wird die Post-Methode zum Schreiben in einen BufferBlock<T>-Datenflussblock und die Receive-Methode zum Lesen aus demselben Objekt verwendet.

var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
    bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
    Console.WriteLine(bufferBlock.Receive());
}

// Output:
//   0
//   1
//   2
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

' Output:
'   0
'   1
'   2

Sie können auch die TryReceive-Methode zum Lesen aus einem Datenflussblock einsetzen, wie im folgenden Beispiel gezeigt. Der aktuelle Thread wird von der TryReceive-Methode nicht blockiert, und sie ist nützlich, wenn Sie gelegentlich Daten abrufen.

// Post more messages to the block.
for (int i = 0; i < 3; i++)
{
    bufferBlock.Post(i);
}

// Receive the messages back from the block.
while (bufferBlock.TryReceive(out int value))
{
    Console.WriteLine(value);
}

// Output:
//   0
//   1
//   2
' Post more messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
Dim value As Integer
Do While bufferBlock.TryReceive(value)
    Console.WriteLine(value)
Loop

' Output:
'   0
'   1
'   2

Da die Post-Methode synchron arbeitet, erhält das BufferBlock<T>-Objekt in den vorherigen Beispielen alle Daten, bevor in der zweiten Schleife Daten gelesen werden. Im folgenden Beispiel wird das erste Beispiel erweitert, indem Task.WhenAll(Task[]) verwendet wird, um gleichzeitig aus dem Nachrichtenblock zu lesen und in ihn zu schreiben. Da WhenAll alle asynchronen Vorgänge erwartet, die gleichzeitig ausgeführt werden, werden die Werte nicht in einer bestimmten Reihenfolge in das BufferBlock<T>-Objekt geschrieben.

// Write to and read from the message block concurrently.
var post01 = Task.Run(() =>
{
    bufferBlock.Post(0);
    bufferBlock.Post(1);
});
var receive = Task.Run(() =>
{
    for (int i = 0; i < 3; i++)
    {
        Console.WriteLine(bufferBlock.Receive());
    }
});
var post2 = Task.Run(() =>
{
    bufferBlock.Post(2);
});

await Task.WhenAll(post01, receive, post2);

// Output:
//   0
//   1
//   2
' Write to and read from the message block concurrently.
Dim post01 = Task.Run(Sub()
                          bufferBlock.Post(0)
                          bufferBlock.Post(1)
                      End Sub)
Dim receive = Task.Run(Sub()
                           For i As Integer = 0 To 2
                               Console.WriteLine(bufferBlock.Receive())
                           Next i
                       End Sub)
Dim post2 = Task.Run(Sub() bufferBlock.Post(2))
Task.WaitAll(post01, receive, post2)

' Output:
'   0
'   1
'   2

Asynchrones Schreiben und Lesen

Im folgenden Beispiel wird die SendAsync-Methode zum asynchronen Schreiben in ein BufferBlock<T>-Objekt und die ReceiveAsync-Methode zum asynchronen Lesen aus demselben Objekt verwendet. Dieses Beispiel verwendet die Operatoren async und await (Async und Await in Visual Basic) für asynchrones Senden von Daten an und Lesen von Daten aus dem Zielblock. Die SendAsync-Methode ist nützlich, wenn Sie einem Datenflussblock ermöglichen müssen, Nachrichten zurückzustellen. Die ReceiveAsync-Methode ist nützlich, wenn Sie Daten verarbeiten möchten, sobald diese verfügbar sind. Weitere Informationen, wie Nachrichten zwischen Nachrichtenblöcken weitergegeben werden, finden Sie im Abschnitt über die Nachrichtenübergabe in Datenfluss.

// Post more messages to the block asynchronously.
for (int i = 0; i < 3; i++)
{
    await bufferBlock.SendAsync(i);
}

// Asynchronously receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
    Console.WriteLine(await bufferBlock.ReceiveAsync());
}

// Output:
//   0
//   1
//   2
' Post more messages to the block asynchronously.
For i As Integer = 0 To 2
    await bufferBlock.SendAsync(i)
Next i

' Asynchronously receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(await bufferBlock.ReceiveAsync())
Next i

' Output:
'   0
'   1
'   2

Vollständiges Beispiel

Das folgende Beispiel zeigt den gesamten Code für diesen Artikel.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates a how to write to and read from a dataflow block.
class DataflowReadWrite
{
    // Demonstrates asynchronous dataflow operations.
    static async Task AsyncSendReceive(BufferBlock<int> bufferBlock)
    {
        // Post more messages to the block asynchronously.
        for (int i = 0; i < 3; i++)
        {
            await bufferBlock.SendAsync(i);
        }

        // Asynchronously receive the messages back from the block.
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine(await bufferBlock.ReceiveAsync());
        }

        // Output:
        //   0
        //   1
        //   2
    }

    static async Task Main()
    {
        var bufferBlock = new BufferBlock<int>();

        // Post several messages to the block.
        for (int i = 0; i < 3; i++)
        {
            bufferBlock.Post(i);
        }

        // Receive the messages back from the block.
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine(bufferBlock.Receive());
        }

        // Output:
        //   0
        //   1
        //   2

        // Post more messages to the block.
        for (int i = 0; i < 3; i++)
        {
            bufferBlock.Post(i);
        }

        // Receive the messages back from the block.
        while (bufferBlock.TryReceive(out int value))
        {
            Console.WriteLine(value);
        }

        // Output:
        //   0
        //   1
        //   2

        // Write to and read from the message block concurrently.
        var post01 = Task.Run(() =>
        {
            bufferBlock.Post(0);
            bufferBlock.Post(1);
        });
        var receive = Task.Run(() =>
        {
            for (int i = 0; i < 3; i++)
            {
                Console.WriteLine(bufferBlock.Receive());
            }
        });
        var post2 = Task.Run(() =>
        {
            bufferBlock.Post(2);
        });

        await Task.WhenAll(post01, receive, post2);

        // Output:
        //   0
        //   1
        //   2

        // Demonstrate asynchronous dataflow operations.
        await AsyncSendReceive(bufferBlock);
    }
}
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates a how to write to and read from a dataflow block.
Friend Class DataflowReadWrite
    ' Demonstrates asynchronous dataflow operations.
    Private Shared async Function AsyncSendReceive(ByVal bufferBlock As BufferBlock(Of Integer)) As Task
        ' Post more messages to the block asynchronously.
        For i As Integer = 0 To 2
            await bufferBlock.SendAsync(i)
        Next i

        ' Asynchronously receive the messages back from the block.
        For i As Integer = 0 To 2
            Console.WriteLine(await bufferBlock.ReceiveAsync())
        Next i

        ' Output:
        '   0
        '   1
        '   2
    End Function

    Shared Sub Main(ByVal args() As String)
        Dim bufferBlock = New BufferBlock(Of Integer)()

        ' Post several messages to the block.
        For i As Integer = 0 To 2
            bufferBlock.Post(i)
        Next i

        ' Receive the messages back from the block.
        For i As Integer = 0 To 2
            Console.WriteLine(bufferBlock.Receive())
        Next i

        ' Output:
        '   0
        '   1
        '   2

        ' Post more messages to the block.
        For i As Integer = 0 To 2
            bufferBlock.Post(i)
        Next i

        ' Receive the messages back from the block.
        Dim value As Integer
        Do While bufferBlock.TryReceive(value)
            Console.WriteLine(value)
        Loop

        ' Output:
        '   0
        '   1
        '   2

        ' Write to and read from the message block concurrently.
        Dim post01 = Task.Run(Sub()
                                  bufferBlock.Post(0)
                                  bufferBlock.Post(1)
                              End Sub)
        Dim receive = Task.Run(Sub()
                                   For i As Integer = 0 To 2
                                       Console.WriteLine(bufferBlock.Receive())
                                   Next i
                               End Sub)
        Dim post2 = Task.Run(Sub() bufferBlock.Post(2))
        Task.WaitAll(post01, receive, post2)

        ' Output:
        '   0
        '   1
        '   2

        ' Demonstrate asynchronous dataflow operations.
        AsyncSendReceive(bufferBlock).Wait()
    End Sub

End Class

Nächste Schritte

In diesem Beispiel wird veranschaulicht, wie direkt aus einem Nachrichtenblock gelesen und in ihn geschrieben wird. Sie können Datenflussblöcke auch so verbinden, dass sie Pipelines (lineare Sequenzen von Datenflussblöcken) oder Netzwerke (Diagramme von Datenflussblöcken) bilden. In einer Pipeline oder einem Netzwerk geben Quellen asynchron Daten an Ziele weiter, sobald diese Daten verfügbar werden. Ein Beispiel für die Erstellung einer einfachen Datenflusspipeline finden Sie unter Walkthrough: Creating a Dataflow Pipeline (Exemplarische Vorgehensweise: Erstellen einer Datenflusspipeline). Ein Beispiel für die Erstellung eines komplexeren Datenflussnetzwerks finden Sie unter Walkthrough: Using Dataflow in a Windows Forms Application (Exemplarische Vorgehensweise: Datenfluss in einer Windows Forms-Anwendung verwenden).

Weitere Informationen