Bagikan melalui


Aliran Data (Pustaka Tugas Paralel)

Pustaka Paralel Tugas (TPL) menyediakan komponen aliran data untuk membantu meningkatkan ketahanan aplikasi berkemampuan konkurensi. Komponen aliran data ini secara kolektif disebut sebagai Pustaka Aliran Data TPL . Model aliran data ini mempromosikan pemrograman berorientasi aktor dengan menyediakan pengiriman pesan secara internal untuk tugas aliran data dan pipeline berbutir kasar. Komponen aliran data dibangun berdasarkan jenis dan infrastruktur penjadwalan TPL dan terintegrasi dengan dukungan bahasa C#, Visual Basic, dan F# untuk pemrograman asinkron. Komponen aliran data ini berguna ketika Anda memiliki beberapa operasi yang harus berkomunikasi satu sama lain secara asinkron atau ketika Anda ingin memproses data saat tersedia. Misalnya, pertimbangkan aplikasi yang memproses data gambar dari kamera web. Dengan menggunakan model aliran data, aplikasi dapat memproses bingkai gambar saat tersedia. Jika aplikasi meningkatkan bingkai gambar, misalnya, dengan melakukan koreksi cahaya atau pengurangan mata merah, Anda dapat membuat alur komponen aliran data. Setiap tahap alur mungkin menggunakan fungsionalitas paralelisme yang lebih kasar, seperti fungsionalitas yang disediakan oleh TPL, untuk mengubah gambar.

Dokumen ini menyediakan gambaran umum Pustaka Aliran Data TPL. Ini menjelaskan model pemrograman, jenis blok aliran data yang telah ditentukan sebelumnya, dan cara mengonfigurasi blok aliran data untuk memenuhi persyaratan spesifik aplikasi Anda.

Nota

Pustaka Aliran Data TPL (namespace System.Threading.Tasks.Dataflow) tidak didistribusikan dengan .NET. Untuk menginstal namespace System.Threading.Tasks.Dataflow di Visual Studio, buka proyek Anda, pilih Kelola Paket NuGet dari menu Project, dan cari paket System.Threading.Tasks.Dataflow secara online. Atau, untuk menginstalnya menggunakan .NET Core CLI , jalankan dotnet add package System.Threading.Tasks.Dataflow.

Model Pemrograman

Pustaka Aliran Data TPL menyediakan fondasi untuk meneruskan pesan dan menyejajarkan aplikasi intensif CPU dan intensif I/O yang memiliki throughput tinggi dan latensi rendah. Ini juga memberi Anda kontrol eksplisit atas bagaimana data di-buffer dan bergerak di sekitar sistem. Untuk lebih memahami model pemrograman aliran data, pertimbangkan aplikasi yang secara asinkron memuat gambar dari disk dan membuat komposit gambar tersebut. Model pemrograman tradisional biasanya mengharuskan Anda menggunakan panggilan balik dan objek sinkronisasi, seperti kunci, untuk mengoordinasikan tugas dan akses ke data bersama. Dengan menggunakan model pemrograman aliran data, Anda dapat membuat objek aliran data yang memproses gambar saat dibaca dari disk. Di bawah model aliran data, Anda menyatakan bagaimana data ditangani saat tersedia, dan juga dependensi apa pun antar data. Karena runtime mengelola dependensi antar data, Anda sering kali dapat menghindari persyaratan untuk menyinkronkan akses ke data bersama. Selain itu, karena jadwal waktu proses bekerja berdasarkan kedatangan data asinkron, aliran data dapat meningkatkan tanggapan dan throughput dengan mengelola utas dasar secara efisien. Untuk contoh yang menggunakan model pemrograman aliran data untuk menerapkan pemrosesan gambar dalam aplikasi Windows Forms, lihat panduan : Menggunakan Aliran Data di Aplikasi Windows Forms.

Sumber dan Target

Pustaka Aliran Data TPL terdiri dari blok aliran data, yang merupakan struktur data yang menyangga dan memproses data. TPL mendefinisikan tiga jenis blok aliran data: blok sumber, blok target, dan blok penyebar . Blok sumber bertindak sebagai sumber data dan dapat dibaca. Blok target bertindak sebagai penerima data dan dapat ditulis. Blok penyebar bertindak sebagai blok sumber dan blok target, dan dapat dibaca dari dan ditulis ke. TPL mendefinisikan antarmuka System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> untuk mewakili sumber, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> untuk mewakili target, dan System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> untuk mewakili penyebar. IPropagatorBlock<TInput,TOutput> mewarisi dari ISourceBlock<TOutput>, dan ITargetBlock<TInput>.

Pustaka Aliran Data TPL menyediakan beberapa jenis blok aliran data yang telah ditentukan sebelumnya yang mengimplementasikan antarmuka ISourceBlock<TOutput>, ITargetBlock<TInput>, dan IPropagatorBlock<TInput,TOutput>. Jenis blok aliran data ini dijelaskan dalam dokumen ini di bagian Jenis Blok Aliran Data yang Telah Ditentukan sebelumnya.

Menghubungkan Blok

Anda dapat menyambungkan blok aliran data untuk membentuk alur , yang merupakan urutan linier blok aliran data, atau jaringan , yang merupakan grafik blok aliran data. Alur adalah salah satu bentuk jaringan. Dalam alur atau jaringan, sumber secara asinkron menyebarluaskan data ke target saat data tersebut tersedia. Metode ISourceBlock<TOutput>.LinkTo menautkan blok aliran data sumber ke blok target. Sumber dapat ditautkan ke nol atau lebih target; target dapat ditautkan dari nol atau lebih sumber. Anda dapat menambahkan atau menghapus blok aliran data ke atau dari alur atau jaringan secara bersamaan. Jenis blok aliran data yang telah ditentukan sebelumnya menangani semua aspek keamanan alur untuk menautkan dan membatalkan tautan.

