Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Reliable Concurrent Queue är en asynkron, transaktionell och replikerad kö som erbjuder hög samtidig kapacitet för enqueue- och dequeue-åtgärder. Den är utformad för att leverera högt dataflöde och låg svarstid genom att lätta på den strikta FIFO-beställningen som tillhandahålls av Reliable Queue och ger istället en beställning med bästa möjliga arbete.
API:er
Samtidig kö | Tillförlitlig samtidig kö |
---|---|
void Enqueue(T item) | Task EnqueueAsync(ITransaction tx, T-objekt) |
bool TryDequeue(out T resultat) | Task< ConditionalValue < T >> TryDequeueAsync(ITransaction tx) |
int Count() | long Count() |
Jämförelse med Reliable Queue
Reliable Concurrent Queue erbjuds som ett alternativ till Reliable Queue. Det bör användas i fall där strikt FIFO-beställning inte krävs, eftersom garanti av FIFO kräver en kompromiss med samtidighet. Reliable Queue använder lås för att framtvinga FIFO-ordning, med högst en transaktion tillåts att köa och högst en transaktion tillåts att ta bort från kön åt gången. Som jämförelse kopplar Reliable Concurrent Queue av ordningsbegränsningen och gör det möjligt för valfritt antal samtidiga transaktioner att fläta samman sina enqueue- och dequeue-operationer. Beställning med bästa förmåga tillhandahålls, men den relativa ordningen på två värden i en tillförlitlig samtidig kö kan aldrig garanteras.
Reliable Concurrent Queue ger högre dataflöde och lägre svarstid än Reliable Queue när det finns flera samtidiga transaktioner som utför enqueue-operationer och/eller dequeue-operationer.
Ett exempel på användningsfall för ReliableConcurrentQueue är scenariot meddelandekö . I det här scenariot skapar och lägger en eller flera meddelandeproducenter till i kön, och en eller flera meddelandekonsumenter hämtar meddelanden från kön och bearbetar dem. Flera producenter och konsumenter kan arbeta oberoende av varandra och använda samtidiga transaktioner för att bearbeta kön.
Användningsriktlinjer
- Kön förväntar sig att objekten i kön har en låg kvarhållningsperiod. Det vill säga att objekten inte skulle stanna i kön under en längre tid.
- Kön garanterar inte strikt FIFO-beställning.
- Kön läser inte sina egna skrivningar. Om ett objekt lagras i en transaktion visas det inte för en dequeuer i samma transaktion.
- Dequeues är inte isolerade från varandra. Om objekt A tas bort i transaktionen txnA, även om txnA inte har bekräftats, skulle objekt A inte vara synligt för en samtidig transaktion txnB. Om txnA avbryts blir A synligt för txnB omedelbart.
- TryPeekAsync-beteende kan implementeras med hjälp av en TryDequeueAsync och sedan avbryta transaktionen. Ett exempel på det här beteendet finns i avsnittet Programmeringsmönster.
- Antalet är icke-transaktionellt. Det kan användas för att få en uppfattning om antalet element i kön, men representerar en tidpunkt och kan inte lita på.
- Dyr bearbetning av de borttagna objekten bör inte utföras medan transaktionen är aktiv, för att undvika långvariga transaktioner som kan ha en prestandapåverkan på systemet.
Kodstycken
Låt oss titta på några kodfragment och deras förväntade utdata. Undantagshantering ignoreras i det här avsnittet.
Instansiering
Att skapa en instans av en tillförlitlig samtidig kö liknar alla andra tillförlitliga samlingar.
IReliableConcurrentQueue<int> queue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<int>>("myQueue");
EnqueueAsync
Här följer några kodfragment för att använda EnqueueAsync följt av deras förväntade utdata.
- Fall 1: Enkel enqueue-uppgift
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
await this.Queue.EnqueueAsync(txn, 20, cancellationToken);
await txn.CommitAsync();
}
Anta att uppgiften har slutförts och att det inte fanns några samtidiga transaktioner som ändrar kön. Användaren kan förvänta sig att kön innehåller objekten i någon av följande beställningar:
10, 20
20, 10
- Fall 2: Parallell köaktivitet
// Parallel Task 1
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
await this.Queue.EnqueueAsync(txn, 20, cancellationToken);
await txn.CommitAsync();
}
// Parallel Task 2
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 30, cancellationToken);
await this.Queue.EnqueueAsync(txn, 40, cancellationToken);
await txn.CommitAsync();
}
Anta att uppgifterna har slutförts, att aktiviteterna kördes parallellt och att det inte fanns några andra samtidiga transaktioner som ändrar kön. Det går inte att dra någon slutsats om ordningen på objekten i kön. För det här kodfragmentet kan objekten visas i någon av 4! möjliga ordningar. Kön försöker behålla objekten i den ursprungliga ordningen (enqueued), men kan tvingas ändra ordning på dem på grund av samtidiga åtgärder eller fel.
DequeueAsync
Här följer några kodfragment för att använda TryDequeueAsync följt av förväntade utdata. Anta att kön redan är ifylld med följande objekt i kön:
10, 20, 30, 40, 50, 60
- Fall 1: Enskild dequeue-uppgift
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await txn.CommitAsync();
}
Anta att uppgiften har slutförts och att det inte fanns några samtidiga transaktioner som ändrar kön. Eftersom ingen slutsats kan dras om ordningen på elementen i kön kan tre av elementen tas bort från kön, i valfri ordning. Kön kommer att försöka behålla objekten i den ursprungliga köade ordningen, men kan tvingas omorganisera dem på grund av samtidiga operationer eller fel.
- Fall 2: Parallell dequeue-uppgift
// Parallel Task 1
List<int> dequeue1;
using (var txn = this.StateManager.CreateTransaction())
{
dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
await txn.CommitAsync();
}
// Parallel Task 2
List<int> dequeue2;
using (var txn = this.StateManager.CreateTransaction())
{
dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
await txn.CommitAsync();
}
Anta att uppgifterna har slutförts, att aktiviteterna kördes parallellt och att det inte fanns några andra samtidiga transaktioner som ändrar kön. Eftersom ingen slutsatsdragning kan göras om ordningen på objekten i kön, innehåller listorna dequeue1 och dequeue2 var och en två objekt, i valfri ordning.
Samma objekt visas inte i båda listorna. Därför, om dequeue1 har 10, 30, skulle dequeue2 ha 20, 40.
- Fall 3: Avqueue-beställning med transaktions abort
Om du avbryter en transaktion med dequeues under flygning hamnar objekten på köns huvud igen. Ordningen i vilken objekten sätts tillbaka på köns huvud garanteras inte. Låt oss titta på följande kod:
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
// Abort the transaction
await txn.AbortAsync();
}
Anta att objekten har dequeued i följande ordning:
10, 20
När vi avbryter transaktionen läggs objekten tillbaka till köns huvud i någon av följande beställningar:
10, 20
20, 10
Detsamma gäller för alla fall där transaktionen inte har checkats in.
Programmeringsmönster
I det här avsnittet ska vi titta på några programmeringsmönster som kan vara till hjälp vid användning av ReliableConcurrentQueue.
Batch-dequeues
Ett rekommenderat programmeringsmönster är att konsumentuppgiften batchar sina dequeues i stället för att utföra en dequeue i taget. Användaren kan välja att begränsa fördröjningar mellan varje batch eller batchstorlek. Följande kodfragment visar den här programmeringsmodellen. Tänk på att bearbetningen i det här exemplet utförs efter att transaktionen har checkats in, så om ett fel skulle inträffa under bearbetningen går de obearbetade objekten förlorade utan att ha bearbetats. Alternativt kan bearbetningen utföras inom transaktionens omfång, men det kan ha en negativ inverkan på prestanda och kräver hantering av de objekt som redan har bearbetats.
int batchSize = 5;
long delayMs = 100;
while(!cancellationToken.IsCancellationRequested)
{
// Buffer for dequeued items
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue<int> ret;
for(int i = 0; i < batchSize; ++i)
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if (ret.HasValue)
{
// If an item was dequeued, add to the buffer for processing
processItems.Add(ret.Value);
}
else
{
// else break the for loop
break;
}
}
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
int delayFactor = batchSize - processItems.Count;
await Task.Delay(TimeSpan.FromMilliseconds(delayMs * delayFactor), cancellationToken);
}
Best-Effort Notification-Based bearbetning
Ett annat intressant programmeringsmönster använder count-API:et. Här kan vi implementera meddelandebaserad bearbetning med bästa möjliga arbete för kön. Köantalet kan användas för att begränsa en kö eller en dequeue-uppgift. Observera att som i föregående exempel, eftersom bearbetningen sker utanför transaktionen, kan obearbetade objekt gå förlorade om ett fel inträffar under bearbetningen.
int threshold = 5;
long delayMs = 1000;
while(!cancellationToken.IsCancellationRequested)
{
while (this.Queue.Count < threshold)
{
cancellationToken.ThrowIfCancellationRequested();
// If the queue does not have the threshold number of items, delay the task and check again
await Task.Delay(TimeSpan.FromMilliseconds(delayMs), cancellationToken);
}
// If there are approximately threshold number of items, try and process the queue
// Buffer for dequeued items
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue<int> ret;
do
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if (ret.HasValue)
{
// If an item was dequeued, add to the buffer for processing
processItems.Add(ret.Value);
}
} while (processItems.Count < threshold && ret.HasValue);
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
}
Best-Effort avlopp
Det går inte att garantera att kön töms på grund av datastrukturens samtidiga karaktär. Det är möjligt att ett visst anrop till TryDequeueAsync kanske inte returnerar ett objekt som tidigare har lagts till i kön och bekräftats, även om inga användaråtgärder pågår i kön. Den köade posten är garanterad att så småningom bli synlig för borttagning, men utan en out-of-band-kommunikationsmekanism kan en oberoende konsument inte veta att kön har nått ett stabilt tillstånd även om alla producenter har stoppats och inga nya köåtgärder tillåts. Därför är dräneringsoperationen en bästa möjliga insats när den implementeras enligt nedan.
Användaren bör stoppa alla ytterligare producent- och konsumentaktiviteter och vänta tills alla pågående transaktioner har slutförts eller avbrutits innan man försöker tömma kön. Om användaren känner till det förväntade antalet objekt i kön kan de ställa in ett meddelande som signalerar att alla objekt har tagits bort.
int numItemsDequeued;
int batchSize = 5;
ConditionalValue ret;
do
{
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
do
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if(ret.HasValue)
{
// Buffer the dequeues
processItems.Add(ret.Value);
}
} while (ret.HasValue && processItems.Count < batchSize);
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
} while (ret.HasValue);
Kika
ReliableConcurrentQueue tillhandahåller inte Api:et TryPeekAsync . Användare kan få en titt på semantiken med hjälp av en TryDequeueAsync och sedan avbryta transaktionen. I det här exemplet bearbetas dequeues endast om objektets värde är större än 10.
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
bool valueProcessed = false;
if (ret.HasValue)
{
if (ret.Value > 10)
{
// Process the item
Console.WriteLine("Value : " + ret.Value);
valueProcessed = true;
}
}
if (valueProcessed)
{
await txn.CommitAsync();
}
else
{
await txn.AbortAsync();
}
}
Måste läsas
- Snabbstart för Reliable Services
- Arbeta med Reliable Collections
- Reliable Services-meddelanden
- Pålitliga tjänster säkerhetskopiering och återställning (katastrofåterställning)
- Konfiguration för Reliable State Manager
- Komma igång med Service Fabric Web API Services
- Avancerad användning av reliable services-programmeringsmodellen
- Utvecklarreferens för tillförlitliga samlingar