Condividi tramite


Flusso di dati (Task Parallel Library)

La Task Parallel Library (TPL) fornisce componenti di flusso di dati per aumentare l'affidabilità delle applicazioni abilitate alla concorrenza. Questi componenti del flusso di dati vengono collettivamente definiti libreria di flussi di dati TPL. Questo modello di flusso di dati promuove la programmazione basata su attore fornendo il passaggio di messaggi in-process per attività di pipelining e flusso di dati con granularità grossolana. I componenti del flusso di dati si basano sui tipi e sull'infrastruttura di pianificazione di TPL e si integrano con il supporto del linguaggio C#, Visual Basic e F# per la programmazione asincrona. Questi componenti del flusso di dati sono utili quando si dispone di più operazioni che devono comunicare tra loro in modo asincrono o quando si desidera elaborare i dati man mano che diventano disponibili. Si consideri, ad esempio, un'applicazione che elabora i dati delle immagini da una fotocamera Web. Usando il modello di flusso di dati, l'applicazione può elaborare fotogrammi non appena diventano disponibili. Se l'applicazione migliora i fotogrammi di immagine, ad esempio eseguendo la correzione della luce o la riduzione degli occhi rossi, è possibile creare una pipeline di componenti del flusso di dati. Ogni fase della pipeline potrebbe utilizzare funzionalità di parallelismo più grossolane, come quelle fornite dal TPL, per trasformare l'immagine.

Questo documento offre una panoramica della libreria di flussi di dati TPL. Descrive il modello di programmazione, i tipi di blocchi di flussi di dati predefiniti e come configurare i blocchi di flussi di dati per soddisfare i requisiti specifici delle applicazioni.

Annotazioni

La libreria del flusso di dati TPL (spazio dei nomi System.Threading.Tasks.Dataflow) non viene distribuita con .NET. Per installare lo spazio dei nomi System.Threading.Tasks.Dataflow in Visual Studio, aprire il progetto, scegliere Gestisci pacchetti NuGet dal menu Project e cercare online il pacchetto System.Threading.Tasks.Dataflow. In alternativa, per installarlo usando l'interfaccia della riga di comando di .NET Core, eseguire dotnet add package System.Threading.Tasks.Dataflow.

Modello di programmazione

La libreria di flussi di dati TPL offre una base per il passaggio dei messaggi e la parallelizzazione di applicazioni a elevato utilizzo di CPU e I/O con velocità effettiva elevata e bassa latenza. Offre anche un controllo esplicito sul modo in cui i dati vengono memorizzati nel buffer e si spostano nel sistema. Per comprendere meglio il modello di programmazione del flusso di dati, prendere in considerazione un'applicazione che carica in modo asincrono le immagini dal disco e crea una composizione di tali immagini. I modelli di programmazione tradizionali richiedono in genere di usare callback e oggetti di sincronizzazione, ad esempio blocchi, per coordinare le attività e accedere ai dati condivisi. Usando il modello di programmazione del flusso di dati, è possibile creare oggetti flusso di dati che elaborano le immagini durante la lettura dal disco. Nel modello di flusso di dati dichiarare come vengono gestiti i dati quando diventano disponibili e anche eventuali dipendenze tra i dati. Poiché il runtime gestisce le dipendenze tra i dati, spesso è possibile evitare il requisito di sincronizzare l'accesso ai dati condivisi. Inoltre, poiché il runtime pianifica il lavoro in base all'arrivo asincrono dei dati, il flusso di dati può migliorare la velocità di risposta e la velocità effettiva gestendo in modo efficiente i thread sottostanti. Per un esempio che usa il modello di programmazione del flusso di dati per implementare l'elaborazione di immagini in un'applicazione Windows Form, vedere Procedura dettagliata: Uso del flusso di dati in un'applicazione Windows Form.

Origini e destinazioni

La libreria del flusso di dati TPL è costituita da blocchi di flussi di dati, ovvero strutture di dati che memorizzano nel buffer ed elaborano i dati. Il TPL definisce tre tipi di blocchi di flussi di dati: blocchi di origine, blocchi di destinazione e blocchi propagatori. Un blocco di origine funge da fonte di dati e da cui si può leggere. Un blocco di destinazione funge da ricevente di dati e può essere scritto. Un blocco propagatore agisce sia come blocco di origine che come blocco di destinazione e può essere letto da e scritto in. Il TPL definisce l'interfaccia per rappresentare le System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> origini, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> rappresentare le destinazioni e System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> rappresentare i propagatori. IPropagatorBlock<TInput,TOutput> eredita sia da ISourceBlock<TOutput> che da ITargetBlock<TInput>.

La libreria di flussi di dati TPL fornisce diversi tipi di blocchi di flussi di dati predefiniti che implementano le ISourceBlock<TOutput>interfacce , ITargetBlock<TInput>e IPropagatorBlock<TInput,TOutput> . Questi tipi di blocchi di flussi di dati sono descritti in questo documento nella sezione Tipi di blocchi di flussi di dati predefiniti.

Blocchi di connessione

È possibile connettere blocchi di flussi di dati per formare pipeline, ovvero sequenze lineari di blocchi di flussi di dati, o reti, che sono grafici di blocchi di flussi di dati. Una pipeline è una forma di rete. In una pipeline o in una rete le origini propagano in modo asincrono i dati alle destinazioni man mano che i dati diventano disponibili. Il ISourceBlock<TOutput>.LinkTo metodo collega un blocco del flusso di dati di origine a un blocco di destinazione. Un'origine può essere collegata a zero o più destinazioni; le destinazioni possono essere collegate da zero o più origini. È possibile aggiungere o rimuovere blocchi di flussi di dati da o verso una pipeline o da una rete contemporaneamente. I tipi di blocchi di flussi di dati predefiniti gestiscono tutti gli aspetti di thread-safety relativi al collegamento e all'scollegamento.