Untuk contoh yang menyambungkan blok aliran data untuk membentuk alur dasar, lihat panduan : Membuat Alur Aliran Data. Untuk contoh yang menyambungkan blok aliran data untuk membentuk jaringan yang lebih kompleks, lihat panduan : Menggunakan Aliran Data di Aplikasi Formulir Windows. Untuk contoh yang melepaskan tautan target dari sumber setelah sumber menyampaikan pesan kepada target, lihat Cara: Membatalkan Tautan Blok Aliran Data.

Penyaringan

Saat Anda memanggil metode ISourceBlock<TOutput>.LinkTo untuk menautkan sumber ke target, Anda dapat menyediakan delegasi yang menentukan apakah blok target menerima atau menolak pesan berdasarkan nilai pesan tersebut. Mekanisme pemfilteran ini adalah cara yang berguna untuk menjamin bahwa blok aliran data hanya menerima nilai tertentu. Untuk sebagian besar jenis blok aliran data yang telah ditentukan sebelumnya, jika blok sumber tersambung ke beberapa blok target, ketika blok target menolak pesan, sumber menawarkan pesan tersebut ke target berikutnya. Urutan sumber menawarkan pesan ke target didefinisikan oleh sumber dan dapat bervariasi sesuai dengan jenis sumbernya. Sebagian besar jenis blok sumber berhenti menawarkan pesan setelah satu target menerima pesan tersebut. Satu pengecualian untuk aturan ini adalah kelas BroadcastBlock<T>, yang menawarkan setiap pesan ke semua target, bahkan jika beberapa target menolak pesan. Untuk contoh yang menggunakan pemfilteran untuk memproses hanya pesan tertentu, lihat panduan : Menggunakan Aliran Data dalam Aplikasi Formulir Windows.

Penting

Karena setiap jenis blok aliran data sumber yang telah ditentukan sebelumnya menjamin bahwa pesan disebarluaskan dalam urutan penerimaannya, setiap pesan harus dibaca dari blok sumber sebelum blok sumber dapat memproses pesan berikutnya. Oleh karena itu, saat Anda menggunakan pemfilteran untuk menyambungkan beberapa target ke sumber, pastikan setidaknya satu blok target menerima setiap pesan. Jika tidak, aplikasi Anda mungkin kebuntuan.

Pengiriman Pesan

Model pemrograman aliran data terkait dengan konsep pesan melewati, di mana komponen independen program berkomunikasi satu sama lain dengan mengirim pesan. Salah satu cara untuk menyebarluaskan pesan di antara komponen aplikasi adalah dengan memanggil metode Post (sinkron) dan SendAsync (asinkron) untuk mengirim pesan ke blok aliran data target, dan metode Receive, ReceiveAsync, dan TryReceive untuk menerima pesan dari blok sumber. Anda dapat menggabungkan metode ini dengan alur aliran data atau jaringan dengan mengirim data input ke simpul kepala (blok target), dan dengan menerima data output dari simpul terminal alur atau simpul terminal jaringan (satu atau beberapa blok sumber). Anda juga dapat menggunakan metode Choose untuk membaca dari sumber pertama yang disediakan yang memiliki data yang tersedia dan melakukan tindakan pada data tersebut.

Blok sumber menawarkan data ke blok target dengan memanggil metode ITargetBlock<TInput>.OfferMessage. Blok target merespons pesan yang ditawarkan dengan salah satu dari tiga cara: blok target dapat menerima pesan, menolak pesan, atau menunda pesan. Saat target menerima pesan, metode OfferMessage mengembalikan Accepted. Ketika target menolak pesan, metode OfferMessage mengembalikan Declined. Ketika target mengharuskan tidak lagi menerima pesan apa pun dari sumbernya, OfferMessage mengembalikan DecliningPermanently. Jenis blok sumber yang telah ditentukan sebelumnya tidak mengirim pesan ke target yang ditautkan setelah menerima nilai pengembalian tersebut, dan secara otomatis memutuskan tautan dari target tersebut.

Ketika blok target menunda pesan untuk digunakan nanti, metode OfferMessage mengembalikan Postponed. Blok target yang menunda pesan nantinya dapat memanggil metode ISourceBlock<TOutput>.ReserveMessage untuk mencoba memesan pesan yang ditawarkan. Pada titik ini, pesan masih tersedia dan dapat digunakan oleh blok target, atau pesan telah diambil oleh target lain. Ketika blok target nanti memerlukan pesan atau tidak lagi memerlukan pesan, blok tersebut masing-masing memanggil metode ISourceBlock<TOutput>.ConsumeMessage atau ReleaseReservation. Reservasi pesan biasanya digunakan oleh jenis blok aliran data yang beroperasi dalam mode tidak serakah. Mode tidak rakus akan dijelaskan nanti dalam dokumen ini. Alih-alih menyimpan pesan yang ditunda, blok target juga dapat menggunakan metode ISourceBlock<TOutput>.ConsumeMessage untuk mencoba langsung mengonsumsi pesan yang ditunda.

Penyelesaian Blok Aliran Data

Blok aliran data juga mendukung konsep penyelesaian tugas . Blok aliran data yang dalam status selesai tidak melakukan pekerjaan lebih lanjut. Setiap blok aliran data memiliki objek System.Threading.Tasks.Task terkait, yang dikenal sebagai tugas penyelesaian , yang mewakili status penyelesaian blok. Karena Anda dapat menunggu objek Task selesai, dengan menggunakan tugas penyelesaian, Anda dapat menunggu satu atau beberapa simpul terminal jaringan aliran data selesai. Antarmuka IDataflowBlock menentukan metode Complete, yang mengirimkan permintaan penyelesaian ke blok aliran data, serta properti Completion yang mengembalikan tugas selesai untuk blok aliran data. Baik ISourceBlock<TOutput> maupun ITargetBlock<TInput> mewarisi antarmuka IDataflowBlock.

