Bagikan melalui


Pengantar ReliableConcurrentQueue dalam Azure Service Fabric

Reliable Concurrent Queue adalah antrean asinkron, transaksional, dan direplikasi yang menampilkan konkurensi tinggi untuk operasi antre dan hapus antrean. Hal ini dirancang untuk memberikan throughput tinggi dan latensi rendah dengan melonggarkan pemesanan FIFO yang ketat yang disediakan oleh Reliable Queue dan sebaliknya memberikan pemesanan upaya terbaik.

API

Antrean Bersamaan Reliable Concurrent Queue
batal Enqueue(item T) Tugas EnqueueAsync(ITransaction tx, item T)
bool TryDequeue(hasil T keluar) Tugas< ConditionalValue < T >> TryDequeueAsync(ITransaction tx)
Int Count() long Count()

Perbandingan dengan Reliable Queue

Reliable Concurrent Queue ditawarkan sebagai alternatif untuk Reliable Queue. Hal ini harus digunakan dalam kasus di mana pemesanan FIFO yang ketat tidak diperlukan, karena menjamin FIFO membutuhkan tradeoff dengan konkurensi. Reliable Queue menggunakan kunci untuk memberlakukan pemesanan FIFO, dengan paling banyak satu transaksi diizinkan untuk mengantre dan paling banyak satu transaksi diizinkan untuk menghapus antrean sekaligus. Sebagai perbandingan, Reliable Concurrent Queue melonggarkan batasan pemesanan dan memungkinkan transaksi bersamaan nomor apa pun untuk mengintervensi operasi antre dan hapus antreannya. Pemesanan dengan upaya terbaik disediakan, namun pemesanan relatif berisi dua nilai dalam Reliable Concurrent Queue tidak pernah dapat dijamin.

Reliable Concurrent Queue memberikan throughput yang lebih tinggi dan latensi yang lebih rendah daripada Reliable Queue setiap kali ada beberapa transaksi bersamaan yang melakukan operasi antre dan/atau hapus antrean.

Sampel kasus penggunaan untuk ReliableConcurrentQueue adalah skenario Antrean Pesan. Dalam skenario ini, satu atau beberapa produsen pesan membuat dan menambahkan item ke antrean, dan satu atau beberapa pesan konsumen menarik pesan dari antrean dan memprosesnya. Beberapa produsen dan konsumen dapat bekerja secara mandiri, menggunakan transaksi bersamaan untuk memproses antrean.

Panduan Penggunaan

  • Antrean mengharapkan bahwa item dalam antrean memiliki periode retensi yang rendah. Artinya, item tersebut tidak akan berada di antrean untuk waktu yang lama.
  • Antrean tidak menjamin pemesanan FIFO yang ketat.
  • Antrean tidak membaca tulisannya sendiri. Jika dihapus dari antrean dalam transaksi, item tidak akan terlihat oleh pengantre dalam transaksi yang sama.
  • Operasi hapus antrean tidak terisolasi satu sama lain. Jika item A dihapus dari antrean dalam transaksi txnA, meskipun txnA tidak berkomitmen, item A tidak akan terlihat oleh transaksi bersamaan txnB. Jika txnA dibatalkan, A akan segera terlihat oleh txnB.
  • Perilaku TryPeekAsync dapat diimplementasikan menggunakan TryDequeueAsync, lalu membatalkan transaksi. Contoh perilaku ini dapat ditemukan di bagian Pola Pemrograman.
  • Penghitungan non-transaksional. Hal ini dapat digunakan untuk mendapatkan gambaran tentang jumlah elemen dalam antrean, tetapi mewakili titik waktu dan tidak dapat diandalkan.
  • Pemrosesan mahal pada item yang dihapus dari antrean tidak boleh dilakukan saat transaksi aktif, untuk menghindari transaksi jangka panjang yang mungkin memiliki dampak performa pada sistem.

Cuplikan Kode

Mari lihat beberapa cuplikan kode dan output yang diharapkan. Penanganan pengecualian diabaikan di bagian ini.

Instansiasi

Pembuatan instans Reliable Concurrent Queue mirip dengan Reliable Collection lainnya.

IReliableConcurrentQueue<int> queue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<int>>("myQueue");

EnqueueAsync