Per un esempio che connette blocchi di flussi di dati per formare una pipeline di base, vedere Procedura dettagliata: Creazione di una pipeline del flusso di dati. Per un esempio che connette blocchi di flussi di dati per formare una rete più complessa, vedere Procedura dettagliata: Uso del flusso di dati in un'applicazione Windows Form. Per un esempio che scollega una destinazione da un'origine dopo che l'origine offre un messaggio di destinazione, vedere Procedura: Scollegare blocchi di flussi di dati.

Filtraggio

Quando si chiama il metodo per collegare un'origine ISourceBlock<TOutput>.LinkTo a una destinazione, è possibile fornire un delegato che determina se il blocco di destinazione accetta o rifiuta un messaggio in base al valore di tale messaggio. Questo meccanismo di filtro è un modo utile per garantire che un blocco di flussi di dati riceva solo determinati valori. Per la maggior parte dei tipi di blocchi di flussi di dati predefiniti, se un blocco di origine è connesso a più blocchi di destinazione, quando un blocco di destinazione rifiuta un messaggio, l'origine offre tale messaggio alla destinazione successiva. L'ordine in cui un'origine offre messaggi alle destinazioni è definito dall'origine e può variare in base al tipo dell'origine. La maggior parte dei tipi di blocchi di origine smette di offrire un messaggio dopo che una destinazione accetta tale messaggio. Un'eccezione a questa regola è la BroadcastBlock<T> classe , che offre ogni messaggio a tutte le destinazioni, anche se alcune destinazioni rifiutano il messaggio. Per un esempio che usa il filtro per elaborare solo determinati messaggi, vedere Procedura dettagliata: Uso del flusso di dati in un'applicazione Windows Form.

Importante

Poiché ogni tipo di blocco del flusso di dati di origine predefinito garantisce che i messaggi vengano propagati nell'ordine in cui vengono ricevuti, ogni messaggio deve essere letto dal blocco di origine prima che il blocco di origine possa elaborare il messaggio successivo. Pertanto, quando usi il filtraggio per connettere più destinazioni a un'origine, assicurati che almeno un blocco di destinazione riceva ciascun messaggio. In caso contrario, l'applicazione potrebbe bloccarsi.

Passaggio di messaggi

Il modello di programmazione del flusso di dati è correlato al concetto di passaggio dei messaggi, in cui componenti indipendenti di un programma comunicano tra loro inviando messaggi. Un modo per propagare i messaggi tra i componenti dell'applicazione consiste nel chiamare i Post metodi (sincroni) e SendAsync (asincroni) per inviare messaggi ai blocchi di flussi di dati di destinazione e i Receivemetodi , ReceiveAsynce TryReceive per ricevere messaggi dai blocchi di origine. È possibile combinare questi metodi con pipeline o reti del flusso di dati inviando dati di input al nodo head (un blocco di destinazione) e ricevendo i dati di output dal nodo terminale della pipeline o dai nodi del terminale della rete (uno o più blocchi di origine). Puoi anche utilizzare il metodo Choose per leggere dalla prima delle origini fornite che abbia dati disponibili ed eseguire un'azione su tali dati.

I blocchi di origine offrono dati ai blocchi di destinazione chiamando il ITargetBlock<TInput>.OfferMessage metodo . Il blocco di destinazione risponde a un messaggio offerto in uno dei tre modi seguenti: può accettare il messaggio, rifiutare il messaggio o posticipare il messaggio. Quando la destinazione accetta il messaggio, il OfferMessage metodo restituisce Accepted. Quando la destinazione rifiuta il messaggio, il OfferMessage metodo restituisce Declined. Quando la destinazione richiede che non riceva più messaggi dall'origine, OfferMessage restituisce DecliningPermanently. I tipi di blocchi di origine predefiniti non offrono messaggi alle destinazioni collegate dopo la ricezione di un valore restituito di questo tipo e scollegano automaticamente da tali destinazioni.

Quando un blocco di destinazione posticipa il messaggio per un uso successivo, il OfferMessage metodo restituisce Postponed. Un blocco di destinazione che posticipa un messaggio può successivamente chiamare il ISourceBlock<TOutput>.ReserveMessage metodo per tentare di riservare il messaggio offerto. A questo punto, il messaggio è ancora disponibile e può essere usato dal blocco di destinazione oppure il messaggio è stato acquisito da un'altra destinazione. Quando il blocco di destinazione richiede successivamente il messaggio o non richiede più il messaggio, chiama rispettivamente il ISourceBlock<TOutput>.ConsumeMessage metodo o ReleaseReservation . La prenotazione dei messaggi viene in genere usata dai tipi di blocchi di flusso di dati che operano in modalità non vorace. La modalità "non greedy" è spiegata più avanti in questo documento. Invece di riservare un messaggio posticipato, un blocco di destinazione può usare anche il ISourceBlock<TOutput>.ConsumeMessage metodo per tentare di utilizzare direttamente il messaggio posticipato.

Completamento del blocco del flusso di dati

I blocchi di flussi di dati supportano anche il concetto di completamento. Un blocco del flusso di dati nello stato completato non esegue ulteriori operazioni. Ogni blocco di flussi di dati ha un oggetto associato System.Threading.Tasks.Task , noto come attività di completamento, che rappresenta lo stato di completamento del blocco. Poiché è possibile attendere il completamento di un Task oggetto, usando le attività di completamento, è possibile attendere il completamento di uno o più nodi terminal di una rete di flussi di dati. L'interfaccia IDataflowBlock definisce il metodo Complete, che informa il blocco del flusso di dati di una richiesta affinché sia completata, e la proprietà Completion, che restituisce l'attività di completamento per il blocco del flusso di dati. Sia ISourceBlock<TOutput> che ITargetBlock<TInput> ereditano l'interfaccia IDataflowBlock .

