Gambaran Umum BlockingCollection

BlockingCollection<T> adalah kelas pengumpulan yang aman untuk utas yang menyediakan fitur-fitur berikut:

  • Implementasi pola Producer-Consumer.

  • Menambahkan dan mengambil item secara bersamaan dari beberapa utas.

  • Kapasitas maksimum opsional.

  • Operasi penyisipan dan penghapusan yang memblokir saat pengumpulan kosong atau penuh.

  • Operasi penyisipan dan penghapusan "coba" yang tidak memblokir atau yang memblokir hingga jangka waktu tertentu.

  • Merangkum jenis koleksi apa pun yang mengimplementasikan IProducerConsumerCollection<T>

  • Pembatalan dengan token pembatalan.

  • Dua jenis enumerasi dengan foreach (For Each dalam Visual Basic):

    1. Pencacahan baca-saja.

    2. Pencacahan yang menghapus item saat disebutkan.

Dukungan Bounding dan Blocking

BlockingCollection<T> mendukung pembatas dan pemblokiran. Bounding berarti Anda dapat mengatur kapasitas maksimum koleksi. Bounding penting dalam skenario tertentu karena memungkinkan Anda untuk mengontrol ukuran maksimum koleksi dalam memori, dan mencegah utas produksi bergerak terlalu jauh di depan utas yang digunakan.

Beberapa utas atau tugas dapat menambahkan item ke koleksi secara bersamaan, dan jika koleksi mencapai kapasitas maksimum yang ditentukan, utas produksi akan diblokir sampai item dihapus. Beberapa konsumen dapat menghapus item secara bersamaan, dan jika koleksi menjadi kosong, uts yang digunakan akan memblokir sampai produsen menambahkan item. Utas yang memproduksi dapat memanggil CompleteAdding untuk menunjukkan bahwa tidak ada lagi item yang akan ditambahkan. Konsumen memantau IsCompleted properti untuk mengetahui kapan koleksi kosong dan tidak ada lagi item yang akan ditambahkan. Contoh berikut menunjukkan BlockingCollection sederhana dengan kapasitas terbatas 100. Tugas produsen menambahkan item ke koleksi selama beberapa kondisi eksternal benar, lalu memanggil CompleteAdding. Tugas konsumen mengambil item hingga IsCompleted properti benar.

// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);

// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
    while (!dataItems.IsCompleted)
    {

        Data data = null;
        // Blocks if dataItems.Count == 0.
        // IOE means that Take() was called on a completed collection.
        // Some other thread can call CompleteAdding after we pass the
        // IsCompleted check but before we call Take.
        // In this example, we can simply catch the exception since the
        // loop will break on the next iteration.
        try
        {
            data = dataItems.Take();
        }
        catch (InvalidOperationException) { }

        if (data != null)
        {
            Process(data);
        }
    }
    Console.WriteLine("\r\nNo more items to take.");
});

// A simple blocking producer with no cancellation.
Task.Run(() =>
{
    while (moreItemsToAdd)
    {
        Data data = GetData();
        // Blocks if numbers.Count == dataItems.BoundedCapacity
        dataItems.Add(data);
    }
    // Let consumer know we are done.
    dataItems.CompleteAdding();
});

' A bounded collection. It can hold no more 
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)

' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
                          While dataItems.IsCompleted = False
                              Dim dataItem As Data = Nothing
                              Try
                                  dataItem = dataItems.Take()
                              Catch e As InvalidOperationException
                                  ' IOE means that Take() was called on a completed collection.
                                  ' In this example, we can simply catch the exception since the 
                                  ' loop will break on the next iteration.
                              End Try
                              If (dataItem IsNot Nothing) Then
                                  Process(dataItem)
                              End If
                          End While
                          Console.WriteLine(vbCrLf & "No more items to take.")
                      End Sub)

' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
                          While moreItemsToAdd = True
                              Dim item As Data = GetData()

                              ' Blocks if dataItems.Count = dataItems.BoundedCapacity.
                              dataItems.Add(item)
                          End While

                          ' Let consumer know we are done.
                          dataItems.CompleteAdding()
                      End Sub)

Untuk contoh lengkap, lihat panduan: Menambahkan dan Mengambil Item Satu per Satu dari BlockingCollection.

