Sdílet prostřednictvím


Tok dat (paralelní knihovna úloh)

Knihovna TPL (Task Parallel Library) poskytuje komponenty toku dat, které pomáhají zvýšit odolnost aplikací s podporou souběžnosti. Tyto komponenty toku dat jsou souhrnně označovány jako TPL Dataflow Library. Tento model toku dat podporuje programování založené na aktorech tím, že poskytuje předávání zpráv v procesu pro hrubozrnné úlohy toku dat a zřetězené úlohy. Komponenty toku dat vycházejí z typů a plánování infrastruktury TPL a integrují se s podporou jazyka C#, Visual Basic a F# pro asynchronní programování. Tyto komponenty toku dat jsou užitečné, pokud máte více operací, které musí vzájemně komunikovat asynchronně nebo když chcete zpracovávat data, jakmile budou k dispozici. Představte si například aplikaci, která zpracovává data obrázků z webové kamery. Pomocí modelu toku dat může aplikace zpracovávat snímky obrázků, jakmile budou k dispozici. Pokud aplikace vylepšuje snímky, například provedením opravy světla nebo odstranění červených očí, můžete vytvořit potrubí komponent toku dat. Každá fáze zpracovatelského řetězce může k transformaci obrazu použít hrubší paralelismus, například funkcionalitu poskytovanou TPL.

Tento dokument obsahuje přehled knihovny toků dat TPL. Popisuje programovací model, předdefinované typy bloků toku dat a způsob konfigurace bloků toku dat tak, aby splňovaly konkrétní požadavky vašich aplikací.

Poznámka:

Knihovna toku dat TPL (jmenný prostor System.Threading.Tasks.Dataflow) není distribuována s .NET. Pokud chcete nainstalovat obor názvů System.Threading.Tasks.Dataflow v sadě Visual Studio, otevřete projekt, zvolte v nabídce Projekt možnost Spravovat balíčky NuGet a balíček System.Threading.Tasks.Dataflow vyhledejte online. Případně ji můžete nainstalovat pomocí rozhraní příkazového řádku .NET Core, spusťte dotnet add package System.Threading.Tasks.Dataflow.

Programovací model

Knihovna toků dat TPL poskytuje základ pro předávání zpráv a paralelizaci aplikací náročných na procesor a vstupně-výstupní operace, které mají vysokou propustnost a nízkou latenci. Poskytuje také explicitní kontrolu nad tím, jak se data uloží do vyrovnávací paměti a přesunují se kolem systému. Pokud chcete lépe porozumět programovacímu modelu toku dat, představte si aplikaci, která asynchronně načítá obrázky z disku a vytvoří z nich kompozit. Tradiční programovací modely obvykle vyžadují použití zpětných volání a synchronizačních objektů, jako jsou zámky, ke koordinaci úkolů a přístupu ke sdíleným datům. Pomocí programovacího modelu toku dat můžete vytvořit objekty toku dat, které zpracovávají obrázky při čtení z disku. V rámci modelu toku dat deklarujete, jak se data zpracovávají, když jsou k dispozici, a také všechny závislosti mezi daty. Vzhledem k tomu, že modul runtime spravuje závislosti mezi daty, můžete se často vyhnout požadavku na synchronizaci přístupu ke sdíleným datům. Vzhledem k tomu, že plány modulu runtime fungují na základě asynchronního doručení dat, může tok dat zlepšit rychlost odezvy a propustnost efektivní správou podkladových vláken. Příklad, který používá programovací model toku dat k implementaci zpracování obrázků v aplikaci Windows Forms, viz Návod: Použití toku dat v aplikaci Windows Forms.

Zdroje a cíle

Knihovna toků dat TPL se skládá z bloků toku dat, které jsou datovými strukturami pro ukládání a zpracování dat. TPL definuje tři druhy bloků toku dat: zdrojové bloky, cílové blokya šíření bloků. Zdrojový blok funguje jako zdroj dat a lze z něj číst. Cílový blok funguje jako příjemce dat a může být zapsán do. Blok šíření funguje jako zdrojový i cílový blok a lze ho číst a zapisovat do. TPL definuje System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> rozhraní pro reprezentaci zdrojů, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> představující cíle a System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> k reprezentaci šíření. IPropagatorBlock<TInput,TOutput> dědí z ISourceBlock<TOutput>a ITargetBlock<TInput>.

Knihovna toku dat TPL poskytuje několik předdefinovaných typů bloků toku dat, které implementují rozhraní ISourceBlock<TOutput>, ITargetBlock<TInput>a IPropagatorBlock<TInput,TOutput>. Tyto typy bloků toku dat jsou popsány v tomto dokumentu v části předdefinované typy bloků toku dat.

Spojovací bloky

Bloky toku dat můžete připojit k vytvoření kanálů, což jsou lineární sekvence bloků toku dat nebo sítě, což jsou grafy bloků toku dat. Kanál je jednou z forem sítě. V kanálu nebo síti zdroje asynchronně šíří data do cílů, jakmile jsou tato data dostupná. Metoda ISourceBlock<TOutput>.LinkTo propojuje zdrojový blok toku dat s cílovým blokem. Zdroj lze propojit s nulovými nebo více cíli; cíle mohou být propojeny z nuly nebo více zdrojů. Bloky toku dat můžete přidávat nebo odebírat do kanálu nebo sítě současně. Předdefinované typy bloků toku dat se starají o všechny aspekty bezpečnosti vláken při propojení a rozpojení.