Esistono due modi per determinare se un blocco di flussi di dati è stato completato senza errori, ha rilevato uno o più errori o è stato annullato. Il primo consiste nel chiamare il metodo Task.Wait sull'attività di completamento in un blocco try-catch (Try-Catch in Visual Basic). Nell'esempio seguente, viene creato un oggetto ActionBlock<TInput> che lancia un'eccezione ArgumentOutOfRangeException se il valore di input è minore di zero. AggregateException viene lanciata quando questo esempio chiama Wait sul task di completamento. L'oggetto ArgumentOutOfRangeException è accessibile tramite la InnerExceptions proprietà dell'oggetto AggregateException .

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine($"n = {n}");
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

In questo esempio viene illustrato il caso in cui un'eccezione non viene gestita nel delegato di un blocco di flussi di dati di esecuzione. È consigliabile gestire le eccezioni all'interno di tali blocchi. Tuttavia, se non è possibile farlo, il blocco si comporta come se fosse stato annullato e non elabora i messaggi in arrivo.

Quando un blocco di flussi di dati viene annullato in modo esplicito, l'oggetto AggregateException contiene OperationCanceledException nella InnerExceptions proprietà . Per altre informazioni sull'annullamento del flusso di dati, vedere la sezione Abilitazione dell'annullamento .

Il secondo modo per determinare lo stato di completamento di un blocco di flussi di dati consiste nell'usare una continuazione dell'attività di completamento oppure per usare le funzionalità del linguaggio asincrono di C# e Visual Basic per attendere in modo asincrono l'attività di completamento. Il delegato che fornite al metodo Task.ContinueWith accetta un oggetto Task che rappresenta l'attività antecedente. Nel caso della Completion proprietà, il delegato per la continuazione accetta il compito stesso di completamento. L'esempio seguente è simile a quello precedente, ad eccezione del fatto che usa anche il ContinueWith metodo per creare un'attività di continuazione che stampa lo stato dell'operazione complessiva del flusso di dati.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine($"n = {n}");
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine($"The status of the completion task is '{task.Status}'.");
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Create a continuation task that prints the overall 
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

È anche possibile usare proprietà come IsCanceled nel corpo dell'attività di continuazione per determinare informazioni aggiuntive sullo stato di completamento di un blocco di flussi di dati. Per altre informazioni sulle attività di continuazione e sulla relativa correlazione con la gestione degli errori e dell'annullamento, vedere Concatenamento di attività tramite attività di continuazione, annullamento delle attività e gestione delle eccezioni.

Tipi di blocchi di flussi di dati predefiniti

La libreria di flussi di dati TPL fornisce diversi tipi di blocchi di flussi di dati predefiniti. Questi tipi sono suddivisi in tre categorie: blocchi di buffering, blocchi di esecuzione e blocchi di raggruppamento. Le sezioni seguenti descrivono i tipi di blocco che costituiscono queste categorie.

Blocchi di buffering

I blocchi di buffer contengono dati per l'uso da parte dei consumer di dati. La libreria di flussi di dati TPL fornisce tre tipi di blocchi di buffering: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>e System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.

BufferBlock<T>

La BufferBlock<T> classe rappresenta una struttura di messaggistica asincrona per utilizzo generico. Questa classe archivia una coda FIFO (First In, First Out) di messaggi che possono essere scritti da più fonti o letti da più destinazioni. Quando una destinazione riceve un messaggio da un BufferBlock<T> oggetto , tale messaggio viene rimosso dalla coda di messaggi. Pertanto, anche se un BufferBlock<T> oggetto può avere più destinazioni, ogni messaggio riceverà una sola destinazione. La BufferBlock<T> classe è utile quando si desidera passare più messaggi a un altro componente e tale componente deve ricevere ogni messaggio.

L'esempio di base seguente inserisce diversi Int32 valori in un BufferBlock<T> oggetto e quindi legge tali valori da tale oggetto.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

'          Output:
'            0
'            1
'            2
'          

Per un esempio completo che illustra come scrivere e leggere messaggi da un BufferBlock<T> oggetto , vedere Procedura: Scrivere messaggi in e leggere messaggi da un blocco di flussi di dati.

BroadcastBlock<T>

La BroadcastBlock<T> classe è utile quando è necessario passare più messaggi a un altro componente, ma tale componente richiede solo il valore più recente. Questa classe è utile anche quando si vuole trasmettere un messaggio a più componenti.

L'esempio di base seguente inserisce un Double valore in un BroadcastBlock<T> oggetto e quindi legge il valore da tale oggetto più volte. Poiché i valori non vengono rimossi dagli BroadcastBlock<T> oggetti dopo la lettura, lo stesso valore è disponibile ogni volta.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

' Post a message to the block.
broadcastBlock.Post(Math.PI)

' Receive the messages back from the block several times.
For i As Integer = 0 To 2
    Console.WriteLine(broadcastBlock.Receive())
Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

Per un esempio completo che illustra come usare BroadcastBlock<T> per trasmettere un messaggio a più blocchi di destinazione, vedere Procedura: Specificare un'utilità di pianificazione in un blocco di flussi di dati.

WriteOnceBlock<T>

