Bagikan melalui


Aliran Data (Pustaka Paralel Tugas)

Pustaka Paralel Tugas (TPL) menyediakan komponen aliran data untuk membantu meningkatkan ketahanan aplikasi berkekuatan konkurensi. Komponen aliran data ini secara kolektif disebut sebagai Pustaka Dataflow TPL. Model aliran data ini mempromosikan pemrograman berbasis aktor dengan menyediakan pesan dalam proses yang diteruskan untuk tugas aliran data dan pipelining berbutir kasar. Komponen aliran data dibangun di atas jenis dan infrastruktur penjadwalan TPL dan berintegrasi dengan dukungan bahasa C #, Visual Basic, dan F# untuk pemrograman asinkron. Komponen aliran data ini berguna ketika Anda memiliki beberapa operasi yang harus berkomunikasi satu sama lain secara asinkron atau ketika Anda ingin memproses data saat tersedia. Misalnya, pertimbangkan aplikasi yang memproses data gambar dari kamera web. Dengan menggunakan model aliran data, aplikasi dapat memproses bingkai gambar saat tersedia. Jika aplikasi meningkatkan bingkai gambar, misalnya, dengan melakukan koreksi cahaya atau pengurangan mata merah, Anda dapat membuat alur komponen aliran data. Setiap tahap pipa mungkin menggunakan fungsi paralelisme yang lebih kasar, seperti fungsionalitas yang disediakan oleh TPL, untuk mengubah gambar.

Dokumen ini menyediakan gambaran umum tentang Pustaka Dataflow TPL. Ini menjelaskan model pemrograman, jenis blok aliran data yang telah ditentukan, dan cara mengonfigurasi blok aliran data untuk memenuhi persyaratan spesifik aplikasi Anda.

Catatan

Pustaka Aliran Data TPL (namespace layanan System.Threading.Tasks.Dataflow) tidak didistribusikan dengan .NET. Untuk menginstal namespace layanan System.Threading.Tasks.Dataflow di Visual Studio, buka proyek, pilih Kelola Paket NuGet dari menu Proyek, dan cari paket System.Threading.Tasks.Dataflow secara online. Atau, untuk menginstalnya menggunakan .NET Core CLI, jalankan dotnet add package System.Threading.Tasks.Dataflow.

Model Pemrograman

Pustaka Dataflow TPL menyediakan dasar untuk melewati pesan dan paralelisasi aplikasi intensif CPU dan I/O yang memiliki throughput tinggi dan latensi rendah. Ini juga memberi Anda kontrol eksplisit atas bagaimana data dibuffer dan bergerak di sekitar sistem. Untuk lebih memahami model pemrograman aliran data, pertimbangkan aplikasi yang secara asinkron memuat gambar dari disk dan membuat komposit gambar tersebut. Model pemrograman tradisional biasanya mengharuskan Anda menggunakan panggilan balik dan objek sinkronisasi, seperti kunci, untuk mengoordinasikan tugas dan akses ke data bersama. Dengan menggunakan model pemrograman aliran data, Anda dapat membuat objek aliran data yang memproses gambar saat dibaca dari disk. Di bawah model aliran data, Anda menyatakan bagaimana data ditangani saat tersedia, dan juga dependensi apa pun antar data. Karena runtime mengelola dependensi antar data, Anda sering dapat menghindari persyaratan untuk menyinkronkan akses ke data bersama. Selain itu, karena jadwal runtime bekerja berdasarkan kedatangan data asinkron, aliran data dapat meningkatkan responsivitas dan throughput dengan mengelola rangkaian yang mendasarinya secara efisien. Untuk contoh yang menggunakan model pemrograman aliran data untuk mengimplementasikan pemrosesan gambar dalam aplikasi Formulir Windows, lihat Panduan: Menggunakan Aliran Data dalam Aplikasi Formulir Windows.

Sumber dan Target

Pustaka Aliran Data TPL terdiri dari blok aliran data, yang merupakan struktur data yang menyangga dan memproses data. TPL mendefinisikan tiga jenis blok aliran data: blok sumber, blok target, dan blok propagator. Blok sumber bertindak sebagai sumber data dan dapat dibacakan. Blok target bertindak sebagai penerima data dan dapat dituliskan. Blok propagasi bertindak sebagai blok sumber dan blok target, dan dapat dibacakan dan dituliskan. TPL mendefinisikan antarmuka System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> untuk mewakili sumber, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> untuk mewakili target, dan System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> mewakili penyebar. IPropagatorBlock<TInput,TOutput> mewarisi ISourceBlock<TOutput>, and ITargetBlock<TInput>.

Pustaka Dataflow TPL menyediakan beberapa jenis blok aliran data yang telah ditentukan sebelumnya yang mengimplementasikan antarmuka ISourceBlock<TOutput>, ITargetBlock<TInput>, dan IPropagatorBlock<TInput,TOutput>. Jenis blok aliran data ini dijelaskan dalam dokumen ini di bagian Jenis Blok Dataflow yang Telah Ditentukan sebelumnya.

Blok Penghubung

Anda dapat menyambungkan blok aliran data untuk membentuk alur, yang merupakan urutan linier blok aliran data, atau jaringan, yang merupakan grafik blok aliran data. Alur adalah salah satu bentuk jaringan. Dalam alur atau jaringan, sumber secara asinkron menyebarluaskan data ke target saat data tersebut tersedia. Metode ISourceBlock<TOutput>.LinkTo ini menautkan blok aliran data sumber ke blok target. Sumber dapat ditautkan ke nol target atau lebih; target dapat dihubungkan dari nol sumber atau lebih. Anda dapat menambahkan atau menghapus blok aliran data ke atau dari alur atau jaringan secara bersamaan. Jenis blok aliran data yang telah ditentukan menangani semua aspek keamanan rangkaian dari penautan dan pembatalan penautan.

