Share via


Introduktion till ReliableConcurrentQueue i Azure Service Fabric

Reliable Concurrent Queue är en asynkron, transaktionell och replikerad kö med hög samtidighet för kö- och borttagningsåtgärder. Den är utformad för att leverera högt dataflöde och låg svarstid genom att lätta på den strikta FIFO-beställningen som tillhandahålls av Reliable Queue och ger istället en beställning med bästa förmåga.

API:er

Samtidig kö Reliable Concurrent Queue
void Enqueue(T-objekt) Uppgift EnqueueAsync(ITransaction tx, T-objekt)
bool TryDequeue(ut T-resultat) Uppgiften< ConditionalValue < T >> TryDequeueAsync(ITransaction tx)
int Count() long Count()

Jämförelse med Reliable Queue

Reliable Concurrent Queue erbjuds som ett alternativ till Reliable Queue. Den bör användas i fall där strikt FIFO-beställning inte krävs, eftersom fifo-garanti kräver en kompromiss med samtidighet. Reliable Queue använder lås för att framtvinga FIFO-beställning, med högst en transaktion tillåten att köa och högst en transaktion tillåts att dequeue i taget. Som jämförelse avger Reliable Concurrent Queue ordningsbegränsningen och tillåter att valfritt antal samtidiga transaktioner interleave sina kö- och borttagningsåtgärder. Beställning av bästa förmåga tillhandahålls, men relativ ordning av två värden i en reliable concurrent-kö kan aldrig garanteras.

Reliable Concurrent Queue ger högre dataflöde och kortare svarstid än Reliable Queue när det finns flera samtidiga transaktioner som utför köer och/eller köer.

Ett exempel på användningsfall för ReliableConcurrentQueue är scenariot för meddelandekö . I det här scenariot skapar och lägger en eller flera meddelandeproducenter till objekt i kön, och en eller flera meddelandekonsumenter hämtar meddelanden från kön och bearbetar dem. Flera producenter och konsumenter kan arbeta oberoende av varandra med hjälp av samtidiga transaktioner för att bearbeta kön.

Riktlinjer för användning

  • Kön förväntar sig att objekten i kön har en låg kvarhållningsperiod. Objekten skulle alltså inte ligga kvar i kön under en längre tid.
  • Kön garanterar inte strikt FIFO-beställning.
  • Kön läser inte sina egna skrivningar. Om ett objekt placeras i en transaktion visas det inte för en dequeuer i samma transaktion.
  • Köer är inte isolerade från varandra. Om objekt A tas bort från kön i txnA-transaktion, även om txnA inte checkas in, skulle objekt A inte vara synligt för en samtidig transaktion txnB. Om txnA avbryts blir A omedelbart synligt för txnB .
  • TryPeekAsync-beteende kan implementeras med hjälp av en TryDequeueAsync och sedan avbryta transaktionen. Ett exempel på det här beteendet finns i avsnittet Programmeringsmönster.
  • Antalet är icke-transaktionellt. Den kan användas för att få en uppfattning om antalet element i kön, men representerar en tidpunkt och kan inte användas.
  • Dyr bearbetning av de borttagna objekten bör inte utföras medan transaktionen är aktiv, för att undvika långvariga transaktioner som kan påverka systemets prestanda.

Kodfragment

Låt oss titta på några kodfragment och deras förväntade utdata. Undantagshantering ignoreras i det här avsnittet.

Instansiering

Att skapa en instans av en reliable concurrent-kö liknar alla andra tillförlitliga samlingar.

IReliableConcurrentQueue<int> queue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<int>>("myQueue");

EnqueueAsync

Här följer några kodfragment för att använda EnqueueAsync följt av deras förväntade utdata.

  • Fall 1: Enskild köaktivitet
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 20, cancellationToken);

    await txn.CommitAsync();
}

Anta att uppgiften har slutförts och att det inte fanns några samtidiga transaktioner som ändrar kön. Användaren kan förvänta sig att kön innehåller objekten i någon av följande beställningar:

10, 20

20, 10

  • Fall 2: Parallell köaktivitet
// Parallel Task 1
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 20, cancellationToken);

    await txn.CommitAsync();
}

// Parallel Task 2
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 30, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 40, cancellationToken);

    await txn.CommitAsync();
}

Anta att uppgifterna har slutförts, att aktiviteterna kördes parallellt och att det inte fanns några andra samtidiga transaktioner som ändrar kön. Det går inte att dra någon slutsats om ordningen på objekten i kön. För det här kodfragmentet kan objekten visas i någon av de 4! möjliga orderings. Kön försöker behålla objekten i den ursprungliga (köade) ordningen, men kan tvingas att ändra ordning på dem på grund av samtidiga åtgärder eller fel.

DequeueAsync

Här följer några kodfragment för att använda TryDequeueAsync följt av förväntade utdata. Anta att kön redan är ifylld med följande objekt i kön:

10, 20, 30, 40, 50, 60

  • Fall 1: Enskild dequeue-uppgift
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);

    await txn.CommitAsync();
}

Anta att uppgiften har slutförts och att det inte fanns några samtidiga transaktioner som ändrar kön. Eftersom ingen slutsatsdragning kan göras om ordningen på objekten i kön, kan alla tre objekten dequeueras, i valfri ordning. Kön försöker behålla objekten i den ursprungliga (köade) ordningen, men kan tvingas att ändra ordning på dem på grund av samtidiga åtgärder eller fel.

  • Fall 2: Parallell dequeue-uppgift