La WriteOnceBlock<T> classe è simile alla BroadcastBlock<T> classe , ad eccezione del fatto che un WriteOnceBlock<T> oggetto può essere scritto una sola volta. Si può pensare di WriteOnceBlock<T> essere simile alla parola chiave C# readonly (ReadOnly in Visual Basic), ad eccezione del fatto che un WriteOnceBlock<T> oggetto diventa non modificabile dopo che riceve un valore anziché in fase di costruzione. Analogamente alla BroadcastBlock<T> classe , quando una destinazione riceve un messaggio da un WriteOnceBlock<T> oggetto , tale messaggio non viene rimosso da tale oggetto. Pertanto, più destinazioni ricevono una copia del messaggio. La WriteOnceBlock<T> classe è utile quando si desidera propagare solo il primo di più messaggi.

L'esempio di base seguente inserisce più String valori in un WriteOnceBlock<T> oggetto e quindi legge il valore da tale oggetto. Poiché un WriteOnceBlock<T> oggetto può essere scritto una sola volta, dopo che un WriteOnceBlock<T> oggetto riceve un messaggio, rimuove i messaggi successivi.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

' Post several messages to the block in parallel. The first 
' message to be received is written to the block. 
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

Per un esempio completo che illustra come usare WriteOnceBlock<T> per ricevere il valore della prima operazione completata, vedere Procedura: Scollegare blocchi di flussi di dati.

Blocchi di esecuzione

I blocchi di esecuzione, per ogni parte di dati ricevuti, chiamano un delegato fornito dall'utente. La libreria di flussi di dati TPL fornisce tre tipi di blocchi di esecuzione: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>e System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

ActionBlock<T>

La ActionBlock<TInput> classe è un blocco di destinazione che chiama un delegato quando riceve i dati. Si pensi a un ActionBlock<TInput> oggetto come delegato che viene eseguito in modo asincrono quando i dati diventano disponibili. Il delegato fornito a un ActionBlock<TInput> oggetto può essere di tipo Action<T> o di tipo System.Func<TInput, Task>. Quando si usa un ActionBlock<TInput> oggetto con Action<T>, l'elaborazione di ogni elemento di input viene considerata completata quando il delegato restituisce . Quando si usa un ActionBlock<TInput> oggetto con System.Func<TInput, Task>, l'elaborazione di ogni elemento di input viene considerata completata solo quando l'oggetto restituito Task viene completato. Usando questi due meccanismi, è possibile usare ActionBlock<TInput> sia per l'elaborazione sincrona che asincrona di ogni elemento di input.

L'esempio di base seguente inserisce più Int32 valori in un ActionBlock<TInput> oggetto . L'oggetto ActionBlock<TInput> stampa tali valori nella console. Questo esempio imposta quindi il blocco sullo stato completato e attende il completamento di tutte le attività del flusso di dati.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

' Post several messages to the block.
For i As Integer = 0 To 2
    actionBlock.Post(i * 10)
Next i

' Set the block to the completed state and wait for all 
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

Per esempi completi che illustrano come usare delegati con la ActionBlock<TInput> classe , vedere Procedura: Eseguire un'azione quando un blocco di flussi di dati riceve i dati.

TransformBlock<TInput, TOutput>

La classe TransformBlock<TInput,TOutput> è simile alla classe ActionBlock<TInput>, ad eccezione del fatto che agisce sia come origine che come destinazione. Il delegato passato a un TransformBlock<TInput,TOutput> oggetto restituisce un valore di tipo TOutput. Il delegato fornito a un TransformBlock<TInput,TOutput> oggetto può essere di tipo System.Func<TInput, TOutput> o di tipo System.Func<TInput, Task<TOutput>>. Quando si usa un TransformBlock<TInput,TOutput> oggetto con System.Func<TInput, TOutput>, l'elaborazione di ogni elemento di input viene considerata completata quando il delegato restituisce . Quando si usa un TransformBlock<TInput,TOutput> oggetto usato con System.Func<TInput, Task<TOutput>>, l'elaborazione di ogni elemento di input viene considerata completata solo quando l'oggetto restituito Task<TResult> viene completato. Come con ActionBlock<TInput>, usando questi due meccanismi, è possibile usare TransformBlock<TInput,TOutput> sia per l'elaborazione sincrona che asincrona di ogni elemento di input.

Nell'esempio di base seguente viene creato un TransformBlock<TInput,TOutput> oggetto che calcola la radice quadrata del relativo input. L'oggetto TransformBlock<TInput,TOutput> accetta valori Int32 come input e produce Double valori come output.

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
' Create a TransformBlock<int, double> object that 
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)

' Read the output messages from the block.
For i As Integer = 0 To 2
    Console.WriteLine(transformBlock.Receive())
Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Per esempi completi che usano TransformBlock<TInput,TOutput> in una rete di blocchi di flussi di dati che eseguono l'elaborazione di immagini in un'applicazione Windows Form, vedere Procedura dettagliata: Uso del flusso di dati in un'applicazione Windows Form.

TransformManyBlock<TInput, TOutput>

La TransformManyBlock<TInput,TOutput> classe è simile alla TransformBlock<TInput,TOutput> classe, eccetto che TransformManyBlock<TInput,TOutput> produce zero o più valori di output per ogni valore di input, anziché un solo valore di output per ogni valore di input. Il delegato fornito a un TransformManyBlock<TInput,TOutput> oggetto può essere di tipo System.Func<TInput, IEnumerable<TOutput>> o di tipo System.Func<TInput, Task<IEnumerable<TOutput>>>. Quando si usa un TransformManyBlock<TInput,TOutput> oggetto con System.Func<TInput, IEnumerable<TOutput>>, l'elaborazione di ogni elemento di input viene considerata completata quando il delegato restituisce . Quando si usa un TransformManyBlock<TInput,TOutput> oggetto con System.Func<TInput, Task<IEnumerable<TOutput>>>, l'elaborazione di ogni elemento di input viene considerata completa solo quando l'oggetto restituito System.Threading.Tasks.Task<IEnumerable<TOutput>> viene completato.

