Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
A feladat párhuzamos kódtára (TPL) adatfolyam-összetevőket biztosít az egyidejűség-kompatibilis alkalmazások robusztusságának növeléséhez. Ezeket az adatfolyam-összetevőket együttesen TPL-adatfolyam-kódtárnak nevezzük. Ez az adatfolyam-modell előlépteti az aktoralapú programozást azáltal, hogy folyamat közbeni üzenetátadást biztosít a durva szemcsés adatfolyamokhoz és a pipelining feladatokhoz. Az adatfolyam-összetevők a TPL típusaira és ütemezési infrastruktúrájára épülnek, és integrálhatók az aszinkron programozás C#, Visual Basic és F# nyelvi támogatásával. Ezek az adatfolyam-összetevők akkor hasznosak, ha több olyan művelettel rendelkezik, amelyeknek aszinkron módon kell kommunikálniuk egymással, vagy ha az adatok feldolgozását az elérhetővé válásukkor szeretné elvégezni. Vegyük például egy webkamera képadatait feldolgozó alkalmazást. Az adatfolyam-modell használatával az alkalmazás képes a képkeretek feldolgozására, amint elérhetővé válnak. Ha az alkalmazás javítja a képkockákat, például fénykorrekcióval vagy vörösszem-csökkentéssel, adatfolyam-összetevőkből álló csővezetéket hozhat létre. A folyamat minden szakasza durvább párhuzamosságú funkciókat használhat, például a TPL által biztosított funkciókat a kép átalakításához.
Ez a dokumentum áttekintést nyújt a TPL-adatfolyamtárról. Ismerteti a programozási modellt, az előre definiált adatfolyamblokk-típusokat, valamint azt, hogy hogyan konfigurálhatja az adatfolyamblokkokat az alkalmazások adott követelményeinek megfelelően.
Megjegyzés:
A TPL-adatfolyamtár (a System.Threading.Tasks.Dataflow névtér) nincs elosztva a .NET-tel. Ha telepíteni szeretné a System.Threading.Tasks.Dataflow névteret a Visual Studióban, nyissa meg a projektet, válassza NuGet-csomagok kezelése a Project menüjében, és keressen online a System.Threading.Tasks.Dataflow csomagra. Másik lehetőségként a .NET Core CLI használatával telepítheti, futtassa a dotnet add package System.Threading.Tasks.Dataflow.
Programozási modell
A TPL adatfolyam-kódtár az üzenetek továbbításának és párhuzamosításának alapja a nagy átviteli sebességgel és alacsony késéssel rendelkező, processzorigényes és I/O-igényes alkalmazások számára. Emellett explicit módon szabályozhatja az adatok pufferelt és a rendszeren való mozgását. Az adatfolyam-programozási modell jobb megértéséhez fontolja meg azt az alkalmazást, amely aszinkron módon betölti a lemezről származó képeket, és létrehozza a képek összetett részét. A hagyományos programozási modellek általában megkövetelik, hogy visszahívásokat és szinkronizálási objektumokat, például zárolásokat használjon a feladatok koordinálásához és a megosztott adatokhoz való hozzáféréshez. Az adatfolyam-programozási modell használatával adatfolyam-objektumokat hozhat létre, amelyek lemezről olvasás közben dolgozzák fel a képeket. Az adatfolyam-modellben deklarálja az adatok kezelésének módját, amikor az elérhetővé válik, valamint az adatok közötti függőségeket is. Mivel a futtatókörnyezet kezeli az adatok közötti függőségeket, gyakran elkerülheti a megosztott adatokhoz való hozzáférés szinkronizálásának követelményét. Emellett mivel a futtatókörnyezeti ütemezések az adatok aszinkron érkezése alapján működnek, az adatfolyam a mögöttes szálak hatékony kezelésével javíthatja a válaszképességet és az átviteli sebességet. Ha például az adatfolyam-programozási modellt használja képfeldolgozás implementálásához Egy Windows Forms-alkalmazásban, tekintse meg az útmutatót: Adatfolyam használata Windows Forms-alkalmazásban.
Források és célok
A TPL adatfolyam-kódtár adatfolyamblokkokból áll, amelyek olyan adatstruktúrák, amelyek pufferelik és dolgozzák fel az adatokat. A TPL háromféle adatfolyamblokkot határoz meg: forrásblokkokat, célblokkokat és propagátorblokkokat. A forrásblokkok adatforrásként szolgálnak, és onnan is olvashatók. A célblokk az adatok fogadójaként működik, és megírható. A propagátorblokkok forrásblokkként és célblokkként is használhatók, és beolvashatók és beírhatók. A TPL meghatározza az interfészt a System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> források megjelenítéséhez, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> a célok megjelenítéséhez és System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> a propagátorok megjelenítéséhez. IPropagatorBlock<TInput,TOutput> örökli mindkettőt ISourceBlock<TOutput>, és ITargetBlock<TInput>.
A TPL adatfolyamtár számos előre definiált adatfolyamblokktípust biztosít, amelyek implementálják a ISourceBlock<TOutput>, ITargetBlock<TInput>és IPropagatorBlock<TInput,TOutput> interfészeket. Ezeket az adatfolyamblokktípusokat ebben a dokumentumban az Előre definiált adatfolyamblokktípusok című szakaszban találja.
Blokkok csatlakoztatása
Adatfolyamblokkokat összekapcsolhat folyamatokkal, amelyek adatfolyamblokkok vagy hálózatok lineáris sorozatai, amelyek adatfolyamblokkok gráfjai. A csővezeték a hálózat egyik formája. Folyamatokban vagy hálózatokban a források aszinkron módon propagálják az adatokat a célokra, amint az adatok elérhetővé válnak. A ISourceBlock<TOutput>.LinkTo metódus egy forrás adatfolyamblokkot egy célblokkhoz kapcsol. A forrás nulla vagy több célhoz csatolható; a célok nulla vagy több forrásból kapcsolhatók össze. Adatfolyamblokkokat egyszerre adhat hozzá vagy távolíthat el egy folyamathoz vagy hálózatról. Az előre definiált adatfolyamblokk-típusok a csatolás és a leválasztás szálbiztonsági szempontjait kezelik.
Az adatfolyamblokkokat egyszerű folyamat létrehozásához csatlakoztató példa : Útmutató: Adatfolyam-folyamat létrehozása. Ha például adatfolyamblokkokat csatlakoztat egy összetettebb hálózat létrehozásához, tekintse meg az útmutatót: Adatfolyam használata Windows Forms-alkalmazásban. ** Ha egy olyan példáról van szó, amely leválaszt egy célt egy forrásról, miután a forrás üzenetet küld a célnak, olvassa el a Hogyan: Adatfolyam-blokkok leválasztása című témakört.
Szűrés
Amikor meghívja a ISourceBlock<TOutput>.LinkTo forrás célhoz csatolásának metódusát, megadhat egy meghatalmazottat, aki meghatározza, hogy a célblokk elfogadja vagy elutasítja-e az üzenetet az üzenet értéke alapján. Ez a szűrési mechanizmus hasznos módja annak, hogy az adatfolyamblokkok csak bizonyos értékeket kapnak. Az előre definiált adatfolyamblokk-típusok többségénél, ha egy forrásblokk több célblokkhoz csatlakozik, amikor egy célblokk elutasít egy üzenetet, a forrás felajánlja az üzenetet a következő célnak. A forrás határozza meg, hogy a forrás milyen sorrendben kínál üzeneteket a céloknak, és a forrás típusától függően változhat. A legtöbb forrásblokktípus leállítja az üzenet felajánlását, miután egy cél elfogadta az üzenetet. A szabály alól kivételt képez az BroadcastBlock<T> osztály, amely minden egyes üzenetet kínál az összes célnak, még akkor is, ha egyes célok elutasítják az üzenetet. Ha például szűréssel csak bizonyos üzeneteket dolgoz fel, tekintse meg az útmutatót: Adatfolyam használata Windows Forms-alkalmazásban.
Fontos
Mivel minden előre definiált forrás adatfolyamblokktípus garantálja, hogy az üzenetek a beérkezés sorrendjében propagálásra kerülnek, minden üzenetet be kell olvasni a forrásblokkból, mielőtt a forrásblokk feldolgozhatja a következő üzenetet. Ezért ha szűréssel több célt csatlakoztat egy forráshoz, győződjön meg arról, hogy legalább egy célblokk megkapja az egyes üzeneteket. Ellenkező esetben előfordulhat, hogy az alkalmazás holtpontra kerülhet.
Üzenetátadás
Az adatfolyam-programozási modell az üzenetátadás fogalmához kapcsolódik, ahol a program független összetevői üzenetek küldésével kommunikálnak egymással. Az üzenetek alkalmazásösszetevők közötti propagálásának egyik módja, ha meghívja a Post (szinkron) és SendAsync az (aszinkron) metódust, hogy üzeneteket küldjön a cél adatfolyamblokkoknak, valamint a Receiveforrásblokkokból érkező üzenetek fogadására szolgáló módszereketReceiveAsyncTryReceive. Ezeket a módszereket adatfolyam-folyamatokkal vagy hálózatokkal kombinálhatja úgy, hogy bemeneti adatokat küld a fő csomópontnak (egy célblokknak), és a kimeneti adatokat a folyamat terminálcsomópontjáról vagy a hálózat terminálcsomópontjairól (egy vagy több forrásblokk) kapja meg. A metódus használatával Choose az első olyan forrásból is olvashat, amely rendelkezik elérhető adatokkal, és műveleteket hajthat végre az adatokon.
A forrásblokkok a metódus meghívásával kínálnak adatokat a ITargetBlock<TInput>.OfferMessage célblokkok számára. A célblokk háromféleképpen válaszol egy felajánlott üzenetre: elfogadhatja az üzenetet, elutasíthatja vagy elhalaszthatja az üzenetet. Amikor a cél elfogadja az üzenetet, a OfferMessage metódus visszatér Accepted. Amikor a cél elutasítja az üzenetet, a OfferMessage metódus visszatér Declined. Ha a cél azt követeli meg, hogy a továbbiakban ne kapjon üzeneteket a forrástól, OfferMessage akkor a rendszer a következőt adja DecliningPermanentlyvissza: Az előre definiált forrásblokktípusok nem kínálnak üzeneteket a csatolt céloknak egy ilyen visszatérési érték fogadása után, és automatikusan leválasztják a kapcsolatot az ilyen célokról.
Ha egy célblokk elhalasztja az üzenetet későbbi használatra, a OfferMessage metódus visszatér Postponed. Az üzenetet elhalasztó célblokk később meghívhatja a metódust, ISourceBlock<TOutput>.ReserveMessage hogy megpróbálja lefoglalni a felajánlott üzenetet. Ezen a ponton az üzenet továbbra is elérhető, és használhatja a célblokk, vagy az üzenetet egy másik cél vette át. Ha a célblokkhoz később szükség van az üzenetre, vagy már nincs szüksége az üzenetre, meghívja a metódust vagy ISourceBlock<TOutput>.ConsumeMessage a ReleaseReservation metódust. Az üzenetfoglalást általában a nem mohó módban működő adatfolyamblokk-típusok használják. A nem mohó módot a dokumentum későbbi részében ismertetik. Halasztott üzenet lefoglalása helyett a célblokkok a ISourceBlock<TOutput>.ConsumeMessage módszer használatával is megpróbálhatják közvetlenül felhasználni az elhalasztott üzenetet.
Adatfolyam-blokk befejezése
Az adatfolyamblokkok a befejezés fogalmát is támogatják. A befejezett állapotban lévő adatfolyamblokkok nem végeznek további munkát. Minden adatfolyamblokkhoz tartozik egy kapcsolódó System.Threading.Tasks.Task objektum, más néven befejezési tevékenység, amely a blokk befejezési állapotát jelöli. Mivel megvárhatja, amíg egy Task objektum befejeződik, a befejezési feladatokkal megvárhatja, amíg egy adatfolyam-hálózat egy vagy több terminálcsomópontja befejeződik. Az IDataflowBlock interfész határozza meg a Complete metódust, amely tájékoztatja az adatfolyam-blokkot a befejezési kérelemről, valamint a Completion tulajdonságot, amely az adatfolyam-blokk befejezési feladatát adja vissza. Mindkettőt ISourceBlock<TOutput> , és ITargetBlock<TInput> örökli a IDataflowBlock felületet.
Kétféleképpen állapítható meg, hogy egy adatfolyam-blokk hiba nélkül fejeződött-e be, egy vagy több hibát észlelt vagy megszakított. Az első módszer az, hogy meghívja a Task.Wait metódust a befejezési feladatban egytry-catch blokkban (Try-Catcha Visual Basicben). Az alábbi példa létrehoz egy ActionBlock<TInput> objektumot, amely akkor dob, ArgumentOutOfRangeException ha a bemeneti értéke kisebb, mint nulla.
AggregateException akkor jelenik meg, amikor ez a példa meghívja a Wait metódust a befejezési feladaton. A ArgumentOutOfRangeException hozzáférés az InnerExceptions objektum tulajdonságán AggregateException keresztül történik.
// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
Console.WriteLine($"n = {n}");
if (n < 0)
{
throw new ArgumentOutOfRangeException();
}
});
// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();
// Wait for completion in a try/catch block.
try
{
throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
// If an unhandled exception occurs during dataflow processing, all
// exceptions are propagated through an AggregateException object.
ae.Handle(e =>
{
Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
return true;
});
}
/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
Console.WriteLine("n = {0}", n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
End Sub)
' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()
' Wait for completion in a try/catch block.
Try
throwIfNegative.Completion.Wait()
Catch ae As AggregateException
' If an unhandled exception occurs during dataflow processing, all
' exceptions are propagated through an AggregateException object.
ae.Handle(Function(e)
Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
Return True
End Function)
End Try
' Output:
' n = 0
' n = -1
' Encountered ArgumentOutOfRangeException: Specified argument was out of the range
' of valid values.
'
Ez a példa azt az esetet mutatja be, amikor egy végrehajtási adatfolyam-blokk delegáltja nem kezel kivételt. Javasoljuk, hogy kezelje az ilyen blokkokban található kivételeket. Ha azonban nem tudja ezt megtenni, a blokk úgy viselkedik, mintha megszakították volna, és nem dolgozza fel a bejövő üzeneteket.
Ha egy adatfolyam-blokkot explicit módon törölnek, az AggregateException objektum OperationCanceledException-et tartalmaz a InnerExceptions tulajdonságban. Az adatfolyam-lemondással kapcsolatos további információkért lásd a Lemondás engedélyezése szakaszt .
Az adatfolyamblokkok befejezési állapotának meghatározásának második módja a befejezési tevékenység folytatása, vagy a C# és a Visual Basic aszinkron nyelvi funkcióinak használata aszinkron módon a befejezési tevékenységre való várakozáshoz. A metódushoz Task.ContinueWith megadott delegált egy Task objektumot vesz fel, amely az előzményfeladatot jelöli. A Completion tulajdonság esetében a folytatásért felelős delegált maga foglalkozik a befejezési feladattal. Az alábbi példa az előzőhöz hasonlít, azzal a kivételrel, hogy a ContinueWith metódussal létrehoz egy folytatási feladatot, amely a teljes adatfolyam-művelet állapotát nyomtatja.
// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
Console.WriteLine($"n = {n}");
if (n < 0)
{
throw new ArgumentOutOfRangeException();
}
});
// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
Console.WriteLine($"The status of the completion task is '{task.Status}'.");
});
// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();
// Wait for completion in a try/catch block.
try
{
throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
// If an unhandled exception occurs during dataflow processing, all
// exceptions are propagated through an AggregateException object.
ae.Handle(e =>
{
Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
return true;
});
}
/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
Console.WriteLine("n = {0}", n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
End Sub)
' Create a continuation task that prints the overall
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))
' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()
' Wait for completion in a try/catch block.
Try
throwIfNegative.Completion.Wait()
Catch ae As AggregateException
' If an unhandled exception occurs during dataflow processing, all
' exceptions are propagated through an AggregateException object.
ae.Handle(Function(e)
Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
Return True
End Function)
End Try
' Output:
' n = 0
' n = -1
' The status of the completion task is 'Faulted'.
' Encountered ArgumentOutOfRangeException: Specified argument was out of the range
' of valid values.
'
Az adatfolyamblokkok befejezési állapotával kapcsolatos további információk meghatározásához olyan tulajdonságokat is használhat, mint IsCanceled például a folytatási feladat törzsében. A folytatási feladatokról, valamint a lemondással és a hibakezeléssel kapcsolatos további információkért tekintse meg a Tevékenységek láncolása a folytatási feladatok, a tevékenységtörlés és a kivételkezelés használatával című témakört.
Előre definiált adatfolyamblokktípusok
A TPL adatfolyamtár számos előre definiált adatfolyamblokktípust biztosít. Ezek a típusok három kategóriába sorolhatók: pufferelési blokkok, végrehajtási blokkok és csoportosítási blokkok. A következő szakaszok azokat a blokktípusokat ismertetik, amelyek ezeket a kategóriákat alkotják.
Pufferelési blokkok
A pufferelési blokkok adatokat tárolnak az adatfelhasználók számára felhasználásra. A TPL adatfolyamtár három pufferelési blokktípust biztosít: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>és System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.
BufferBlock<T>
Az BufferBlock<T> osztály egy általános célú aszinkron üzenetkezelési struktúrát jelöl. Ez az osztály egy elsőként be, elsőként ki (FIFO) üzenetsort tárol azokból az üzenetekből, amelyek több forrásból írhatók, vagy több cél által olvashatók. Amikor egy cél üzenetet kap egy BufferBlock<T> objektumtól, az üzenet el lesz távolítva az üzenetsorból. Ezért bár egy BufferBlock<T> objektum több célhoz is tartozhat, minden egyes üzenetet csak egy cél fog megkapni. Az BufferBlock<T> osztály akkor hasznos, ha több üzenetet szeretne átadni egy másik összetevőnek, és az összetevőnek minden egyes üzenetet fogadnia kell.
Az alábbi egyszerű példa több Int32 értéket is feljegyz egy BufferBlock<T> objektumra, majd visszaolvassa azokat az objektumból.
// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
/* Output:
0
1
2
*/
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()
' Post several messages to the block.
For i As Integer = 0 To 2
bufferBlock.Post(i)
Next i
' Receive the messages back from the block.
For i As Integer = 0 To 2
Console.WriteLine(bufferBlock.Receive())
Next i
' Output:
' 0
' 1
' 2
'
Egy teljes példát, amely bemutatja, hogyan írhat üzeneteket egy objektumba, és hogyan olvashat üzeneteket egy BufferBlock<T> objektumból, olvassa el az Üzenetek írása és olvasása adatfolyamblokkból című témakört.
BroadcastBlock<T>
Az BroadcastBlock<T> osztály akkor hasznos, ha több üzenetet kell átadnia egy másik összetevőnek, de az összetevőnek csak a legújabb értékre van szüksége. Ez az osztály akkor is hasznos, ha több összetevőnek szeretne üzenetet küldeni.
Az alábbi alapszintű példa egy Double értéket ad meg egy BroadcastBlock<T> objektumnak, majd többször visszaolvassa az értéket az adott objektumból. Mivel az értékek olvasás után nem törlődnek az objektumokból BroadcastBlock<T> , minden alkalommal ugyanaz az érték érhető el.
// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);
// Post a message to the block.
broadcastBlock.Post(Math.PI);
// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(broadcastBlock.Receive());
}
/* Output:
3.14159265358979
3.14159265358979
3.14159265358979
*/
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)
' Post a message to the block.
broadcastBlock.Post(Math.PI)
' Receive the messages back from the block several times.
For i As Integer = 0 To 2
Console.WriteLine(broadcastBlock.Receive())
Next i
' Output:
' 3.14159265358979
' 3.14159265358979
' 3.14159265358979
'
Egy teljes példa, amely bemutatja, hogyan továbbíthat BroadcastBlock<T> egy üzenetet több célblokkba: Útmutató: Feladatütemező megadása adatfolyamblokkban.
WriteOnceBlock<T>
A WriteOnceBlock<T> osztály hasonlít a BroadcastBlock<T> osztályhoz, kivéve, hogy egy WriteOnceBlock<T> objektum csak egyszer írható be. Képzelje el WriteOnceBlock<T>, hogy a C# readonly (ReadOnly Visual Basicban) kulcsszóhoz hasonló, azzal a kivétellel, hogy egy WriteOnceBlock<T> objektum nem módosíthatóvá válik, miután egy értéket kap, nem pedig a létrehozáskor. Az osztályhoz BroadcastBlock<T> hasonlóan, amikor egy cél egy objektumtól WriteOnceBlock<T> kap üzenetet, az üzenet nem lesz eltávolítva az adott objektumból. Ezért több cél kap egy másolatot az üzenetről. Az WriteOnceBlock<T> osztály akkor hasznos, ha több üzenet közül csak az elsőt szeretné propagálni.
Az alábbi egyszerű példa több String értéket is feljegyz egy WriteOnceBlock<T> objektumra, majd visszaolvassa az értéket az objektumból. Mivel egy WriteOnceBlock<T> objektum csak egyszer írható, miután egy WriteOnceBlock<T> objektum üzenetet kap, elveti a következő üzeneteket.
// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);
// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
() => writeOnceBlock.Post("Message 1"),
() => writeOnceBlock.Post("Message 2"),
() => writeOnceBlock.Post("Message 3"));
// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());
/* Sample output:
Message 2
*/
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)
' Post several messages to the block in parallel. The first
' message to be received is written to the block.
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))
' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())
' Sample output:
' Message 2
'
Egy teljes példa, amely bemutatja, hogyan használható WriteOnceBlock<T> az első befejezett művelet értékének fogadására: Útmutató: Adatfolyamblokkok leválasztása.
Végrehajtási blokkok
A végrehajtási blokkok minden fogadott adathoz meghívnak egy felhasználó által megadott meghatalmazottat. A TPL adatfolyamtár három végrehajtási blokktípust biztosít: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>és System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.
ActionBlock<T>
Az ActionBlock<TInput> osztály egy célblokk, amely meghív egy meghatalmazottat, amikor adatokat fogad. Gondoljon egy ActionBlock<TInput> objektumra delegáltként, amely aszinkron módon fut, amikor az adatok elérhetővé válnak. A(z) ActionBlock<TInput> objektumnak megadott delegált Action<T> típusú vagy System.Func<TInput, Task> típusú lehet. Amikor egy ActionBlock<TInput> objektumot használ a Action<T>-val, a rendszer befejezettnek tekinti az egyes bemeneti elemek feldolgozását, amikor a delegált visszatér. Ha ActionBlock<TInput> objektumot System.Func<TInput, Task> használ, az egyes bemeneti elemek feldolgozása csak akkor tekinthető befejezettnek, ha a visszaadott Task objektum befejeződött. E két mechanizmus használatával az egyes bemeneti elemek szinkron és aszinkron feldolgozását is használhatja ActionBlock<TInput> .
Az alábbi egyszerű példa több Int32 értéket ad hozzá egy ActionBlock<TInput> objektumhoz. Az ActionBlock<TInput> objektum ezeket az értékeket a konzolon nyomtatja ki. Ez a példa ezután befejezett állapotra állítja a blokkot, és megvárja, amíg az összes adatfolyam-tevékenység befejeződik.
// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
actionBlock.Post(i * 10);
}
// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();
/* Output:
0
10
20
*/
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))
' Post several messages to the block.
For i As Integer = 0 To 2
actionBlock.Post(i * 10)
Next i
' Set the block to the completed state and wait for all
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()
' Output:
' 0
' 10
' 20
'
A ActionBlock<TInput> osztállyal való delegáltak használatát bemutató teljes példákért lásd Művelet végrehajtása, amikor egy adatfolyamblokk adatokat fogad.
TransformBlock<TInput, TOutput>
Az TransformBlock<TInput,TOutput> osztály hasonlít a ActionBlock<TInput> osztályhoz, azzal a különbséggel, hogy forrásként és célként is működik. A TransformBlock<TInput,TOutput> objektumnak átadott delegált értéket ad vissza, amely TOutput típusú. A(z) TransformBlock<TInput,TOutput> objektumhoz megadott delegált lehet System.Func<TInput, TOutput> vagy System.Func<TInput, Task<TOutput>> típusú. Amikor a TransformBlock<TInput,TOutput> objektumot a System.Func<TInput, TOutput> taggal együtt használ, a feldolgozás minden bemeneti elem esetén befejezettnek tekintendő, amikor a delegált visszatér. Amikor egy TransformBlock<TInput,TOutput> objektumot System.Func<TInput, Task<TOutput>>használ, az egyes bemeneti elemek feldolgozása csak akkor tekinthető befejezettnek, ha a visszaadott Task<TResult> objektum befejeződött.
ActionBlock<TInput>A két mechanizmushoz hasonlóan az egyes bemeneti elemek szinkron és aszinkron feldolgozására is használhatóTransformBlock<TInput,TOutput>.
Az alábbi egyszerű példa létrehoz egy TransformBlock<TInput,TOutput> objektumot, amely kiszámítja a bemenet négyzetgyökét. Az TransformBlock<TInput,TOutput> objektum bemenetként veszi fel Int32 az értékeket, és kimenetként állítja elő Double az értékeket.
// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));
// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);
// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(transformBlock.Receive());
}
/* Output:
3.16227766016838
4.47213595499958
5.47722557505166
*/
' Create a TransformBlock<int, double> object that
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))
' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)
' Read the output messages from the block.
For i As Integer = 0 To 2
Console.WriteLine(transformBlock.Receive())
Next i
' Output:
' 3.16227766016838
' 4.47213595499958
' 5.47722557505166
'
A Windows Forms-alkalmazásokban képfeldolgozást végző adatfolyamblokkok hálózatában használt TransformBlock<TInput,TOutput> teljes példákért lásd: Útmutató: Adatfolyam használata Windows Forms-alkalmazásokban.
TransformManyBlock<TInput, TOutput>
Az TransformManyBlock<TInput,TOutput> osztály az osztályhoz hasonlít, azzal a TransformBlock<TInput,TOutput> kivétellel, hogy TransformManyBlock<TInput,TOutput> minden bemeneti értékhez nulla vagy több kimeneti értéket állít elő, nem pedig egyetlen kimeneti értéket minden bemeneti értékhez. A(z) TransformManyBlock<TInput,TOutput> objektumhoz megadott delegált lehet System.Func<TInput, IEnumerable<TOutput>> vagy System.Func<TInput, Task<IEnumerable<TOutput>>> típusú. Amikor a TransformManyBlock<TInput,TOutput> objektumot a System.Func<TInput, IEnumerable<TOutput>> taggal együtt használ, a feldolgozás minden bemeneti elem esetén befejezettnek tekintendő, amikor a delegált visszatér. Ha használja a TransformManyBlock<TInput,TOutput> objektumot a System.Func<TInput, Task<IEnumerable<TOutput>>>-val, az egyes bemeneti elemek feldolgozása csak akkor tekinthető befejezettnek, ha a visszaadott System.Threading.Tasks.Task<IEnumerable<TOutput>> objektum befejeződött.
Az alábbi egyszerű példa egy TransformManyBlock<TInput,TOutput> objektumot hoz létre, amely sztringeket oszt fel az egyes karaktersorozatokra. Az TransformManyBlock<TInput,TOutput> objektum bemenetként veszi fel String az értékeket, és kimenetként állítja elő Char az értékeket.
// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
s => s.ToCharArray());
// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");
// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
Console.WriteLine(transformManyBlock.Receive());
}
/* Output:
H
e
l
l
o
W
o
r
l
d
*/
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())
' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")
' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
Console.WriteLine(transformManyBlock.Receive())
Next i
' Output:
' H
' e
' l
' l
' o
' W
' o
' r
' l
' d
'
Az adatfolyam-folyamat minden bemenetéhez több független kimenet előállítására használható TransformManyBlock<TInput,TOutput> teljes példákért lásd : Útmutató: Adatfolyam-folyamat létrehozása.
A párhuzamosság foka
Minden ActionBlock<TInput>, TransformBlock<TInput,TOutput>és TransformManyBlock<TInput,TOutput> objektum puffereli a bemeneti üzeneteket, amíg a blokk készen nem áll a feldolgozásukra. Alapértelmezés szerint ezek az osztályok az üzeneteket a beérkezés sorrendjében dolgozzák fel, egyszerre egy üzenetet. Megadhatja azt is, hogy a párhuzamosság milyen mértékben legyen beállítva, így az ActionBlock<TInput>, TransformBlock<TInput,TOutput> és TransformManyBlock<TInput,TOutput> objektumok egyszerre több üzenetet tudnak feldolgozni. Az egyidejű végrehajtással kapcsolatos további információkért tekintse meg a dokumentum későbbi, párhuzamossági fokát meghatározó szakaszt. Egy példa, amely beállítja a párhuzamosság mértékét, hogy egy adatfolyam-blokk egyszerre több üzenetet dolgozhasson fel, lásd: Hogyan: A párhuzamosság fokának megadása egy adatfolyam-blokkban.
Delegálttípusok összegzése
Az alábbi táblázat összefoglalja azokat a delegált típusokat, amelyeket megadhat a ActionBlock<TInput>, TransformBlock<TInput,TOutput>, és TransformManyBlock<TInput,TOutput> objektumoknak. Ez a tábla azt is meghatározza, hogy a delegált típusa szinkronban vagy aszinkron módon működik-e.
| Típus | Szinkron meghatalmazott típusa | Aszinkron delegálás típusa |
|---|---|---|
| ActionBlock<TInput> | System.Action |
System.Func<TInput, Task> |
| TransformBlock<TInput,TOutput> | System.Func<TInput, TOutput> |
System.Func<TInput, Task<TOutput>> |
| TransformManyBlock<TInput,TOutput> | System.Func<TInput, IEnumerable<TOutput>> |
System.Func<TInput, Task<IEnumerable<TOutput>>> |
Lambda-kifejezéseket akkor is használhat, ha végrehajtási blokktípusokkal dolgozik. Egy példa, amely bemutatja, hogyan használható egy lambda kifejezés végrehajtási blokkokkal, lásd : Művelet végrehajtása, amikor egy adatfolyam-blokk adatokat fogad.
Csoportosítási blokkok
A csoportosítási blokkok egy vagy több forrásból származó adatokat kombinálnak különböző korlátozások mellett. A TPL adatfolyamtár három illesztési blokktípust biztosít: BatchBlock<T>, JoinBlock<T1,T2>és BatchedJoinBlock<T1,T2>.
BatchBlock<T>
Az BatchBlock<T> osztály kimeneti adatok tömbjeibe egyesíti a bemeneti adatok készleteit, amelyeket kötegeknek nevezünk. Objektum létrehozásakor BatchBlock<T> meg kell adnia az egyes kötegek méretét. Amikor az BatchBlock<T> objektum megkapja a bemeneti elemek megadott számát, aszinkron módon propagálja az elemeket tartalmazó tömböt. Ha egy BatchBlock<T> objektum befejezett állapotra van állítva, de nem tartalmaz elegendő elemet a köteg létrehozásához, propagálja a fennmaradó bemeneti elemeket tartalmazó végső tömböt.
Az BatchBlock<T> osztály kapzsi vagy nem kapzsi módban működik. Kapzsi módban, amely az alapértelmezett, egy BatchBlock<T> objektum minden olyan üzenetet elfogad, amelyet felajánl, és propagálja a tömböt, miután megkapja a megadott számú elemet. Nem kapzsi módban egy BatchBlock<T> objektum elhalasztja az összes bejövő üzenetet, amíg elegendő forrás nem ajánlott fel üzeneteket a blokknak köteg létrehozásához. A mohó mód általában jobb teljesítményt nyújt, mint a nem kapzsi mód, mivel kevesebb feldolgozási többletterhelést igényel. Azonban használhatja a nem kapzsi üzemmódot, ha atomi módon kell koordinálnia a több forrásból származó fogyasztást. Adja meg a nem mohó módot azzal, hogy a Greedy konstruktor False paraméterén belül a dataflowBlockOptions értéket BatchBlock<T>-re állítja.
Az alábbi egyszerű példa több Int32 értéket is közzéten egy BatchBlock<T> olyan objektumra, amely egy köteg tíz elemét tartalmazza. Annak biztosítása érdekében, hogy az összes érték ki legyenek propagálva a BatchBlock<T>-ból, ez a példa meghívja a(z) Complete metódust. A Complete metódus befejezett állapotra állítja az BatchBlock<T> objektumot, ezért az objektum végső kötegként propagálja a BatchBlock<T> fennmaradó elemeket.
// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);
// Post several values to the block.
for (int i = 0; i < 13; i++)
{
batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();
// Print the sum of both batches.
Console.WriteLine($"The sum of the elements in batch 1 is {batchBlock.Receive().Sum()}.");
Console.WriteLine($"The sum of the elements in batch 2 is {batchBlock.Receive().Sum()}.");
/* Output:
The sum of the elements in batch 1 is 45.
The sum of the elements in batch 2 is 33.
*/
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)
' Post several values to the block.
For i As Integer = 0 To 12
batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()
' Print the sum of both batches.
Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())
Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())
' Output:
' The sum of the elements in batch 1 is 45.
' The sum of the elements in batch 2 is 33.
'
Az adatbázis-beszúrási műveletek hatékonyságának javítására szolgáló BatchBlock<T> teljes példáért lásd : Útmutató: A BatchBlock és a BatchedJoinBlock használata a hatékonyság javítása érdekében.
JoinBlock<T1, T2, ...>
Az JoinBlock<T1,T2> és JoinBlock<T1,T2,T3> osztályok bemeneti elemeket gyűjtenek, és propagálják a System.Tuple<T1,T2> vagy System.Tuple<T1,T2,T3> objektumokat, amelyek azokat az elemeket tartalmazzák. Az
BatchBlock<T>, JoinBlock<T1,T2> és JoinBlock<T1,T2,T3> kapzsi vagy nem kapzsi módban működnek. Kapzsi módban, amely az alapértelmezett, egy JoinBlock<T1,T2> vagy JoinBlock<T1,T2,T3> objektum minden felkínált üzenetet elfogad, és egy tömböt propagál, miután minden egyes célja kapott legalább egy üzenetet. Nem kapzsi módban egy JoinBlock<T1,T2> vagy JoinBlock<T1,T2,T3> objektum elhalasztja az összes bejövő üzenetet, amíg az összes célnak meg nem adták a többszörös létrehozásához szükséges adatokat. Ezen a ponton a blokk egy kétfázisú véglegesítési protokollt alkalmaz, amely atomilag lekéri az összes szükséges elemet a forrásokból. Ez a halasztás lehetővé teszi, hogy egy másik entitás addig is felhasználja az adatokat, hogy a teljes rendszer előrehaladjon.
Az alábbi egyszerű példa egy olyan esetet mutat be, amelyben egy JoinBlock<T1,T2,T3> objektum több adatot igényel egy érték kiszámításához. Ez a példa létrehoz egy JoinBlock<T1,T2,T3> objektumot, amely két Int32 értéket és egy Char értéket igényel egy számtani művelet végrehajtásához.
// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();
// Post two values to each target of the join.
joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);
joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);
joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');
// Receive each group of values and apply the operator part
// to the number parts.
for (int i = 0; i < 2; i++)
{
var data = joinBlock.Receive();
switch (data.Item3)
{
case '+':
Console.WriteLine($"{data.Item1} + {data.Item2} = {data.Item1 + data.Item2}");
break;
case '-':
Console.WriteLine($"{data.Item1} - {data.Item2} = {data.Item1 - data.Item2}");
break;
default:
Console.WriteLine($"Unknown operator '{data.Item3}'.");
break;
}
}
/* Output:
3 + 5 = 8
6 - 4 = 2
*/
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()
' Post two values to each target of the join.
joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)
joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)
joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)
' Receive each group of values and apply the operator part
' to the number parts.
For i As Integer = 0 To 1
Dim data = joinBlock.Receive()
Select Case data.Item3
Case "+"c
Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
Case "-"c
Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
Case Else
Console.WriteLine("Unknown operator '{0}'.", data.Item3)
End Select
Next i
' Output:
' 3 + 5 = 8
' 6 - 4 = 2
'
Egy olyan teljes példáért, amely nem mohó módban használ JoinBlock<T1,T2> objektumokat egy erőforrás együttműködési megosztásához, olvassa el a How to: Use JoinBlock to Read Data From Multiple Sources (A JoinBlock használata több forrásból származó adatok olvasásához) című témakört.
BatchedJoinBlock<T1, T2, ...>
Az BatchedJoinBlock<T1,T2> és BatchedJoinBlock<T1,T2,T3> osztályok bemeneti elemek kötegeit gyűjtik össze, és propagálják az azokat az elemeket tartalmazó System.Tuple(IList(T1), IList(T2)) vagy System.Tuple(IList(T1), IList(T2), IList(T3)) objektumokat. Gondolj BatchedJoinBlock<T1,T2>-ra, mint BatchBlock<T> és JoinBlock<T1,T2> kombinációjára. Adja meg az egyes kötegek méretét egy BatchedJoinBlock<T1,T2> objektum létrehozásakor.
BatchedJoinBlock<T1,T2> tulajdonságokat biztosít, mint például Target1 és Target2, amelyek ITargetBlock<TInput>-t valósítanak meg. Ha a bemeneti elemek megadott száma az összes célból érkezik, az BatchedJoinBlock<T1,T2> objektum aszinkron módon propagálja System.Tuple(IList(T1), IList(T2)) az elemeket tartalmazó objektumot.
Az alábbi egyszerű példa egy BatchedJoinBlock<T1,T2> objektumot hoz létre, amely eredményeket, Int32 értékeket és hibákat tárol, amelyek mind Exception objektumok. Ez a példa több műveletet hajt végre, és az eredményeket az Target1 tulajdonságra, a hibákat pedig a Target2 tulajdonságra írja, a BatchedJoinBlock<T1,T2> objektum esetében. Mivel a sikeres és sikertelen műveletek száma előre ismeretlen, az objektumok lehetővé teszik, hogy minden IList<T> cél nulla vagy több értéket fogadjon.
// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
if (n < 0)
throw new ArgumentOutOfRangeException();
return n;
};
// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);
// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
try
{
// Post the result of the worker to the
// first target of the block.
batchedJoinBlock.Target1.Post(DoWork(i));
}
catch (ArgumentOutOfRangeException e)
{
// If an error occurred, post the Exception to the
// second target of the block.
batchedJoinBlock.Target2.Post(e);
}
}
// Read the results from the block.
var results = batchedJoinBlock.Receive();
// Print the results to the console.
// Print the results.
foreach (int n in results.Item1)
{
Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
Console.WriteLine(e.Message);
}
/* Output:
5
6
13
55
0
Specified argument was out of the range of valid values.
Specified argument was out of the range of valid values.
*/
' For demonstration, create a Func<int, int> that
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
Return n
End Function
' Create a BatchedJoinBlock<int, Exception> object that holds
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)
' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
Try
' Post the result of the worker to the
' first target of the block.
batchedJoinBlock.Target1.Post(DoWork(i))
Catch e As ArgumentOutOfRangeException
' If an error occurred, post the Exception to the
' second target of the block.
batchedJoinBlock.Target2.Post(e)
End Try
Next i
' Read the results from the block.
Dim results = batchedJoinBlock.Receive()
' Print the results to the console.
' Print the results.
For Each n As Integer In results.Item1
Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
Console.WriteLine(e.Message)
Next e
' Output:
' 5
' 6
' 13
' 55
' 0
' Specified argument was out of the range of valid values.
' Specified argument was out of the range of valid values.
'
Egy teljes példáért, amely az eredmények és a program adatbázisból való olvasása során előforduló kivételek rögzítésére is használható BatchedJoinBlock<T1,T2> , tekintse meg az útmutatót: A BatchBlock és a BatchedJoinBlock használata a hatékonyság javítása érdekében.
Adatfolyam-blokk viselkedésének konfigurálása
További beállítások engedélyezéséhez adjon meg egy objektumot System.Threading.Tasks.Dataflow.DataflowBlockOptions az adatfolyamblokk-típusok konstruktorának. Ezek a beállítások szabályozzák az olyan viselkedést, mint a mögöttes feladatot kezelő ütemező és a párhuzamosság foka. A DataflowBlockOptions származtatott típusok olyan viselkedést határoznak meg, amely bizonyos adatfolyamblokk-típusokra jellemző. Az alábbi táblázat összefoglalja, hogy mely beállítástípusok vannak társítva az egyes adatfolyam-blokktípusokhoz.
Az alábbi szakaszok további információt nyújtanak az adatfolyam-blokkok azon fontos beállításairól, amelyek az System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions és System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions osztályokon keresztül érhetők el.
A Feladatütemező megadása
Minden előre definiált adatfolyam-blokk a TPL feladatütemezési mechanizmust használja olyan tevékenységek végrehajtására, mint az adatok célba való propagálása, a forrásból származó adatok fogadása és a felhasználó által definiált meghatalmazottak futtatása, amikor az adatok elérhetővé válnak. TaskScheduler egy absztrakt osztály, amely egy tevékenységütemezőt jelöl, amely a tevékenységeket szálakra várólistára állítja. Az alapértelmezett feladatütemező, a Default, a ThreadPool osztályt használja a munka várólistára állítására és végrehajtására. Az alapértelmezett feladatütemező felülbírálható úgy, hogy egy adatfolyamblokk-objektum létrehozásakor beállítja a TaskScheduler tulajdonságot.
Ha ugyanaz a feladatütemező több adatfolyamblokkot kezel, a szabályzatok kényszeríthetők rajtuk. Ha például több adatfolyam-blokk van konfigurálva ugyanannak ConcurrentExclusiveSchedulerPair az objektumnak a kizárólagos ütemezőjének megcélzására, az ezeken a blokkokon futó összes munka szerializálva lesz. Hasonlóképpen, ha ezek a blokkok ugyanazon ConcurrentExclusiveSchedulerPair objektum egyidejű ütemezőjének megcélzására vannak konfigurálva, és az ütemező úgy van konfigurálva, hogy maximális egyidejűségi szinttel rendelkezzen, az ezekből a blokkokból származó összes munka csak ennyi egyidejű műveletre korlátozódik. Például, ha az ConcurrentExclusiveSchedulerPair osztályt használja az olvasási műveletek párhuzamos végrehajtására, de az írási műveletek végrehajtását kizárólagosan, az összes többi művelettől elkülönítve kívánja végrehajtani, lásd a How to: Specify a Task Scheduler in a Dataflow Block (Feladatütemező megadása adatfolyam-blokkokban) című témakört. A TPL feladatütemezőiről további információt az osztály témakörében TaskScheduler talál.
A párhuzamosság fokának megadása
Alapértelmezés szerint a TPL adatfolyamtár által biztosított ActionBlock<TInput>TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput>három végrehajtási blokktípus egyszerre egy üzenetet dolgoz fel. Ezek az adatfolyam-blokktípusok az üzenetek fogadási sorrendjében is feldolgozhatók. Ha engedélyezni szeretné ezeknek az adatfolyamblokkoknak az üzenetek egyidejű feldolgozását, állítsa be a ExecutionDataflowBlockOptions.MaxDegreeOfParallelism tulajdonságot az adatfolyamblokk-objektum létrehozásakor.
Az alapértelmezett érték MaxDegreeOfParallelism 1, amely garantálja, hogy az adatfolyam-blokk egyszerre egy üzenetet dolgoz fel. Ha ezt a tulajdonságot 1-nél nagyobb értékre állítja, az adatfolyam-blokk egyszerre több üzenetet is feldolgozhat. E tulajdonság DataflowBlockOptions.Unbounded beállításával lehetővé teszi, hogy az alapul szolgáló feladatütemező kezelje az egyidejűség maximális fokát.
Fontos
Ha 1-nél nagyobb párhuzamossági fokot ad meg, a rendszer egyszerre több üzenetet dolgoz fel, ezért előfordulhat, hogy az üzenetek nem a beérkezés sorrendjében lesznek feldolgozva. Az üzenetek blokkból való kimenetének sorrendje azonban ugyanaz, amelyben az üzenetek érkeznek.
Mivel a MaxDegreeOfParallelism tulajdonság a párhuzamosság maximális fokát jelöli, az adatfolyamblokk a megadottnál kisebb mértékű párhuzamossággal is végrehajtható. Előfordulhat, hogy az adatfolyam-blokk kisebb fokú párhuzamosságot használ a funkcionális követelmények teljesítéséhez, vagy mert hiányzik a rendelkezésre álló rendszererőforrások száma. Az adatfolyamblokkok soha nem választanak a megadottnál több párhuzamosságot.
A tulajdonság értéke MaxDegreeOfParallelism minden adatfolyamblokk-objektumra kizárólagos. Ha például négy adatfolyam-blokkobjektum határoz meg 1-et a párhuzamosság maximális fokához, akkor mind a négy adatfolyamblokk-objektum párhuzamosan is futtatható.
Az alábbi cikk példát mutat be arra, hogyan állíthatja be a párhuzamosság maximális fokát annak érdekében, hogy hosszadalmas műveletek párhuzamosan történjenek: Adatfolyam-blokkok párhuzamossági fokának megadása.
Az üzenetek számának megadása tevékenységenként
Az előre definiált adatfolyamblokk-típusok feladatokkal dolgoznak fel több bemeneti elemet. Ez segít minimalizálni az adatok feldolgozásához szükséges feladatobjektumok számát, ami lehetővé teszi az alkalmazások hatékonyabb futtatását. Ha azonban az adatfolyam-blokkok egy csoportjából származó tevékenységek dolgoznak fel adatokat, előfordulhat, hogy a többi adatfolyamblokk tevékenységeinek várniuk kell a feldolgozási időt az üzenetek sorba állításával. Az adatfolyam-feladatok jobb méltányosságának érdekében állítsa be a tulajdonságot MaxMessagesPerTask . Ha MaxMessagesPerTask az adatfolyam-blokk által használt feladat az alapértelmezett értékre DataflowBlockOptions.Unboundedvan állítva, annyi üzenetet dolgoz fel, amennyit csak lehet. Amikor a MaxMessagesPerTask más értékre van beállítva, mint a Unbounded, az adatfolyam-blokk legfeljebb ennyi üzenetet dolgoz fel minden Task objektum esetében. Bár a tulajdonság beállítása növelheti a MaxMessagesPerTask feladatok közötti méltányosságot, a rendszer a szükségesnél több feladatot hozhat létre, ami csökkentheti a teljesítményt.
Lemondás engedélyezése
A TPL olyan mechanizmust biztosít, amely lehetővé teszi a feladatok számára a lemondás együttműködésen alapuló összehangolását. Ha engedélyezni szeretné, hogy az adatfolyamblokkok részt vehessenek ebben a lemondási mechanizmusban, állítsa be a tulajdonságot CancellationToken . Ha ez a CancellationToken objektum törölt állapotba van állítva, az összes adatfolyamblokk, amely ezt a token figyeli, befejezi a jelenlegi elem feldolgozását, de nem kezdi el a további elemek feldolgozását. Ezek az adatfolyam-blokkok emellett törlik a pufferelt üzeneteket, feloldják a kapcsolatot a forrás- és célblokkokkal, és áttérnek a megszakított állapotra. A törölt állapotra való áttéréssel a Completion tulajdonsága Status értékre van állítva, kivéve, ha a feldolgozás során kivétel történt. Ebben az esetben Status a következőre van állítva: Faulted.
Például arról, hogyan lehet lemondást alkalmazni egy Windows Forms-alkalmazásban, lásd a Hogyan lehet: Adatfolyam-blokk törlése című témakört. A TPL-ben történő lemondással kapcsolatos további információkért lásd: Feladatlemondás.
A kapzsiság és a nem mohó viselkedés megadása
Több csoportosítási adatfolyam-blokktípus is működhet kapzsi vagy nem kapzsi módban. Alapértelmezés szerint az előre definiált adatfolyamblokk-típusok mohó módban működnek.
Az illesztési blokktípusok esetében, például JoinBlock<T1,T2>a kapzsi mód azt jelenti, hogy a blokk azonnal elfogadja az adatokat, még akkor is, ha az összekapcsolandó adatok még nem érhetők el. Nem mohó üzemmód azt jelenti, hogy a blokk késlelteti az összes bejövő üzenetet mindaddig, amíg minden célponton rendelkezésre nem áll egy a csatlakozást lezáró üzenet. Ha az elhalasztott üzenetek közül bármelyik már nem érhető el, az összekapcsolási blokk minden elhalasztott üzenetet felszabadít, és újraindítja a folyamatot. Az BatchBlock<T> osztály esetében a kapzsi és nem kapzsi viselkedés hasonló, azzal a kivétellel, hogy nem kapzsi módban egy BatchBlock<T> objektum elhalasztja a beérkező üzeneteket, amíg elegendő nem áll rendelkezésre a különböző forrásokból egy köteg befejezéséhez.
Az adatfolyam-blokk nem mohó módjának megadásához állítsa be a Greedy értéket False módjára. Egy példa, amely bemutatja, hogyan használható a nem mohó mód arra, hogy több illesztési blokk hatékonyabban ossza meg az adatforrást, lásd: A JoinBlock használata több forrásból származó adatok olvasásához.
Egyéni adatfolyamblokkok
Bár a TPL adatfolyamtár számos előre definiált blokktípust biztosít, létrehozhat további, egyéni viselkedést végrehajtó blokktípusokat is. Közvetlenül implementálhatja az ISourceBlock<TOutput> interfészeket vagy ITargetBlock<TInput> a Encapsulate metódust egy összetett blokk létrehozásához, amely magában foglalja a meglévő blokktípusok viselkedését. Az egyéni adatfolyamblokk-funkciók implementálását bemutató példákért lásd: Útmutató: Egyéni adatfolyam-blokktípus létrehozása.
Kapcsolódó témakörök
| Cím | Leírás |
|---|---|
| Útmutató: Üzenetek írása és üzenetek olvasása adatfolyamblokkból | Bemutatja, hogyan írhat üzeneteket egy objektumba, és hogyan olvashat üzeneteket BufferBlock<T> . |
| Útmutató: Producer-Consumer adatfolyam-minta implementálása | Ismerteti, hogyan használható az adatfolyam-modell egy termelő-fogyasztó minta implementálásához, ahol a gyártó üzeneteket küld egy adatfolyam-blokkba, és a fogyasztó ebből a blokkból olvas üzeneteket. |
| Útmutató: Művelet végrehajtása, amikor egy adatfolyam-blokk adatokat fogad | Ismerteti, hogyan biztosíthat meghatalmazottakat a végrehajtási adatfolyam blokktípusainak, ActionBlock<TInput>TransformBlock<TInput,TOutput>és TransformManyBlock<TInput,TOutput>. |
| Útmutató: Adatfolyam-folyamat létrehozása | Ismerteti, hogyan hozhat létre olyan adatfolyam-folyamatot, amely szöveget tölt le a weből, és műveleteket hajt végre ezen a szövegen. |
| Útmutató: Adatfolyam-blokkok leválasztás | Bemutatja, hogyan lehet a LinkTo metódussal leválasztani egy célblokkot a forrástól, miután a forrás üzenetet küld a célnak. |
| Útmutató: Adatfolyam használata Windows Forms-alkalmazásban | Bemutatja, hogyan hozhat létre adatfolyamblokkokból álló hálózatot, amely képfeldolgozást végez egy Windows Forms-alkalmazásban. |
| Útmutató: Adatfolyam-blokk megszakítása | Bemutatja, hogyan használhatja a lemondást a Windows Forms-alkalmazásokban. |
| Útmutató: A JoinBlock használata több forrásból származó adatok beolvasásához | Megmagyarázza, hogyan hajtson végre műveletet az JoinBlock<T1,T2> osztály használatával, amikor az adatok több forrásból érhetők el, és hogyan használja a nem mohó módot annak érdekében, hogy több összekapcsolási blokk hatékonyabban osztozzon egy adaforráson. |
| Útmutató: A párhuzamosság fokának megadása adatfolyamblokkokban | Azt ismerteti, hogyan állíthatja be a MaxDegreeOfParallelism tulajdonságot úgy, hogy egy végrehajtási adatfolyam-blokk egyszerre több üzenetet is feldolgozhasson. |
| Útmutató: Feladatütemező megadása adatfolyamblokkban | Bemutatja, hogyan társíthat egy adott feladatütemezőt, amikor adatfolyamot használ az alkalmazásban. |
| Útmutató: A BatchBlock és a BatchedJoinBlock használata a hatékonyság javítása érdekében | Ismerteti, hogyan használható az osztály az BatchBlock<T> adatbázis-beszúrási műveletek hatékonyságának javítására, és hogyan használható az BatchedJoinBlock<T1,T2> osztály az eredmények és az esetleges kivételek rögzítésére, amikor a program egy adatbázisból olvas. |
| Útmutató: Egyéni adatfolyam-blokktípus létrehozása | Bemutatja, hogyan hozhat létre adatfolyam-blokktípust, amely egyéni viselkedést valósít meg. |
| Feladat párhuzamos könyvtár (TPL) | Bevezeti a TPL-t, egy könyvtárat, amely leegyszerűsíti a párhuzamos és egyidejű programozást a .NET-keretrendszer-alkalmazásokban. |