Untuk contoh yang menyambungkan blok aliran data untuk membentuk alur dasar, lihat Panduan: Membuat Alur Dataflow. Untuk contoh yang menyambungkan blok aliran data untuk membentuk jaringan yang lebih kompleks, lihat Panduan: Menggunakan Dataflow dalam Aplikasi Formulir Windows. Untuk contoh yang membatalkan penautan target dari sumber setelah sumber menawarkan pesan target, lihat Cara: Membatalkan penautan Blok Dataflow.

Filter

Saat Anda memanggil metode ISourceBlock<TOutput>.LinkTo untuk menautkan sumber ke target, Anda dapat menyediakan delegasi yang menentukan apakah blok target menerima atau menolak pesan berdasarkan nilai pesan tersebut. Mekanisme pemfilteran ini adalah cara yang berguna untuk menjamin bahwa blok aliran data hanya menerima nilai tertentu. Untuk sebagian besar jenis blok aliran data yang telah ditentukan, jika blok sumber terhubung ke beberapa blok target, ketika blok target menolak pesan, sumber tersebut menawarkan pesan tersebut ke target berikutnya. Urutan di mana sumber menawarkan pesan ke target ditentukan oleh sumber dan dapat bervariasi sesuai dengan jenis sumber. Sebagian besar jenis blok sumber berhenti menawarkan pesan setelah satu target menerima pesan tersebut. Salah satu pengecualian untuk aturan ini adalah kelas BroadcastBlock<T>, yang menawarkan setiap pesan ke semua target, bahkan jika beberapa target menolak pesan. Untuk contoh yang menggunakan pemfilteran untuk memproses hanya pesan tertentu, lihat Panduan: Menggunakan Dataflow dalam Aplikasi Formulir Windows.

Penting

Karena setiap jenis blok aliran data sumber yang telah ditentukan menjamin bahwa pesan disebarkan dalam urutan diterima, setiap pesan harus dibaca dari blok sumber sebelum blok sumber dapat memproses pesan berikutnya. Oleh karena itu, saat Anda menggunakan pemfilteran untuk menghubungkan beberapa target ke sumber, pastikan setidaknya satu blok target menerima setiap pesan. Jika tidak, aplikasi Anda mungkin menemui kebuntuan.

Menyampaikan pesan

Model pemrograman aliran data terkait dengan konsep penerusan pesan, di mana komponen independen program berkomunikasi satu sama lain dengan mengirim pesan. Salah satu cara untuk menyebarluaskan pesan di antara komponen aplikasi adalah dengan memanggil Post metode (sinkron) dan SendAsync (asinkron) untuk mengirim pesan ke blok aliran data target, dan Receivemetode , , ReceiveAsyncdan TryReceive untuk menerima pesan dari blok sumber. Anda dapat menggabungkan metode ini dengan alur aliran data atau jaringan dengan mengirim data input ke simpul kepala (blok target), dan dengan menerima data output dari simpul terminal alur atau simpul terminal jaringan (satu atau beberapa blok sumber). Anda juga dapat menggunakan metode Choose untuk membaca dari sumber pertama yang disediakan yang memiliki data yang tersedia dan melakukan tindakan pada data tersebut.

Blok sumber menawarkan data ke blok target dengan memanggil metode ITargetBlock<TInput>.OfferMessage. Blok target merespons pesan yang ditawarkan dengan salah satu dari tiga cara: dapat menerima pesan, menolak pesan, atau menunda pesan. Ketika target menerima pesan, metode OfferMessage mengembalikan Accepted. Ketika target menolak pesan, metode OfferMessage mengembalikan Declined. Ketika target mengharuskannya tidak lagi menerima pesan dari sumber apa pun, OfferMessage mengembalikan DecliningPermanently. Jenis blok sumber yang telah ditentukan tidak menawarkan pesan ke target tertaut setelah nilai pengembalian tersebut diterima, dan mereka secara otomatis membatalkan tautan dari target tersebut.

Ketika blok target menunda pesan untuk penggunaan kemudian, metode OfferMessage mengembalikan Postponed. Blok target yang menunda pesan nantinya dapat memanggil metode ISourceBlock<TOutput>.ReserveMessage untuk mencoba memesan pesan yang ditawarkan. Pada titik ini, pesan masih tersedia dan dapat digunakan oleh blok target, atau pesan telah diambil oleh target lain. Ketika blok target nanti memerlukan pesan atau tidak lagi memerlukan pesan, itu masing-masing memanggil metode ISourceBlock<TOutput>.ConsumeMessage atau ReleaseReservation. Reservasi pesan biasanya digunakan oleh jenis blok aliran data yang beroperasi dalam mode non-serakah. Mode non-serakah dijelaskan kemudian dalam dokumen ini. Alih-alih mencadangkan pesan yang ditunda, blok target juga dapat menggunakan metode ISourceBlock<TOutput>.ConsumeMessage untuk mencoba langsung menggunakan pesan yang ditunda.

Penyelesaian Blok Dataflow

Blok Dataflow juga mendukung konsep penyelesaian. Blok aliran data yang dalam keadaan selesai tidak melakukan pekerjaan lebih lanjut. Setiap blok aliran data memiliki objek terkait System.Threading.Tasks.Task, yang dikenal sebagai tugas penyelesaian, yang mewakili status penyelesaian blok. Karena Anda dapat menunggu objek Task selesai, dengan menggunakan tugas penyelesaian, Anda dapat menunggu satu atau beberapa node terminal jaringan aliran data selesai. Antarmuka IDataflowBlock menentukan metode Complete, yang menginformasikan blok aliran data permintaan untuk diselesaikan, dan properti Completion, yang mengembalikan tugas penyelesaian untuk blok aliran data. Keduanya ISourceBlock<TOutput> dan ITargetBlock<TInput> mewarisi antarmuka IDataflowBlock.