Ada dua cara untuk menentukan apakah blok aliran data selesai tanpa kesalahan, mengalami satu atau beberapa kesalahan, atau dibatalkan. Cara pertama adalah memanggil metode Task.Wait pada tugas penyelesaian dalam blok try-catch (Try-Catch di Visual Basic). Contoh berikut membuat objek ActionBlock<TInput> yang melemparkan ArgumentOutOfRangeException jika nilai inputnya kurang dari nol. AggregateException dilemparkan ketika contoh ini memanggil Wait pada tugas penyelesaian. ArgumentOutOfRangeException diakses melalui properti InnerExceptions objek AggregateException.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine($"n = {n}");
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

Contoh ini menunjukkan kasus di mana pengecualian tidak tertangani dalam delegasi blok aliran data eksekusi. Kami menyarankan agar Anda menangani pengecualian dalam isi blok tersebut. Namun, jika Anda tidak dapat melakukannya, blok bersifat seolah-olah dibatalkan dan tidak memproses pesan masuk.

Saat blok aliran data dibatalkan secara eksplisit, objek AggregateException berisi OperationCanceledException di properti InnerExceptions. Untuk informasi selengkapnya tentang pembatalan aliran data, lihat bagian Mengaktifkan pembatalan.

Cara kedua untuk menentukan status penyelesaian blok aliran data adalah dengan menggunakan kelanjutan tugas penyelesaian, atau untuk menggunakan fitur bahasa asinkron C# dan Visual Basic untuk secara asinkron menunggu tugas penyelesaian. Delegasi yang Anda berikan ke metode Task.ContinueWith menggunakan objek Task yang mewakili tugas sebelumnya. Dalam kasus properti Completion, delegasi untuk kelanjutan mengambil tugas penyelesaian itu sendiri. Contoh berikut menyerupai yang sebelumnya, kecuali juga menggunakan metode ContinueWith untuk membuat tugas kelanjutan yang mencetak status operasi aliran data secara keseluruhan.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine($"n = {n}");
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine($"The status of the completion task is '{task.Status}'.");
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Create a continuation task that prints the overall 
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

Anda juga dapat menggunakan properti seperti IsCanceled dalam isi tugas kelanjutan untuk menentukan informasi tambahan tentang status penyelesaian blok aliran data. Untuk informasi selengkapnya tentang tugas kelanjutan dan bagaimana hubungannya dengan pembatalan dan penanganan kesalahan, lihat Penautan Tugas Menggunakan Tugas Kelanjutan, Pembatalan Tugas , dan Penanganan Pengecualian .

Jenis Blok Aliran Data yang Telah Ditentukan Sebelumnya

Pustaka Aliran Data TPL menyediakan beberapa jenis blok aliran data yang telah ditentukan sebelumnya. Jenis ini dibagi menjadi tiga kategori: blok buffering, blok eksekusi , dan blok pengelompokan . Bagian berikut menjelaskan jenis blok yang membentuk kategori ini.

Blok Buffering

Blok buffering menyimpan data untuk digunakan oleh konsumen data. Pustaka Aliran Data TPL menyediakan tiga jenis blok buffering: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>, dan System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.

BufferBlock<T>

Kelas BufferBlock<T> mewakili struktur olahpesan asinkron tujuan umum. Kelas ini menyimpan antrian masuk pertama, keluar pertama (FIFO) untuk pesan-pesan yang dapat ditulis oleh beberapa sumber atau dibaca oleh beberapa target. Saat target menerima pesan dari objek BufferBlock<T>, pesan tersebut dihapus dari antrean pesan. Oleh karena itu, meskipun objek BufferBlock<T> dapat memiliki beberapa target, hanya satu target yang akan menerima setiap pesan. Kelas BufferBlock<T> berguna ketika Anda ingin meneruskan beberapa pesan ke komponen lain, dan komponen tersebut harus menerima setiap pesan.

Contoh dasar berikut memposting beberapa nilai Int32 ke objek BufferBlock<T> lalu membaca kembali nilai tersebut dari objek tersebut.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

'          Output:
'            0
'            1
'            2
'          

Untuk contoh lengkap yang menunjukkan cara menulis pesan ke dan membaca pesan dari objek BufferBlock<T>, lihat Cara: Menulis Pesan ke dan Membaca Pesan dari Blok Aliran Data.

BroadcastBlock<T>

Kelas BroadcastBlock<T> berguna ketika Anda harus meneruskan beberapa pesan ke komponen lain, tetapi komponen tersebut hanya membutuhkan nilai terbaru. Kelas ini juga berguna ketika Anda ingin menyiarkan pesan ke beberapa komponen.

Contoh dasar berikut memposting nilai Double ke objek BroadcastBlock<T> lalu membaca nilai tersebut kembali dari objek tersebut beberapa kali. Karena nilai tidak dihapus dari objek BroadcastBlock<T> setelah dibaca, nilai yang sama tersedia setiap saat.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

' Post a message to the block.
broadcastBlock.Post(Math.PI)

' Receive the messages back from the block several times.
For i As Integer = 0 To 2
    Console.WriteLine(broadcastBlock.Receive())
Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

Untuk contoh lengkap yang menunjukkan cara menggunakan BroadcastBlock<T> untuk menyiarkan pesan ke beberapa blok target, lihat Cara: Menentukan Penjadwal Tugas di Blok Aliran Data.

WriteOnceBlock<T>

Kelas WriteOnceBlock<T> menyerupan kelas BroadcastBlock<T>, kecuali bahwa objek WriteOnceBlock<T> hanya dapat ditulis satu kali. Anda dapat menganggap WriteOnceBlock<T> mirip dengan kata kunci C# readonly (ReadOnly di Visual Basic), kecuali bahwa objek WriteOnceBlock<T> menjadi tidak dapat diubah setelah menerima nilai alih-alih pada konstruksi. Seperti kelas BroadcastBlock<T>, ketika target menerima pesan dari objek WriteOnceBlock<T>, pesan tersebut tidak dihapus dari objek tersebut. Oleh karena itu, beberapa target menerima salinan pesan. Kelas WriteOnceBlock<T> berguna ketika Anda hanya ingin menyebarluaskan pesan pertama dari beberapa pesan.