Příklad, který spojuje bloky toku dat k vytvoření základního kanálu, najdete v tématu Návod: Vytvoření kanálu toku dat. Příklad, který spojuje bloky toku dat k vytvoření složitější sítě, najdete v tématu Návod: Použití toku dat vaplikace Windows Forms . Příklad, který zruší propojení cíle ze zdroje poté, co zdroj nabídne cílovou zprávu, najdete v tématu Postupy: Zrušení propojení bloků toku dat.

Filtrování

Když zavoláte metodu ISourceBlock<TOutput>.LinkTo pro propojení zdroje s cílem, můžete zadat delegáta, který určuje, jestli cílový blok přijme nebo odmítne zprávu na základě hodnoty této zprávy. Tento mechanismus filtrování je užitečný způsob, jak zaručit, že blok toku dat přijímá pouze určité hodnoty. U většiny předdefinovaných typů bloků toku dat platí, že pokud je zdrojový blok připojený k více cílovým blokům, když cílový blok odmítne zprávu, zdroj tuto zprávu nabídne dalšímu cíli. Pořadí, ve kterém zdroj nabízí zprávy cílům, je definován zdrojem a může se lišit podle typu zdroje. Většina typů zdrojových bloků přestane nabízet zprávu poté, co ji přijme jeden cíl. Jednou z výjimek tohoto pravidla je třída BroadcastBlock<T>, která nabízí každou zprávu všem cílům, i když některé cíle zprávu zamítnou. Příklad, který používá filtrování ke zpracování pouze určitých zpráv, naleznete v tématu Návod: Použití toku dat v aplikaci Windows Forms.

Důležité

Vzhledem k tomu, že každý předdefinovaný typ bloku toku dat zaručuje, že se zprávy rozšíří v pořadí, ve kterém jsou přijaty, musí být každá zpráva načtena ze zdrojového bloku, aby zdrojový blok mohl zpracovat další zprávu. Proto když použijete filtrování pro připojení více cílů ke zdroji, ujistěte se, že každá zpráva obdrží alespoň jeden cílový blok. Jinak se vaše aplikace může zablokovat.

Předávání zpráv

Programovací model toku dat souvisí s konceptem předávání zpráv, kde nezávislé komponenty programu vzájemně komunikují odesláním zpráv. Jedním ze způsobů, jak rozšířit zprávy mezi komponentami aplikace, je volat Post (synchronní) a SendAsync (asynchronní) metody odesílání zpráv do bloků cílového toku dat a Receive, ReceiveAsynca TryReceive metody pro příjem zpráv ze zdrojových bloků. Tyto metody můžete kombinovat s kanály toku dat nebo sítěmi odesláním vstupních dat do hlavního uzlu (cílový blok) a příjmem výstupních dat z koncového uzlu kanálu nebo terminálových uzlů sítě (jeden nebo více zdrojových bloků). Můžete také použít metodu Choose ke čtení z prvního ze zadaných zdrojů, které mají k dispozici data, a provádět akce s daty.

Zdrojové bloky nabízejí data pro cílové bloky voláním metody ITargetBlock<TInput>.OfferMessage. Cílový blok reaguje na nabízenou zprávu jedním ze tří způsobů: může zprávu přijmout, odmítnout nebo odložit zprávu. Když cíl přijme zprávu, vrátí metoda OfferMessageAccepted. Když cíl zprávu odmítne, vrátí metoda OfferMessageDeclined. Pokud cíl vyžaduje, aby již neobdržel žádné zprávy ze zdroje, OfferMessage vrátí DecliningPermanently. Předdefinované typy zdrojového bloku nenabízejí zprávy propojeným cílům po přijetí takové návratové hodnoty a automaticky zruší propojení s těmito cíli.

Když cílový blok odloží zprávu pro pozdější použití, vrátí metoda OfferMessagePostponed. Cílový blok, který odloží zprávu, může později volat metodu ISourceBlock<TOutput>.ReserveMessage, aby se pokusil rezervovat nabízenou zprávu. V tomto okamžiku je zpráva buď stále dostupná a může ji použít cílový blok, nebo zprávu převzal jiný cíl. Pokud cílový blok později vyžaduje zprávu nebo ji už nepotřebuje, volá ISourceBlock<TOutput>.ConsumeMessage nebo ReleaseReservation metodu. Rezervace zpráv se obvykle používá u typů datových bloků, které pracují v nežravém režimu. Nechamtivý režim je vysvětlen dále v tomto dokumentu. Místo rezervace odložené zprávy může cílový blok použít také metodu ISourceBlock<TOutput>.ConsumeMessage k přímému využití odložené zprávy.

Dokončení bloku toku dat

Bloky toku dat také podporují koncept ukončení . Blok toku dat, který je v dokončeném stavu, neprovádí žádnou další práci. Každý blok toku dat má přidružený System.Threading.Tasks.Task objekt, označovaný jako úkol dokončení, který představuje stav dokončení bloku. Protože můžete počkat na dokončení objektu Task pomocí úloh určených pro dokončení, můžete stejným způsobem čekat na dokončení jednoho nebo více koncových uzlů datové podpůrné sítě. Rozhraní IDataflowBlock definuje metodu Complete, která informuje blok toku dat o dokončení požadavku a vlastnost Completion, která vrací úlohu dokončení bloku toku dat. ISourceBlock<TOutput> i ITargetBlock<TInput> dědí rozhraní IDataflowBlock.