Nell'esempio di base seguente viene creato un TransformManyBlock<TInput,TOutput> oggetto che suddivide le stringhe nelle singole sequenze di caratteri. L'oggetto TransformManyBlock<TInput,TOutput> accetta valori String come input e produce Char valori come output.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")

' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
    Console.WriteLine(transformManyBlock.Receive())
Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

Per esempi completi che usano TransformManyBlock<TInput,TOutput> per produrre più output indipendenti per ogni input in una pipeline del flusso di dati, vedere Procedura dettagliata: Creazione di una pipeline del flusso di dati.

Grado di parallelismo

Ogni ActionBlock<TInput>oggetto , TransformBlock<TInput,TOutput>e TransformManyBlock<TInput,TOutput> memorizza nel buffer i messaggi di input fino a quando il blocco non è pronto per elaborarli. Per impostazione predefinita, queste classi elaborano i messaggi nell'ordine in cui vengono ricevuti, un messaggio alla volta. È anche possibile specificare il grado di parallelismo per consentire ActionBlock<TInput>TransformBlock<TInput,TOutput> agli oggetti e TransformManyBlock<TInput,TOutput> di elaborare più messaggi contemporaneamente. Per altre informazioni sull'esecuzione simultanea, vedere la sezione Specifica del grado di parallelismo più avanti in questo documento. Per un esempio che imposta il grado di parallelismo per consentire a un blocco di flussi di dati di esecuzione di elaborare più messaggi alla volta, vedere Procedura: Specificare il grado di parallelismo in un blocco di flussi di dati.

Riepilogo dei tipi delegati

La tabella seguente riepiloga i tipi delegati che è possibile fornire agli ActionBlock<TInput>oggetti , TransformBlock<TInput,TOutput>e TransformManyBlock<TInput,TOutput> . Questa tabella specifica anche se il tipo delegato opera in modo sincrono o asincrono.

TIPO Tipo delegato sincrono Tipo delegato asincrono
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

È anche possibile usare espressioni lambda quando si usano tipi di blocchi di esecuzione. Per un esempio che illustra come usare un'espressione lambda con un blocco di esecuzione, vedere Procedura: Eseguire un'azione quando un blocco di flussi di dati riceve dati.

Raggruppamento di blocchi

I blocchi di raggruppamento combinano i dati da una o più origini e in vari vincoli. La libreria di flussi di dati TPL fornisce tre tipi di blocchi di join: BatchBlock<T>, JoinBlock<T1,T2>e BatchedJoinBlock<T1,T2>.

BatchBlock<T>

La BatchBlock<T> classe combina set di dati di input, noti come batch, in matrici di dati di output. Specifichi la dimensione di ogni batch quando si crea l'oggetto BatchBlock<T>. Quando l'oggetto BatchBlock<T> riceve il conteggio specificato degli elementi di input, propaga in modo asincrono una matrice che contiene tali elementi. Se un BatchBlock<T> oggetto è impostato sullo stato completato ma non contiene elementi sufficienti per formare un batch, propaga una matrice finale che contiene gli elementi di input rimanenti.

La BatchBlock<T> classe opera in modalità greedy o non greedy . In modalità greedy, ovvero l'impostazione predefinita, un BatchBlock<T> oggetto accetta ogni messaggio offerto e propaga una matrice dopo che riceve il conteggio specificato di elementi. In modalità non greedy, un oggetto BatchBlock<T> ritarda tutti i messaggi ricevuti fino a quando non sono stati offerti al blocco messaggi sufficienti per formare un batch. La modalità Greedy offre in genere prestazioni migliori rispetto alla modalità non greedy perché richiede meno overhead di elaborazione. Tuttavia, è possibile usare la modalità non greedy quando è necessario coordinare l'utilizzo da più origini in modo atomico. Specificare la modalità non greedy impostando Greedy su False nel parametro dataflowBlockOptions nel costruttore BatchBlock<T>.

Nell'esempio di base seguente vengono inseriti diversi Int32 valori in un BatchBlock<T> oggetto che contiene dieci elementi in un batch. Per garantire che tutti i valori vengano propagati all'esterno di BatchBlock<T>, questo esempio chiama il metodo Complete. Il Complete metodo imposta l'oggetto BatchBlock<T> sullo stato completato e pertanto l'oggetto BatchBlock<T> propaga tutti gli elementi rimanenti come batch finale.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine($"The sum of the elements in batch 1 is {batchBlock.Receive().Sum()}.");

Console.WriteLine($"The sum of the elements in batch 2 is {batchBlock.Receive().Sum()}.");

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)

' Post several values to the block.
For i As Integer = 0 To 12
    batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()

' Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

Per un esempio completo che usa BatchBlock<T> per migliorare l'efficienza delle operazioni di inserimento del database, vedere Procedura dettagliata: Uso di BatchBlock e BatchedJoinBlock per migliorare l'efficienza.

JoinBlock<T1, T2, ...>

Le classi JoinBlock<T1,T2> e JoinBlock<T1,T2,T3> raccolgono gli elementi di input e propagano oggetti System.Tuple<T1,T2> o System.Tuple<T1,T2,T3> che contengono tali elementi. Le JoinBlock<T1,T2> classi e JoinBlock<T1,T2,T3> non ereditano da ITargetBlock<TInput>. Forniscono invece proprietà, Target1, Target2e Target3, che implementano ITargetBlock<TInput>.

