Aracılığıyla paylaş


Nasıl yapılır: Veri Akışı Bloklarının Bağlantısını Kaldırma

Bu belgede, hedef veri akışı bloğunun kaynağıyla bağlantısının nasıl kaldırıldığı açıklanır.

Uyarı

TPL veri akışı kitaplığı ( System.Threading.Tasks.Dataflow ad alanı), .NET 6 ve sonraki sürümlerde bulunur. .NET Framework ve .NET Standard projeleri için System.Threading.Tasks.Dataflow NuGet paketini yüklemeniz📦 gerekir.

Example

Aşağıdaki örnek, her biri bir değeri hesaplamak için TrySolution yöntemini çağıran üç TransformBlock<TInput,TOutput> nesnesi oluşturur. Bu örnekte, yalnızca TrySolution için yapılan ilk çağrının sonucunun bitmesi gerekir.

using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to unlink dataflow blocks.
class DataflowReceiveAny
{
   // Receives the value from the first provided source that has
   // a message.
   public static T ReceiveFromAny<T>(params ISourceBlock<T>[] sources)
   {
      // Create a WriteOnceBlock<T> object and link it to each source block.
      var writeOnceBlock = new WriteOnceBlock<T>(e => e);
      foreach (var source in sources)
      {
         // Setting MaxMessages to one instructs
         // the source block to unlink from the WriteOnceBlock<T> object
         // after offering the WriteOnceBlock<T> object one message.
         source.LinkTo(writeOnceBlock, new DataflowLinkOptions { MaxMessages = 1 });
      }
      // Return the first value that is offered to the WriteOnceBlock object.
      return writeOnceBlock.Receive();
   }

   // Demonstrates a function that takes several seconds to produce a result.
   static int TrySolution(int n, CancellationToken ct)
   {
      // Simulate a lengthy operation that completes within three seconds
      // or when the provided CancellationToken object is cancelled.
      SpinWait.SpinUntil(() => ct.IsCancellationRequested,
         new Random().Next(3000));

      // Return a value.
      return n + 42;
   }

   static void Main(string[] args)
   {
      // Create a shared CancellationTokenSource object to enable the
      // TrySolution method to be cancelled.
      var cts = new CancellationTokenSource();

      // Create three TransformBlock<int, int> objects.
      // Each TransformBlock<int, int> object calls the TrySolution method.
      Func<int, int> action = n => TrySolution(n, cts.Token);
      var trySolution1 = new TransformBlock<int, int>(action);
      var trySolution2 = new TransformBlock<int, int>(action);
      var trySolution3 = new TransformBlock<int, int>(action);

      // Post data to each TransformBlock<int, int> object.
      trySolution1.Post(11);
      trySolution2.Post(21);
      trySolution3.Post(31);

      // Call the ReceiveFromAny<T> method to receive the result from the
      // first TransformBlock<int, int> object to finish.
      int result = ReceiveFromAny(trySolution1, trySolution2, trySolution3);

      // Cancel all calls to TrySolution that are still active.
      cts.Cancel();

      // Print the result to the console.
      Console.WriteLine($"The solution is {result}.");

      cts.Dispose();
   }
}

/* Sample output:
The solution is 53.
*/
Imports System.Threading
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to unlink dataflow blocks.
Friend Class DataflowReceiveAny
    ' Receives the value from the first provided source that has 
    ' a message.
    Public Shared Function ReceiveFromAny(Of T)(ParamArray ByVal sources() As ISourceBlock(Of T)) As T
        ' Create a WriteOnceBlock<T> object and link it to each source block.
        Dim writeOnceBlock = New WriteOnceBlock(Of T)(Function(e) e)
        For Each source In sources
            ' Setting MaxMessages to one instructs
            ' the source block to unlink from the WriteOnceBlock<T> object
            ' after offering the WriteOnceBlock<T> object one message.
            source.LinkTo(writeOnceBlock, New DataflowLinkOptions With {.MaxMessages = 1})
        Next source
        ' Return the first value that is offered to the WriteOnceBlock object.
        Return writeOnceBlock.Receive()
    End Function

    ' Demonstrates a function that takes several seconds to produce a result.
    Private Shared Function TrySolution(ByVal n As Integer, ByVal ct As CancellationToken) As Integer
        ' Simulate a lengthy operation that completes within three seconds
        ' or when the provided CancellationToken object is cancelled.
        SpinWait.SpinUntil(Function() ct.IsCancellationRequested, New Random().Next(3000))

        ' Return a value.
        Return n + 42
    End Function

    Shared Sub Main(ByVal args() As String)
        ' Create a shared CancellationTokenSource object to enable the 
        ' TrySolution method to be cancelled.
        Dim cts = New CancellationTokenSource()

        ' Create three TransformBlock<int, int> objects. 
        ' Each TransformBlock<int, int> object calls the TrySolution method.
        Dim action As Func(Of Integer, Integer) = Function(n) TrySolution(n, cts.Token)
        Dim trySolution1 = New TransformBlock(Of Integer, Integer)(action)
        Dim trySolution2 = New TransformBlock(Of Integer, Integer)(action)
        Dim trySolution3 = New TransformBlock(Of Integer, Integer)(action)

        ' Post data to each TransformBlock<int, int> object.
        trySolution1.Post(11)
        trySolution2.Post(21)
        trySolution3.Post(31)

        ' Call the ReceiveFromAny<T> method to receive the result from the 
        ' first TransformBlock<int, int> object to finish.
        Dim result As Integer = ReceiveFromAny(trySolution1, trySolution2, trySolution3)

        ' Cancel all calls to TrySolution that are still active.
        cts.Cancel()

        ' Print the result to the console.
        Console.WriteLine("The solution is {0}.", result)

        cts.Dispose()
    End Sub
End Class

' Sample output:
'The solution is 53.
'

Bu örnek, biten ilk TransformBlock<TInput,TOutput> nesneden değer almak için ReceiveFromAny(T) yöntemini tanımlar. ReceiveFromAny(T) yöntemi bir nesne dizisi ISourceBlock<TOutput> kabul eder ve bu nesnelerin her birini bir WriteOnceBlock<T> nesneye bağlar. Bir kaynak veri akışı bloğunu LinkTo hedef bloğa bağlamak için yöntemini kullandığınızda, veriler kullanılabilir hale geldikçe kaynak iletileri hedefe yayılır. WriteOnceBlock<T> sınıfı yalnızca teklif edilen ilk iletiyi kabul ettiğinden, ReceiveFromAny(T) yöntemi sonucunu Receive yöntemini çağırarak üretir. Bu, nesnesine WriteOnceBlock<T> sunulan ilk iletiyi oluşturur. LinkTo yönteminde, bir 1 nesnesi alan aşırı yüklenmiş bir sürüm vardır; bu nesne, özellikleri MaxMessages olduğunda hedef bir mesaj aldığında kaynak bloğun hedeften bağlantısını kaldırmasını bildirir. Kaynak dizisi ile WriteOnceBlock<T> nesnesi arasındaki ilişki, WriteOnceBlock<T> bir ileti aldıktan sonra artık gerekli olmadığından, WriteOnceBlock<T> nesnesinin kaynaklarından bağlantısını kaldırması önemlidir.

Biri bir değeri hesapladıktan sonra kalan çağrıların TrySolution sona ermesini sağlamak için, ReceiveFromAny(T) çağrısı döndürüldükten sonra iptal edilen bir CancellationToken nesnesini alan TrySolution yöntemini kullanır. SpinUntil yöntemi, bu CancellationToken nesne iptal edildiğinde döner.

Ayrıca bakınız