Bagikan melalui


Gambaran Umum tentang BlockingCollection

BlockingCollection<T> adalah kelas pengumpulan aman utas yang menyediakan fitur-fitur berikut:

  • Implementasi pola Producer-Consumer.

  • Penambahan dan pengambilan item secara bersamaan dan terkoordinasi dari beberapa thread.

  • Kapasitas maksimum opsional.

  • Operasi penyisipan dan penghapusan yang memblokir ketika koleksi kosong atau penuh.

  • Operasi penyisipan dan penghapusan "coba" yang tidak memblokir atau yang memblokir hingga periode waktu tertentu.

  • Merangkum jenis koleksi apa pun yang mengimplementasikan IProducerConsumerCollection<T>

  • Pembatalan dengan token pembatalan.

  • Dua jenis enumerasi dengan foreach (For Each di Visual Basic):

    1. Enumerasi baca-saja.

    2. Enumerasi yang menghapus item saat dijumlahkan.

Dukungan Pembatas dan Pemblokiran

BlockingCollection<T> mendukung pembatasan dan pemblokiran. Pembatasan berarti Anda dapat mengatur kapasitas maksimum dari koleksi. Pembatas penting dalam skenario tertentu karena memungkinkan Anda mengontrol ukuran maksimum koleksi dalam memori, dan mencegah utas produsen bergerak terlalu jauh di depan utas konsumen.

Beberapa utas atau tugas dapat menambahkan item ke koleksi secara bersamaan, dan jika koleksi mencapai kapasitas maksimum yang ditentukan, utas penghasil akan memblokir hingga item dihapus. Beberapa thread dapat menghapus item secara bersamaan, dan jika koleksi menjadi kosong, thread yang mengambil akan berhenti sementara hingga ada produsen yang menambahkan item. Utas yang sedang memproduksi dapat memanggil CompleteAdding untuk menunjukkan bahwa tidak ada lagi item yang akan ditambahkan. Pengguna memantau properti IsCompleted untuk mengetahui apakah koleksi sudah kosong dan tidak ada lagi item yang akan ditambahkan. Contoh berikut menunjukkan BlockingCollection sederhana yang memiliki kapasitas maksimum 100. Tugas produsen menambahkan item ke koleksi selama suatu kondisi eksternal tertentu benar, kemudian memanggil CompleteAdding. Tugas konsumen mengambil item sampai properti IsCompleted menjadi benar.

// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);

// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
    while (!dataItems.IsCompleted)
    {

        Data data = null;
        // Blocks if dataItems.Count == 0.
        // IOE means that Take() was called on a completed collection.
        // Some other thread can call CompleteAdding after we pass the
        // IsCompleted check but before we call Take.
        // In this example, we can simply catch the exception since the
        // loop will break on the next iteration.
        try
        {
            data = dataItems.Take();
        }
        catch (InvalidOperationException) { }

        if (data != null)
        {
            Process(data);
        }
    }
    Console.WriteLine("\r\nNo more items to take.");
});

// A simple blocking producer with no cancellation.
Task.Run(() =>
{
    while (moreItemsToAdd)
    {
        Data data = GetData();
        // Blocks if numbers.Count == dataItems.BoundedCapacity
        dataItems.Add(data);
    }
    // Let consumer know we are done.
    dataItems.CompleteAdding();
});

' A bounded collection. It can hold no more 
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)

' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
                          While dataItems.IsCompleted = False
                              Dim dataItem As Data = Nothing
                              Try
                                  dataItem = dataItems.Take()
                              Catch e As InvalidOperationException
                                  ' IOE means that Take() was called on a completed collection.
                                  ' In this example, we can simply catch the exception since the 
                                  ' loop will break on the next iteration.
                              End Try
                              If (dataItem IsNot Nothing) Then
                                  Process(dataItem)
                              End If
                          End While
                          Console.WriteLine(vbCrLf & "No more items to take.")
                      End Sub)

' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
                          While moreItemsToAdd = True
                              Dim item As Data = GetData()

                              ' Blocks if dataItems.Count = dataItems.BoundedCapacity.
                              dataItems.Add(item)
                          End While

                          ' Let consumer know we are done.
                          dataItems.CompleteAdding()
                      End Sub)