Come BatchBlock<T>, JoinBlock<T1,T2> e JoinBlock<T1,T2,T3> operano in modalità avida o non avida. In modalità greedy, ovvero l'impostazione predefinita, un JoinBlock<T1,T2> o JoinBlock<T1,T2,T3> accetta tutti i messaggi che le vengono offerti e propaga una tupla dopo che ciascuna delle sue destinazioni ha ricevuto almeno un messaggio. In modalità non greedy, un oggetto JoinBlock<T1,T2> o JoinBlock<T1,T2,T3> posticipa tutti i messaggi in arrivo fino a quando non sono stati offerti a tutti i destinatari i dati richiesti per creare una tupla. A questo punto, il blocco si impegna in un protocollo di commit in due fasi per recuperare in modo atomico tutti gli elementi necessari dalle origini. Questo rinvio consente a un'altra entità di utilizzare i dati nel frattempo, per consentire al sistema complessivo di avanzare.

Nell'esempio di base seguente viene illustrato un caso in cui un JoinBlock<T1,T2,T3> oggetto richiede più dati per calcolare un valore. In questo esempio viene creato un JoinBlock<T1,T2,T3> oggetto che richiede due Int32 valori e un Char valore per eseguire un'operazione aritmetica.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine($"{data.Item1} + {data.Item2} = {data.Item1 + data.Item2}");
         break;
      case '-':
         Console.WriteLine($"{data.Item1} - {data.Item2} = {data.Item1 - data.Item2}");
         break;
      default:
         Console.WriteLine($"Unknown operator '{data.Item3}'.");
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

' Post two values to each target of the join.

joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)

joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)

joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)

' Receive each group of values and apply the operator part
' to the number parts.

For i As Integer = 0 To 1
    Dim data = joinBlock.Receive()
    Select Case data.Item3
        Case "+"c
            Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
        Case "-"c
            Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
        Case Else
            Console.WriteLine("Unknown operator '{0}'.", data.Item3)
    End Select
Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

Per un esempio completo che utilizza oggetti JoinBlock<T1,T2> in modalità non avido per condividere in modo cooperativo una risorsa, vedere Procedura: Usare JoinBlock per leggere i dati da più origini.

BatchedJoinBlock<T1, T2, ...>

Le classi BatchedJoinBlock<T1,T2> e BatchedJoinBlock<T1,T2,T3> raccolgono batch di elementi di input e propagano System.Tuple(IList(T1), IList(T2)) o System.Tuple(IList(T1), IList(T2), IList(T3)) oggetti che contengono tali elementi. BatchedJoinBlock<T1,T2> Si consideri come una combinazione di BatchBlock<T> e JoinBlock<T1,T2>. Specificare le dimensioni di ogni batch quando si crea un BatchedJoinBlock<T1,T2> oggetto . BatchedJoinBlock<T1,T2> fornisce anche proprietà Target1 e Target2, che implementano ITargetBlock<TInput>. Quando il numero specificato di elementi di input viene ricevuto da tutte le destinazioni, l'oggetto BatchedJoinBlock<T1,T2> propaga in modo asincrono un System.Tuple(IList(T1), IList(T2)) oggetto che contiene tali elementi.

Il seguente esempio di base crea un oggetto BatchedJoinBlock<T1,T2> che conserva risultati, valori Int32 ed errori come oggetti Exception. In questo esempio vengono eseguite più operazioni e vengono scritti i risultati nella Target1 proprietà ed errori nella Target2 proprietà dell'oggetto BatchedJoinBlock<T1,T2> . Poiché il numero di operazioni riuscite e non riuscite è sconosciuto in anticipo, gli IList<T> oggetti consentono a ogni destinazione di ricevere zero o più valori.

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */
' For demonstration, create a Func<int, int> that 
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
                                              If n < 0 Then
                                                  Throw New ArgumentOutOfRangeException()
                                              End If
                                              Return n
                                          End Function

' Create a BatchedJoinBlock<int, Exception> object that holds 
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
    Try
        ' Post the result of the worker to the 
        ' first target of the block.
        batchedJoinBlock.Target1.Post(DoWork(i))
    Catch e As ArgumentOutOfRangeException
        ' If an error occurred, post the Exception to the 
        ' second target of the block.
        batchedJoinBlock.Target2.Post(e)
    End Try
Next i

' Read the results from the block.
Dim results = batchedJoinBlock.Receive()

' Print the results to the console.

' Print the results.
For Each n As Integer In results.Item1
    Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
    Console.WriteLine(e.Message)
Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

Per un esempio completo che usa BatchedJoinBlock<T1,T2> per acquisire sia i risultati che le eccezioni che si verificano durante la lettura del programma da un database, vedere Procedura dettagliata: Uso di BatchBlock e BatchedJoinBlock per migliorare l'efficienza.

Configurazione del comportamento del blocco del flusso di dati

È possibile abilitare opzioni aggiuntive fornendo un System.Threading.Tasks.Dataflow.DataflowBlockOptions oggetto al costruttore dei tipi di blocchi di flussi di dati. Queste opzioni controllano il comportamento dell'utilità di pianificazione che gestisce l'attività sottostante e il grado di parallelismo. Include DataflowBlockOptions anche tipi derivati che specificano un comportamento specifico per determinati tipi di blocchi di flussi di dati. La tabella seguente riepiloga il tipo di opzioni associato a ogni tipo di blocco del flusso di dati.

Tipo di blocco del flusso di dati DataflowBlockOptions tipo
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

Le sezioni seguenti forniscono informazioni aggiuntive sui tipi importanti di opzioni del blocco di flussi di dati disponibili tramite le System.Threading.Tasks.Dataflow.DataflowBlockOptionsclassi , System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptionse System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions .

Specificare l'Utilità di Pianificazione