Contoh dasar berikut memposting beberapa nilai String ke objek WriteOnceBlock<T> lalu membaca nilai kembali dari objek tersebut. Karena objek WriteOnceBlock<T> hanya dapat ditulis ke satu kali, setelah objek WriteOnceBlock<T> menerima pesan, objek tersebut akan membuang pesan berikutnya.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

' Post several messages to the block in parallel. The first 
' message to be received is written to the block. 
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

Untuk contoh lengkap yang menunjukkan cara menggunakan WriteOnceBlock<T> untuk menerima nilai operasi pertama yang selesai, lihat Cara: Membatalkan Tautan Blok Aliran Data.

Blok Pelaksanaan

Blok eksekusi memanggil delegasi yang disediakan pengguna untuk setiap bagian data yang diterima. Pustaka Aliran Data TPL menyediakan tiga jenis blok eksekusi: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>, dan System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

ActionBlock<T>

Kelas ActionBlock<TInput> adalah blok target yang memanggil delegasi saat menerima data. Anggap objek ActionBlock<TInput> sebagai delegasi yang berjalan secara asinkron saat data tersedia. Delegasi yang Anda berikan ke objek ActionBlock<TInput> dapat berjenis Action<T> atau jenis System.Func<TInput, Task>. Saat Anda menggunakan objek ActionBlock<TInput> dengan Action<T>, pemrosesan setiap elemen input dianggap selesai saat delegasi kembali. Saat Anda menggunakan objek ActionBlock<TInput> dengan System.Func<TInput, Task>, pemrosesan setiap elemen input dianggap selesai hanya ketika objek Task yang dikembalikan selesai. Dengan menggunakan kedua mekanisme ini, Anda dapat menggunakan ActionBlock<TInput> untuk pemrosesan sinkron dan asinkron dari setiap elemen input.

Contoh dasar berikut memposting beberapa nilai Int32 ke objek ActionBlock<TInput>. Objek ActionBlock<TInput> mencetak nilai tersebut ke konsol. Contoh ini kemudian mengatur blok ke status selesai dan menunggu semua tugas aliran data selesai.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

' Post several messages to the block.
For i As Integer = 0 To 2
    actionBlock.Post(i * 10)
Next i

' Set the block to the completed state and wait for all 
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

Untuk contoh lengkap yang menunjukkan cara menggunakan delegasi dengan kelas ActionBlock<TInput>, lihat Cara: Melakukan Tindakan Saat Blok Aliran Data Menerima Data.

TransformBlock<TInput, TOutput>

Kelas TransformBlock<TInput,TOutput> menyerupai kelas ActionBlock<TInput>, kecuali bahwa ia bertindak sebagai sumber sekaligus target. Delegasi yang Anda berikan ke objek TransformBlock<TInput,TOutput> mengembalikan nilai jenis TOutput. Delegasi yang Anda berikan pada objek TransformBlock<TInput,TOutput> bisa berjenis System.Func<TInput, TOutput> atau jenis System.Func<TInput, Task<TOutput>>. Saat Anda menggunakan objek TransformBlock<TInput,TOutput> dengan System.Func<TInput, TOutput>, pemrosesan setiap elemen input dianggap selesai saat delegasi kembali. Saat Anda menggunakan objek TransformBlock<TInput,TOutput> yang digunakan dengan System.Func<TInput, Task<TOutput>>, pemrosesan setiap elemen input dianggap selesai hanya ketika objek Task<TResult> yang dikembalikan selesai. Seperti halnya ActionBlock<TInput>, dengan menggunakan kedua mekanisme ini, Anda dapat menggunakan TransformBlock<TInput,TOutput> untuk pemrosesan sinkron dan asinkron dari setiap elemen input.

Contoh dasar berikut membuat objek TransformBlock<TInput,TOutput> yang menghitung akar kuadrat inputnya. Objek TransformBlock<TInput,TOutput> mengambil nilai Int32 sebagai input dan menghasilkan nilai Double sebagai output.

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
' Create a TransformBlock<int, double> object that 
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)

' Read the output messages from the block.
For i As Integer = 0 To 2
    Console.WriteLine(transformBlock.Receive())
Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Untuk contoh lengkap yang menggunakan TransformBlock<TInput,TOutput> dalam jaringan blok aliran data yang melakukan pemrosesan gambar dalam aplikasi Windows Forms, lihat Panduan: Menggunakan Aliran Data di Aplikasi Windows Forms.

TransformManyBlock<TInput, TOutput>

Kelas TransformManyBlock<TInput,TOutput> menyerupan kelas TransformBlock<TInput,TOutput>, kecuali bahwa TransformManyBlock<TInput,TOutput> menghasilkan nilai output nol atau lebih untuk setiap nilai input, bukan hanya satu nilai output untuk setiap nilai input. Delegasi yang Anda berikan pada objek TransformManyBlock<TInput,TOutput> bisa berjenis System.Func<TInput, IEnumerable<TOutput>> atau jenis System.Func<TInput, Task<IEnumerable<TOutput>>>. Saat Anda menggunakan objek TransformManyBlock<TInput,TOutput> dengan System.Func<TInput, IEnumerable<TOutput>>, pemrosesan setiap elemen input dianggap selesai saat delegasi kembali. Saat Anda menggunakan objek TransformManyBlock<TInput,TOutput> dengan System.Func<TInput, Task<IEnumerable<TOutput>>>, pemrosesan setiap elemen input dianggap selesai hanya ketika objek System.Threading.Tasks.Task<IEnumerable<TOutput>> yang dikembalikan selesai.