// Parallel Task 1
List<int> dequeue1;
using (var txn = this.StateManager.CreateTransaction())
{
    dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;

    await txn.CommitAsync();
}

// Parallel Task 2
List<int> dequeue2;
using (var txn = this.StateManager.CreateTransaction())
{
    dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;

    await txn.CommitAsync();
}

Anta att uppgifterna har slutförts, att aktiviteterna kördes parallellt och att det inte fanns några andra samtidiga transaktioner som ändrar kön. Eftersom ingen slutsatsdragning kan göras om ordningen på objekten i kön innehåller listorna dequeue1 och dequeue2 vart och ett två objekt i valfri ordning.

Samma objekt visas inte i båda listorna. Om dequeue1 därför har 10, 30, skulle dequeue2 ha 20, 40.

  • Fall 3: Avqueue-beställning med transaktionsabstängning

Om du avbryter en transaktion med försändelser under flygning hamnar objekten i kön igen. Ordningen i vilken objekten sätts tillbaka på köns huvud garanteras inte. Låt oss titta på följande kod:

using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);

    // Abort the transaction
    await txn.AbortAsync();
}

Anta att objekten har dequeuerats i följande ordning:

10, 20

När vi avbryter transaktionen läggs objekten tillbaka till köns huvud i någon av följande beställningar:

10, 20

20, 10

Detsamma gäller för alla fall där transaktionen inte har checkats in.

Programmeringsmönster

I det här avsnittet ska vi titta på några programmeringsmönster som kan vara användbara när du använder ReliableConcurrentQueue.

Batch-avköer

Ett rekommenderat programmeringsmönster är att konsumentuppgiften batchar sina kvoter i stället för att utföra en dequeue i taget. Användaren kan välja att begränsa fördröjningar mellan varje batch eller batchstorlek. Följande kodfragment visar den här programmeringsmodellen. Tänk på att i det här exemplet utförs bearbetningen efter att transaktionen har checkats in, så om ett fel skulle inträffa under bearbetningen går de obearbetade objekten förlorade utan att ha bearbetats. Bearbetningen kan också utföras inom transaktionens omfång, men det kan ha en negativ inverkan på prestanda och kräver hantering av de objekt som redan har bearbetats.

int batchSize = 5;
long delayMs = 100;

while(!cancellationToken.IsCancellationRequested)
{
    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        for(int i = 0; i < batchSize; ++i)
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
            else
            {
                // else break the for loop
                break;
            }
        }

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }

    int delayFactor = batchSize - processItems.Count;
    await Task.Delay(TimeSpan.FromMilliseconds(delayMs * delayFactor), cancellationToken);
}

Best-Effort Notification-Based bearbetning

Ett annat intressant programmeringsmönster använder COUNT-API:et. Här kan vi implementera meddelandebaserad bearbetning med bästa förmåga för kön. Köantalet kan användas för att begränsa en kö eller en köaktivitet. Observera att som i föregående exempel, eftersom bearbetningen sker utanför transaktionen, kan obearbetade objekt gå förlorade om ett fel uppstår under bearbetningen.

int threshold = 5;
long delayMs = 1000;

while(!cancellationToken.IsCancellationRequested)
{
    while (this.Queue.Count < threshold)
    {
        cancellationToken.ThrowIfCancellationRequested();

        // If the queue does not have the threshold number of items, delay the task and check again
        await Task.Delay(TimeSpan.FromMilliseconds(delayMs), cancellationToken);
    }

    // If there are approximately threshold number of items, try and process the queue

    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
        } while (processItems.Count < threshold && ret.HasValue);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
}

Best-Effort dränering

Det går inte att garantera att kön töms på grund av datastrukturens samtidiga karaktär. Det är möjligt att ett visst anrop till TryDequeueAsync, även om inga användaråtgärder i kön pågår, inte returnerar ett objekt som tidigare har placerats i kö och checkats in. Den köade posten kommer garanterat att så småningom bli synlig för borttagning, men utan en out-of-band-kommunikationsmekanism kan en oberoende konsument inte veta att kön har nått ett stabilt tillstånd även om alla producenter har stoppats och inga nya köåtgärder tillåts. Därför är dräneringsåtgärden det bästa som implementeras nedan.

Användaren bör stoppa alla ytterligare producent- och konsumentuppgifter och vänta tills alla pågående transaktioner checkas in eller avbryts innan de försöker tömma kön. Om användaren känner till det förväntade antalet objekt i kön kan de ställa in ett meddelande som signalerar att alla objekt har tagits bort från kön.

int numItemsDequeued;
int batchSize = 5;

ConditionalValue ret;

do
{
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if(ret.HasValue)
            {
                // Buffer the dequeues
                processItems.Add(ret.Value);
            }
        } while (ret.HasValue && processItems.Count < batchSize);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
} while (ret.HasValue);

Granska

ReliableConcurrentQueue tillhandahåller inte Api:et TryPeekAsync . Användare kan hämta peek-semantiken med hjälp av en TryDequeueAsync och sedan avbryta transaktionen. I det här exemplet bearbetas dequeues endast om objektets värde är större än 10.

using (var txn = this.StateManager.CreateTransaction())
{
    ConditionalValue ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
    bool valueProcessed = false;

    if (ret.HasValue)
    {
        if (ret.Value > 10)
        {
            // Process the item
            Console.WriteLine("Value : " + ret.Value);
            valueProcessed = true;
        }
    }

    if (valueProcessed)
    {
        await txn.CommitAsync();    
    }
    else
    {
        await txn.AbortAsync();
    }
}

Måste läsa