Megosztás a következőn keresztül:


Útmutató: Üzenetek írása és olvasása adatfolyam-blokkból

Ez a cikk azt ismerteti, hogyan használható a Feladat párhuzamos kódtár (TPL) adatfolyamtára üzenetek írására és üzenetek olvasására egy adatfolyamblokkból. A TPL adatfolyamtár szinkron és aszinkron metódusokat is biztosít az üzenetek adatfolyamblokkba való írásához és olvasásához. Ez a cikk bemutatja, hogyan használhatja az osztályt System.Threading.Tasks.Dataflow.BufferBlock<T> . Az BufferBlock<T> osztály puffereli az üzeneteket, és üzenetforrásként és üzenetcélként is viselkedik.

Feljegyzés

A TPL-adatfolyamtár (a System.Threading.Tasks.Dataflow névtér) nincs elosztva a .NET-tel. Ha telepíteni szeretné a névteret a System.Threading.Tasks.Dataflow Visual Studióban, nyissa meg a projektet, válassza a Project menü NuGet-csomagok kezelése parancsát, és keressen rá online a System.Threading.Tasks.Dataflow csomagra. Másik lehetőségként futtassa dotnet add package System.Threading.Tasks.Dataflowa .NET Core parancssori felülettel való telepítéshez.

Szinkron írás és olvasás

Az alábbi példa a metódust használja egy Post adatfolyam-blokkba való íráshoz BufferBlock<T> , a metódus pedig ugyanabból az Receive objektumból való olvasáshoz.

var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
    bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
    Console.WriteLine(bufferBlock.Receive());
}

// Output:
//   0
//   1
//   2
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

' Output:
'   0
'   1
'   2

A metódussal TryReceive adatfolyamblokkokból is olvashat, ahogy az alábbi példában is látható. A TryReceive metódus nem blokkolja az aktuális szálat, és akkor hasznos, ha időnként adatokat kérdez le.

// Post more messages to the block.
for (int i = 0; i < 3; i++)
{
    bufferBlock.Post(i);
}

// Receive the messages back from the block.
while (bufferBlock.TryReceive(out int value))
{
    Console.WriteLine(value);
}

// Output:
//   0
//   1
//   2
' Post more messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
Dim value As Integer
Do While bufferBlock.TryReceive(value)
    Console.WriteLine(value)
Loop

' Output:
'   0
'   1
'   2

Mivel a Post metódus szinkron módon működik, az BufferBlock<T> előző példákban szereplő objektum az összes adatot megkapja, mielőtt a második ciklus beolvassa az adatokat. Az alábbi példa kiterjeszti az első példát úgy Task.WhenAll(Task[]) , hogy egyszerre olvas és ír az üzenetblokkba. Mivel WhenAll az egyidejűleg végrehajtó összes aszinkron műveletre vár, a rendszer nem írja az értékeket az BufferBlock<T> objektumba egyetlen adott sorrendben sem.

// Write to and read from the message block concurrently.
var post01 = Task.Run(() =>
{
    bufferBlock.Post(0);
    bufferBlock.Post(1);
});
var receive = Task.Run(() =>
{
    for (int i = 0; i < 3; i++)
    {
        Console.WriteLine(bufferBlock.Receive());
    }
});
var post2 = Task.Run(() =>
{
    bufferBlock.Post(2);
});

await Task.WhenAll(post01, receive, post2);

// Output:
//   0
//   1
//   2
' Write to and read from the message block concurrently.
Dim post01 = Task.Run(Sub()
                          bufferBlock.Post(0)
                          bufferBlock.Post(1)
                      End Sub)
Dim receive = Task.Run(Sub()
                           For i As Integer = 0 To 2
                               Console.WriteLine(bufferBlock.Receive())
                           Next i
                       End Sub)
Dim post2 = Task.Run(Sub() bufferBlock.Post(2))
Task.WaitAll(post01, receive, post2)

' Output:
'   0
'   1
'   2

Írás és olvasás aszinkron módon

Az alábbi példa a SendAsync metódus használatával aszinkron módon ír egy BufferBlock<T> objektumba, a ReceiveAsync metódus pedig aszinkron módon olvas ugyanabból az objektumból. Ez a példa az aszinkron és várakozási operátorokat (Async and Await in Visual Basic) használja az adatok aszinkron küldésére és a célblokkból való olvasására. Ez SendAsync a módszer akkor hasznos, ha engedélyeznie kell egy adatfolyam-blokkot az üzenetek elhalasztásához. Ez ReceiveAsync a módszer akkor hasznos, ha az adatok elérhetővé válásakor az adatokon szeretne eljárni. Az üzenetek üzenetblokkok közötti propagálásával kapcsolatos további információkért lásd az Üzenetátadás az adatfolyamban című szakaszt.

// Post more messages to the block asynchronously.
for (int i = 0; i < 3; i++)
{
    await bufferBlock.SendAsync(i);
}

// Asynchronously receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
    Console.WriteLine(await bufferBlock.ReceiveAsync());
}