Contoh dasar berikut membuat objek TransformManyBlock<TInput,TOutput> yang membagi string menjadi urutan karakter individualnya. Objek TransformManyBlock<TInput,TOutput> mengambil nilai String sebagai input dan menghasilkan nilai Char sebagai output.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")

' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
    Console.WriteLine(transformManyBlock.Receive())
Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

Untuk contoh lengkap yang menggunakan TransformManyBlock<TInput,TOutput> untuk menghasilkan beberapa output independen untuk setiap input dalam alur aliran data, lihat panduan : Membuat Alur Aliran Data.

Tingkat Paralelisme

Setiap objek buffer ActionBlock<TInput>, TransformBlock<TInput,TOutput>, dan TransformManyBlock<TInput,TOutput> menampung pesan input sampai blok siap untuk memprosesnya. Secara default, kelas-kelas ini memproses pesan dalam urutan penerimaannya, satu pesan pada satu waktu. Anda juga dapat menentukan tingkat paralelisme untuk mengaktifkan objek ActionBlock<TInput>, TransformBlock<TInput,TOutput>, dan TransformManyBlock<TInput,TOutput> untuk memproses beberapa pesan secara bersamaan. Untuk informasi selengkapnya tentang eksekusi bersamaan, lihat bagian Menentukan Tingkat Paralelisme nanti dalam dokumen ini. Untuk contoh yang mengatur tingkat paralelisme untuk mengaktifkan blok aliran data eksekusi untuk memproses lebih dari satu pesan pada satu waktu, lihat Cara: Menentukan Tingkat Paralelisme dalam Blok Aliran Data.

Ringkasan Jenis Delegasi

Tabel berikut ini meringkas jenis delegasi yang bisa Anda sediakan untuk ActionBlock<TInput>, TransformBlock<TInput,TOutput>, dan objek TransformManyBlock<TInput,TOutput>. Tabel ini juga menentukan apakah jenis delegasi beroperasi secara sinkron atau asinkron.

Tipe Jenis Delegasi Sinkron Jenis Delegasi Asinkron
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

Anda juga dapat menggunakan ekspresi lambda saat bekerja dengan jenis blok eksekusi. Untuk contoh yang memperlihatkan cara menggunakan ekspresi lambda dengan blok eksekusi, lihat Cara: Melakukan Tindakan Saat Blok Aliran Data Menerima Data.

Mengelompokkan Blok

Blok pengelompokan menggabungkan data dari satu atau beberapa sumber dan di bawah berbagai batasan. Pustaka Aliran Data TPL menyediakan tiga jenis blok gabungan: BatchBlock<T>, JoinBlock<T1,T2>, dan BatchedJoinBlock<T1,T2>.

BatchBlock<T>

Kelas BatchBlock<T> menggabungkan kumpulan data input, yang dikenal sebagai batch, ke dalam array data output. Anda menentukan ukuran setiap batch saat membuat objek BatchBlock<T>. Ketika objek BatchBlock<T> menerima jumlah elemen input yang ditentukan, objek tersebut secara asinkron menyebarkan array yang berisi elemen-elemen tersebut. Jika objek BatchBlock<T> diatur ke status selesai tetapi tidak berisi elemen yang cukup untuk membentuk batch, objek tersebut menyebarkan array akhir yang berisi elemen input yang tersisa.

Kelas BatchBlock<T> beroperasi dalam mode serakah () atau tidak serakah (). Dalam mode serakah, yang merupakan default, objek BatchBlock<T> menerima setiap pesan yang ditawarkan dan menyebarkan array setelah menerima jumlah elemen yang ditentukan. Dalam mode tidak rakus, objek BatchBlock<T> menunda semua pesan masuk sampai sumber yang cukup telah menawarkan pesan ke blok untuk membentuk batch. Mode serakah biasanya berkinerja lebih baik daripada mode tidak serakah karena membutuhkan lebih sedikit overhead pemrosesan. Namun, Anda dapat menggunakan mode tidak serakah ketika Anda harus mengoordinasikan konsumsi dari beberapa sumber dengan cara atomik. Tentukan mode tidak serakah dengan mengatur Greedy ke False dalam parameter dataflowBlockOptions di konstruktor BatchBlock<T>.

Contoh dasar berikut memposting beberapa nilai Int32 ke objek BatchBlock<T> yang menyimpan sepuluh elemen dalam batch. Untuk menjamin bahwa semua nilai disebarluaskan dari BatchBlock<T>, contoh ini memanggil metode Complete. Metode Complete mengatur objek BatchBlock<T> ke status selesai, dan oleh karena itu, objek BatchBlock<T> menyebarluaskan elemen yang tersisa sebagai batch akhir.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine($"The sum of the elements in batch 1 is {batchBlock.Receive().Sum()}.");

Console.WriteLine($"The sum of the elements in batch 2 is {batchBlock.Receive().Sum()}.");

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)

' Post several values to the block.
For i As Integer = 0 To 12
    batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()

' Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

Untuk contoh lengkap yang menggunakan BatchBlock<T> untuk meningkatkan efisiensi operasi penyisipan database, lihat panduan : Menggunakan BatchBlock dan BatchedJoinBlock untuk Meningkatkan Efisiensi.

JoinBlock<T1, T2, ...>

Kelas JoinBlock<T1,T2> dan JoinBlock<T1,T2,T3> mengumpulkan elemen input dan menyebarluaskan objek System.Tuple<T1,T2> atau System.Tuple<T1,T2,T3> yang berisi elemen-elemen tersebut. Kelas JoinBlock<T1,T2> dan JoinBlock<T1,T2,T3> tidak mewarisi dari ITargetBlock<TInput>. Sebaliknya, mereka menyediakan properti, Target1, Target2, dan Target3, yang menerapkan ITargetBlock<TInput>.

