Úvod do ReliableConcurrentQueue v Azure Service Fabric

Reliable Concurrent Queue je asynchronní, transakční a replikovaná fronta, která má vysokou souběžnost pro operace zařazení do fronty a zrušení fronty. Je navržený tak, aby poskytoval vysokou propustnost a nízkou latenci uvolněním striktního pořadí FIFO poskytovaného službou Reliable Queue a místo toho poskytuje řazení s maximálním úsilím.

Rozhraní API

Souběžná fronta Spolehlivá souběžná fronta
void Enqueue(T položka) Task EnqueueAsync(ITransaction tx, T item)
bool TryDequeue(out T result) Task< ConditionalValue < T >> TryDequeueAsync(ITransaction tx)
int Count() long Count()

Porovnání se spolehlivou frontou

Spolehlivá souběžná fronta se nabízí jako alternativa ke spolehlivé frontě. Měl by být použit v případech, kdy není vyžadováno striktní objednávání FIFO, protože záruka FIFO vyžaduje kompromis se souběžností. Spolehlivá fronta používá zámky k vynucení pořadí FIFO, přičemž maximálně jedna transakce může být zařazena do fronty a maximálně jedna transakce může zrušit pořadí najednou. Pro srovnání, Reliable Concurrent Queue uvolní omezení řazení a umožňuje prokládání všech řady souběžných transakcí jejich operace zařazení do fronty a dequeue. Je k dispozici co nejlepší řazení, ale relativní pořadí dvou hodnot ve spolehlivé souběžné frontě nelze nikdy zaručit.

Spolehlivá souběžná fronta poskytuje vyšší propustnost a nižší latenci než Spolehlivá fronta vždy, když existuje více souběžných transakcí provádějících fronty a/nebo odstraňování front.

Ukázkový případ použití pro ReliableConcurrentQueue je scénář fronty zpráv . V tomto scénáři jeden nebo více producentů zpráv vytvoří a přidá položky do fronty a jeden nebo více příjemců zpráv přetáhne zprávy z fronty a zpracuje je. Více producentů a příjemců může pracovat nezávisle a používat souběžné transakce ke zpracování fronty.

Pokyny k používání

  • Fronta očekává, že položky ve frontě mají nízkou dobu uchovávání. To znamená, že by položky nezůstaly ve frontě po dlouhou dobu.
  • Fronta nezaručuje striktní pořadí FIFO.
  • Fronta nečte vlastní zápisy. Pokud je položka zapsána do fronty v rámci transakce, nebude viditelná pro dequeuer v rámci stejné transakce.
  • Odsadky nejsou od sebe izolované. Pokud je položka A vyřazena z fronty v transakci txnA, i když txnA není potvrzena, položka A nebude viditelná pro souběžnou transakci txnB. Pokud txnA přeruší, A se okamžitě zobrazí txnB .
  • TryPeekAsync chování lze implementovat pomocí TryDequeueAsync a pak přerušit transakci. Příklad tohoto chování najdete v části Programovací vzory.
  • Count je neaktuální. Dá se použít k získání přehledu o počtu prvků ve frontě, ale představuje bod v čase a nedá se na něj spoléhat.
  • Nákladné zpracování položek vyřazených z fronty by nemělo být prováděno, když je transakce aktivní, aby se zabránilo dlouhotrvajícím transakcím, které mohou mít dopad na výkon systému.

Fragmenty kódu

Pojďme se podívat na několik fragmentů kódu a jejich očekávané výstupy. Zpracování výjimek je v této části ignorováno.

Vytváření instancí

Vytvoření instance spolehlivé souběžné fronty je podobné jakékoli jiné spolehlivé kolekci.

IReliableConcurrentQueue<int> queue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<int>>("myQueue");

EnqueueAsync

Tady je několik fragmentů kódu pro použití EnqueueAsync následovaných očekávanými výstupy.

  • Případ 1: Jedna úloha fronty
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 20, cancellationToken);

    await txn.CommitAsync();
}