Berikut adalah beberapa cuplikan kode untuk menggunakan EnqueueAsync diikuti oleh output yang diharapkan.

  • Kasus 1: Tugas Antre Tunggal
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 20, cancellationToken);

    await txn.CommitAsync();
}

Asumsikan bahwa tugas berhasil diselesaikan, dan bahwa tidak ada transaksi bersamaan yang memodifikasi antrean. Pengguna dapat mengharapkan antrean berisi item dalam salah satu pesanan berikut:

10, 20

20, 10

  • Kasus 2: Tugas Antre Paralel
// Parallel Task 1
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 20, cancellationToken);

    await txn.CommitAsync();
}

// Parallel Task 2
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 30, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 40, cancellationToken);

    await txn.CommitAsync();
}

Asumsikan bahwa tugas berhasil diselesaikan, bahwa tugas berjalan secara paralel, dan bahwa tidak ada transaksi bersamaan lainnya yang memodifikasi antrean. Tidak ada inferensi yang dapat dibuat tentang urutan item dalam antrean. Untuk cuplikan kode ini, item dapat muncul di salah satu dari 4! kemungkinan pemesanan. Antrean akan berusaha untuk menyimpan item dalam urutan asli (antrean), tetapi mungkin dipaksa untuk menyusun ulang karena operasi atau kesalahan bersamaan.

DequeueAsync

Berikut adalah beberapa cuplikan kode untuk menggunakan TryDequeueAsync diikuti oleh output yang diharapkan. Misalnya, antrean sudah diisi dengan item berikut dalam antrean:

10, 20, 30, 40, 50, 60

  • Kasus 1: Tugas Penghapusan Antrean Tunggal
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);

    await txn.CommitAsync();
}

Asumsikan bahwa tugas berhasil diselesaikan, dan bahwa tidak ada transaksi bersamaan yang memodifikasi antrean. Karena tidak ada inferensi yang dapat dibuat tentang urutan item dalam antrean, tiga item dapat dihapus dalam antrean, dalam urutan apa pun. Antrean akan berusaha untuk menyimpan item dalam urutan asli (antrean), tetapi mungkin dipaksa untuk menyusun ulang karena operasi atau kesalahan bersamaan.

  • Kasus 2: Tugas Hapus Dari Antrean Paralel
// Parallel Task 1
List<int> dequeue1;
using (var txn = this.StateManager.CreateTransaction())
{
    dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;

    await txn.CommitAsync();
}

// Parallel Task 2
List<int> dequeue2;
using (var txn = this.StateManager.CreateTransaction())
{
    dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;

    await txn.CommitAsync();
}

Asumsikan bahwa tugas berhasil diselesaikan, bahwa tugas berjalan secara paralel, dan bahwa tidak ada transaksi bersamaan lainnya yang memodifikasi antrean. Karena tidak ada inferensi yang dapat dibuat tentang urutan item dalam antrean, daftar didequeue1 dan dequeue2 masing-masing akan berisi dua item, dalam urutan apa pun.

Item yang sama tidak akan muncul di kedua daftar. Oleh karena itu, jika dequeue1 memiliki 10, 30, dequeue2 akan memiliki nilai 20, 40.

  • Kasus 3: Menghapus Antrean Pemesanan Dengan Pembatalan Transaksi

Membatalkan transaksi dengan operasi hapus antrean yang aktif menempatkan item kembali di bagian utama antrean. Urutan di mana item diletakkan kembali di bagian utama antrean tidak dijamin. Mari lihat kode berikut:

using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);

    // Abort the transaction
    await txn.AbortAsync();
}

Asumsikan bahwa item telah dihapus dari antrean dalam urutan berikut:

10, 20

Ketika kami membatalkan transaksi, item akan ditambahkan kembali ke bagian utama antrean dalam salah satu pesanan berikut:

10, 20

20, 10

Hal yang sama berlaku untuk semua kasus di mana transaksi tidak berhasil Dilakukan.

Pola Pemrograman

Di bagian ini, mari lihat beberapa pola pemrograman yang mungkin berguna dalam menggunakan ReliableConcurrentQueue.

Penghapusan Antrean Secara Massal