Seperti BatchBlock<T>, JoinBlock<T1,T2> dan JoinBlock<T1,T2,T3> beroperasi dalam mode serakah atau tidak serakah. Dalam modus rakus, sebagai pengaturan bawaan, objek JoinBlock<T1,T2> atau JoinBlock<T1,T2,T3> menerima setiap pesan yang ditawarkan dan menyebarkan tuple setelah setiap targetnya menerima setidaknya satu pesan. Dalam mode tidak rakus, objek JoinBlock<T1,T2> atau JoinBlock<T1,T2,T3> menunda semua pesan masuk hingga semua target telah diberikan data yang diperlukan untuk membuat tuple. Pada titik ini, blok terlibat dalam protokol komit dua fase untuk mengambil secara atomik semua item yang diperlukan dari sumber. Penundaan ini memungkinkan entitas lain untuk mengonsumsi data sementara itu, untuk memungkinkan sistem keseluruhan membuat kemajuan ke depan.

Contoh dasar berikut menunjukkan kasus di mana objek JoinBlock<T1,T2,T3> memerlukan beberapa data untuk menghitung nilai. Contoh ini membuat objek JoinBlock<T1,T2,T3> yang memerlukan dua nilai Int32 dan nilai Char untuk melakukan operasi aritmatika.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine($"{data.Item1} + {data.Item2} = {data.Item1 + data.Item2}");
         break;
      case '-':
         Console.WriteLine($"{data.Item1} - {data.Item2} = {data.Item1 - data.Item2}");
         break;
      default:
         Console.WriteLine($"Unknown operator '{data.Item3}'.");
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

' Post two values to each target of the join.

joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)

joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)

joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)

' Receive each group of values and apply the operator part
' to the number parts.

For i As Integer = 0 To 1
    Dim data = joinBlock.Receive()
    Select Case data.Item3
        Case "+"c
            Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
        Case "-"c
            Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
        Case Else
            Console.WriteLine("Unknown operator '{0}'.", data.Item3)
    End Select
Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

Untuk contoh lengkap yang menggunakan objek JoinBlock<T1,T2> dalam mode tidak serakah untuk berbagi sumber daya secara kooperatif, lihat Cara: Menggunakan JoinBlock untuk Membaca Data Dari Beberapa Sumber.

BatchedJoinBlock<T1, T2, ...>

Kelas BatchedJoinBlock<T1,T2> dan BatchedJoinBlock<T1,T2,T3> mengumpulkan batch elemen input dan menyebarluaskan objek System.Tuple(IList(T1), IList(T2)) atau System.Tuple(IList(T1), IList(T2), IList(T3)) yang berisi elemen-elemen tersebut. Anggap BatchedJoinBlock<T1,T2> sebagai kombinasi BatchBlock<T> dan JoinBlock<T1,T2>. Tentukan ukuran setiap batch saat Anda membuat objek BatchedJoinBlock<T1,T2>. BatchedJoinBlock<T1,T2> juga menyediakan properti, Target1 dan Target2, yang menerapkan ITargetBlock<TInput>. Ketika jumlah elemen input yang ditentukan diterima dari semua target, objek BatchedJoinBlock<T1,T2> secara asinkron menyebarkan objek System.Tuple(IList(T1), IList(T2)) yang berisi elemen-elemen tersebut.

Contoh dasar berikut membuat objek BatchedJoinBlock<T1,T2> yang menyimpan hasil, nilai Int32, dan kesalahan yang merupakan objek Exception. Contoh ini melakukan beberapa operasi dan menulis hasil ke properti Target1, dan kesalahan ke properti Target2, dari objek BatchedJoinBlock<T1,T2>. Karena jumlah operasi yang berhasil dan gagal tidak diketahui sebelumnya, objek IList<T> memungkinkan setiap target untuk menerima nilai nol atau lebih.

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */
' For demonstration, create a Func<int, int> that 
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
                                              If n < 0 Then
                                                  Throw New ArgumentOutOfRangeException()
                                              End If
                                              Return n
                                          End Function

' Create a BatchedJoinBlock<int, Exception> object that holds 
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
    Try
        ' Post the result of the worker to the 
        ' first target of the block.
        batchedJoinBlock.Target1.Post(DoWork(i))
    Catch e As ArgumentOutOfRangeException
        ' If an error occurred, post the Exception to the 
        ' second target of the block.
        batchedJoinBlock.Target2.Post(e)
    End Try
Next i

' Read the results from the block.
Dim results = batchedJoinBlock.Receive()

' Print the results to the console.

' Print the results.
For Each n As Integer In results.Item1
    Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
    Console.WriteLine(e.Message)
Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

Untuk contoh lengkap yang menggunakan BatchedJoinBlock<T1,T2> untuk mengambil hasil dan pengecualian apa pun yang terjadi saat program membaca dari database, lihat Panduan: Menggunakan BatchBlock dan BatchedJoinBlock untuk Meningkatkan Efisiensi.

Mengonfigurasi Perilaku Blok Dataflow

Anda dapat mengaktifkan opsi tambahan dengan menyediakan objek System.Threading.Tasks.Dataflow.DataflowBlockOptions ke konstruktor jenis blok aliran data. Opsi ini mengontrol perilaku, seperti penjadwal yang mengelola tugas-tugas dasar, dan tingkat paralelisme. DataflowBlockOptions juga memiliki jenis turunan yang menentukan perilaku yang khusus untuk jenis blok aliran data tertentu. Tabel berikut ini meringkas jenis opsi mana yang terkait dengan setiap jenis blok aliran data.

Jenis Blok Aliran Data DataflowBlockOptions jenis
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

Bagian berikut ini menyediakan informasi tambahan tentang jenis opsi blok aliran data penting yang tersedia melalui kelas System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions, dan System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions.

Menentukan Penjadwal Tugas