Ada dua cara untuk menentukan apakah blok aliran data selesai tanpa kesalahan, mengalami satu atau lebih kesalahan, atau dibatalkan. Cara pertama adalah memanggil metode Task.Wait pada tugas penyelesaian dalam blok try-catch(Try-Catch dalam Visual Basic). Contoh berikut membuat objek ActionBlock<TInput> yang melempar ArgumentOutOfRangeException jika nilai inputnya kurang dari nol. AggregateException dilemparkan ketika contoh ini memanggil tugas penyelesaian Wait. ArgumentOutOfRangeException diakses melalui properti InnerExceptions objek AggregateException.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

Contoh ini menunjukkan kasus di mana pengecualian tidak ditangani dalam delegasi blok aliran data eksekusi. Kami menyarankan Anda menghandel pengecualian di badan blok tersebut. Namun, jika Anda tidak dapat melakukannya, blok berperilaku seolah-olah dibatalkan dan tidak memproses pesan masuk.

Ketika blok aliran data dibatalkan secara eksplisit, objek AggregateException berisi OperationCanceledException di properti InnerExceptions. Untuk informasi selengkapnya tentang pembatalan aliran data, lihat bagian Mengaktifkan Pembatalan.

Cara kedua untuk menentukan status penyelesaian blok aliran data adalah dengan menggunakan kelanjutan tugas penyelesaian, atau menggunakan fitur bahasa asinkron C# dan Visual Basic untuk secara asinkron menunggu tugas penyelesaian. Delegasi yang Anda sediakan ke metode Task.ContinueWith mengambil objek Task yang mewakili tugas anteseden. Dalam kasus properti Completion, delegasi untuk kelanjutan mengambil tugas penyelesaian itu sendiri. Contoh berikut menyerupai yang sebelumnya, kecuali juga menggunakan metode ContinueWith untuk membuat tugas kelanjutan yang mencetak status operasi aliran data secara keseluruhan.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine("The status of the completion task is '{0}'.",
      task.Status);
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Create a continuation task that prints the overall 
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

Anda juga dapat menggunakan properti seperti IsCanceled dalam isi tugas kelanjutan untuk menentukan informasi tambahan tentang status penyelesaian blok aliran data. Untuk informasi selengkapnya tentang tugas kelanjutan dan bagaimana hubungannya dengan pembatalan dan penanganan kesalahan, lihat Menautkan Tugas dengan Menggunakan Tugas Kelanjutan, Pembatalan Tugas, dan Penanganan Pengecualian.

Tipe Blok Dataflow yang Telah Ditentukan

Pustaka Dataflow TPL menyediakan beberapa jenis blok aliran data yang telah ditentukan. Jenis-jenis ini dibagi menjadi tiga kategori: blok buffering, blok eksekusi, dan blok pengelompokan. Bagian berikut menjelaskan jenis blok yang membentuk kategori ini.

Blok Buffering

Blok buffering menyimpan data untuk digunakan oleh konsumen data. Pustaka Dataflow TPL menyediakan tiga jenis blok buffering: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>, dan System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.

BufferBlock<T>

Kelas BufferBlock<T> mewakili struktur olahpesan asinkron tujuan umum. Kelas ini menyimpan antrian pesan pertama, pertama (FIFO) yang dapat ditulis oleh berbagai sumber atau dibaca oleh beberapa target. Saat target menerima pesan dari objek BufferBlock<T>, pesan tersebut dihapus dari antrean pesan. Oleh karena itu, meskipun objek BufferBlock<T> dapat memiliki beberapa target, hanya satu target yang akan menerima setiap pesan. Kelas BufferBlock<T> ini berguna ketika Anda ingin meneruskan beberapa pesan ke komponen lain, dan komponen tersebut harus menerima setiap pesan.

Contoh dasar berikut memposting beberapa nilai Int32 ke objek BufferBlock<T> lalu membaca nilai tersebut kembali dari objek tersebut.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

'          Output:
'            0
'            1
'            2
'          

Untuk contoh lengkap yang menunjukkan cara menulis pesan ke dan membaca pesan dari objek BufferBlock<T>, lihat Cara: Menulis Pesan ke dan Membaca Pesan dari Blok Dataflow.

BroadcastBlock<T>

Kelas BroadcastBlock<T> ini berguna ketika Anda harus meneruskan beberapa pesan ke komponen lain, tetapi komponen tersebut hanya membutuhkan nilai terbaru. Kelas ini juga berguna saat Anda ingin menyiarkan pesan ke beberapa komponen.

Contoh dasar berikut memposting nilai Double ke objek BroadcastBlock<T> lalu membaca nilai tersebut kembali dari objek tersebut beberapa kali. Karena nilai tidak dihapus dari objek BroadcastBlock<T> setelah dibaca, nilai yang sama tersedia setiap saat.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

' Post a message to the block.
broadcastBlock.Post(Math.PI)

' Receive the messages back from the block several times.
For i As Integer = 0 To 2
    Console.WriteLine(broadcastBlock.Receive())
Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

Untuk contoh lengkap yang menunjukkan cara menggunakan BroadcastBlock<T> untuk menyiarkan pesan ke beberapa blok target, lihat Cara: Menentukan Penjadwal Tugas di Blok Dataflow.

WriteOnceBlock<T>

Kelas WriteOnceBlock<T> menyerupai kelas BroadcastBlock<T>, kecuali bahwa objek WriteOnceBlock<T> hanya dapat ditulis satu kali. Anda dapat menganggap WriteOnceBlock<T> mirip dengan kata kunci C# readonly (ReadOnly in Visual Basic), kecuali bahwa objek WriteOnceBlock<T> menjadi tidak dapat diubah setelah menerima nilai alih-alih saat konstruksi. Seperti kelas BroadcastBlock<T>, ketika target menerima pesan dari objek WriteOnceBlock<T>, pesan tersebut tidak dihapus dari objek tersebut. Oleh karena itu, beberapa target menerima salinan pesan. Kelas WriteOnceBlock<T> ini berguna ketika Anda hanya ingin menyebarluaskan pesan pertama dari beberapa pesan.

