Share via


IReliableConcurrentQueue<T>.TryDequeueAsync Method

Definition

Tentatively dequeue a value from the queue. If the queue is empty, the dequeue operation will wait for an item to become available.

public System.Threading.Tasks.Task<Microsoft.ServiceFabric.Data.ConditionalValue<T>> TryDequeueAsync (Microsoft.ServiceFabric.Data.ITransaction tx, System.Threading.CancellationToken cancellationToken = default, TimeSpan? timeout = default);
abstract member TryDequeueAsync : Microsoft.ServiceFabric.Data.ITransaction * System.Threading.CancellationToken * Nullable<TimeSpan> -> System.Threading.Tasks.Task<Microsoft.ServiceFabric.Data.ConditionalValue<'T>>
Public Function TryDequeueAsync (tx As ITransaction, Optional cancellationToken As CancellationToken = Nothing, Optional timeout As Nullable(Of TimeSpan) = Nothing) As Task(Of ConditionalValue(Of T))

Parameters

tx
ITransaction

Transaction to associate this operation with.

cancellationToken
CancellationToken

The token to monitor for cancellation requests. The default is None.

timeout
Nullable<TimeSpan>

The amount of time to wait for the operation to complete. The default is null. If null is passed, a default timeout will be used.

Returns

A task that represents the asynchronous dequeue operation. The task's result is a ConditionalValue of type T. If a value was dequeued within the given time, return a ConditionalValue with HasValue as false, else it returns a ConditionalValue with HasValue as true and the Value as the dequeued item of Type T

Exceptions

The replica is no longer in .

The replica is currently not readable.

The replica saw a transient failure. Retry the operation on a new transaction

The replica saw a non retriable failure other than the types defined above. Cleanup and rethrow the exception

The operation was unable to be completed within the given timeout. The transaction should be aborted and a new transaction should be created to retry.

tx is null. Do not handle this exception.

The operation was canceled via cancellationToken.

The transaction has been internally faulted by the system. Retry the operation on a new transaction

Thrown when a method call is invalid for the object's current state. Example, transaction used is already terminated: committed or aborted by the user. If this exception is thrown, it is highly likely that there is a bug in the service code of the use of transactions.

Examples

This example shows how to dequeue and log infinitely with retry, until the cancellation token is canceled.

protected override async Task RunAsync(CancellationToken cancellationToken)
{
    var concurrentQueue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<long>>(new Uri("fabric:/concurrentQueue"));

    // Assumption: values are being enqueued by another source (e.g. the communication listener).
    while (true)
    {
        cancellationToken.ThrowIfCancellationRequested();

        try
        {
            using (var tx = this.StateManager.CreateTransaction())
            {
                var dequeueOutput = await concurrentQueue.TryDequeueAsync(tx, cancellationToken, TimeSpan.FromMilliseconds(100));
                await tx.CommitAsync();

                if (dequeueOutput.HasValue)
                {
                    Console.WriteLine("Dequeue # " + dequeueOutput);
                }
                else
                {
                    Console.WriteLine("Could not dequeue in the given time");
                }
            }
        }
        catch (TransactionFaultedException e)
        {
            // This indicates that the transaction was internally faulted by the system. One possible cause for this is that the transaction was long running
            // and blocked a checkpoint. Increasing the "ReliableStateManagerReplicatorSettings.CheckpointThresholdInMB" will help reduce the chances of running into this exception
            Console.WriteLine("Transaction was internally faulted, retrying the transaction: " + e);
        }
        catch (FabricNotPrimaryException e)
        {
            // Gracefully exit RunAsync as the new primary should have RunAsync invoked on it and continue work.
            // If instead dequeue was being executed as part of a client request, the client would be signaled to re-resolve.
            Console.WriteLine("Replica is not primary, exiting RunAsync: " + e);
            return;
        }
        catch (FabricNotReadableException e)
        {
            // Retry until the queue is readable or a different exception is thrown.
            Console.WriteLine("Queue is not readable, retrying the transaction: " + e);
        }
        catch (FabricObjectClosedException e)
        {
            // Gracefully exit RunAsync as this is happening due to replica close.
            // If instead dequeue was being executed as part of a client request, the client would be signaled to re-resolve.
            Console.WriteLine("Replica is closing, exiting RunAsync: " + e);
            return;
        }
        catch (TimeoutException e)
        {
            Console.WriteLine("Encountered TimeoutException during DequeueAsync, retrying the transaction: " + e);
        }
        catch (FabricTransientException e)
        {
            // Retry until the queue is writable or a different exception is thrown.
            Console.WriteLine("Queue is currently not writable, retrying the transaction: " + e);
        }

        // Delay and retry.
        await Task.Delay(TimeSpan.FromMilliseconds(100), cancellationToken);
    }
}

Remarks

While TryDequeueAsync(ITransaction, CancellationToken, Nullable<TimeSpan>) can only return values for which the corresponding EnqueueAsync(ITransaction, T, CancellationToken, Nullable<TimeSpan>) was committed, TryDequeueAsync(ITransaction, CancellationToken, Nullable<TimeSpan>) operations are not isolated from one another. Once a transaction has dequeued a value, other transactions cannot dequeue it, but are not blocked from dequeuing other values.

When a transaction or transactions including one or more TryDequeueAsync(ITransaction, CancellationToken, Nullable<TimeSpan>) operations aborts, the dequeued values will be added back at the head of the queue in an arbitrary order. This will ensure that these values will be dequeued again soon, improving the fairness of the data structure, but without enforcing strict ordering (which would require reducing the allowed concurrency, as in IReliableQueue<T>).

Applies to