Existují dva způsoby, jak zjistit, jestli se blok toku dat dokončil bez chyby, zjistil jednu nebo více chyb nebo byl zrušen. Prvním způsobem je zavolat metodu Task.Wait na úkolu dokončení v bloku try-catch (Try-Catch v jazyce Visual Basic). Následující příklad vytvoří ActionBlock<TInput> objekt, který vyvolá ArgumentOutOfRangeException pokud je jeho vstupní hodnota menší než nula. AggregateException je vyhozena, když tento příklad zavolá Wait na dokončovací úkol. K ArgumentOutOfRangeException se přistupuje prostřednictvím vlastnosti InnerExceptions objektu AggregateException.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine($"n = {n}");
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

Tento příklad demonstruje případ, kdy se neošetřená výjimka v delegátu datového toku provádění. Doporučujeme zpracovávat výjimky v tělech těchto bloků. Pokud to ale nemůžete udělat, blok se chová, jako by byl zrušený a nezpracovává příchozí zprávy.

Pokud je blok toku dat explicitně zrušen, objekt AggregateException obsahuje OperationCanceledException ve vlastnosti InnerExceptions. Další informace o zrušení datových toků naleznete v části Povolení zrušení.

Druhým způsobem, jak určit stav dokončení bloku toku dat, je použití pokračování úkolu dokončení nebo použití asynchronních jazykových funkcí jazyka C# a Visual Basic k asynchronnímu čekání na dokončení úkolu. Delegát, který poskytnete metodě Task.ContinueWith, přijímá objekt Task, který představuje předchozí úlohu. V případě vlastnosti Completion převezme delegát pokračování úkol dokončení. Následující příklad se podobá předchozímu, s tím rozdílem, že používá také metodu ContinueWith k vytvoření úlohy pokračování, která vytiskne stav celkové operace toku dat.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine($"n = {n}");
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine($"The status of the completion task is '{task.Status}'.");
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Create a continuation task that prints the overall 
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

K určení dalších informací o stavu dokončení bloku toku dat můžete také použít vlastnosti, jako je IsCanceled v textu úkolu pokračování. Další informace o úlohách pokračování a jejich vztahu k rušení a zpracování chyb naleznete v tématech Řetězení úloh pomocí pokračování úlohy, Zrušení úlohya Zpracování výjimek.

Předdefinované typy bloků toku dat

Knihovna toků dat TPL poskytuje několik předdefinovaných typů bloků toku dat. Tyto typy jsou rozděleny do tří kategorií: bloky vyrovnávací paměti, spouštěcí blokya seskupování bloků. Následující části popisují typy bloků, které tvoří tyto kategorie.

Bloky pro vyrovnávací paměť

Bloky ve vyrovnávací paměti uchovávají data pro použití spotřebiteli dat. Knihovna toků dat TPL poskytuje tři typy bloků vyrovnávací paměti: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>a System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.

BufferBlock<T>

Třída BufferBlock<T> představuje strukturu asynchronního zasílání zpráv pro obecné účely. Tato třída ukládá frontu zpráv FIFO (first in, first out), do které může zapisovat více zdrojů nebo z ní číst více cílů. Když cíl obdrží zprávu z objektu BufferBlock<T>, tato zpráva se odebere z fronty zpráv. I když objekt BufferBlock<T> může mít více cílů, obdrží každá zpráva pouze jeden cíl. Třída BufferBlock<T> je užitečná, když chcete předat více zpráv jiné komponentě a tato komponenta musí přijímat každou zprávu.

Následující základní příklad publikuje několik Int32 hodnot do objektu BufferBlock<T> a pak tyto hodnoty načte zpět z tohoto objektu.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

'          Output:
'            0
'            1
'            2
'          

Úplný příklad, který ukazuje, jak zapisovat zprávy do a číst zprávy z objektu BufferBlock<T>, naleznete v tématu Postupy: Zápis zpráv do bloku toku dat a čtení zpráv z bloku toku dat.

BroadcastBlock<T>

Třída BroadcastBlock<T> je užitečná, když musíte předat více zpráv jiné komponentě, ale tato komponenta potřebuje pouze nejnovější hodnotu. Tato třída je užitečná také v případě, že chcete zprávu vysílat více komponentám.

Následující základní příklad publikuje Double hodnotu do objektu BroadcastBlock<T> a pak ji několikrát přečte zpět z tohoto objektu. Vzhledem k tomu, že hodnoty se po jejich přečtení neodeberou z BroadcastBlock<T> objektů, je stejná hodnota k dispozici pokaždé.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

' Post a message to the block.
broadcastBlock.Post(Math.PI)

' Receive the messages back from the block several times.
For i As Integer = 0 To 2
    Console.WriteLine(broadcastBlock.Receive())
Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

Úplný příklad, který ukazuje použití BroadcastBlock<T> k vysílání zprávy do více cílových bloků, naleznete v tématu Postupy: Určení plánovače úloh v bloku toku dat.

WriteOnceBlock<T>

Třída WriteOnceBlock<T> připomíná třídu BroadcastBlock<T> s tím rozdílem, že objekt WriteOnceBlock<T> lze zapsat pouze jednou. Můžete si představit, že WriteOnceBlock<T> se podobá klíčovému slovu C# readonly (ReadOnly ve Visual Basicu), s tím rozdílem, že objekt WriteOnceBlock<T> se stane neměnným, jakmile získá hodnotu, namísto při vytváření. Podobně jako třída BroadcastBlock<T> , když cíl obdrží zprávu z objektu WriteOnceBlock<T> , tato zpráva se z objektu neodebere. Proto více cílů obdrží kopii zprávy. Třída WriteOnceBlock<T> je užitečná, když chcete rozšířit pouze první z více zpráv.