Contoh dasar berikut memposting beberapa nilai String ke objek WriteOnceBlock<T> lalu membaca nilai tersebut kembali dari objek tersebut. Karena WriteOnceBlock<T> objek dapat ditulis satu kali saja, setelah objek WriteOnceBlock<T> menerima pesan, objek akan membuang pesan berikutnya.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

' Post several messages to the block in parallel. The first 
' message to be received is written to the block. 
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

Untuk contoh lengkap yang menunjukkan cara menggunakan WriteOnceBlock<T> untuk menerima nilai operasi pertama yang selesai, lihat Cara: Membatalkan Tautan Blok Dataflow.

Blok Eksekusi

Blok eksekusi memanggil delegasi yang disediakan pengguna untuk setiap bagian dari data yang diterima. Pustaka Dataflow TPL menyediakan tiga jenis blok eksekusi: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>, dan System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

ActionBlock<T>

Kelas ActionBlock<TInput> adalah blok target yang memanggil delegasi saat menerima data. Anggap objek ActionBlock<TInput> sebagai delegasi yang berjalan secara asinkron saat data tersedia. Delegasi yang Anda berikan ke objek ActionBlock<TInput> dapat berjenis Action<T> atau jenis System.Func<TInput, Task>. Saat Anda menggunakan objek ActionBlock<TInput> dengan Action<T>, pemrosesan setiap elemen input dianggap selesai saat delegasi kembali. Saat Anda menggunakan objek ActionBlock<TInput> dengan System.Func<TInput, Task>, pemrosesan setiap elemen input dianggap selesai hanya ketika objek Task yang dikembalikan selesai. Dengan menggunakan kedua mekanisme ini, Anda dapat menggunakan ActionBlock<TInput> untuk pemrosesan sinkron dan asinkron dari setiap elemen input.

Contoh dasar berikut memposting beberapa nilai ke Int32 objek ActionBlock<TInput>. Objek ActionBlock<TInput> mencetak nilai-nilai tersebut ke konsol. Contoh ini kemudian mengatur blok ke status selesai dan menunggu semua tugas aliran data selesai.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

' Post several messages to the block.
For i As Integer = 0 To 2
    actionBlock.Post(i * 10)
Next i

' Set the block to the completed state and wait for all 
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

Untuk contoh lengkap yang menunjukkan cara menggunakan delegasi dengan kelas ActionBlock<TInput>, lihat Cara: Melakukan Tindakan Saat Blok Aliran Data Menerima Data.

TransformBlock<TInput, TOutput>

Kelas TransformBlock<TInput,TOutput> menyerupai kelas ActionBlock<TInput>, kecuali dalam hal kelas tersebut bertindak sebagai sumber dan sebagai target. Delegasi yang Anda berikan ke objek TransformBlock<TInput,TOutput> mengembalikan nilai jenis TOutput. Delegasi yang Anda sediakan ke objek TransformBlock<TInput,TOutput> dapat berjenis System.Func<TInput, TOutput> atau jenis System.Func<TInput, Task<TOutput>>. Saat Anda menggunakan objek TransformBlock<TInput,TOutput> dengan System.Func<TInput, TOutput>, pemrosesan setiap elemen input dianggap selesai saat delegasi kembali. Saat Anda menggunakan objek TransformBlock<TInput,TOutput> yang digunakan dengan System.Func<TInput, Task<TOutput>>, pemrosesan setiap elemen input dianggap selesai hanya ketika objek Task<TResult> yang dikembalikan selesai. Seperti halnya ActionBlock<TInput>, dengan menggunakan dua mekanisme, Anda dapat menggunakan TransformBlock<TInput,TOutput> untuk pemrosesan sinkron dan asinkron dari setiap elemen input.

Contoh dasar berikut membuat objek TransformBlock<TInput,TOutput> yang menghitung akar kuadrat inputnya. Objek TransformBlock<TInput,TOutput> mengambil nilai Int32 sebagai input dan menghasilkan nilai Double sebagai output.

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
' Create a TransformBlock<int, double> object that 
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)

' Read the output messages from the block.
For i As Integer = 0 To 2
    Console.WriteLine(transformBlock.Receive())
Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Untuk contoh lengkap yang menggunakan TransformBlock<TInput,TOutput> dalam jaringan blok aliran data yang melakukan pemrosesan gambar dalam aplikasi Formulir Windows, lihat Panduan: Menggunakan Dataflow dalam Aplikasi Formulir Windows.

TransformManyBlock<TInput, TOutput>

Kelas TransformManyBlock<TInput,TOutput> menyerupai kelas TransformBlock<TInput,TOutput>, kecuali TransformManyBlock<TInput,TOutput> menghasilkan nilai output nol atau lebih untuk setiap nilai input, alih-alih hanya satu nilai output untuk setiap nilai input. Delegasi yang Anda sediakan ke objek TransformManyBlock<TInput,TOutput> dapat berjenis System.Func<TInput, IEnumerable<TOutput>> atau jenis System.Func<TInput, Task<IEnumerable<TOutput>>>. Saat Anda menggunakan objek TransformManyBlock<TInput,TOutput> dengan System.Func<TInput, IEnumerable<TOutput>>, pemrosesan setiap elemen input dianggap selesai saat delegasi kembali. Saat Anda menggunakan objek TransformManyBlock<TInput,TOutput> dengan System.Func<TInput, Task<IEnumerable<TOutput>>>, pemrosesan setiap elemen input dianggap selesai hanya ketika objek System.Threading.Tasks.Task<IEnumerable<TOutput>> yang dikembalikan selesai.