Untuk contoh lengkapnya, lihat Cara: Menambahkan dan Mengambil Item Satu Per satu dari BlockingCollection.

Operasi Pemblokiran Berwakti

Dalam operasi pemblokiran dengan waktu tertentu pada koleksi dengan batas, metode mencoba menambahkan atau mengambil item. Jika barang tersedia, barang akan ditempatkan dalam variabel yang diteruskan oleh referensi, dan metode mengembalikan nilai benar. Jika tidak ada item yang diambil setelah batas waktu yang ditentukan, metode mengembalikan false. Thread tersebut kemudian dapat bebas melakukan beberapa pekerjaan berguna lainnya sebelum mencoba lagi untuk mengakses koleksi. Untuk contoh akses pemblokiran terjadwal, lihat contoh kedua di Cara: Menambahkan dan Mengambil Item Satu Per satu dari BlockingCollection.

Membatalkan Operasi Tambahkan dan Ambil

Operasi Tambahkan dan Ambil biasanya dilakukan dalam perulangan. Anda dapat membatalkan perulangan dengan meneruskan CancellationToken ke TryAdd metode atau TryTake , lalu memeriksa nilai properti token IsCancellationRequested pada setiap perulangan. Jika nilainya benar, maka terserah Anda untuk menanggapi permintaan pembatalan dengan membersihkan sumber daya dan mengakhiri perulangan. Contoh berikut menunjukkan kelebihan beban TryAdd yang mengambil token pembatalan, dan kode yang menggunakannya:

do
{
    // Cancellation causes OCE. We know how to handle it.
    try
    {
        success = bc.TryAdd(itemToAdd, 2, ct);
    }
    catch (OperationCanceledException)
    {
        bc.CompleteAdding();
        break;
    }
    //...
} while (moreItems == true);
Do While moreItems = True
    ' Cancellation causes OCE. We know how to handle it.
    Try
        success = bc.TryAdd(itemToAdd, 2, ct)
    Catch ex As OperationCanceledException
        bc.CompleteAdding()
        Exit Do
    End Try
Loop

Untuk contoh cara menambahkan dukungan pembatalan, lihat contoh kedua di Cara: Menambahkan dan Mengambil Item Satu Per satu dari BlockingCollection.

Menentukan Tipe Koleksi

Saat membuat BlockingCollection<T>, Anda dapat menentukan tidak hanya kapasitas terikat tetapi juga jenis koleksi yang akan digunakan. Misalnya, Anda dapat menentukan ConcurrentQueue<T> untuk perilaku masuk pertama-keluar pertama (FIFO), atau ConcurrentStack<T> untuk perilaku masuk terakhir-keluar pertama (LIFO). Anda dapat menggunakan kelas koleksi apa pun yang mengimplementasikan IProducerConsumerCollection<T> antarmuka. Jenis koleksi default untuk BlockingCollection<T> adalah ConcurrentQueue<T>. Contoh kode berikut menunjukkan cara membuat BlockingCollection<T> string yang memiliki kapasitas 1000 dan menggunakan ConcurrentBag<T>:

Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );

Untuk informasi selengkapnya, lihat Cara: Menambahkan Fungsionalitas Pembatas dan Pemblokiran ke Koleksi.

Dukungan IEnumerable

BlockingCollection<T> menyediakan GetConsumingEnumerable metode yang memungkinkan konsumen untuk menggunakan foreach (For Each di Visual Basic) untuk menghapus item sampai koleksi selesai, yang berarti kosong dan tidak ada lagi item yang akan ditambahkan. Untuk informasi selengkapnya, lihat Cara: Menggunakan ForEach untuk Menghapus Item dalam BlockingCollection.

Menggunakan Banyak BlockingCollections Sebagai Satu

Untuk skenario di mana konsumen perlu mengambil item dari beberapa koleksi secara bersamaan, Anda dapat membuat array BlockingCollection<T> dan menggunakan metode statis seperti TakeFromAny dan AddToAny yang akan menambah atau mengambil dari salah satu koleksi dalam array. Jika satu koleksi memblokir, metode segera mencoba yang lain sampai menemukan satu yang dapat melakukan operasi. Untuk informasi selengkapnya, lihat Panduan: Memakai Array Koleksi Pemblokir di Pipeline.

Lihat juga