Následující základní příklad publikuje více String hodnot do objektu WriteOnceBlock<T> a pak přečte hodnotu zpět z tohoto objektu. Vzhledem k tomu, že objekt WriteOnceBlock<T> lze zapsat pouze jednou, jakmile objekt WriteOnceBlock<T> obdrží zprávu, zahodí další zprávy.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

' Post several messages to the block in parallel. The first 
' message to be received is written to the block. 
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

Úplný příklad, který ukazuje, jak použít WriteOnceBlock<T> k získání hodnoty první operace, která se dokončí, najdete v tématu Postupy: Zrušení propojení bloků toku dat.

Bloky provádění

Bloky provádění volají delegáta poskytnutého uživatelem pro každou část přijatých dat. Knihovna toků dat TPL poskytuje tři typy bloků provádění: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>a System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

ActionBlock<T>

Třída ActionBlock<TInput> je cílový blok, který vyvolá delegáta, když obdrží data. Objekt ActionBlock<TInput> si můžete představit jako delegáta, který se spouští asynchronně, jakmile budou data k dispozici. Delegát, který zadáte objektu ActionBlock<TInput>, může být typu Action<T> nebo typ System.Func<TInput, Task>. Pokud použijete objekt ActionBlock<TInput> s Action<T>, zpracování každého vstupního prvku se považuje za dokončené při vrácení delegáta. Při použití ActionBlock<TInput> objektu s System.Func<TInput, Task>, zpracování každého vstupního prvku je považováno za dokončeno pouze v případě, že vrácený Task objekt je dokončen. Pomocí těchto dvou mechanismů můžete použít ActionBlock<TInput> pro synchronní i asynchronní zpracování každého vstupního prvku.

Následující základní příklad publikuje více Int32 hodnot do objektu ActionBlock<TInput>. Objekt ActionBlock<TInput> tyto hodnoty vytiskne do konzoly. Tento příklad pak nastaví blok na dokončený stav a čeká na dokončení všech úloh toku dat.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

' Post several messages to the block.
For i As Integer = 0 To 2
    actionBlock.Post(i * 10)
Next i

' Set the block to the completed state and wait for all 
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

Kompletní příklady, které ukazují, jak používat delegáty s třídou ActionBlock<TInput>, najdete v části Jak: Provedení akce, když datový blok přijímá data.

TransformBlock<TInput, TOutput>

Třída TransformBlock<TInput,TOutput> se podobá třídě ActionBlock<TInput> s tím rozdílem, že funguje jako zdroj i jako cíl. Delegát, který předáte objektu TransformBlock<TInput,TOutput>, vrátí hodnotu typu TOutput. Delegát, který poskytnete pro objekt TransformBlock<TInput,TOutput>, může být typu System.Func<TInput, TOutput> nebo System.Func<TInput, Task<TOutput>>. Pokud použijete objekt TransformBlock<TInput,TOutput> s System.Func<TInput, TOutput>, zpracování každého vstupního prvku se považuje za dokončené při vrácení delegáta. Pokud použijete objekt TransformBlock<TInput,TOutput> použitý s System.Func<TInput, Task<TOutput>>, zpracování každého vstupního prvku je považováno za dokončené pouze v případě, že vrácený Task<TResult> objekt je dokončen. Stejně jako u ActionBlock<TInput>můžete pomocí těchto dvou mechanismů použít TransformBlock<TInput,TOutput> pro synchronní i asynchronní zpracování každého vstupního prvku.

Následující základní příklad vytvoří objekt TransformBlock<TInput,TOutput>, který vypočítá druhou odmocninu vstupu. Objekt TransformBlock<TInput,TOutput> přebírá Int32 hodnoty jako vstup a vytváří Double hodnoty jako výstup.

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
' Create a TransformBlock<int, double> object that 
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)

' Read the output messages from the block.
For i As Integer = 0 To 2
    Console.WriteLine(transformBlock.Receive())
Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Kompletní příklady, které používají TransformBlock<TInput,TOutput> v síti bloků toku dat, které provádějí zpracování obrázků v aplikaci Windows Forms, najdete v tématu Návod: Použití toku dat v aplikaci Windows Forms.

TransformManyBlock<TInput, TOutput>

Třída TransformManyBlock<TInput,TOutput> se podobá třídě TransformBlock<TInput,TOutput> s tím rozdílem, že TransformManyBlock<TInput,TOutput> pro každou vstupní hodnotu vytvoří nulové nebo více výstupních hodnot, nikoli pouze jednu výstupní hodnotu pro každou vstupní hodnotu. Delegát, který poskytnete pro objekt TransformManyBlock<TInput,TOutput>, může být typu System.Func<TInput, IEnumerable<TOutput>> nebo System.Func<TInput, Task<IEnumerable<TOutput>>>. Pokud použijete objekt TransformManyBlock<TInput,TOutput> s System.Func<TInput, IEnumerable<TOutput>>, zpracování každého vstupního prvku se považuje za dokončené při vrácení delegáta. Pokud použijete TransformManyBlock<TInput,TOutput> objekt s System.Func<TInput, Task<IEnumerable<TOutput>>>, zpracování každého vstupního prvku je považováno za dokončené pouze v případě, že vrácený System.Threading.Tasks.Task<IEnumerable<TOutput>> objekt je dokončen.

