Wprowadzenie do elementu ReliableConcurrentQueue w usłudze Azure Service Fabric

Reliable Concurrent Queue to asynchroniczna, transakcyjna i replikowana kolejka, która oferuje wysoką współbieżność dla operacji kolejkowania i dequeue. Jest ona zaprojektowana w celu zapewnienia wysokiej przepływności i małych opóźnień przez złagodzenie ścisłego porządkowania FIFO dostarczonego przez usługę Reliable Queue i zamiast tego zapewnia najlepsze zamówienie.

Interfejsy API

Kolejka współbieżna Niezawodna kolejka współbieżna
void Enqueue(element T) Zadanie EnqueueAsync(ITransaction tx, element T)
bool TryDequeue(wynik T) Task< ConditionalValue < T >> TryDequeueAsync(ITransaction tx)
liczba int() long Count()

Porównanie z niezawodną kolejką

Niezawodna kolejka współbieżna jest oferowana jako alternatywa dla niezawodnej kolejki. Należy go używać w przypadkach, gdy ścisłe porządkowanie FIFO nie jest wymagane, ponieważ zagwarantowanie FIFO wymaga kompromisu z współbieżnością. Usługa Reliable Queue używa blokad do wymuszania porządkowania FIFO, z co najwyżej jedną transakcją dozwoloną w kolejce i co najwyżej jedną transakcję dozwoloną do dequeue naraz. W porównaniu funkcja Reliable Concurrent Queue rozluźnia ograniczenie kolejności i umożliwia wykonywanie dowolnej liczby równoczesnych transakcji w kolejce i dequeue. Zapewniana jest kolejność starań, jednak względne porządkowanie dwóch wartości w niezawodnej kolejce współbieżnej nigdy nie może być gwarantowane.

Niezawodna kolejka współbieżna zapewnia większą przepływność i mniejsze opóźnienie niż niezawodna kolejka , gdy istnieje wiele współbieżnych transakcji wykonujących kolejki i/lub dequeues.

Przykładowy przypadek użycia dla elementu ReliableConcurrentQueue to scenariusz kolejki komunikatów . W tym scenariuszu co najmniej jeden producent komunikatów tworzy i dodaje elementy do kolejki, a co najmniej jeden użytkownik komunikatu ściąga komunikaty z kolejki i przetwarza je. Wielu producentów i konsumentów może działać niezależnie, używając współbieżnych transakcji w celu przetworzenia kolejki.

Zalecenia dotyczące użytkowania

  • Kolejka oczekuje, że elementy w kolejce mają niski okres przechowywania. Oznacza to, że elementy nie pozostaną w kolejce przez długi czas.
  • Kolejka nie gwarantuje ścisłego porządkowania FIFO.
  • Kolejka nie odczytuje własnych zapisów. Jeśli element jest w kolejce w ramach transakcji, nie będzie widoczny dla dequeuer w ramach tej samej transakcji.
  • Kolejki nie są odizolowane od siebie. Jeśli element A jest wyrejestrowany w transakcji txnA, mimo że txnA nie jest zatwierdzany, element A nie będzie widoczny dla współbieżnej transakcji txnB. Jeśli txnA przerywa, A stanie się widoczny dla txnB natychmiast.
  • Zachowanie tryPeekAsync można zaimplementować przy użyciu narzędzia TryDequeueAsync , a następnie przerwania transakcji. Przykład tego zachowania można znaleźć w sekcji Wzorce programowania.
  • Liczba nie jest transakcyjna. Może służyć do uzyskania pojęcia liczby elementów w kolejce, ale reprezentuje punkt w czasie i nie można go polegać.
  • Kosztowne przetwarzanie elementów w kolejce nie powinno być wykonywane, gdy transakcja jest aktywna, aby uniknąć długotrwałych transakcji, które mogą mieć wpływ na wydajność systemu.

Wstawki kodu

Przyjrzyjmy się kilku fragmentom kodu i ich oczekiwanym wynikom. Obsługa wyjątków jest ignorowana w tej sekcji.

Tworzenie wystąpienia

Tworzenie wystąpienia niezawodnej kolejki współbieżnej jest podobne do każdej innej kolekcji Reliable Collection.

IReliableConcurrentQueue<int> queue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<int>>("myQueue");

EnqueueAsync