Předpokládejme, že úloha byla úspěšně dokončena a že nebyly k dispozici žádné souběžné transakce, které by upravovaly frontu. Uživatel může očekávat, že fronta bude obsahovat položky v některém z následujících pořadí:

10, 20

20, 10

  • Případ 2: Paralelní úloha zařazení do fronty
// Parallel Task 1
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 20, cancellationToken);

    await txn.CommitAsync();
}

// Parallel Task 2
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 30, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 40, cancellationToken);

    await txn.CommitAsync();
}

Předpokládejme, že úlohy byly úspěšně dokončeny, že úlohy běžely paralelně a že nebyly žádné další souběžné transakce, které by upravovaly frontu. Nelze odvodit pořadí položek ve frontě. U tohoto fragmentu kódu se položky můžou zobrazit v libovolném ze 4! možné objednávky. Fronta se pokusí zachovat položky v původním pořadí (zařazené do fronty), ale kvůli souběžným operacím nebo chybám může být nucena změnit jejich pořadí.

DequeueAsync

Tady je několik fragmentů kódu pro použití tryDequeueAsync následovaných očekávanými výstupy. Předpokládejme, že fronta je již naplněna následujícími položkami ve frontě:

10, 20, 30, 40, 50, 60

  • Případ 1: Úkol s jedním vyřazením z fronty
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);

    await txn.CommitAsync();
}

Předpokládejme, že úloha byla úspěšně dokončena a že nebyly k dispozici žádné souběžné transakce, které by upravovaly frontu. Vzhledem k tomu, že nelze odvozovat pořadí položek ve frontě, mohou být všechny tři položky vyřazeny z fronty v libovolném pořadí. Fronta se pokusí zachovat položky v původním pořadí (zařazené do fronty), ale kvůli souběžným operacím nebo chybám může být nucena změnit jejich pořadí.

  • Případ 2: Paralelní úloha odstranění fronty
// Parallel Task 1
List<int> dequeue1;
using (var txn = this.StateManager.CreateTransaction())
{
    dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;

    await txn.CommitAsync();
}

// Parallel Task 2
List<int> dequeue2;
using (var txn = this.StateManager.CreateTransaction())
{
    dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;

    await txn.CommitAsync();
}

Předpokládejme, že úlohy byly úspěšně dokončeny, že úlohy běžely paralelně a že nebyly žádné další souběžné transakce, které by upravovaly frontu. Vzhledem k tomu, že nelze odvodit pořadí položek ve frontě, budou seznamy dequeue1 a dequeue2 obsahovat všechny dvě položky v libovolném pořadí.

Stejná položka se nezobrazí v obou seznamech. Pokud tedy má dequeue1 hodnotu 10, 30, pak dequeue2 bude mít hodnotu 20, 40.

  • Případ 3: Zrušení řazení s přerušením transakce

Přerušení transakce s průběžnými odřazením do fronty vrátí položky zpět do čela fronty. Pořadí, ve kterém jsou položky vloženy zpět na začátek fronty, není zaručeno. Podívejme se na následující kód:

using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);

    // Abort the transaction
    await txn.AbortAsync();
}

Předpokládejme, že položky byly vyřazeny z fronty v následujícím pořadí:

10, 20

Když transakci přerušíme, položky se přidají zpět do hlavního uzlu fronty v některé z následujících objednávek:

10, 20

20, 10

Totéž platí pro všechny případy, kdy transakce nebyla úspěšně potvrzena.

Programovací vzory

V této části se podíváme na několik programovacích vzorů, které by mohly být užitečné při používání ReliableConcurrentQueue.

Dávkové vyřazení z fronty