Následující základní příklad vytvoří TransformManyBlock<TInput,TOutput> objekt, který rozdělí řetězce na jejich jednotlivé sekvence znaků. Objekt TransformManyBlock<TInput,TOutput> přebírá String hodnoty jako vstup a vytváří Char hodnoty jako výstup.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")

' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
    Console.WriteLine(transformManyBlock.Receive())
Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

Kompletní příklady, které používají TransformManyBlock<TInput,TOutput> k vytvoření více nezávislých výstupů pro každý vstup v kanálu toku dat, najdete v tématu Návod: Vytvoření kanálu toku dat.

Stupeň paralelismu

Každý objekt ActionBlock<TInput>, TransformBlock<TInput,TOutput>a TransformManyBlock<TInput,TOutput> ukládá vstupní zprávy do vyrovnávací paměti, dokud není blok připraven je zpracovat. Ve výchozím nastavení tyto třídy zpracovávají zprávy v pořadí, v jakém jsou přijímány, a to po jedné zprávě. Můžete také určit stupeň paralelismu, který povolí ActionBlock<TInput>, TransformBlock<TInput,TOutput> a TransformManyBlock<TInput,TOutput> objekty ke souběžnému zpracování více zpráv. Další informace o souběžné provádění naleznete v části Určení stupně paralelismu dále v tomto dokumentu. Příklad, který nastaví stupeň paralelismu tak, aby blok toku dat spouštění zpracovával více než jednu zprávu najednou, najdete v tématu Postupy: Určení stupně paralelismu v bloku toku dat.

Souhrn typů delegátů

Následující tabulka shrnuje typy delegátů, které můžete zadat pro ActionBlock<TInput>, TransformBlock<TInput,TOutput>a TransformManyBlock<TInput,TOutput> objekty. Tato tabulka také určuje, jestli typ delegáta funguje synchronně nebo asynchronně.

Typ Synchronní typ delegáta Asynchronní typ delegáta
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

Výrazy lambda můžete použít také při práci s typy bloků provádění. Příklad, jak použít výraz lambda s blokem provádění, viz Postupy: Provedení akce, když blok toku dat obdrží data.

Seskupování bloků

Seskupení bloků kombinuje data z jednoho nebo více zdrojů a pod různými omezeními. Knihovna toků dat TPL poskytuje tři typy bloků spojení: BatchBlock<T>, JoinBlock<T1,T2>a BatchedJoinBlock<T1,T2>.

BatchBlock<T>

Třída BatchBlock<T> kombinuje sady vstupních dat, které se označují jako dávky, do polí výstupních dat. Při vytváření BatchBlock<T> objektu zadáte velikost každé dávky. Když BatchBlock<T> objekt obdrží zadaný počet vstupních prvků, asynchronně rozšíří pole obsahující tyto prvky. BatchBlock<T> Pokud je objekt nastavený na dokončený stav, ale neobsahuje dostatek prvků pro vytvoření dávky, rozšíří se konečné pole obsahující zbývající vstupní prvky.

Třída BatchBlock<T> funguje buď v režimu hrabivého, nebo v režimu nehrabivého. V režimu greedy, což je výchozí, objekt BatchBlock<T> přijímá každou zprávu, která je nabízena a šíří pole po přijetí zadaného počtu prvků. V nechamtivém režimu odloží objekt BatchBlock<T> všechny příchozí zprávy, dokud dostatečný počet zdrojů nenabídne zprávy k bloku ke vytvoření dávky. Režim Greedy obvykle funguje lépe než režim bez greedy, protože vyžaduje menší režii na zpracování. Nicméně, můžete použít nechamtivý režim, když musíte koordinovat spotřebu z více zdrojů atomickým způsobem. Nastavením Greedy na False v parametru dataflowBlockOptions v konstruktoru BatchBlock<T> zadejte nechtivý režim.

Následující základní příklad odešle několik hodnot Int32 do objektu typu BatchBlock<T>, který obsahuje deset prvků jako dávku. Chcete-li zaručit, že se všechny hodnoty rozšíří z BatchBlock<T>, tento příklad volá metodu Complete. Metoda Complete nastaví BatchBlock<T> objekt do dokončeného stavu, a proto BatchBlock<T> objekt rozšíří všechny zbývající prvky jako konečnou dávku.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine($"The sum of the elements in batch 1 is {batchBlock.Receive().Sum()}.");

Console.WriteLine($"The sum of the elements in batch 2 is {batchBlock.Receive().Sum()}.");

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)

' Post several values to the block.
For i As Integer = 0 To 12
    batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()

' Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

Úplný příklad, který používá BatchBlock<T> ke zlepšení efektivity operací vkládání databáze, najdete v tématu Návod: Použití batchBlock a BatchedJoinBlock ke zlepšení efektivity.

JoinBlock<T1, T2, ...>

Třídy JoinBlock<T1,T2> a JoinBlock<T1,T2,T3> shromažďují vstupní prvky a šíří System.Tuple<T1,T2> nebo System.Tuple<T1,T2,T3> objekty, které tyto prvky obsahují. Třídy JoinBlock<T1,T2> a JoinBlock<T1,T2,T3> nezdědí z ITargetBlock<TInput>. Místo toho poskytují vlastnosti, Target1, Target2a Target3, které implementují ITargetBlock<TInput>.