Poniżej przedstawiono kilka fragmentów kodu do korzystania z enqueueAsync, a następnie ich oczekiwane dane wyjściowe.

  • Przypadek 1. Jedno zadanie w kolejce
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 20, cancellationToken);

    await txn.CommitAsync();
}

Załóżmy, że zadanie zostało ukończone pomyślnie i że nie było współbieżnych transakcji modyfikujących kolejkę. Użytkownik może oczekiwać, że kolejka będzie zawierać elementy w dowolnej z następujących zamówień:

10, 20

20, 10

  • Przypadek 2. Równoległe zadanie kolejki
// Parallel Task 1
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 20, cancellationToken);

    await txn.CommitAsync();
}

// Parallel Task 2
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 30, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 40, cancellationToken);

    await txn.CommitAsync();
}

Załóżmy, że zadania zostały ukończone pomyślnie, zadania były uruchamiane równolegle i że nie było żadnych innych współbieżnych transakcji modyfikujących kolejkę. Nie można wnioskować o kolejności elementów w kolejce. W przypadku tego fragmentu kodu elementy mogą być wyświetlane w dowolnym z 4! możliwe zamówienia. Kolejka spróbuje zachować elementy w oryginalnej kolejności (w kolejce), ale może być zmuszona do zmiany kolejności z powodu współbieżnych operacji lub błędów.

DequeueAsync

Poniżej przedstawiono kilka fragmentów kodu do używania narzędzia TryDequeueAsync, a następnie oczekiwanych danych wyjściowych. Załóżmy, że kolejka jest już wypełniona następującymi elementami w kolejce:

10, 20, 30, 40, 50, 60

  • Przypadek 1. Jedno zadanie w kolejce
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);

    await txn.CommitAsync();
}

Załóżmy, że zadanie zostało ukończone pomyślnie i że nie było współbieżnych transakcji modyfikujących kolejkę. Ponieważ nie można wnioskować o kolejności elementów w kolejce, wszystkie trzy elementy mogą być w kolejce, w dowolnej kolejności. Kolejka spróbuje zachować elementy w oryginalnej kolejności (w kolejce), ale może być zmuszona do zmiany kolejności z powodu współbieżnych operacji lub błędów.

  • Przypadek 2. Równoległe zadanie w kolejce
// Parallel Task 1
List<int> dequeue1;
using (var txn = this.StateManager.CreateTransaction())
{
    dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;

    await txn.CommitAsync();
}

// Parallel Task 2
List<int> dequeue2;
using (var txn = this.StateManager.CreateTransaction())
{
    dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;

    await txn.CommitAsync();
}

Załóżmy, że zadania zostały ukończone pomyślnie, zadania były uruchamiane równolegle i że nie było żadnych innych współbieżnych transakcji modyfikujących kolejkę. Ponieważ nie można wnioskować o kolejności elementów w kolejce, listy dequeue1 i dequeue2 będą zawierać wszystkie dwa elementy w dowolnej kolejności.

Ten sam element nie będzie wyświetlany na obu listach. W związku z tym, jeśli dequeue1 ma 10, 30, to dequeue2 będzie miał 20, 40.

  • Przypadek 3: Anulowanie zamawiania za pomocą transakcji przerwania

Przerwanie transakcji z dequeues w locie umieszcza elementy z powrotem na głowie kolejki. Kolejność, w której elementy są umieszczane z powrotem na głowie kolejki, nie jest gwarantowana. Przyjrzyjmy się następującej kodzie:

using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);

    // Abort the transaction
    await txn.AbortAsync();
}

Załóżmy, że elementy zostały wyczekiwane w następującej kolejności:

10, 20

Po przerwaniu transakcji elementy zostaną dodane z powrotem do głowy kolejki w dowolnej z następujących zamówień:

10, 20

20, 10

To samo dotyczy wszystkich przypadków, w których transakcja nie została pomyślnie zatwierdzona.

Wzorce programowania

W tej sekcji przyjrzyjmy się kilku wzorom programowania, które mogą być przydatne w korzystaniu z elementu ReliableConcurrentQueue.

Kolejki wsadowe