Contoh dasar berikut membuat objek TransformManyBlock<TInput,TOutput> yang membagi string menjadi urutan karakter individualnya. Objek TransformManyBlock<TInput,TOutput> mengambil nilai String sebagai input dan menghasilkan nilai Char sebagai output.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")

' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
    Console.WriteLine(transformManyBlock.Receive())
Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

Untuk contoh lengkap yang digunakan TransformManyBlock<TInput,TOutput> untuk menghasilkan beberapa output independen untuk setiap input dalam alur aliran data, lihat Panduan: Membuat Alur Dataflow.

Tingkat Paralelisme

Setiap buffer objek ActionBlock<TInput>, TransformBlock<TInput,TOutput>, dan TransformManyBlock<TInput,TOutput> input pesan hingga blok siap untuk memprosesnya. Secara default, kelas-kelas ini memproses pesan dalam urutan di mana mereka diterima, satu pesan pada satu waktu. Anda juga dapat menentukan tingkat paralelisme untuk mengaktifkan ActionBlock<TInput>, TransformBlock<TInput,TOutput> dan TransformManyBlock<TInput,TOutput> objek untuk memproses beberapa pesan secara bersamaan. Untuk informasi selengkapnya tentang eksekusi bersamaan, lihat bagian Menentukan Derajat Paralelisme nanti dalam dokumen ini. Untuk contoh yang mengatur tingkat paralelisme untuk mengaktifkan blok aliran data eksekusi untuk memproses lebih dari satu pesan pada satu waktu, lihat Cara: Menentukan Tingkat Paralelisme dalam Blok Dataflow.

Ringkasan Jenis Delegasi

Tabel berikut ini meringkas jenis delegasi yang bisa Anda berikan ke objek ActionBlock<TInput>, TransformBlock<TInput,TOutput>, dan TransformManyBlock<TInput,TOutput>. Tabel ini juga menentukan apakah jenis delegasi beroperasi secara sinkron atau asinkron.

Jenis Jenis Delegasi Sinkron Jenis Delegasi Asinkron
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

Anda juga dapat menggunakan ekspresi lambda saat bekerja dengan tipe blok eksekusi. Untuk contoh yang memperlihatkan cara menggunakan ekspresi lambda dengan blok eksekusi, lihat Cara: Melakukan Tindakan Saat Blok Aliran Data Menerima Data.

Blok Pengelompokan

Mengelompokkan blok menggabungkan data dari satu atau beberapa sumber dan di bawah berbagai batasan. Pustaka Dataflow TPL menyediakan tiga jenis blok gabungan: BatchBlock<T>, JoinBlock<T1,T2>, dan BatchedJoinBlock<T1,T2>.

BatchBlock<T>

Kelas BatchBlock<T> menggabungkan set data input, yang dikenal sebagai batch, ke dalam array data output. Anda menentukan ukuran setiap batch saat membuat objek BatchBlock<T>. Ketika objek BatchBlock<T> menerima jumlah elemen input yang ditentukan, objek secara asinkron menyebarkan array yang berisi elemen-elemen tersebut. Jika objek BatchBlock<T> diatur ke status selesai tetapi tidak berisi elemen yang cukup untuk membentuk batch, objek menyebarkan array akhir yang berisi elemen input yang tersisa.

Kelas ini BatchBlock<T> beroperasi dalam mode serakah atau non-serakah. Dalam mode serakah, yang merupakan default, objek BatchBlock<T> menerima setiap pesan yang ditawarkan dan menyebarkan array setelah menerima jumlah elemen yang ditentukan. Dalam mode tidak serakah, objek BatchBlock<T> menunda semua pesan masuk sampai sumber yang cukup telah menawarkan pesan ke blok untuk membentuk batch. Mode serakah biasanya berkinerja lebih baik daripada mode non-serakah karena membutuhkan lebih sedikit pemrosesan overhead. Namun, Anda dapat menggunakan mode non-serakah ketika Anda harus mengoordinasikan konsumsi dari berbagai sumber dengan cara atomik. Tentukan mode non-serakah dengan mengatur Greedy ke False di parameter dataflowBlockOptions pada konstruktor BatchBlock<T>.

Contoh dasar berikut mengirim beberapa nilai Int32 ke objek BatchBlock<T> yang menyimpan sepuluh elemen dalam batch. Untuk menjamin bahwa semua nilai disebarluaskan dari BatchBlock<T>, contoh ini memanggil metode Complete. Metode Complete ini mengatur objek BatchBlock<T> ke status selesai, dan oleh karena itu, objek BatchBlock<T> menyebarluaskan elemen yang tersisa sebagai batch akhir.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.",
   batchBlock.Receive().Sum());

Console.WriteLine("The sum of the elements in batch 2 is {0}.",
   batchBlock.Receive().Sum());

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)

' Post several values to the block.
For i As Integer = 0 To 12
    batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()

' Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

Untuk contoh lengkap yang menggunakan BatchBlock<T> untuk meningkatkan efisiensi operasi penyisipan database, lihat Panduan: Menggunakan BatchBlock dan BatchedJoinBlock untuk Meningkatkan Efisiensi.

JoinBlock<T1, T2, ...>

Kelas JoinBlock<T1,T2> dan JoinBlock<T1,T2,T3> mengumpulkan elemen input dan menyebarluaskan objek System.Tuple<T1,T2> atau System.Tuple<T1,T2,T3> yang berisi elemen-elemen tersebut. Kelas JoinBlock<T1,T2> dan JoinBlock<T1,T2,T3> tidak mewarisi dari ITargetBlock<TInput>. Sebaliknya, mereka menyediakan properti Target1, Target2, dan Target3, yang mengimplementasikan ITargetBlock<TInput>.