Podobně jako BatchBlock<T>fungují JoinBlock<T1,T2> a JoinBlock<T1,T2,T3> v režimu chamtivém nebo nechamtivém. V chamtivém režimu, což je výchozí, objekt JoinBlock<T1,T2> nebo JoinBlock<T1,T2,T3> přijímá každou zprávu, která je nabízena, a propaguje ven n-tici poté, co každý z jeho cílů obdrží alespoň jednu zprávu. V non-greedy režimu odloží objekt JoinBlock<T1,T2> nebo JoinBlock<T1,T2,T3> všechny příchozí zprávy, dokud nebudou všem cílům nabídnuta data, která jsou potřebná k vytvoření n-tice. V tomto okamžiku se blok zapojuje do dvoufázového potvrzovacího protokolu, který atomicky načte všechny požadované položky ze zdrojů. Díky tomuto odložení může jiná entita mezitím využívat data, aby celkový systém mohl pokračovat.

Následující základní příklad ukazuje případ, kdy JoinBlock<T1,T2,T3> objekt vyžaduje k výpočtu hodnoty více dat. Tento příklad vytvoří objekt JoinBlock<T1,T2,T3>, který k provedení aritmetické operace vyžaduje dvě Int32 hodnoty a Char hodnotu.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine($"{data.Item1} + {data.Item2} = {data.Item1 + data.Item2}");
         break;
      case '-':
         Console.WriteLine($"{data.Item1} - {data.Item2} = {data.Item1 - data.Item2}");
         break;
      default:
         Console.WriteLine($"Unknown operator '{data.Item3}'.");
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

' Post two values to each target of the join.

joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)

joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)

joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)

' Receive each group of values and apply the operator part
' to the number parts.

For i As Integer = 0 To 1
    Dim data = joinBlock.Receive()
    Select Case data.Item3
        Case "+"c
            Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
        Case "-"c
            Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
        Case Else
            Console.WriteLine("Unknown operator '{0}'.", data.Item3)
    End Select
Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

Úplný příklad použití JoinBlock<T1,T2> objektů v neagresivním režimu ke společnému využití zdroje najdete v tématu Postupy: Použití JoinBlock ke čtení dat z více zdrojů.

BatchedJoinBlock<T1, T2, ...>

Třídy BatchedJoinBlock<T1,T2> a BatchedJoinBlock<T1,T2,T3> shromažďují dávky vstupních prvků a šíří System.Tuple(IList(T1), IList(T2)) nebo System.Tuple(IList(T1), IList(T2), IList(T3)) objekty, které tyto prvky obsahují. Představte si BatchedJoinBlock<T1,T2> jako kombinaci BatchBlock<T> a JoinBlock<T1,T2>. Při vytváření BatchedJoinBlock<T1,T2> objektu zadejte velikost každé dávky. BatchedJoinBlock<T1,T2> také poskytuje vlastnosti, Target1 a Target2, které implementují ITargetBlock<TInput>. Při přijetí zadaného počtu vstupních prvků ze všech cílů BatchedJoinBlock<T1,T2> objekt asynchronně rozšíří objekt System.Tuple(IList(T1), IList(T2)) obsahující tyto prvky.

Následující základní příklad vytvoří BatchedJoinBlock<T1,T2> objekt, který obsahuje výsledky, Int32 hodnoty a chyby, které jsou Exception objekty. Tento příklad provádí více operací a zapisuje výsledky do vlastnosti Target1 a chyby do vlastnosti Target2 objektu BatchedJoinBlock<T1,T2>. Vzhledem k tomu, že počet úspěšných a neúspěšných operací je předem neznámý, objekty IList<T> umožňují každému cíli přijímat nulové nebo více hodnot.

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */
' For demonstration, create a Func<int, int> that 
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
                                              If n < 0 Then
                                                  Throw New ArgumentOutOfRangeException()
                                              End If
                                              Return n
                                          End Function

' Create a BatchedJoinBlock<int, Exception> object that holds 
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
    Try
        ' Post the result of the worker to the 
        ' first target of the block.
        batchedJoinBlock.Target1.Post(DoWork(i))
    Catch e As ArgumentOutOfRangeException
        ' If an error occurred, post the Exception to the 
        ' second target of the block.
        batchedJoinBlock.Target2.Post(e)
    End Try
Next i

' Read the results from the block.
Dim results = batchedJoinBlock.Receive()

' Print the results to the console.

' Print the results.
For Each n As Integer In results.Item1
    Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
    Console.WriteLine(e.Message)
Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

Úplný příklad, který používá BatchedJoinBlock<T1,T2> k zachycení výsledků i všech výjimek, ke kterým dochází při čtení programu z databáze, najdete v tématu Návod: Použití BatchBlock a BatchedJoinBlock ke zlepšení efektivity.

Konfigurace chování bloku toku dat

Další možnosti můžete povolit poskytnutím objektu System.Threading.Tasks.Dataflow.DataflowBlockOptions konstruktoru typů bloků toku dat. Tyto možnosti řídí chování, jako je plánovač, který spravuje základovou úlohu a stupeň paralelismu. DataflowBlockOptions má také odvozené typy, které určují chování specifické pro určité typy bloků toku dat. Následující tabulka shrnuje, které typy možností jsou přidružené ke každému typu bloku toku dat.

Typ bloku toku dat typ DataflowBlockOptions
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