Zalecanym wzorcem programowania jest wykonanie zadania odbiorcy wsadowego jego dequeues zamiast wykonywania jednego dequeue naraz. Użytkownik może zdecydować się na ograniczenie opóźnień między każdą partią lub rozmiarem partii. Poniższy fragment kodu przedstawia ten model programowania. Należy pamiętać, że w tym przykładzie przetwarzanie odbywa się po zatwierdzeniu transakcji, więc jeśli podczas przetwarzania wystąpi błąd, nieprzetworzone elementy zostaną utracone bez przetworzenia. Alternatywnie przetwarzanie można wykonać w zakresie transakcji, jednak może mieć negatywny wpływ na wydajność i wymaga obsługi już przetworzonych elementów.

int batchSize = 5;
long delayMs = 100;

while(!cancellationToken.IsCancellationRequested)
{
    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        for(int i = 0; i < batchSize; ++i)
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
            else
            {
                // else break the for loop
                break;
            }
        }

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }

    int delayFactor = batchSize - processItems.Count;
    await Task.Delay(TimeSpan.FromMilliseconds(delayMs * delayFactor), cancellationToken);
}

przetwarzanie Best-Effort Notification-Based

Inny interesujący wzorzec programowania używa interfejsu API count. W tym miejscu możemy zaimplementować przetwarzanie oparte na powiadomieniach dla kolejki. Liczba kolejek może służyć do ograniczania kolejki lub dequeue zadania. Należy pamiętać, że tak jak w poprzednim przykładzie, ponieważ przetwarzanie odbywa się poza transakcją, nieprzetworzone elementy mogą zostać utracone w przypadku wystąpienia błędu podczas przetwarzania.

int threshold = 5;
long delayMs = 1000;

while(!cancellationToken.IsCancellationRequested)
{
    while (this.Queue.Count < threshold)
    {
        cancellationToken.ThrowIfCancellationRequested();

        // If the queue does not have the threshold number of items, delay the task and check again
        await Task.Delay(TimeSpan.FromMilliseconds(delayMs), cancellationToken);
    }

    // If there are approximately threshold number of items, try and process the queue

    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
        } while (processItems.Count < threshold && ret.HasValue);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
}

Best-Effort opróżnianie

Nie można zagwarantować opróżnienia kolejki ze względu na współbieżny charakter struktury danych. Istnieje możliwość, że nawet jeśli żadne operacje użytkownika w kolejce nie są w locie, określone wywołanie tryDequeueAsync może nie zwrócić elementu, który został wcześniej w kolejce i zatwierdzony. Element w kolejce ma gwarancję, że w końcu stanie się widoczny dla dequeue, jednak bez mechanizmu komunikacji poza pasmem, niezależny konsument nie może wiedzieć, że kolejka osiągnęła stały stan, nawet jeśli wszyscy producenci zostali zatrzymani i nie są dozwolone żadne nowe operacje kolejkowania. W związku z tym operacja opróżniania jest najlepszym wysiłkiem, jak zaimplementowano poniżej.

Użytkownik powinien zatrzymać wszystkie dalsze zadania producenta i konsumenta i poczekać na zatwierdzenie lub przerwanie transakcji w locie przed podjęciem próby opróżnienia kolejki. Jeśli użytkownik zna oczekiwaną liczbę elementów w kolejce, może skonfigurować powiadomienie, które sygnalizuje, że wszystkie elementy zostały wyczekiwane.

int numItemsDequeued;
int batchSize = 5;

ConditionalValue ret;

do
{
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if(ret.HasValue)
            {
                // Buffer the dequeues
                processItems.Add(ret.Value);
            }
        } while (ret.HasValue && processItems.Count < batchSize);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
} while (ret.HasValue);

Peek

Element ReliableConcurrentQueue nie udostępnia interfejsu API TryPeekAsync . Użytkownicy mogą uzyskać semantyka wglądu przy użyciu narzędzia TryDequeueAsync , a następnie przerwania transakcji. W tym przykładzie dequeues są przetwarzane tylko wtedy, gdy wartość elementu jest większa niż 10.

using (var txn = this.StateManager.CreateTransaction())
{
    ConditionalValue ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
    bool valueProcessed = false;

    if (ret.HasValue)
    {
        if (ret.Value > 10)
        {
            // Process the item
            Console.WriteLine("Value : " + ret.Value);
            valueProcessed = true;
        }
    }

    if (valueProcessed)
    {
        await txn.CommitAsync();    
    }
    else
    {
        await txn.AbortAsync();
    }
}

Musi zostać odczytany