Seperti BatchBlock<T>, JoinBlock<T1,T2> dan JoinBlock<T1,T2,T3> beroperasi dalam mode serakah atau non-serakah. Dalam mode serakah, yang merupakan default, objek JoinBlock<T1,T2> atau JoinBlock<T1,T2,T3> menerima setiap pesan yang ditawarkan dan menyebarkan tuple setelah setiap targetnya menerima setidaknya satu pesan. Dalam mode non-serakah, objek JoinBlock<T1,T2> atau JoinBlock<T1,T2,T3> menunda semua pesan masuk sampai semua target telah ditawarkan data yang diperlukan untuk membuat tuple. Pada titik ini, blok terlibat dalam protokol komit dua fase untuk mengambil semua item yang diperlukan secara atomik dari sumber. Penundaan ini memungkinkan entitas lain untuk mengonsumsi data sementara itu, untuk memungkinkan keseluruhan sistem membuat kemajuan ke depan.

Contoh dasar berikut menunjukkan kasus di mana objek JoinBlock<T1,T2,T3> memerlukan beberapa data untuk menghitung nilai. Contoh ini membuat objek JoinBlock<T1,T2,T3> yang memerlukan dua nilai Int32 dan nilai Char untuk melakukan operasi aritmatika.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine("{0} + {1} = {2}",
            data.Item1, data.Item2, data.Item1 + data.Item2);
         break;
      case '-':
         Console.WriteLine("{0} - {1} = {2}",
            data.Item1, data.Item2, data.Item1 - data.Item2);
         break;
      default:
         Console.WriteLine("Unknown operator '{0}'.", data.Item3);
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

' Post two values to each target of the join.

joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)

joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)

joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)

' Receive each group of values and apply the operator part
' to the number parts.

For i As Integer = 0 To 1
    Dim data = joinBlock.Receive()
    Select Case data.Item3
        Case "+"c
            Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
        Case "-"c
            Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
        Case Else
            Console.WriteLine("Unknown operator '{0}'.", data.Item3)
    End Select
Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

Untuk contoh lengkap yang menggunakan objek JoinBlock<T1,T2> dalam mode non-serakah untuk berbagi sumber daya secara kooperatif, lihat Cara: Menggunakan JoinBlock untuk Membaca Data Dari Berbagai Sumber.

BatchedJoinBlock<T1, T2, ...>

Kelas BatchedJoinBlock<T1,T2> dan BatchedJoinBlock<T1,T2,T3> mengumpulkan sekumpulan elemen input dan menyebarluaskan objek System.Tuple(IList(T1), IList(T2)) atau System.Tuple(IList(T1), IList(T2), IList(T3)) yang berisi elemen-elemen tersebut. Anggap BatchedJoinBlock<T1,T2> sebagai kombinasi BatchBlock<T> dan JoinBlock<T1,T2>. Tentukan ukuran setiap batch saat Anda membuat objek BatchedJoinBlock<T1,T2>. BatchedJoinBlock<T1,T2> juga menyediakan properti, Target1 dan Target2, yang mengimplementasikan ITargetBlock<TInput>. Ketika jumlah elemen input yang ditentukan diterima dari semua target, objek BatchedJoinBlock<T1,T2> secara asinkron menyebar keluar objek System.Tuple(IList(T1), IList(T2)) yang berisi elemen-elemen tersebut.

Contoh dasar berikut membuat objek BatchedJoinBlock<T1,T2> yang menyimpan hasil, nilai Int32, dan kesalahan yang merupakan objek Exception. Contoh ini melakukan beberapa operasi dan menulis hasil ke properti Target1, dan kesalahan ke properti Target2, dari objek BatchedJoinBlock<T1,T2>. Karena jumlah operasi yang berhasil dan gagal tidak diketahui sebelumnya, objek IList<T> memungkinkan setiap target untuk menerima nol atau lebih nilai.

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */
' For demonstration, create a Func<int, int> that 
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
                                              If n < 0 Then
                                                  Throw New ArgumentOutOfRangeException()
                                              End If
                                              Return n
                                          End Function

' Create a BatchedJoinBlock<int, Exception> object that holds 
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
    Try
        ' Post the result of the worker to the 
        ' first target of the block.
        batchedJoinBlock.Target1.Post(DoWork(i))
    Catch e As ArgumentOutOfRangeException
        ' If an error occurred, post the Exception to the 
        ' second target of the block.
        batchedJoinBlock.Target2.Post(e)
    End Try
Next i

' Read the results from the block.
Dim results = batchedJoinBlock.Receive()

' Print the results to the console.

' Print the results.
For Each n As Integer In results.Item1
    Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
    Console.WriteLine(e.Message)
Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

Untuk contoh lengkap yang menggunakan BatchedJoinBlock<T1,T2> untuk mengambil hasil dan pengecualian apa pun yang terjadi saat program membaca dari database, lihat Panduan: Menggunakan BatchBlock dan BatchedJoinBlock untuk Meningkatkan Efisiensi.

Mengonfigurasi Perilaku Blok Dataflow

Anda dapat mengaktifkan opsi tambahan dengan menyediakan objek System.Threading.Tasks.Dataflow.DataflowBlockOptions ke konstruktor jenis blok aliran data. Pilihan ini mengontrol perilaku seperti penjadwal yang mengelola tugas yang mendasari dan tingkat paralelisme. DataflowBlockOptions juga memiliki jenis turunan yang menentukan perilaku yang khusus untuk jenis blok aliran data tertentu. Tabel berikut merangkum jenis opsi mana yang terkait dengan setiap jenis blok aliran data.

Tipe Blok Dataflow DataflowBlockOptions type
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

Bagian berikut ini menyediakan informasi tambahan tentang jenis penting opsi blok aliran data yang tersedia melalui kelas System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions, dan System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions.

Menentukan Penjadwal Tugas