Ogni blocco di flussi di dati predefinito usa il meccanismo di pianificazione delle attività TPL per eseguire attività come la propagazione dei dati a una destinazione, la ricezione di dati da un'origine e l'esecuzione di delegati definiti dall'utente quando i dati diventano disponibili. TaskScheduler è una classe astratta che rappresenta un'utilità di pianificazione che accoda le attività nei thread. L'utilità di pianificazione attività predefinita, Default, usa la classe ThreadPool per accodare ed eseguire il lavoro. È possibile eseguire l'override dell'utilità di pianificazione predefinita impostando la TaskScheduler proprietà quando si costruisce un oggetto blocco di flussi di dati.

Quando lo stesso scheduler gestisce più blocchi di flussi di dati, può applicare criteri comuni tra di essi. Ad esempio, se più blocchi di flusso dati sono configurati per l'utilità esclusiva di pianificazione dello stesso oggetto ConcurrentExclusiveSchedulerPair, tutte le operazioni eseguite in questi blocchi vengono serializzate. Analogamente, se questi blocchi sono configurati per lo schedulatore concorrente dello stesso ConcurrentExclusiveSchedulerPair oggetto e tale schedulatore è configurato per avere un livello massimo di concorrenza, tutto il lavoro proveniente da questi blocchi è limitato a tale numero di operazioni concorrenti. Per un esempio che utilizza la classe ConcurrentExclusiveSchedulerPair per consentire l'esecuzione di operazioni di lettura in parallelo, ma di scrittura in modo esclusivo rispetto a tutte le altre operazioni, vedere Come: Specificare un'utilità di pianificazione in un blocco di flusso di dati. Per ulteriori informazioni sulle utilità di pianificazione delle attività nel TPL, vedere l'argomento della classe TaskScheduler.

Specifica del grado di parallelismo

Per impostazione predefinita, i tre tipi di blocchi di esecuzione forniti dalla libreria di flussi di dati TPL, , ActionBlock<TInput>TransformBlock<TInput,TOutput>e TransformManyBlock<TInput,TOutput>elaborano un messaggio alla volta. Questi tipi di blocchi di flussi di dati elaborano anche i messaggi nell'ordine in cui vengono ricevuti. Per abilitare questi blocchi di flussi di dati per elaborare i messaggi contemporaneamente, impostare la ExecutionDataflowBlockOptions.MaxDegreeOfParallelism proprietà quando si costruisce l'oggetto blocco del flusso di dati.

Il valore predefinito di MaxDegreeOfParallelism è 1, che garantisce che il blocco del flusso di dati elabora un messaggio alla volta. L'impostazione di questa proprietà su un valore maggiore di 1 consente al blocco di flussi di dati di elaborare più messaggi contemporaneamente. Impostare questa proprietà su DataflowBlockOptions.Unbounded consente al task scheduler sottostante di gestire il grado massimo di concorrenza.

Importante

Quando si specifica un grado massimo di parallelismo maggiore di 1, più messaggi vengono elaborati contemporaneamente e pertanto i messaggi potrebbero non essere elaborati nell'ordine in cui vengono ricevuti. L'ordine in cui i messaggi vengono restituiti dal blocco è, tuttavia, lo stesso in cui vengono ricevuti.

Poiché la MaxDegreeOfParallelism proprietà rappresenta il grado massimo di parallelismo, il blocco di flussi di dati potrebbe essere eseguito con un grado di parallelismo minore di quello specificato. Il blocco del flusso di dati potrebbe usare un grado di parallelismo minore per soddisfare i requisiti funzionali o perché non sono disponibili risorse di sistema. Un blocco di flussi di dati non sceglie mai più parallelismo di quello specificato.

Il valore della MaxDegreeOfParallelism proprietà è esclusivo di ogni oggetto blocco del flusso di dati. Ad esempio, se quattro oggetti blocco del flusso di dati specificano 1 per il grado massimo di parallelismo, tutti e quattro gli oggetti blocco di flussi di dati possono essere eseguiti in parallelo.

Per un esempio che imposta il grado massimo di parallelismo per consentire l'esecuzione di operazioni lunghe in parallelo, vedere Procedura: Specificare il grado di parallelismo in un blocco di flussi di dati.

Specifica il numero di messaggi per ogni attività

I tipi di blocchi di flussi di dati predefiniti usano attività per elaborare più elementi di input. Ciò consente di ridurre al minimo il numero di oggetti attività necessari per elaborare i dati, che consente alle applicazioni di eseguire in modo più efficiente. Tuttavia, quando le attività di un set di blocchi di flussi di dati elaborano i dati, le attività di altri blocchi di flussi di dati potrebbero dover attendere il tempo di elaborazione accodando i messaggi. Per migliorare l'equità tra le attività del flusso di dati, impostare la MaxMessagesPerTask proprietà . Quando MaxMessagesPerTask è impostato su DataflowBlockOptions.Unbounded, ovvero l'impostazione predefinita, l'attività usata da un blocco di flussi di dati elabora il numero di messaggi disponibili. Quando MaxMessagesPerTask è impostato su un valore diverso da Unbounded, il blocco del flusso di dati elabora al massimo questo numero di messaggi per Task oggetto. Anche se l'impostazione della proprietà può aumentare l'equità MaxMessagesPerTask tra le attività, il sistema può creare più attività di quelle necessarie, riducendo così le prestazioni.

Abilitare l'annullamento

