次の方法で共有


IReliableConcurrentQueue<T>.TryDequeueAsync メソッド

定義

キューから値を一時的にデキューします。 キューが空の場合、デキュー操作は項目が使用可能になるまで待機します。

public System.Threading.Tasks.Task<Microsoft.ServiceFabric.Data.ConditionalValue<T>> TryDequeueAsync (Microsoft.ServiceFabric.Data.ITransaction tx, System.Threading.CancellationToken cancellationToken = default, TimeSpan? timeout = default);
abstract member TryDequeueAsync : Microsoft.ServiceFabric.Data.ITransaction * System.Threading.CancellationToken * Nullable<TimeSpan> -> System.Threading.Tasks.Task<Microsoft.ServiceFabric.Data.ConditionalValue<'T>>
Public Function TryDequeueAsync (tx As ITransaction, Optional cancellationToken As CancellationToken = Nothing, Optional timeout As Nullable(Of TimeSpan) = Nothing) As Task(Of ConditionalValue(Of T))

パラメーター

tx
ITransaction

この操作を関連付けるトランザクション。

cancellationToken
CancellationToken

キャンセル要求を監視するためのトークン。 既定では、 Noneです。

timeout
Nullable<TimeSpan>

操作が完了するまで待機する時間。 既定値は null です。 null が渡された場合は、既定のタイムアウトが使用されます。

戻り値

非同期デキュー操作を表すタスク。 タスクの結果は、T 型の ConditionalValue です。指定された時間内に値がデキューされた場合は、HasValue が false の ConditionalValue を返します。それ以外の場合は、HasValue が true、Value が T 型のデキューされた項目として ConditionalValue を返します

例外

レプリカが に 存在しなくなりました。

現在、レプリカは読み取り可能ではありません。

IReliableConcurrentQueue<T> ランタイムによって閉じられました。

レプリカで一時的な障害が発生しました。 新しいトランザクションで操作を再試行する

レプリカで、上記で定義した型以外の再トリブルではないエラーが発生しました。 例外をクリーンアップして再スローする

指定されたタイムアウト内に操作を完了できませんでした。 トランザクションを中止し、再試行するために新しいトランザクションを作成する必要があります。

tx が null です。 この例外は処理しないでください。

操作は を介して cancellationToken取り消されました。

トランザクションは、システムによって内部的に障害が発生しました。 新しいトランザクションで操作を再試行する

オブジェクトの現在の状態に対してメソッド呼び出しが無効な場合にスローされます。 たとえば、使用されているトランザクションは既に終了しています。コミットまたは中止されます。 この例外がスローされた場合、トランザクションの使用に関するサービス コードにバグがある可能性が高くなります。

この例では、取り消しトークンが取り消されるまで、再試行でデキューとログを無限に記録する方法を示します。

protected override async Task RunAsync(CancellationToken cancellationToken)
{
    var concurrentQueue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<long>>(new Uri("fabric:/concurrentQueue"));

    // Assumption: values are being enqueued by another source (e.g. the communication listener).
    while (true)
    {
        cancellationToken.ThrowIfCancellationRequested();

        try
        {
            using (var tx = this.StateManager.CreateTransaction())
            {
                var dequeueOutput = await concurrentQueue.TryDequeueAsync(tx, cancellationToken, TimeSpan.FromMilliseconds(100));
                await tx.CommitAsync();

                if (dequeueOutput.HasValue)
                {
                    Console.WriteLine("Dequeue # " + dequeueOutput);
                }
                else
                {
                    Console.WriteLine("Could not dequeue in the given time");
                }
            }
        }
        catch (TransactionFaultedException e)
        {
            // This indicates that the transaction was internally faulted by the system. One possible cause for this is that the transaction was long running
            // and blocked a checkpoint. Increasing the "ReliableStateManagerReplicatorSettings.CheckpointThresholdInMB" will help reduce the chances of running into this exception
            Console.WriteLine("Transaction was internally faulted, retrying the transaction: " + e);
        }
        catch (FabricNotPrimaryException e)
        {
            // Gracefully exit RunAsync as the new primary should have RunAsync invoked on it and continue work.
            // If instead dequeue was being executed as part of a client request, the client would be signaled to re-resolve.
            Console.WriteLine("Replica is not primary, exiting RunAsync: " + e);
            return;
        }
        catch (FabricNotReadableException e)
        {
            // Retry until the queue is readable or a different exception is thrown.
            Console.WriteLine("Queue is not readable, retrying the transaction: " + e);
        }
        catch (FabricObjectClosedException e)
        {
            // Gracefully exit RunAsync as this is happening due to replica close.
            // If instead dequeue was being executed as part of a client request, the client would be signaled to re-resolve.
            Console.WriteLine("Replica is closing, exiting RunAsync: " + e);
            return;
        }
        catch (TimeoutException e)
        {
            Console.WriteLine("Encountered TimeoutException during DequeueAsync, retrying the transaction: " + e);
        }
        catch (FabricTransientException e)
        {
            // Retry until the queue is writable or a different exception is thrown.
            Console.WriteLine("Queue is currently not writable, retrying the transaction: " + e);
        }

        // Delay and retry.
        await Task.Delay(TimeSpan.FromMilliseconds(100), cancellationToken);
    }
}

注釈

TryDequeueAsync(ITransaction, CancellationToken, Nullable<TimeSpan>)対応する EnqueueAsync(ITransaction, T, CancellationToken, Nullable<TimeSpan>) がコミットされた値のみを返すことができますが、TryDequeueAsync(ITransaction, CancellationToken, Nullable<TimeSpan>)操作は互いに分離されません。 あるトランザクションが値をデキューすると、他のトランザクションはその値をデキューすることはできませんが、他の値のデキューはブロックされません。

1 つ以上 TryDequeueAsync(ITransaction, CancellationToken, Nullable<TimeSpan>) の操作を含むトランザクションまたはトランザクションが中止されると、キューの先頭に任意の順序でデキューされた値が追加されます。 これにより、これらの値が間もなく再びデキューされ、データ構造の公平性が向上しますが、厳密な順序を適用する必要はありません (これは、 のように IReliableQueue<T>、許可されたコンカレンシーを減らす必要があります)。

適用対象