Operasi Pemblokiran Tepat Waktu

Dalam pemblokiran TryAdd berwaktu dan TryTake operasi pada koleksi terikat, metode mencoba menambahkan atau mengambil item. Jika item tersedia, item tersebut ditempatkan ke dalam variabel yang diteruskan dengan referensi, dan metode mengembalikan true. Jika tidak ada item yang diambil setelah periode waktu habis tertentu, metode mengembalikan false. Utas kemudian bebas untuk melakukan beberapa pekerjaan berguna lainnya sebelum mencoba lagi untuk mengakses koleksi. Untuk contoh akses pemblokiran berwaktu, lihat contoh kedua di Cara: Menambahkan dan Mengambil Item Satu Per satu dari BlockingCollection.

Membatalkan Operasi Tambah dan Ambil

Operasi Tambahkan dan Ambil biasanya dilakukan dalam satu perulangan. Anda dapat membatalkan perulangan dengan meneruskan CancellationToken ke TryAdd atau TryTake metode, lalu memeriksa nilai properti token IsCancellationRequested pada setiap perulangan. Jika nilainya benar, maka terserah Anda untuk menanggapi permintaan pembatalan dengan membersihkan sumber daya apa pun dan keluar dari perulangan. Contoh berikut menunjukkan kelebihan beban TryAdd yang mengambil token pembatalan, dan kode yang menggunakannya:

do
{
    // Cancellation causes OCE. We know how to handle it.
    try
    {
        success = bc.TryAdd(itemToAdd, 2, ct);
    }
    catch (OperationCanceledException)
    {
        bc.CompleteAdding();
        break;
    }
    //...
} while (moreItems == true);
Do While moreItems = True
    ' Cancellation causes OCE. We know how to handle it.
    Try
        success = bc.TryAdd(itemToAdd, 2, ct)
    Catch ex As OperationCanceledException
        bc.CompleteAdding()
        Exit Do
    End Try
Loop

Untuk contoh akses pemblokiran berwaktu, lihat contoh kedua di Cara: Menambahkan dan Mengambil Item Satu Per satu dari BlockingCollection.

Menentukan Jenis Koleksi

Saat membuat BlockingCollection<T>, Anda dapat menentukan tidak hanya kapasitas terikat tetapi juga jenis koleksi yang akan digunakan. Misalnya, Anda dapat menentukan ConcurrentQueue<T> untuk perilaku masuk pertama keluar pertama (FIFO), atau ConcurrentStack<T> untuk perilaku masuk terakhir keluar pertama (LIFO). Anda dapat menggunakan kelas koleksi apa pun yang mengimplementasikan IProducerConsumerCollection<T> antarmuka. Jenis koleksi default untuk BlockingCollection<T> adalah ConcurrentQueue<T>. Contoh kode berikut menunjukkan cara membuat BlockingCollection<T> string yang memiliki kapasitas 1000 dan menggunakan ConcurrentBag<T>:

Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)  
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );  

Untuk contoh lengkap, lihat Panduan: Menambahkan Fungsi Bounding dan Blocking ke Koleksi.

Dukungan IEnumerable

BlockingCollection<T>menyediakan GetConsumingEnumerable metode yang memungkinkan konsumen untuk menggunakan foreach (For Each dalam Visual Basic) untuk menghapus item sampai koleksi selesai, yang berarti kosong dan tidak ada lagi item yang akan ditambahkan. Untuk informasi selengkapnya, lihat Cara: Menggunakan ForEach untuk Menghapus Item dalam BlockingCollection.

Menggunakan Banyak BlockingCollections Sebagai Salah Satu

Untuk skenario ketika konsumen perlu mengambil item dari beberapa koleksi secara bersamaan, Anda dapat membuat array BlockingCollection<T> dan menggunakan metode statis seperti TakeFromAny dan AddToAny yang akan menambah atau mengambil dari salah satu koleksi dalam array. Jika satu koleksi diblokir, metode segera mencoba yang lain sampai menemukan satu yang dapat melakukan operasi. Untuk informasi selengkapnya, lihat Cara: Menggunakan Array Pemblokiran Koleksi dalam Alur.

Lihat juga