Cara: Membatalkan Tautan Blok Aliran Data

Dokumen ini menjelaskan cara membatalkan tautan blok aliran data target dari sumbernya.


Pustaka Aliran Data TPL (namespace layanan System.Threading.Tasks.Dataflow) tidak didistribusikan dengan .NET. Untuk menginstal namespace layanan System.Threading.Tasks.Dataflow di Visual Studio, buka proyek, pilih Kelola Paket NuGet dari menu Proyek, dan cari paket System.Threading.Tasks.Dataflow secara online. Atau, untuk menginstalnya menggunakan .NET Core CLI, jalankan dotnet add package System.Threading.Tasks.Dataflow.


Contoh berikut membuat tiga objek TransformBlock<TInput,TOutput>, yang masing-masing memanggil metode TrySolution untuk menghitung nilai. Contoh ini hanya membutuhkan hasil dari panggilan pertama ke TrySolution hingga selesai.

using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to unlink dataflow blocks.
class DataflowReceiveAny
   // Receives the value from the first provided source that has
   // a message.
   public static T ReceiveFromAny<T>(params ISourceBlock<T>[] sources)
      // Create a WriteOnceBlock<T> object and link it to each source block.
      var writeOnceBlock = new WriteOnceBlock<T>(e => e);
      foreach (var source in sources)
         // Setting MaxMessages to one instructs
         // the source block to unlink from the WriteOnceBlock<T> object
         // after offering the WriteOnceBlock<T> object one message.
         source.LinkTo(writeOnceBlock, new DataflowLinkOptions { MaxMessages = 1 });
      // Return the first value that is offered to the WriteOnceBlock object.
      return writeOnceBlock.Receive();

   // Demonstrates a function that takes several seconds to produce a result.
   static int TrySolution(int n, CancellationToken ct)
      // Simulate a lengthy operation that completes within three seconds
      // or when the provided CancellationToken object is cancelled.
      SpinWait.SpinUntil(() => ct.IsCancellationRequested,
         new Random().Next(3000));

      // Return a value.
      return n + 42;

   static void Main(string[] args)
      // Create a shared CancellationTokenSource object to enable the
      // TrySolution method to be cancelled.
      var cts = new CancellationTokenSource();

      // Create three TransformBlock<int, int> objects.
      // Each TransformBlock<int, int> object calls the TrySolution method.
      Func<int, int> action = n => TrySolution(n, cts.Token);
      var trySolution1 = new TransformBlock<int, int>(action);
      var trySolution2 = new TransformBlock<int, int>(action);
      var trySolution3 = new TransformBlock<int, int>(action);

      // Post data to each TransformBlock<int, int> object.

      // Call the ReceiveFromAny<T> method to receive the result from the
      // first TransformBlock<int, int> object to finish.
      int result = ReceiveFromAny(trySolution1, trySolution2, trySolution3);

      // Cancel all calls to TrySolution that are still active.

      // Print the result to the console.
      Console.WriteLine("The solution is {0}.", result);


/* Sample output:
The solution is 53.
Imports System.Threading
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to unlink dataflow blocks.
Friend Class DataflowReceiveAny
    ' Receives the value from the first provided source that has 
    ' a message.
    Public Shared Function ReceiveFromAny(Of T)(ParamArray ByVal sources() As ISourceBlock(Of T)) As T
        ' Create a WriteOnceBlock<T> object and link it to each source block.
        Dim writeOnceBlock = New WriteOnceBlock(Of T)(Function(e) e)
        For Each source In sources
            ' Setting MaxMessages to one instructs
            ' the source block to unlink from the WriteOnceBlock<T> object
            ' after offering the WriteOnceBlock<T> object one message.
            source.LinkTo(writeOnceBlock, New DataflowLinkOptions With {.MaxMessages = 1})
        Next source
        ' Return the first value that is offered to the WriteOnceBlock object.
        Return writeOnceBlock.Receive()
    End Function

    ' Demonstrates a function that takes several seconds to produce a result.
    Private Shared Function TrySolution(ByVal n As Integer, ByVal ct As CancellationToken) As Integer
        ' Simulate a lengthy operation that completes within three seconds
        ' or when the provided CancellationToken object is cancelled.
        SpinWait.SpinUntil(Function() ct.IsCancellationRequested, New Random().Next(3000))

        ' Return a value.
        Return n + 42
    End Function

    Shared Sub Main(ByVal args() As String)
        ' Create a shared CancellationTokenSource object to enable the 
        ' TrySolution method to be cancelled.
        Dim cts = New CancellationTokenSource()

        ' Create three TransformBlock<int, int> objects. 
        ' Each TransformBlock<int, int> object calls the TrySolution method.
        Dim action As Func(Of Integer, Integer) = Function(n) TrySolution(n, cts.Token)
        Dim trySolution1 = New TransformBlock(Of Integer, Integer)(action)
        Dim trySolution2 = New TransformBlock(Of Integer, Integer)(action)
        Dim trySolution3 = New TransformBlock(Of Integer, Integer)(action)

        ' Post data to each TransformBlock<int, int> object.

        ' Call the ReceiveFromAny<T> method to receive the result from the 
        ' first TransformBlock<int, int> object to finish.
        Dim result As Integer = ReceiveFromAny(trySolution1, trySolution2, trySolution3)

        ' Cancel all calls to TrySolution that are still active.

        ' Print the result to the console.
        Console.WriteLine("The solution is {0}.", result)

    End Sub
End Class

' Sample output:
'The solution is 53.

Untuk menerima nilai dari objek TransformBlock<TInput,TOutput> pertama yang selesai, contoh ini mendefinisikan metode ReceiveFromAny(T). Metode ReceiveFromAny(T) ini menerima array objek ISourceBlock<TOutput> dan menautkan masing-masing objek ini ke objek WriteOnceBlock<T>. Saat Anda menggunakan metode LinkTo untuk menautkan blok aliran data sumber ke blok target, sumber menyebarkan pesan ke target saat data tersedia. Karena kelas WriteOnceBlock<T> hanya menerima pesan pertama yang ditawarkan, metode ReceiveFromAny(T) menghasilkan hasilnya dengan memanggil metode Receive. Ini menghasilkan pesan pertama yang ditawarkan ke objek WriteOnceBlock<T>. Metode LinkTo memiliki versi overload yang mengambil objek DataflowLinkOptions dengan properti MaxMessages yang, ketika disetel ke 1, menginstruksikan blok sumber untuk memutuskan tautan dari target setelah target menerima satu pesan dari sumber. Penting bagi objek WriteOnceBlock<T> untuk memutuskan tautan dari sumbernya karena hubungan antara array sumber dan objek WriteOnceBlock<T> tidak lagi diperlukan setelah objek WriteOnceBlock<T> menerima pesan.

Untuk mengaktifkan sisa panggilan ke TrySolution agar berakhir setelah salah satu dari mereka menghitung nilai, metode TrySolution mengambil objek CancellationToken yang dibatalkan setelah panggilan ke ReceiveFromAny(T) kembali. Metode SpinUntil mengembalikan ketika objek CancellationToken ini dibatalkan.