Následující části obsahují další informace o důležitých typech možností bloku toku dat, které jsou k dispozici prostřednictvím tříd System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptionsa System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions.

Určení plánovače úloh

Každý předdefinovaný blok toku dat používá mechanismus plánování úkolů TPL k provádění aktivit, jako je šíření dat do cíle, příjem dat ze zdroje a spouštění uživatelem definovaných delegátů, jakmile budou data k dispozici. TaskScheduler je abstraktní třída, která představuje plánovač úloh, který zařadí úlohy do vláken. Výchozí plánovač úloh Defaultpoužívá třídu ThreadPool k zařazení do fronty a provádění práce. Výchozí plánovač úloh můžete přepsat nastavením vlastnosti TaskScheduler při vytváření objektu bloku toku dat.

Pokud stejný plánovač úloh spravuje více bloků toku dat, může vynutit zásady napříč nimi. Pokud je například každý z bloků toku dat nakonfigurovaný tak, aby cílil na výhradní plánovač stejného objektu ConcurrentExclusiveSchedulerPair objektu, je serializována všechna práce, která se spouští napříč těmito bloky. Podobně platí, že pokud jsou tyto bloky nakonfigurované tak, aby cílily na souběžný plánovač stejného objektu ConcurrentExclusiveSchedulerPair a že plánovač má nakonfigurovanou maximální úroveň souběžnosti, je všechna práce z těchto bloků omezená na tento počet souběžných operací. Příklad, který používá třídu ConcurrentExclusiveSchedulerPair k povolení paralelního zpracování operací čtení, ale operace zápisu, které probíhají odděleně od všech ostatních operací, viz téma Postupy: Určení plánovače úloh v bloku toku dat. Další informace o plánovačích úkolů v TPL naleznete v tématu třídy TaskScheduler.

Určení stupně paralelismu

Ve výchozím nastavení tři typy bloků provádění, které poskytuje knihovna toků dat TPL, ActionBlock<TInput>, TransformBlock<TInput,TOutput>a TransformManyBlock<TInput,TOutput>, zpracovávají jednu zprávu najednou. Tyto typy bloků toku dat také zpracovávají zprávy v pořadí, v jakém jsou přijaty. Chcete-li povolit, aby tyto bloky toku dat zpracovávaly zprávy souběžně, nastavte ExecutionDataflowBlockOptions.MaxDegreeOfParallelism vlastnost při vytváření objektu bloku toku dat.

Výchozí hodnota MaxDegreeOfParallelism je 1, což zaručuje, že blok toku dat zpracovává jednu zprávu najednou. Nastavení této vlastnosti na hodnotu, která je větší než 1, umožňuje bloku toku dat zpracovávat více zpráv souběžně. Nastavením této vlastnosti na DataflowBlockOptions.Unbounded umožní plánovači úloh spravovat maximální stupeň souběžnosti.

Důležité

Pokud zadáte maximální stupeň paralelismu, který je větší než 1, zpracuje se současně více zpráv, a proto zprávy nemusí být zpracovány v pořadí, ve kterém jsou přijaty. Pořadí, ve kterém jsou zprávy výstupem bloku, je však stejné, ve kterém jsou přijaty.

Protože MaxDegreeOfParallelism vlastnost představuje maximální stupeň paralelismu, může se blok toku dat spustit s menším stupněm paralelismu, než zadáte. Blok toku dat může k splnění svých funkčních požadavků použít menší stupeň paralelismu nebo kvůli nedostatku dostupných systémových prostředků. Blok toku dat nikdy nevybere více paralelismu, než zadáte.

Hodnota vlastnosti MaxDegreeOfParallelism je výhradní pro každý objekt bloku toku dat. Pokud například čtyři objekty bloku toku dat zadávají 1 pro maximální stupeň paralelismu, mohou se všechny čtyři objekty bloku toku dat spouštět paralelně.

Příklad, který nastaví maximální stupeň paralelismu tak, aby umožňoval provádět zdlouhavé operace paralelně, najdete v tématu Postupy: Určení stupně paralelismu v bloku toku dat.

Určení počtu zpráv na úkol

Předdefinované typy bloků toku dat používají úlohy ke zpracování více vstupních prvků. To pomáhá minimalizovat počet objektů úloh, které jsou potřeba ke zpracování dat, což umožňuje aplikacím efektivněji spouštět. Pokud však úkoly z jedné sady bloků toku dat zpracovávají data, mohou úkoly z jiných bloků toku dat čekat na dobu zpracování frontou zpráv. Pokud chcete umožnit lepší nestrannost mezi úlohami toku dat, nastavte vlastnost MaxMessagesPerTask. Pokud je MaxMessagesPerTask nastavena na DataflowBlockOptions.Unbounded, což je výchozí, úloha používaná blokem toku dat zpracovává tolik zpráv, kolik je k dispozici. Pokud je MaxMessagesPerTask nastavena na jinou hodnotu než Unbounded, blok toku dat zpracovává maximálně tento počet zpráv na objekt Task. I když nastavení vlastnosti MaxMessagesPerTask může zvýšit spravedlnost mezi úkoly, může také způsobit, že systém vytvoří více úkolů, než je nutné, což může snížit výkon.

Povolit zrušení