Setiap blok aliran data yang telah ditentukan sebelumnya menggunakan mekanisme penjadwalan tugas TPL untuk melakukan aktivitas seperti menyebarkan data ke target, menerima data dari sumber, dan menjalankan delegasi yang ditentukan pengguna saat data tersedia. TaskScheduler adalah kelas abstrak yang mewakili penjadwal tugas yang mengantrekan tugas ke utas. Penjadwal tugas default, Default, menggunakan kelas ThreadPool untuk mengantre dan menjalankan pekerjaan. Anda dapat mengambil alih penjadwal tugas default dengan mengatur properti TaskScheduler saat Anda membuat objek blok aliran data.

Saat penjadwal tugas yang sama mengelola beberapa blok aliran data, penjadwal tugas dapat memberlakukan kebijakan di seluruh blok tersebut. Misalnya, jika beberapa blok aliran data masing-masing dikonfigurasi untuk menargetkan penjadwal eksklusif dari objek ConcurrentExclusiveSchedulerPair yang sama, semua pekerjaan yang berjalan di seluruh blok ini diserialisasikan. Demikian pula, jika blok ini dikonfigurasi untuk menargetkan penjadwal bersamaan dari objek ConcurrentExclusiveSchedulerPair yang sama, dan penjadwal tersebut dikonfigurasi untuk memiliki tingkat konkurensi maksimum, semua pekerjaan dari blok ini terbatas pada jumlah operasi bersamaan tersebut. Untuk contoh yang menggunakan kelas ConcurrentExclusiveSchedulerPair untuk memungkinkan operasi baca terjadi secara paralel, tetapi operasi tulis terjadi secara eksklusif dari semua operasi lain, lihat Cara: Menentukan Penjadwal Tugas di Blok Aliran Data. Untuk informasi selengkapnya tentang penjadwal tugas di TPL, lihat topik kelas TaskScheduler.

Menentukan Tingkat Paralelisme

Secara default, tiga jenis blok eksekusi yang disediakan Pustaka Aliran Data TPL, ActionBlock<TInput>, TransformBlock<TInput,TOutput>, dan TransformManyBlock<TInput,TOutput>, memproses satu pesan pada satu waktu. Jenis blok aliran data ini juga memproses pesan dalam urutan penerimaannya. Untuk mengaktifkan blok aliran data ini untuk memproses pesan secara bersamaan, atur properti ExecutionDataflowBlockOptions.MaxDegreeOfParallelism saat Anda membuat objek blok aliran data.

Nilai default MaxDegreeOfParallelism adalah 1, yang menjamin bahwa blok aliran data memproses satu pesan pada satu waktu. Mengatur properti ini ke nilai yang lebih besar dari 1 memungkinkan blok aliran data untuk memproses beberapa pesan secara bersamaan. Mengatur properti ini ke DataflowBlockOptions.Unbounded memungkinkan penjadwal tugas yang mendasar untuk mengelola tingkat konkurensi maksimum.

Penting

Ketika Anda menentukan tingkat paralelisme maksimum yang lebih besar dari 1, beberapa pesan diproses secara bersamaan, dan oleh karena itu pesan mungkin tidak diproses dalam urutan penerimaannya. Urutan pesan dikeluarkan dari blok adalah sama seperti saat pesan diterima.

Karena properti MaxDegreeOfParallelism mewakili tingkat paralelisme maksimum, blok aliran data mungkin dijalankan dengan tingkat paralelisme yang lebih rendah dari yang Anda tentukan. Blok aliran data mungkin menggunakan tingkat paralelisme yang lebih rendah untuk memenuhi persyaratan fungsinya atau karena ada kurangnya sumber daya sistem yang tersedia. Blok aliran data tidak pernah memilih lebih banyak paralelisme daripada yang Anda tentukan.

Nilai properti MaxDegreeOfParallelism eksklusif untuk setiap objek blok aliran data. Misalnya, jika empat objek blok aliran data masing-masing menentukan 1 untuk tingkat paralelisme maksimum, keempat objek blok aliran data berpotensi berjalan secara paralel.

Untuk contoh yang mengatur tingkat paralelisme maksimum untuk memungkinkan operasi yang panjang terjadi secara paralel, lihat Cara: Menentukan Tingkat Paralelisme dalam Blok Aliran Data.

Menentukan Jumlah Pesan per Tugas

Jenis blok aliran data yang telah ditentukan sebelumnya menggunakan tugas untuk memproses beberapa elemen input. Ini membantu meminimalkan jumlah objek tugas yang diperlukan untuk memproses data, yang memungkinkan aplikasi berjalan lebih efisien. Namun, ketika tugas dari satu set blok aliran data memproses data, tugas dari blok aliran data lainnya mungkin perlu menunggu waktu pemrosesan dengan mengantre pesan. Untuk mengaktifkan kewajaran yang lebih baik di antara tugas aliran data, atur properti MaxMessagesPerTask. Saat MaxMessagesPerTask diatur ke DataflowBlockOptions.Unbounded, yang merupakan default, tugas yang digunakan oleh blok aliran data memproses pesan sebanyak yang tersedia. Saat MaxMessagesPerTask diatur ke nilai selain Unbounded, blok aliran data memproses paling banyak jumlah pesan per objek Task ini. Meskipun mengatur properti MaxMessagesPerTask dapat meningkatkan kewajaran di antara tugas, itu dapat menyebabkan sistem membuat lebih banyak tugas daripada yang diperlukan, yang dapat mengurangi performa.

Mengaktifkan Pembatalan

TPL menyediakan mekanisme yang memungkinkan tugas untuk mengoordinasikan pembatalan dengan cara yang kooperatif. Untuk mengaktifkan blok aliran data untuk berpartisipasi dalam mekanisme pembatalan ini, atur properti CancellationToken. Ketika objek CancellationToken ini diatur ke status dibatalkan, semua blok aliran data yang memantau token ini menyelesaikan eksekusi item mereka saat ini tetapi tidak mulai memproses item berikutnya. Blok-blok aliran data ini juga menghapus pesan buffer apa pun, membebaskan koneksi ke blok sumber dan target mana pun, dan bertransisi ke status dibatalkan. Ketika beralih ke status dibatalkan, properti Completion diatur memiliki properti Status dengan nilai Canceled, kecuali jika terjadi pengecualian selama pemrosesan. Dalam hal tersebut, Status disetel ke Faulted.