// Output:
//   0
//   1
//   2
' Post more messages to the block asynchronously.
For i As Integer = 0 To 2
    await bufferBlock.SendAsync(i)
Next i

' Asynchronously receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(await bufferBlock.ReceiveAsync())
Next i

' Output:
'   0
'   1
'   2

Teljes példa

Az alábbi példa a cikk összes kódját mutatja be.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates a how to write to and read from a dataflow block.
class DataflowReadWrite
{
    // Demonstrates asynchronous dataflow operations.
    static async Task AsyncSendReceive(BufferBlock<int> bufferBlock)
    {
        // Post more messages to the block asynchronously.
        for (int i = 0; i < 3; i++)
        {
            await bufferBlock.SendAsync(i);
        }

        // Asynchronously receive the messages back from the block.
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine(await bufferBlock.ReceiveAsync());
        }

        // Output:
        //   0
        //   1
        //   2
    }

    static async Task Main()
    {
        var bufferBlock = new BufferBlock<int>();

        // Post several messages to the block.
        for (int i = 0; i < 3; i++)
        {
            bufferBlock.Post(i);
        }

        // Receive the messages back from the block.
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine(bufferBlock.Receive());
        }

        // Output:
        //   0
        //   1
        //   2

        // Post more messages to the block.
        for (int i = 0; i < 3; i++)
        {
            bufferBlock.Post(i);
        }

        // Receive the messages back from the block.
        while (bufferBlock.TryReceive(out int value))
        {
            Console.WriteLine(value);
        }

        // Output:
        //   0
        //   1
        //   2

        // Write to and read from the message block concurrently.
        var post01 = Task.Run(() =>
        {
            bufferBlock.Post(0);
            bufferBlock.Post(1);
        });
        var receive = Task.Run(() =>
        {
            for (int i = 0; i < 3; i++)
            {
                Console.WriteLine(bufferBlock.Receive());
            }
        });
        var post2 = Task.Run(() =>
        {
            bufferBlock.Post(2);
        });

        await Task.WhenAll(post01, receive, post2);

        // Output:
        //   0
        //   1
        //   2

        // Demonstrate asynchronous dataflow operations.
        await AsyncSendReceive(bufferBlock);
    }
}
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates a how to write to and read from a dataflow block.
Friend Class DataflowReadWrite
    ' Demonstrates asynchronous dataflow operations.
    Private Shared async Function AsyncSendReceive(ByVal bufferBlock As BufferBlock(Of Integer)) As Task
        ' Post more messages to the block asynchronously.
        For i As Integer = 0 To 2
            await bufferBlock.SendAsync(i)
        Next i

        ' Asynchronously receive the messages back from the block.
        For i As Integer = 0 To 2
            Console.WriteLine(await bufferBlock.ReceiveAsync())
        Next i

        ' Output:
        '   0
        '   1
        '   2
    End Function

    Shared Sub Main(ByVal args() As String)
        Dim bufferBlock = New BufferBlock(Of Integer)()

        ' Post several messages to the block.
        For i As Integer = 0 To 2
            bufferBlock.Post(i)
        Next i

        ' Receive the messages back from the block.
        For i As Integer = 0 To 2
            Console.WriteLine(bufferBlock.Receive())
        Next i

        ' Output:
        '   0
        '   1
        '   2

        ' Post more messages to the block.
        For i As Integer = 0 To 2
            bufferBlock.Post(i)
        Next i

        ' Receive the messages back from the block.
        Dim value As Integer
        Do While bufferBlock.TryReceive(value)
            Console.WriteLine(value)
        Loop

        ' Output:
        '   0
        '   1
        '   2

        ' Write to and read from the message block concurrently.
        Dim post01 = Task.Run(Sub()
                                  bufferBlock.Post(0)
                                  bufferBlock.Post(1)
                              End Sub)
        Dim receive = Task.Run(Sub()
                                   For i As Integer = 0 To 2
                                       Console.WriteLine(bufferBlock.Receive())
                                   Next i
                               End Sub)
        Dim post2 = Task.Run(Sub() bufferBlock.Post(2))
        Task.WaitAll(post01, receive, post2)

        ' Output:
        '   0
        '   1
        '   2

        ' Demonstrate asynchronous dataflow operations.
        AsyncSendReceive(bufferBlock).Wait()
    End Sub

End Class

Következő lépések

Ez a példa bemutatja, hogyan lehet közvetlenül olvasni és írni egy üzenetblokkba. Adatfolyamblokkokat olyan folyamatokhoz is csatlakoztathat, amelyek adatfolyamblokkok vagy hálózatok lineáris sorozatai, amelyek adatfolyamblokkok gráfjai. Folyamatokban vagy hálózatokban a források aszinkron módon propagálják az adatokat a célokra, amint az adatok elérhetővé válnak. Egy egyszerű adatfolyam-folyamatot létrehozó példa: Útmutató: Adatfolyam-folyamat létrehozása. Egy összetettebb adatfolyam-hálózatot létrehozó példáért lásd : Útmutató: Adatfolyam használata Windows Forms-alkalmazásokban.

Lásd még