Setiap blok aliran data yang telah ditentukan menggunakan mekanisme penjadwalan tugas TPL untuk melakukan aktivitas seperti menyebarkan data ke target, menerima data dari sumber, dan menjalankan delegasi yang ditentukan pengguna ketika data tersedia. TaskScheduler adalah kelas abstrak yang mewakili penjadwal tugas yang mengantre tugas ke utas. Penjadwal tugas default, Default, menggunakan kelas ThreadPool untuk mengantre dan menjalankan pekerjaan. Anda dapat mengganti penjadwal tugas default dengan mengatur properti TaskScheduler saat Anda membuat objek blok aliran data.

Ketika penjadwal tugas yang sama mengelola beberapa blok aliran data, ia dapat menegakkan kebijakan di seluruhnya. Misalnya, jika beberapa blok aliran data masing-masing dikonfigurasi untuk menargetkan penjadwal eksklusif dari objek ConcurrentExclusiveSchedulerPair yang sama, semua pekerjaan yang berjalan di seluruh blok ini diserialisasikan. Demikian pula, jika blok ini dikonfigurasi untuk menargetkan penjadwal bersamaan dari objek ConcurrentExclusiveSchedulerPair yang sama, dan penjadwal tersebut dikonfigurasi untuk memiliki tingkat konkurensi maksimum, semua pekerjaan dari blok ini terbatas pada jumlah operasi bersamaan. Untuk contoh yang menggunakan kelas ConcurrentExclusiveSchedulerPair untuk memungkinkan operasi baca terjadi secara paralel, tetapi operasi tulis terjadi secara eksklusif dari semua operasi lainnya, lihat Cara: Menentukan Penjadwal Tugas di Blok Dataflow. Untuk informasi selengkapnya tentang penjadwal tugas di TPL, lihat topik kelas TaskScheduler.

Menentukan Tingkat Paralelisme

Secara default, tiga jenis blok eksekusi yang Pustaka Dataflow TPL sediakan, ActionBlock<TInput>, TransformBlock<TInput,TOutput>, dan TransformManyBlock<TInput,TOutput>, memproses satu pesan sekaligus. Jenis blok aliran data ini juga memproses pesan dalam urutan diterima. Untuk mengaktifkan blok aliran data ini untuk memproses pesan secara bersamaan, atur properti ExecutionDataflowBlockOptions.MaxDegreeOfParallelism saat Anda membuat objek blok aliran data.

Nilai default MaxDegreeOfParallelism adalah 1, yang menjamin bahwa blok aliran data memproses satu pesan pada satu waktu. Mengatur properti ini ke nilai yang lebih besar dari 1 memungkinkan blok aliran data memproses beberapa pesan secara bersamaan. Mengatur properti ini ke DataflowBlockOptions.Unbounded memungkinkan penjadwal tugas yang mendasar untuk mengelola tingkat konkurensi maksimum.

Penting

Ketika Anda menentukan tingkat maksimum paralelisme yang lebih besar dari 1, beberapa pesan diproses secara bersamaan, dan oleh karena itu pesan mungkin tidak diproses dalam urutan di mana mereka diterima. Urutan di mana pesan dikeluarkan dari blok, bagaimanapun, yang sama di mana mereka diterima.

Karena MaxDegreeOfParallelism properti mewakili tingkat paralelisme maksimum, blok aliran data mungkin dijalankan dengan tingkat paralelisme yang lebih rendah daripada yang Anda tentukan. Blok aliran data mungkin menggunakan tingkat paralelisme yang lebih rendah untuk memenuhi persyaratan fungsionalnya atau karena kurangnya sumber daya sistem yang tersedia. Blok aliran data tidak pernah memilih lebih banyak paralelisme daripada yang Anda tentukan.

Nilai properti MaxDegreeOfParallelism eksklusif untuk setiap objek blok aliran data. Misalnya, jika empat objek blok aliran data masing-masing menentukan 1 untuk tingkat paralelisme maksimum, keempat objek blok aliran data berpotensi berjalan secara paralel.

Untuk contoh yang mengatur tingkat paralelisme maksimum untuk memungkinkan operasi panjang terjadi secara paralel, lihat Cara: Menentukan Tingkat Paralelisme dalam Blok Dataflow.

Menentukan Jumlah Pesan per Tugas

Jenis blok aliran data yang telah ditentukan menggunakan tugas untuk memproses beberapa elemen input. Ini membantu meminimalkan jumlah objek tugas yang diperlukan untuk memproses data, yang memungkinkan aplikasi berjalan lebih efisien. Namun, ketika tugas dari satu set blok aliran data memproses data, tugas dari blok aliran data lainnya mungkin perlu menunggu waktu pemrosesan dengan mengantrekan pesan. Untuk mengaktifkan kewajaran yang lebih baik di antara tugas aliran data, atur properti MaxMessagesPerTask. Ketika MaxMessagesPerTask diatur ke DataflowBlockOptions.Unbounded, yang merupakan default, tugas yang digunakan oleh blok aliran data memproses pesan sebanyak yang tersedia. Ketika MaxMessagesPerTask diatur ke nilai selain Unbounded, blok aliran data memproses paling banyak jumlah pesan per objek Task ini. Meskipun mengatur properti MaxMessagesPerTask dapat meningkatkan kewajaran di antara tugas, itu dapat menyebabkan sistem membuat lebih banyak tugas daripada yang diperlukan, yang dapat mengurangi performa.

Mengaktifkan Pembatalan

