Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
Ebből a cikkből megtudhatja, hogyan használhatja a TPL adatfolyam-kódtárat egy gyártó-fogyasztó minta implementálásához. Ebben a mintában a gyártó üzeneteket küld egy üzenetblokkba, és a fogyasztó ebből a blokkból olvas üzeneteket.
Feljegyzés
A TPL-adatfolyamtár (a System.Threading.Tasks.Dataflow névtér) nincs elosztva a .NET-tel. Ha telepíteni szeretné a névteret a System.Threading.Tasks.Dataflow Visual Studióban, nyissa meg a projektet, válassza a Project menü NuGet-csomagok kezelése parancsát, és keressen rá online a System.Threading.Tasks.Dataflow csomagra. Másik lehetőségként futtassa dotnet add package System.Threading.Tasks.Dataflowa .NET Core parancssori felülettel való telepítéshez.
Példa
Az alábbi példa egy adatfolyamot használó alapvető gyártó-fogyasztói modellt mutat be. A Produce metódus véletlenszerű bájtnyi adatot tartalmazó tömböket ír egy System.Threading.Tasks.Dataflow.ITargetBlock<TInput> objektumba, a Consume metódus pedig bájtokat olvas be egy System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> objektumból. A származtatott típusok helyett a ISourceBlock<TOutput> ITargetBlock<TInput> felületeken való működéssel újrahasználható kódot írhat, amely számos adatfolyamblokk-típusra képes. Ez a példa az osztályt BufferBlock<T> használja. Mivel az BufferBlock<T> osztály forrásblokkként és célblokkként is működik, a gyártó és a fogyasztó megosztott objektumot használhat az adatok átviteléhez.
A Produce metódus meghívja a Post ciklusban lévő metódust, hogy szinkron módon írjon adatokat a célblokkba. Miután a Produce metódus az összes adatot a célblokkba írja, meghívja a Complete metódust, hogy jelezze, hogy a blokk soha nem lesz elérhető további adatokkal. A Consume metódus az aszinkron és várakozási operátorokat (Async és Await in Visual Basic) használja az objektumtól ISourceBlock<TOutput> kapott bájtok teljes számának aszinkron kiszámításához. Az aszinkron működéshez a metódus meghívja a Consume OutputAvailableAsync metódust, hogy értesítést kapjon arról, ha a forrásblokk rendelkezik elérhető adatokkal, és ha a forrásblokk soha nem lesz elérhető további adatokkal.
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class DataflowProducerConsumer
{
static void Produce(ITargetBlock<byte[]> target)
{
var rand = new Random();
for (int i = 0; i < 100; ++ i)
{
var buffer = new byte[1024];
rand.NextBytes(buffer);
target.Post(buffer);
}
target.Complete();
}
static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
byte[] data = await source.ReceiveAsync();
bytesProcessed += data.Length;
}
return bytesProcessed;
}
static async Task Main()
{
var buffer = new BufferBlock<byte[]>();
var consumerTask = ConsumeAsync(buffer);
Produce(buffer);
var bytesProcessed = await consumerTask;
Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
}
}
// Sample output:
// Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
Friend Class DataflowProducerConsumer
Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
Dim rand As New Random()
For i As Integer = 0 To 99
Dim buffer(1023) As Byte
rand.NextBytes(buffer)
target.Post(buffer)
Next i
target.Complete()
End Sub
Private Shared Async Function ConsumeAsync(
ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte = Await source.ReceiveAsync()
bytesProcessed += data.Length
Loop
Return bytesProcessed
End Function
Shared Sub Main()
Dim buffer = New BufferBlock(Of Byte())()
Dim consumer = ConsumeAsync(buffer)
Produce(buffer)
Dim result = consumer.GetAwaiter().GetResult()
Console.WriteLine($"Processed {result:#,#} bytes.")
End Sub
End Class
' Sample output:
' Processed 102,400 bytes.
Robusztus programozás
Az előző példa csak egy fogyasztót használ a forrásadatok feldolgozásához. Ha több felhasználója van az alkalmazásban, a TryReceive metódus használatával olvassa be az adatokat a forrásblokkból az alábbi példában látható módon.
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
while (source.TryReceive(out byte[] data))
{
bytesProcessed += data.Length;
}
}
return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte
Do While source.TryReceive(data)
bytesProcessed += data.Length
Loop
Loop
Return bytesProcessed
End Function
A TryReceive metódus akkor ad False vissza, ha nincs elérhető adat. Ha egyszerre több felhasználónak is hozzá kell férnie a forrásblokkhoz, ez a mechanizmus garantálja, hogy az adatok a hívás után is elérhetők maradnak OutputAvailableAsync.