Il TPL fornisce un meccanismo che consente alle attività di coordinare l'annullamento in modo cooperativo. Per consentire ai blocchi del flusso di dati di partecipare a questo meccanismo di annullamento, impostare la CancellationToken proprietà . Quando questo CancellationToken oggetto è impostato sullo stato annullato, tutti i blocchi di flussi di dati che monitorano l'esecuzione del token completano l'esecuzione dell'elemento corrente, ma non avviano l'elaborazione degli elementi successivi. Questi blocchi di flussi di dati cancellano anche eventuali messaggi memorizzati nel buffer, rilasciano connessioni a qualsiasi blocco di origine e destinazione e passano allo stato annullato. Passando allo stato annullato, la Completion proprietà ha la Status proprietà impostata su Canceled, a meno che non si sia verificata un'eccezione durante l'elaborazione. In tal caso, Status è impostato su Faulted.

Per un esempio che illustra come usare l'annullamento in un'applicazione Windows Form, vedere Procedura: Annullare un blocco di flussi di dati. Per altre informazioni sull'annullamento in TPL, vedere Annullamento attività.

Specifica del comportamento avido e non avide

Diversi tipi di blocchi di raggruppamento dei flussi di dati possono operare in modalità greedy o non-greedy. Per impostazione predefinita, i tipi di blocchi di flussi di dati predefiniti operano in modalità greedy.

Per i tipi di blocchi join come JoinBlock<T1,T2>, la modalità greedy indica che il blocco accetta immediatamente i dati anche se i dati corrispondenti con cui eseguire il join non sono ancora disponibili. La modalità non vorace indica che il blocco rimanda tutti i messaggi in arrivo fino a quando non ne è disponibile uno per ciascuno dei suoi obiettivi per completare l'aggregazione. Se uno dei messaggi posticipati non è più disponibile, il blocco di join rilascia tutti i messaggi posticipati e riavvia il processo. Per la classe BatchBlock<T>, il comportamento greedy e non greedy è simile, tranne che in modalità non greedy, un oggetto BatchBlock<T> posticipa tutti i messaggi in arrivo fino a quando non sono disponibili abbastanza messaggi da origini distinte per completare un batch.

Per specificare la modalità non greedy per un blocco di flusso di dati, impostare Greedy su False. Per un esempio che illustra come usare la modalità non greedy per consentire a più blocchi di join di condividere un'origine dati in modo più efficiente, vedere Procedura: Usare JoinBlock per leggere i dati da più origini.

Blocchi di flussi di dati personalizzati

Sebbene la libreria di flussi di dati TPL fornisca molti tipi di blocchi predefiniti, è possibile creare tipi di blocco aggiuntivi che eseguono comportamenti personalizzati. Implementare direttamente le ISourceBlock<TOutput> interfacce o ITargetBlock<TInput> o usare il Encapsulate metodo per compilare un blocco complesso che incapsula il comportamento dei tipi di blocchi esistenti. Per esempi che illustrano come implementare funzionalità di blocco di flussi di dati personalizzate, vedere Procedura dettagliata: Creazione di un tipo di blocco di flussi di dati personalizzato.

Titolo Descrizione
Procedura: Scrivere messaggi in e leggere messaggi da un blocco di flussi di dati Illustra come scrivere e leggere messaggi da un BufferBlock<T> oggetto .
Procedura: Implementare un modello di flusso di dati Producer-Consumer Viene descritto come usare il modello di flusso di dati per implementare un modello producer-consumer, in cui il producer invia messaggi a un blocco di flussi di dati e il consumer legge i messaggi da tale blocco.
Procedura: Eseguire un'azione quando un blocco di flussi di dati riceve i dati Viene descritto come fornire delegati ai tipi di blocchi del flusso di dati di esecuzione, ActionBlock<TInput>, TransformBlock<TInput,TOutput>e TransformManyBlock<TInput,TOutput>.
Procedura dettagliata: Creazione di una pipeline del flusso di dati Viene descritto come creare una pipeline del flusso di dati che scarica il testo dal Web ed esegue operazioni su tale testo.
Procedura: Scollegare i blocchi di flussi di dati Illustra come utilizzare il metodo LinkTo per scollegare un blocco target dalla sua origine dopo che quest'ultima offre un messaggio al blocco target.
Procedura dettagliata: Uso del flusso di dati in un'applicazione Windows Form Illustra come creare una rete di blocchi di flussi di dati che eseguono l'elaborazione di immagini in un'applicazione Windows Form.
Procedura: Annullare un blocco di flussi di dati Illustra come usare l'annullamento in un'applicazione Windows Form.
Procedura: Usare JoinBlock per leggere i dati da più origini Viene illustrato come usare la classe JoinBlock<T1,T2> per eseguire un'operazione quando i dati sono disponibili da più origini e come usare la modalità non greedy per consentire a più blocchi di join di condividere un'origine dati in modo più efficiente.
Procedura: Specificare il grado di parallelismo in un blocco di flussi di dati Viene descritto come impostare la MaxDegreeOfParallelism proprietà per consentire a un blocco di flussi di dati di esecuzione di elaborare più messaggi alla volta.
Procedura: Specificare un pianificatore di attività in un blocco Dataflow Illustra come associare un pianificatore di attività specifico quando si utilizza il flusso di dati nella tua applicazione.
Procedura dettagliata: Uso di BatchBlock e BatchedJoinBlock per migliorare l'efficienza Viene descritto come usare la BatchBlock<T> classe per migliorare l'efficienza delle operazioni di inserimento del database e come usare la BatchedJoinBlock<T1,T2> classe per acquisire sia i risultati che le eccezioni che si verificano durante la lettura del programma da un database.
Procedura dettagliata: Creazione di un tipo di blocco di flussi di dati personalizzato Illustra due modi per creare un tipo di blocco del flusso di dati che implementa un comportamento personalizzato.
Task Parallel Library (TPL) Introduce il TPL, una libreria che semplifica la programmazione parallela e simultanea nelle applicazioni .NET Framework.