Untuk contoh yang menunjukkan cara menggunakan pembatalan dalam aplikasi Windows Forms, lihat Cara: Membatalkan Blokir Aliran Data. Untuk informasi selengkapnya tentang pembatalan di TPL, lihat Pembatalan Tugas.

Menentukan Perilaku Serakah Versus Tidak Serakah

Beberapa jenis blok aliran data pengelompokan dapat beroperasi dalam mode rakus atau mode tidak rakus . Secara bawaan, jenis blok aliran data yang telah ditentukan sebelumnya beroperasi dalam mode greedy.

Untuk jenis blok penggabungan seperti JoinBlock<T1,T2>, mode rakus berarti bahwa blok segera menerima data bahkan jika data yang perlu digabungkan belum tersedia. Mode tidak serakah (non-greedy) berarti bahwa blok menunda pengolahan semua pesan masuk hingga satu tersedia di setiap targetnya untuk melengkapi penyatuan. Jika salah satu pesan yang ditunda tidak lagi tersedia, blok gabungan merilis semua pesan yang ditunda dan memulai ulang proses. Untuk kelas BatchBlock<T>, perilaku serakah dan tidak serakah serupa, kecuali bahwa di bawah mode tidak serakah, objek BatchBlock<T> menunda semua pesan masuk sampai cukup tersedia dari sumber yang berbeda untuk menyelesaikan batch.

Untuk menentukan mode tidak serakah untuk blok aliran data, atur Greedy ke False. Untuk contoh yang menunjukkan cara menggunakan mode non-rakus agar beberapa Blok Join dapat berbagi sumber data dengan lebih efisien, lihat Cara Menggunakan Blok Join untuk Membaca Data dari Berbagai Sumber.

Blok Dataflow Kustom

Meskipun Pustaka Aliran Data TPL menyediakan banyak jenis blok yang telah ditentukan sebelumnya, Anda dapat membuat jenis blok tambahan yang melakukan perilaku kustom. Terapkan antarmuka ISourceBlock<TOutput> atau ITargetBlock<TInput> secara langsung atau gunakan metode Encapsulate untuk membangun blok kompleks yang merangkum perilaku jenis blok yang ada. Untuk contoh yang memperlihatkan cara menerapkan fungsionalitas blok aliran data kustom, lihat panduan : Membuat Tipe Blok Aliran Data Kustom.

Judul Deskripsi
Cara: Menulis Pesan ke dan Membaca Pesan dari Blok Aliran Data Menunjukkan cara menulis pesan ke dan membaca pesan dari objek BufferBlock<T>.
Cara: Menerapkan Pola Aliran Data Producer-Consumer Menjelaskan cara menggunakan model aliran data untuk menerapkan pola produsen-konsumen, di mana produsen mengirim pesan ke blok aliran data, dan konsumen membaca pesan dari blok tersebut.
Cara: Melakukan Tindakan Saat Blok Aliran Data Menerima Data Menjelaskan cara memberikan delegasi ke jenis blok aliran data eksekusi, ActionBlock<TInput>, TransformBlock<TInput,TOutput>, dan TransformManyBlock<TInput,TOutput>.
panduan : Membuat alur aliran data Menjelaskan cara membuat alur aliran data yang mengunduh teks dari web dan melakukan operasi pada teks tersebut.
Cara: Membatalkan Tautan Blok Aliran Data Menunjukkan cara menggunakan metode LinkTo untuk melepaskan tautan blok target dari sumbernya setelah sumber menawarkan pesan ke target.
Walkthrough : Menggunakan Dataflow di Aplikasi Windows Forms Menunjukkan cara membuat jaringan blok aliran data yang melakukan pemrosesan gambar di aplikasi Windows Forms.
Cara: Membatalkan Blokir Aliran Data Menunjukkan cara menggunakan pembatalan dalam aplikasi Windows Forms.
Cara: Menggunakan JoinBlock untuk Membaca Data Dari Beberapa Sumber Menjelaskan cara menggunakan kelas JoinBlock<T1,T2> untuk melakukan operasi saat data tersedia dari beberapa sumber, dan cara menggunakan mode tidak serakah untuk mengaktifkan beberapa blok gabungan untuk berbagi sumber data secara lebih efisien.
Cara: Menentukan Tingkat Paralelisme dalam Blok Aliran Data Menjelaskan cara mengatur properti MaxDegreeOfParallelism untuk mengaktifkan blok aliran data eksekusi untuk memproses lebih dari satu pesan pada satu waktu.
Cara: Menentukan Penjadwal Tugas di Blok Aliran Data Menunjukkan cara mengaitkan penjadwal tugas tertentu saat Anda menggunakan aliran data di aplikasi Anda.
Walkthrough: Menggunakan BatchBlock dan BatchedJoinBlock untuk Meningkatkan Efisiensi Menjelaskan cara menggunakan kelas BatchBlock<T> untuk meningkatkan efisiensi operasi penyisipan database, dan cara menggunakan kelas BatchedJoinBlock<T1,T2> untuk menangkap hasil dan pengecualian apa pun yang terjadi saat program membaca dari database.
Panduan : Membuat Jenis BlokIr Aliran Data Kustom Menunjukkan dua cara untuk membuat jenis blok aliran data yang mengimplementasikan perilaku kustom.
Pustaka Paralelisasi Tugas (TPL) Memperkenalkan TPL, pustaka yang menyederhanakan pemrograman paralel dan bersamaan dalam aplikasi .NET Framework.