Cara: Menggunakan JoinBlock untuk Membaca Data Dari Berbagai Sumber
Dokumen ini menjelaskan cara menggunakan JoinBlock<T1,T2> kelas untuk melakukan operasi saat data tersedia dari beberapa sumber. Ini juga menunjukkan cara menggunakan mode tidak serakah untuk mengaktifkan beberapa blok gabungan untuk berbagi sumber data secara lebih efisien.
Catatan
Pustaka Aliran Data TPL (namespace layanan System.Threading.Tasks.Dataflow) tidak didistribusikan dengan .NET. Untuk menginstal namespace layanan System.Threading.Tasks.Dataflow di Visual Studio, buka proyek, pilih Kelola Paket NuGet dari menu Proyek, dan cari paket System.Threading.Tasks.Dataflow
secara online. Atau, untuk menginstalnya menggunakan .NET Core CLI, jalankan dotnet add package System.Threading.Tasks.Dataflow
.
Contoh
Contoh berikut mendefinisikan tiga jenis sumber daya, NetworkResource
, FileResource
, dan MemoryResource
, dan melakukan operasi saat sumber daya tersedia. Contoh ini memerlukan pasangan NetworkResource
dan MemoryResource
untuk melakukan operasi pertama FileResource
dan MemoryResource
pasangan untuk melakukan operasi kedua. Untuk memungkinkan operasi ini terjadi ketika semua sumber daya yang diperlukan tersedia, contoh ini menggunakan JoinBlock<T1,T2> kelas. Ketika JoinBlock<T1,T2> objek menerima data dari semua sumber, objek menyebarkan data tersebut ke targetnya, yang dalam contoh ini adalah ActionBlock<TInput> objek. Kedua JoinBlock<T1,T2> objek dibaca dari kumpulan objek bersama MemoryResource
.
using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to use non-greedy join blocks to distribute
// resources among a dataflow network.
class Program
{
// Represents a resource. A derived class might represent
// a limited resource such as a memory, network, or I/O
// device.
abstract class Resource
{
}
// Represents a memory resource. For brevity, the details of
// this class are omitted.
class MemoryResource : Resource
{
}
// Represents a network resource. For brevity, the details of
// this class are omitted.
class NetworkResource : Resource
{
}
// Represents a file resource. For brevity, the details of
// this class are omitted.
class FileResource : Resource
{
}
static void Main(string[] args)
{
// Create three BufferBlock<T> objects. Each object holds a different
// type of resource.
var networkResources = new BufferBlock<NetworkResource>();
var fileResources = new BufferBlock<FileResource>();
var memoryResources = new BufferBlock<MemoryResource>();
// Create two non-greedy JoinBlock<T1, T2> objects.
// The first join works with network and memory resources;
// the second pool works with file and memory resources.
var joinNetworkAndMemoryResources =
new JoinBlock<NetworkResource, MemoryResource>(
new GroupingDataflowBlockOptions
{
Greedy = false
});
var joinFileAndMemoryResources =
new JoinBlock<FileResource, MemoryResource>(
new GroupingDataflowBlockOptions
{
Greedy = false
});
// Create two ActionBlock<T> objects.
// The first block acts on a network resource and a memory resource.
// The second block acts on a file resource and a memory resource.
var networkMemoryAction =
new ActionBlock<Tuple<NetworkResource, MemoryResource>>(
data =>
{
// Perform some action on the resources.
// Print a message.
Console.WriteLine("Network worker: using resources...");
// Simulate a lengthy operation that uses the resources.
Thread.Sleep(new Random().Next(500, 2000));
// Print a message.
Console.WriteLine("Network worker: finished using resources...");
// Release the resources back to their respective pools.
networkResources.Post(data.Item1);
memoryResources.Post(data.Item2);
});
var fileMemoryAction =
new ActionBlock<Tuple<FileResource, MemoryResource>>(
data =>
{
// Perform some action on the resources.
// Print a message.
Console.WriteLine("File worker: using resources...");
// Simulate a lengthy operation that uses the resources.
Thread.Sleep(new Random().Next(500, 2000));
// Print a message.
Console.WriteLine("File worker: finished using resources...");
// Release the resources back to their respective pools.
fileResources.Post(data.Item1);
memoryResources.Post(data.Item2);
});
// Link the resource pools to the JoinBlock<T1, T2> objects.
// Because these join blocks operate in non-greedy mode, they do not
// take the resource from a pool until all resources are available from
// all pools.
networkResources.LinkTo(joinNetworkAndMemoryResources.Target1);
memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2);
fileResources.LinkTo(joinFileAndMemoryResources.Target1);
memoryResources.LinkTo(joinFileAndMemoryResources.Target2);
// Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.
joinNetworkAndMemoryResources.LinkTo(networkMemoryAction);
joinFileAndMemoryResources.LinkTo(fileMemoryAction);
// Populate the resource pools. In this example, network and
// file resources are more abundant than memory resources.
networkResources.Post(new NetworkResource());
networkResources.Post(new NetworkResource());
networkResources.Post(new NetworkResource());
memoryResources.Post(new MemoryResource());
fileResources.Post(new FileResource());
fileResources.Post(new FileResource());
fileResources.Post(new FileResource());
// Allow data to flow through the network for several seconds.
Thread.Sleep(10000);
}
}
/* Sample output:
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
*/
Imports System.Threading
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to use non-greedy join blocks to distribute
' resources among a dataflow network.
Friend Class Program
' Represents a resource. A derived class might represent
' a limited resource such as a memory, network, or I/O
' device.
Private MustInherit Class Resource
End Class
' Represents a memory resource. For brevity, the details of
' this class are omitted.
Private Class MemoryResource
Inherits Resource
End Class
' Represents a network resource. For brevity, the details of
' this class are omitted.
Private Class NetworkResource
Inherits Resource
End Class
' Represents a file resource. For brevity, the details of
' this class are omitted.
Private Class FileResource
Inherits Resource
End Class
Shared Sub Main(ByVal args() As String)
' Create three BufferBlock<T> objects. Each object holds a different
' type of resource.
Dim networkResources = New BufferBlock(Of NetworkResource)()
Dim fileResources = New BufferBlock(Of FileResource)()
Dim memoryResources = New BufferBlock(Of MemoryResource)()
' Create two non-greedy JoinBlock<T1, T2> objects.
' The first join works with network and memory resources;
' the second pool works with file and memory resources.
Dim joinNetworkAndMemoryResources = New JoinBlock(Of NetworkResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})
Dim joinFileAndMemoryResources = New JoinBlock(Of FileResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})
' Create two ActionBlock<T> objects.
' The first block acts on a network resource and a memory resource.
' The second block acts on a file resource and a memory resource.
Dim networkMemoryAction = New ActionBlock(Of Tuple(Of NetworkResource, MemoryResource))(Sub(data)
' Perform some action on the resources.
' Print a message.
' Simulate a lengthy operation that uses the resources.
' Print a message.
' Release the resources back to their respective pools.
Console.WriteLine("Network worker: using resources...")
Thread.Sleep(New Random().Next(500, 2000))
Console.WriteLine("Network worker: finished using resources...")
networkResources.Post(data.Item1)
memoryResources.Post(data.Item2)
End Sub)
Dim fileMemoryAction = New ActionBlock(Of Tuple(Of FileResource, MemoryResource))(Sub(data)
' Perform some action on the resources.
' Print a message.
' Simulate a lengthy operation that uses the resources.
' Print a message.
' Release the resources back to their respective pools.
Console.WriteLine("File worker: using resources...")
Thread.Sleep(New Random().Next(500, 2000))
Console.WriteLine("File worker: finished using resources...")
fileResources.Post(data.Item1)
memoryResources.Post(data.Item2)
End Sub)
' Link the resource pools to the JoinBlock<T1, T2> objects.
' Because these join blocks operate in non-greedy mode, they do not
' take the resource from a pool until all resources are available from
' all pools.
networkResources.LinkTo(joinNetworkAndMemoryResources.Target1)
memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2)
fileResources.LinkTo(joinFileAndMemoryResources.Target1)
memoryResources.LinkTo(joinFileAndMemoryResources.Target2)
' Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.
joinNetworkAndMemoryResources.LinkTo(networkMemoryAction)
joinFileAndMemoryResources.LinkTo(fileMemoryAction)
' Populate the resource pools. In this example, network and
' file resources are more abundant than memory resources.
networkResources.Post(New NetworkResource())
networkResources.Post(New NetworkResource())
networkResources.Post(New NetworkResource())
memoryResources.Post(New MemoryResource())
fileResources.Post(New FileResource())
fileResources.Post(New FileResource())
fileResources.Post(New FileResource())
' Allow data to flow through the network for several seconds.
Thread.Sleep(10000)
End Sub
End Class
' Sample output:
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'
Untuk mengaktifkan penggunaan kumpulan MemoryResource
objek bersama yang efisien, contoh ini menentukan GroupingDataflowBlockOptions objek yang memiliki Greedy properti yang diatur ke False
untuk membuat JoinBlock<T1,T2> objek yang bertindak dalam mode tidak serakah. Blok gabungan yang tidak serakah menunda semua pesan masuk hingga tersedia dari setiap sumber. Jika salah satu pesan yang ditunda diterima oleh blok lain, blok gabungan memulai ulang proses. Mode tidak serakah memungkinkan blok gabungan yang berbagi satu atau lebih blok sumber untuk membuat kemajuan saat blok lain menunggu data. Dalam contoh ini, jika MemoryResource
objek ditambahkan ke kumpulan memoryResources
, blok gabungan pertama untuk menerima sumber data kedua dapat membuat kemajuan. Jika contoh ini menggunakan mode serakah, yang merupakan default, satu blok gabungan bisa mengambil MemoryResource
objek dan menunggu sumber daya kedua tersedia. Namun, jika blok gabungan lainnya memiliki sumber data kedua yang tersedia, blok tersebut MemoryResource
tidak dapat membuat kemajuan ke depan karena objek telah diambil oleh blok gabungan lainnya.
Pemrograman yang Kuat
Penggunaan gabungan yang tidak serakah juga dapat membantu Anda mencegah kebuntuan dalam aplikasi Anda. Dalam aplikasi perangkat lunak, kebuntuan terjadi ketika dua atau beberapa proses masing-masing menyimpan sumber daya dan saling menunggu proses lain untuk merilis beberapa sumber daya lain. Pertimbangkan aplikasi yang mendefinisikan dua JoinBlock<T1,T2> objek. Kedua objek masing-masing membaca data dari dua blok sumber bersama. Dalam mode serakah, jika satu blok gabungan membaca dari sumber pertama dan blok gabungan kedua dibaca dari sumber kedua, aplikasi dapat mengalami kebuntuan karena kedua blok gabungan saling menunggu yang lain untuk melepaskan sumber dayanya. Dalam mode tidak serakah, setiap blok gabungan membaca dari sumbernya hanya ketika semua data tersedia, dan oleh karena itu, risiko kebuntuan dihilangkan.