TPL menyediakan mekanisme yang memungkinkan tugas untuk mengkoordinasikan pembatalan secara kooperatif. Untuk mengaktifkan blok aliran data untuk berpartisipasi dalam mekanisme pembatalan ini, atur properti CancellationToken. Ketika objek CancellationToken ini diatur ke status dibatalkan, semua blok aliran data yang memantau token ini menyelesaikan eksekusi item saat ini tetapi tidak mulai memproses item berikutnya. Blok aliran data ini juga menghapus pesan buffer, melepaskan koneksi ke blok sumber dan target apa pun, dan transisi ke status yang dibatalkan. Dengan beralih ke status dibatalkan, properti Completion memiliki properti Status yang diatur ke Canceled, kecuali terjadi pengecualian selama pemrosesan. Dalam kasus itu, Status diatur ke Faulted.

Untuk contoh yang menunjukkan cara menggunakan pembatalan dalam aplikasi Formulir Windows, lihat Cara: Membatalkan Blok Dataflow. Untuk informasi selengkapnya tentang pembatalan di TPL, lihat Pembatalan Tugas.

Menentukan Perilaku Serakah Versus Non-Serakah

Beberapa jenis blok aliran data pengelompokan dapat beroperasi dalam mode serakah atau non-serakah. Secara default, jenis blok aliran data yang telah ditentukan beroperasi dalam mode serakah.

Untuk jenis blok gabungan seperti JoinBlock<T1,T2>, mode serakah berarti bahwa blok segera menerima data bahkan jika data yang sesuai untuk bergabung belum tersedia. Mode non-serakah berarti bahwa blok menunda semua pesan masuk sampai satu tersedia pada masing-masing targetnya untuk menyelesaikan bergabung. Jika salah satu pesan yang ditunda tidak lagi tersedia, blok gabungan akan merilis semua pesan yang ditunda dan memulai ulang prosesnya. Untuk kelas BatchBlock<T>, perilaku serakah dan tidak serakah mirip, kecuali bahwa di bawah mode tidak serakah, objek BatchBlock<T> menunda semua pesan masuk sampai cukup tersedia dari sumber yang berbeda untuk menyelesaikan batch.

Untuk menentukan mode tidak serakah untuk blok aliran data, atur Greedy ke False. Untuk contoh yang menunjukkan cara menggunakan mode non-serakah untuk mengaktifkan beberapa blok gabungan untuk berbagi sumber data dengan lebih efisien, lihat Cara: Menggunakan JoinBlock untuk Membaca Data Dari Beberapa Sumber.

Blok Dataflow Kustom

Meskipun Pustaka Dataflow TPL menyediakan banyak jenis blok yang telah ditentukan, Anda dapat membuat jenis blok tambahan yang melakukan perilaku kustom. Terapkan antarmuka ISourceBlock<TOutput> atau ITargetBlock<TInput> secara langsung atau gunakan metode Encapsulate untuk membangun blok kompleks yang merangkum perilaku jenis blok yang ada. Untuk contoh yang memperlihatkan cara menerapkan fungsionalitas blok aliran data kustom, lihat Panduan: Membuat Jenis Blok Dataflow Kustom.

Judul Deskripsi
Cara: Menulis Pesan ke dan Membaca Pesan dari Blok Dataflow Menunjukkan cara menulis pesan ke dan membaca pesan dari objek BufferBlock<T>.
Cara: Menerapkan Pola Dataflow Produsen-Konsumen Menjelaskan cara menggunakan model aliran data untuk menerapkan pola produsen-konsumen, di mana produsen mengirim pesan ke blok aliran data, dan konsumen membaca pesan dari blok itu.
Cara: Melakukan Tindakan Saat Blok Dataflow Menerima Data Menjelaskan cara memberikan delegasi ke jenis blok aliran data eksekusi ActionBlock<TInput>, TransformBlock<TInput,TOutput>, dan TransformManyBlock<TInput,TOutput>.
Panduan: Membuat Alur Dataflow Menjelaskan cara membuat alur aliran data yang mengunduh teks dari web dan melakukan operasi pada teks tersebut.
Cara: Membatalkan Tautan Blok Dataflow Menunjukkan cara menggunakan metode LinkTo untuk membatalkan tautan blok target dari sumbernya setelah sumber menawarkan pesan ke target.
Panduan: Menggunakan Dataflow dalam Aplikasi Formulir Windows Menunjukkan cara membuat jaringan blok aliran data yang melakukan pemrosesan gambar dalam aplikasi Formulir Windows.
Cara: Membatalkan Blok Dataflow Menunjukkan cara menggunakan pembatalan dalam aplikasi Formulir Windows.
Cara: Menggunakan JoinBlock untuk Membaca Data Dari Berbagai Sumber Menjelaskan cara menggunakan kelas JoinBlock<T1,T2> untuk melakukan operasi saat data tersedia dari beberapa sumber, dan cara menggunakan mode non-serakah untuk mengaktifkan beberapa blok gabungan untuk berbagi sumber data dengan lebih efisien.
Cara: Menentukan Tingkat Paralelisme dalam Blok Dataflow Menjelaskan cara mengatur properti MaxDegreeOfParallelism untuk mengaktifkan blok aliran data eksekusi untuk memproses lebih dari satu pesan pada satu waktu.
Cara: Menentukan Penjadwal Tugas di Blok Dataflow Menunjukkan cara mengaitkan penjadwal tugas tertentu saat Anda menggunakan aliran data di aplikasi Anda.
Panduan: Menggunakan BatchBlock dan BatchedJoinBlock untuk Meningkatkan Efisiensi Menjelaskan cara menggunakan kelas BatchBlock<T> untuk meningkatkan efisiensi operasi penyisipan database, dan cara menggunakan kelas BatchedJoinBlock<T1,T2> untuk menangkap hasil dan pengecualian apa pun yang terjadi saat program membaca dari database.
Panduan: Membuat Jenis Blok Dataflow Kustom Menunjukkan dua cara untuk membuat jenis blok aliran data yang menerapkan perilaku kustom.
Pustaka Paralel Tugas (TPL) Memperkenalkan TPL, pustaka yang menyederhanakan pemrograman paralel dan bersamaan dalam aplikasi .NET Framework.