Pola pemrograman yang direkomendasikan adalah untuk tugas konsumen untuk menghapusnya dalam antrean secara massal sebagai ganti melakukan satu operasi penghapusan antrean sekaligus. Pengguna dapat memilih untuk membatasi penundaan antara setiap batch atau ukuran batch. Cuplikan kode berikut menunjukkan model pemrograman ini. Ketahuilah, dalam contoh ini, pemrosesan dilakukan setelah transaksi dilakukan, jadi jika kesalahan terjadi saat pemrosesan, item yang tidak diproses akan hilang tanpa diproses. Atau, pemrosesan dapat dilakukan dalam cakupan transaksi, namun mungkin berdampak negatif pada performa dan memerlukan penanganan item yang sudah diproses.

int batchSize = 5;
long delayMs = 100;

while(!cancellationToken.IsCancellationRequested)
{
    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        for(int i = 0; i < batchSize; ++i)
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
            else
            {
                // else break the for loop
                break;
            }
        }

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }

    int delayFactor = batchSize - processItems.Count;
    await Task.Delay(TimeSpan.FromMilliseconds(delayMs * delayFactor), cancellationToken);
}

Pemrosesan Berbasis Pemberitahuan dengan Upaya Terbaik

Pola pemrograman menarik lainnya menggunakan Count API. Di sini, kami dapat menerapkan pemrosesan berbasis pemberitahuan dengan upaya terbaik untuk antrean. Jumlah antrean dapat digunakan untuk membatasi tugas antrean atau hapus antrean. Perlu diperhatikan bahwa seperti dalam contoh sebelumnya, karena pemrosesan terjadi di luar transaksi, item yang tidak diproses dapat hilang jika kesalahan terjadi selama pemrosesan.

int threshold = 5;
long delayMs = 1000;

while(!cancellationToken.IsCancellationRequested)
{
    while (this.Queue.Count < threshold)
    {
        cancellationToken.ThrowIfCancellationRequested();

        // If the queue does not have the threshold number of items, delay the task and check again
        await Task.Delay(TimeSpan.FromMilliseconds(delayMs), cancellationToken);
    }

    // If there are approximately threshold number of items, try and process the queue

    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
        } while (processItems.Count < threshold && ret.HasValue);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
}

Pengosongan dengan Upaya Terbaik

Pengosongan antrean tidak dapat dijamin karena sifat struktur data yang bersamaan. Ada kemungkinan bahwa, meskipun tidak ada operasi pengguna pada antrean yang aktif, panggilan tertentu ke TryDequeueAsync mungkin tidak mengembalikan item yang sebelumnya dimasukkan dalam antrean dan dilakukan. Item yang dimasukkan dalam antrean dijamin ke akhirnya menjadi terlihat untuk dimasukkan dalam antrean, namun tanpa mekanisme komunikasi di luar band, konsumen independen tidak dapat mengetahui bahwa antrean telah mencapai status stabil meskipun semua produsen telah dihentikan dan tidak ada operasi masukkan dalam antrean baru yang diizinkan. Dengan demikian, operasi pengosongan paling baik seperti yang diterapkan di bawah ini.

Pengguna harus menghentikan semua tugas produsen dan konsumen lebih lanjut, dan menunggu transaksi yang aktif untuk melakukan atau membatalkan, sebelum mencoba mengosongkan antrean. Jika pengguna mengetahui jumlah item yang diharapkan dalam antrean, item dapat menyiapkan pemberitahuan yang memberi tahu bahwa semua item telah dihapus dalam antrean.

int numItemsDequeued;
int batchSize = 5;

ConditionalValue ret;

do
{
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if(ret.HasValue)
            {
                // Buffer the dequeues
                processItems.Add(ret.Value);
            }
        } while (ret.HasValue && processItems.Count < batchSize);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
} while (ret.HasValue);

Lihat Sekilas

ReliableConcurrentQueue tidak menyediakan api TryPeekAsync. Pengguna bisa mendapatkan semantik peek menggunakan TryDequeueAsync, lalu membatalkan transaksi. Dalam contoh ini, penghapusan antrean diproses hanya jika nilai item lebih besar dari 10.

using (var txn = this.StateManager.CreateTransaction())
{
    ConditionalValue ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
    bool valueProcessed = false;

    if (ret.HasValue)
    {
        if (ret.Value > 10)
        {
            // Process the item
            Console.WriteLine("Value : " + ret.Value);
            valueProcessed = true;
        }
    }

    if (valueProcessed)
    {
        await txn.CommitAsync();    
    }
    else
    {
        await txn.AbortAsync();
    }
}

Harus Dibaca