Condividi tramite


Procedura: Scollegare i blocchi di flussi di dati

Questo documento descrive come scollegare un blocco di flussi di dati di destinazione dall'origine.

Annotazioni

La libreria di flussi di dati TPL (il namespace System.Threading.Tasks.Dataflow) è inclusa in .NET 6 e versioni successive. Per i progetti .NET Framework e .NET Standard, è necessario installare il 📦 pacchetto NuGet System.Threading.Tasks.Dataflow.

Example

Nell'esempio seguente vengono creati tre TransformBlock<TInput,TOutput> oggetti, ognuno dei quali chiama il TrySolution metodo per calcolare un valore. Questo esempio richiede solo il risultato della prima chiamata a TrySolution per terminare.

using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to unlink dataflow blocks.
class DataflowReceiveAny
{
   // Receives the value from the first provided source that has
   // a message.
   public static T ReceiveFromAny<T>(params ISourceBlock<T>[] sources)
   {
      // Create a WriteOnceBlock<T> object and link it to each source block.
      var writeOnceBlock = new WriteOnceBlock<T>(e => e);
      foreach (var source in sources)
      {
         // Setting MaxMessages to one instructs
         // the source block to unlink from the WriteOnceBlock<T> object
         // after offering the WriteOnceBlock<T> object one message.
         source.LinkTo(writeOnceBlock, new DataflowLinkOptions { MaxMessages = 1 });
      }
      // Return the first value that is offered to the WriteOnceBlock object.
      return writeOnceBlock.Receive();
   }

   // Demonstrates a function that takes several seconds to produce a result.
   static int TrySolution(int n, CancellationToken ct)
   {
      // Simulate a lengthy operation that completes within three seconds
      // or when the provided CancellationToken object is cancelled.
      SpinWait.SpinUntil(() => ct.IsCancellationRequested,
         new Random().Next(3000));

      // Return a value.
      return n + 42;
   }

   static void Main(string[] args)
   {
      // Create a shared CancellationTokenSource object to enable the
      // TrySolution method to be cancelled.
      var cts = new CancellationTokenSource();

      // Create three TransformBlock<int, int> objects.
      // Each TransformBlock<int, int> object calls the TrySolution method.
      Func<int, int> action = n => TrySolution(n, cts.Token);
      var trySolution1 = new TransformBlock<int, int>(action);
      var trySolution2 = new TransformBlock<int, int>(action);
      var trySolution3 = new TransformBlock<int, int>(action);

      // Post data to each TransformBlock<int, int> object.
      trySolution1.Post(11);
      trySolution2.Post(21);
      trySolution3.Post(31);

      // Call the ReceiveFromAny<T> method to receive the result from the
      // first TransformBlock<int, int> object to finish.
      int result = ReceiveFromAny(trySolution1, trySolution2, trySolution3);

      // Cancel all calls to TrySolution that are still active.
      cts.Cancel();

      // Print the result to the console.
      Console.WriteLine($"The solution is {result}.");

      cts.Dispose();
   }
}

/* Sample output:
The solution is 53.
*/
Imports System.Threading
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to unlink dataflow blocks.
Friend Class DataflowReceiveAny
    ' Receives the value from the first provided source that has 
    ' a message.
    Public Shared Function ReceiveFromAny(Of T)(ParamArray ByVal sources() As ISourceBlock(Of T)) As T
        ' Create a WriteOnceBlock<T> object and link it to each source block.
        Dim writeOnceBlock = New WriteOnceBlock(Of T)(Function(e) e)
        For Each source In sources
            ' Setting MaxMessages to one instructs
            ' the source block to unlink from the WriteOnceBlock<T> object
            ' after offering the WriteOnceBlock<T> object one message.
            source.LinkTo(writeOnceBlock, New DataflowLinkOptions With {.MaxMessages = 1})
        Next source
        ' Return the first value that is offered to the WriteOnceBlock object.
        Return writeOnceBlock.Receive()
    End Function

    ' Demonstrates a function that takes several seconds to produce a result.
    Private Shared Function TrySolution(ByVal n As Integer, ByVal ct As CancellationToken) As Integer
        ' Simulate a lengthy operation that completes within three seconds
        ' or when the provided CancellationToken object is cancelled.
        SpinWait.SpinUntil(Function() ct.IsCancellationRequested, New Random().Next(3000))

        ' Return a value.
        Return n + 42
    End Function

    Shared Sub Main(ByVal args() As String)
        ' Create a shared CancellationTokenSource object to enable the 
        ' TrySolution method to be cancelled.
        Dim cts = New CancellationTokenSource()

        ' Create three TransformBlock<int, int> objects. 
        ' Each TransformBlock<int, int> object calls the TrySolution method.
        Dim action As Func(Of Integer, Integer) = Function(n) TrySolution(n, cts.Token)
        Dim trySolution1 = New TransformBlock(Of Integer, Integer)(action)
        Dim trySolution2 = New TransformBlock(Of Integer, Integer)(action)
        Dim trySolution3 = New TransformBlock(Of Integer, Integer)(action)

        ' Post data to each TransformBlock<int, int> object.
        trySolution1.Post(11)
        trySolution2.Post(21)
        trySolution3.Post(31)

        ' Call the ReceiveFromAny<T> method to receive the result from the 
        ' first TransformBlock<int, int> object to finish.
        Dim result As Integer = ReceiveFromAny(trySolution1, trySolution2, trySolution3)

        ' Cancel all calls to TrySolution that are still active.
        cts.Cancel()

        ' Print the result to the console.
        Console.WriteLine("The solution is {0}.", result)

        cts.Dispose()
    End Sub
End Class

' Sample output:
'The solution is 53.
'

Per ricevere il valore dal primo TransformBlock<TInput,TOutput> oggetto che termina, questo esempio definisce il ReceiveFromAny(T) metodo . Il ReceiveFromAny(T) metodo accetta una matrice di ISourceBlock<TOutput> oggetti e collega ognuno di questi oggetti a un WriteOnceBlock<T> oggetto . Quando si usa il LinkTo metodo per collegare un blocco di flussi di dati di origine a un blocco di destinazione, l'origine propaga i messaggi alla destinazione man mano che i dati diventano disponibili. Poiché la WriteOnceBlock<T> classe accetta solo il primo messaggio offerto, il metodo produce il ReceiveFromAny(T) risultato chiamando il Receive metodo . In questo modo viene generato il primo messaggio offerto all'oggetto WriteOnceBlock<T> . Il LinkTo metodo ha una versione sovraccaricata che accetta un oggetto DataflowLinkOptions con una proprietà MaxMessages che, quando è impostata su 1, istruisce il blocco sorgente di scollegarsi dal bersaglio dopo che il bersaglio riceve un messaggio dal blocco sorgente. È importante scollegare l'oggetto WriteOnceBlock<T> dalle origini perché la relazione tra la matrice di origini e l'oggetto non è più necessaria dopo che l'oggetto WriteOnceBlock<T>WriteOnceBlock<T> riceve un messaggio.

Per consentire alle chiamate rimanenti di TrySolution di terminare dopo che una di esse calcola un valore, il metodo TrySolution accetta un oggetto CancellationToken che viene annullato dopo che la chiamata a ReceiveFromAny(T) restituisce. Il SpinUntil metodo restituisce quando questo CancellationToken oggetto viene annullato.

Vedere anche