TPL poskytuje mechanismus, který umožňuje úkolům koordinovat zrušení způsobem spolupráce. Chcete-li povolit blokům toku dat účast v tomto mechanismu zrušení, nastavte CancellationToken vlastnost. Pokud je tento CancellationToken objekt nastaven na zrušený stav, všechny bloky toku dat, které monitorují tento token, dokončí provedení své aktuální položky, ale nezačnou zpracovávat následující položky. Tyto bloky toku dat také smažou všechny zprávy z vyrovnávací paměti, uvolní připojení ke všem zdrojovým a cílovým blokům a přejdou do zrušeného stavu. Přechodem na zrušený stav má vlastnost Completion vlastnost Status nastavenou na Canceled, pokud během zpracování nedošlo k výjimce. V takovém případě je Status nastavena na Faulted.

Pro příklad ukazující, jak použít zrušení operace v aplikaci Windows Forms, vizte Postupy: Zrušení bloku toku dat. Další informace o zrušení v TPL naleznete v části Zrušení úlohy.

Určení chování greedy versus non-greedy

Několik typů seskupovacích datových bloků může pracovat v režimu nenasytný nebo nenenasytný. Ve výchozím nastavení fungují předdefinované typy bloků toku dat v nenasytném režimu.

U spojovacích bloků, jako je JoinBlock<T1,T2>, režim 'greedy' znamená, že blok okamžitě přijímá data, i když odpovídající data pro spojení ještě nejsou k dispozici. Režim nenasytnosti znamená, že blok odloží všechny příchozí zprávy, dokud nebude k dispozici jedna zpráva z každého ze svých cílů pro dokončení spojení. Pokud některé z odložených zpráv už nejsou k dispozici, blok spojení uvolní všechny odložené zprávy a restartuje proces. U třídy BatchBlock<T> je chování greedy a non-greedy podobné, s tím rozdílem, že v non-greedy režimu objekt BatchBlock<T> odloží všechny příchozí zprávy, dokud nebude k dispozici dostatečný počet zpráv z různých zdrojů k dokončení dávky.

Pokud chcete pro blok toku dat určit ne-greedy režim, nastavte Greedy na False. Příklad, který ukazuje, jak použít režim non-greedy, aby více bloků spojení mohlo efektivněji sdílet zdroj dat, viz Postup: Jak použít JoinBlock ke čtení dat z více zdrojů.

Vlastní bloky toku dat

I když knihovna toků dat TPL poskytuje mnoho předdefinovaných typů bloků, můžete vytvořit další typy bloků, které provádějí vlastní chování. Implementujte rozhraní ISourceBlock<TOutput> nebo ITargetBlock<TInput> přímo nebo použijte metodu Encapsulate k vytvoření komplexního bloku, který zapouzdřuje chování existujících typů bloků. Příklady, které ukazují, jak implementovat vlastní funkce bloku toku dat, viz Návod: Vytvoření vlastního typu bloku toku dat.

Titulek Popis
Jak: Zapsat a číst zprávy z bloku toku dat Ukazuje, jak zapisovat zprávy do a číst zprávy z objektu BufferBlock<T>.
Postupy: Implementace vzoru toku dat Producer-Consumer Popisuje, jak použít model toku dat k implementaci vzoru příjemce producenta, kde producent odesílá zprávy do bloku toku dat a příjemce čte zprávy z daného bloku.
Jak na to: Provedení akce, když blok toku dat přijímá data Popisuje, jak poskytnout delegáty pro typy bloků datových toků pro provádění, ActionBlock<TInput>, TransformBlock<TInput,TOutput>a TransformManyBlock<TInput,TOutput>.
Návod: Vytvoření kanálu toku dat Popisuje, jak vytvořit kanál toku dat, který stáhne text z webu a provede operace s tímto textem.
Postupy: Zrušení propojení bloků toku dat Ukazuje, jak metodu LinkTo použít pro zrušení propojení cílového bloku od jeho zdroje poté, co zdroj poskytne zprávu cílovému bloku.
Návod: Použití Dataflow v aplikaci Windows Forms Ukazuje, jak vytvořit síť bloků toku dat, které provádějí zpracování obrázků v aplikaci Windows Forms.
Postupy: Zrušení bloku toku dat Ukazuje, jak používat zrušení v aplikaci Windows Forms.
Postupy: Použití joinBlock ke čtení dat z více zdrojů Vysvětluje, jak pomocí třídy JoinBlock<T1,T2> provést operaci, když jsou data dostupná z více zdrojů, a jak využít méně nenasytný režim k tomu, aby více spojovacích bloků mohlo efektivněji sdílet zdroje dat.
Postupy: Určení stupně paralelismu v bloku toku dat Popisuje, jak nastavit vlastnost MaxDegreeOfParallelism, aby blok toku dat spouštění zpracovával více zpráv najednou.
Postupy: Určení plánovače úloh v bloku toku dat Ukazuje, jak přidružit konkrétní plánovač úloh při použití toku dat v aplikaci.
Návod: Použití BatchBlocku a BatchedJoinBlock ke zlepšení efektivity Popisuje, jak pomocí třídy BatchBlock<T> zlepšit efektivitu operací vkládání databáze a jak pomocí třídy BatchedJoinBlock<T1,T2> zachytit výsledky i všechny výjimky, ke kterým dojde, když program čte z databáze.
Návod: Vytvoření vlastního typu bloku toku dat Demonstruje dva způsoby vytvoření typu bloku toku dat, který implementuje vlastní chování.
Knihovna pro paralelní zpracování úloh (Task Parallel Library) Představuje TPL, knihovnu, která zjednodušuje paralelní a souběžné programování v aplikacích .NET Framework.