Doporučeným programovacím vzorem je, aby úloha příjemce dávkově odpojila frontu místo provádění jednotlivých odsadí. Uživatel se může rozhodnout, že omezí zpoždění mezi každou dávkou nebo velikostí dávky. Následující fragment kódu ukazuje tento programovací model. Mějte na paměti, že v tomto příkladu se zpracování provádí po potvrzení transakce, takže pokud by při zpracování došlo k chybě, nezpracované položky budou ztraceny, aniž by byly zpracovány. Případně je možné zpracování provést v rámci rozsahu transakce, ale může to mít negativní dopad na výkon a vyžaduje zpracování již zpracovaných položek.

int batchSize = 5;
long delayMs = 100;

while(!cancellationToken.IsCancellationRequested)
{
    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        for(int i = 0; i < batchSize; ++i)
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
            else
            {
                // else break the for loop
                break;
            }
        }

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }

    int delayFactor = batchSize - processItems.Count;
    await Task.Delay(TimeSpan.FromMilliseconds(delayMs * delayFactor), cancellationToken);
}

zpracování Best-Effort Notification-Based

Další zajímavý programovací vzor používá rozhraní API pro počet. Tady můžeme pro frontu implementovat zpracování oznámení na základě nejlepšího úsilí. Počet front lze použít k omezení úlohu zařazení do fronty nebo odstranění fronty. Všimněte si, že stejně jako v předchozím příkladu, protože zpracování probíhá mimo transakci, nezpracované položky mohou být ztraceny, pokud během zpracování dojde k chybě.

int threshold = 5;
long delayMs = 1000;

while(!cancellationToken.IsCancellationRequested)
{
    while (this.Queue.Count < threshold)
    {
        cancellationToken.ThrowIfCancellationRequested();

        // If the queue does not have the threshold number of items, delay the task and check again
        await Task.Delay(TimeSpan.FromMilliseconds(delayMs), cancellationToken);
    }

    // If there are approximately threshold number of items, try and process the queue

    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
        } while (processItems.Count < threshold && ret.HasValue);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
}

Best-Effort Odtok

Kvůli souběžné povaze datové struktury nelze zaručit odtok fronty. Je možné, že i v případě, že nejsou spuštěny žádné operace uživatele ve frontě, nemusí konkrétní volání tryDequeueAsync vrátit položku, která byla dříve zařazena do fronty a potvrzena. Je zaručeno, že položka zařazená do fronty se nakonec zviditelní pro odstranění fronty, ale bez mechanismu vzdálené komunikace nemůže nezávislý příjemce vědět, že fronta dosáhla stabilního stavu, i když byli zastaveni všichni producenti a nejsou povoleny žádné nové operace zařazení do fronty. Proto je operace vyprazdňování nejlepší, jak je implementováno níže.

Uživatel by měl před pokusem o vyprázdnění fronty zastavit všechny další úlohy producenta a příjemce a počkat na potvrzení nebo přerušení všech probíhajících transakcí. Pokud uživatel zná očekávaný počet položek ve frontě, může nastavit oznámení, které signalizuje, že všechny položky byly vyřazeny z fronty.

int numItemsDequeued;
int batchSize = 5;

ConditionalValue ret;

do
{
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if(ret.HasValue)
            {
                // Buffer the dequeues
                processItems.Add(ret.Value);
            }
        } while (ret.HasValue && processItems.Count < batchSize);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
} while (ret.HasValue);

Peek

ReliableConcurrentQueue neposkytuje rozhraní API TryPeekAsync . Uživatelé mohou získat náhled sémantiky pomocí TryDequeueAsync a poté přerušit transakci. V tomto příkladu jsou vyřazení z fronty zpracována pouze v případě, že je hodnota položky větší než 10.

using (var txn = this.StateManager.CreateTransaction())
{
    ConditionalValue ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
    bool valueProcessed = false;

    if (ret.HasValue)
    {
        if (ret.Value > 10)
        {
            // Process the item
            Console.WriteLine("Value : " + ret.Value);
            valueProcessed = true;
        }
    }

    if (valueProcessed)
    {
        await txn.CommitAsync();    
    }
    else
    {
        await txn.AbortAsync();
    }
}

Musí číst