タスク ベースの非同期パターン (TAP) を使用して非同期操作を操作する場合は、コールバックを使用して、ブロックせずに待機を実現できます。 タスクの場合、これは Task.ContinueWithなどの方法によって実現されます。 言語ベースの非同期サポートでは、通常の制御フロー内で非同期操作を待機できるようにすることでコールバックを非表示にし、コンパイラによって生成されたコードでも同じ API レベルのサポートが提供されます。
Await を使用した実行の中断
C# の await キーワードと Visual Basic の Await 演算子 を使用して、 Task オブジェクトと Task<TResult> オブジェクトを非同期的に待機できます。
Taskを待機している場合、await
式はvoid
型です。
Task<TResult>を待機している場合、await
式はTResult
型です。
await
式は、非同期メソッドの本体内で行う必要があります。 (これらの言語機能は.NET Framework 4.5 で導入されました)。
内部では、await 機能は継続を使用してタスクにコールバックをインストールします。 このコールバックは、中断の時点で非同期メソッドを再開します。 非同期メソッドが再開されると、待機中の操作が正常に完了し、 Task<TResult>であった場合は、その TResult
が返されます。 待機していた Task または Task<TResult> が Canceled 状態で終了すると、 OperationCanceledException 例外がスローされます。 待機していた Task または Task<TResult> が Faulted 状態で終了した場合、エラーの原因となった例外がスローされます。
Task
は複数の例外の結果としてエラーが発生する可能性がありますが、これらの例外の 1 つだけが伝達されます。 ただし、 Task.Exception プロパティは、すべてのエラーを含む AggregateException 例外を返します。
同期コンテキスト (SynchronizationContext オブジェクト) が、中断時に非同期メソッドを実行していたスレッドに関連付けられている場合 (たとえば、 SynchronizationContext.Current プロパティが null
されていない場合)、非同期メソッドはコンテキストの Post メソッドを使用して、同じ同期コンテキストで再開します。 それ以外の場合は、中断時に現在のタスク スケジューラ (TaskScheduler オブジェクト) に依存します。 通常、これはスレッド プールを対象とする既定のタスク スケジューラ (TaskScheduler.Default) です。 このタスク スケジューラは、待機中の非同期操作が完了した場所で再開するか、再開をスケジュールするかを決定します。 通常、既定のスケジューラでは、待機中の操作が完了したスレッドで継続を実行できます。
非同期メソッドが呼び出されると、まだ完了していない待機可能なインスタンスで最初の await 式が呼び出し元に戻るまで、関数の本体が同期的に実行されます。 非同期メソッドが void
を返さない場合は、進行中の計算を表す Task または Task<TResult> オブジェクトが返されます。 void 以外の非同期メソッドでは、return ステートメントが検出された場合、またはメソッド本体の末尾に達した場合、タスクは RanToCompletion 最終状態で完了します。 ハンドルされない例外によって制御が非同期メソッドの本体から離れる場合、タスクは Faulted 状態で終了します。 その例外が OperationCanceledExceptionの場合、タスクは代わりに Canceled 状態で終了します。 この方法では、結果または例外が最終的に公開されます。
この動作には、いくつかの重要なバリエーションがあります。 パフォーマンス上の理由から、タスクが待機するまでにタスクが既に完了している場合、制御は生成されず、関数は引き続き実行されます。 さらに、元のコンテキストに戻すことは常に目的の動作ではなく、変更できます。これについては、次のセクションで詳しく説明します。
Yield と ConfigureAwait を使用した中断と再開の構成
いくつかのメソッドは、非同期メソッドの実行をより詳細に制御します。 たとえば、 Task.Yield メソッドを使用して、非同期メソッドに yield ポイントを導入できます。
public class Task : …
{
public static YieldAwaitable Yield();
…
}
これは、現在のコンテキストに非同期的にポストまたはスケジュールを戻すのと同じです。
Task.Run(async delegate
{
for(int i=0; i<1000000; i++)
{
await Task.Yield(); // fork the continuation into a separate work item
...
}
});
また、 Task.ConfigureAwait メソッドを使用して、非同期メソッドの中断と再開をより適切に制御することもできます。 前述のように、既定では、現在のコンテキストは非同期メソッドが中断された時点でキャプチャされ、キャプチャされたコンテキストは再開時に非同期メソッドの継続を呼び出すために使用されます。 多くの場合、これは必要な正確な動作です。 それ以外の場合は、継続コンテキストを気にせず、元のコンテキストへのポストバックを回避することでパフォーマンスを向上させることができます。 これを有効にするには、 Task.ConfigureAwait メソッドを使用して、コンテキストでキャプチャおよび再開するのではなく、待機中の非同期操作が完了した場所で実行を続行するように await 操作に通知します。
await someTask.ConfigureAwait(continueOnCapturedContext:false);
非同期操作の取り消し
.NET Framework 4 以降では、取り消しをサポートする TAP メソッドは、キャンセル トークン (CancellationToken オブジェクト) を受け入れるオーバーロードを少なくとも 1 つ提供します。
キャンセル トークンは、キャンセル トークン ソース (CancellationTokenSource オブジェクト) を介して作成されます。 ソースの Token プロパティは、ソースの Cancel メソッドが呼び出されたときに通知されるキャンセル トークンを返します。 たとえば、1 つの Web ページをダウンロードして操作を取り消すことができるようにするには、 CancellationTokenSource オブジェクトを作成し、そのトークンを TAP メソッドに渡し、操作を取り消す準備ができたら、ソースの Cancel メソッドを呼び出します。
var cts = new CancellationTokenSource();
string result = await DownloadStringTaskAsync(url, cts.Token);
… // at some point later, potentially on another thread
cts.Cancel();
複数の非同期呼び出しをキャンセルするには、すべての呼び出しに同じトークンを渡します。
var cts = new CancellationTokenSource();
IList<string> results = await Task.WhenAll(from url in urls select DownloadStringTaskAsync(url, cts.Token));
// at some point later, potentially on another thread
…
cts.Cancel();
または、同じトークンを操作の選択的なサブセットに渡すことができます。
var cts = new CancellationTokenSource();
byte [] data = await DownloadDataAsync(url, cts.Token);
await SaveToDiskAsync(outputPath, data, CancellationToken.None);
… // at some point later, potentially on another thread
cts.Cancel();
Von Bedeutung
取り消し要求は、任意のスレッドから開始できます。
取り消しトークンを受け取る任意のメソッドに CancellationToken.None 値を渡して、取り消しが要求されないようにすることができます。 これにより、 CancellationToken.CanBeCanceled プロパティは false
を返し、呼び出されたメソッドはそれに応じて最適化できます。 テスト目的で、トークンが既に取り消された状態で開始する必要があるかどうかを示すブール値を受け取るコンストラクターを使用して、インスタンス化されたキャンセル前のキャンセル トークンを渡すこともできます。
キャンセルに対するこのアプローチには、いくつかの利点があります。
同じキャンセル トークンを任意の数の非同期操作と同期操作に渡すことができます。
同じキャンセル要求は、任意の数のリスナーに拡散される可能性があります。
非同期 API の開発者は、取り消しを要求できるかどうかと、いつ有効になるかを完全に制御できます。
API を使用するコードは、キャンセル要求が伝達される非同期呼び出しを選択的に決定できます。
進行状況の監視
一部の非同期メソッドは、非同期メソッドに渡される進行状況インターフェイスを介して進行状況を公開します。 たとえば、テキストの文字列を非同期的にダウンロードし、その過程で、これまでに完了したダウンロードの割合を含む進行状況の更新を発生させる関数を考えてみましょう。 このようなメソッドは、次のように Windows Presentation Foundation (WPF) アプリケーションで使用できます。
private async void btnDownload_Click(object sender, RoutedEventArgs e)
{
btnDownload.IsEnabled = false;
try
{
txtResult.Text = await DownloadStringTaskAsync(txtUrl.Text,
new Progress<int>(p => pbDownloadProgress.Value = p));
}
finally { btnDownload.IsEnabled = true; }
}
組み込みのタスク ベースの組み合わせ子の使用
System.Threading.Tasks名前空間には、タスクを作成および操作するためのいくつかのメソッドが含まれています。
Task.Run
Task クラスには、TaskまたはTask<TResult>として作業をスレッド プールに簡単にオフロードできる、いくつかのRunメソッドが含まれています。次に例を示します。
public async void button1_Click(object sender, EventArgs e)
{
textBox1.Text = await Task.Run(() =>
{
// … do compute-bound work here
return answer;
});
}
これらの Run メソッドの一部 ( Task.Run(Func<Task>) オーバーロードなど) は、 TaskFactory.StartNew メソッドの短縮形として存在します。 このオーバーロードを使用すると、オフロードされた作業内で await を使用できます。次に例を示します。
public async void button1_Click(object sender, EventArgs e)
{
pictureBox1.Image = await Task.Run(async() =>
{
using(Bitmap bmp1 = await DownloadFirstImageAsync())
using(Bitmap bmp2 = await DownloadSecondImageAsync())
return Mashup(bmp1, bmp2);
});
}
このようなオーバーロードは、タスク並列ライブラリのUnwrap拡張メソッドと組み合わせてTaskFactory.StartNew メソッドを使用することと論理的に同等です。
Task.FromResult
データが既に使用可能であり、Task<TResult>にリフトされたタスクを返すメソッドから返す必要があるシナリオでは、FromResult メソッドを使用します。
public Task<int> GetValueAsync(string key)
{
int cachedValue;
return TryGetCachedValue(out cachedValue) ?
Task.FromResult(cachedValue) :
GetValueAsyncInternal();
}
private async Task<int> GetValueAsyncInternal(string key)
{
…
}
Task.WhenAll
WhenAll メソッドを使用して、タスクとして表される複数の非同期操作を非同期的に待機します。 このメソッドには、一連の非ジェネリック タスクまたは一様でない一連の汎用タスク (たとえば、複数の void を返す操作を非同期的に待機する、各値が異なる型を持つ複数の値を返すメソッドを非同期的に待機する) をサポートし、一様な一連のジェネリック タスク (複数の TResult
戻りメソッドを非同期的に待機するなど) をサポートする複数のオーバーロードがあります。
複数の顧客にメール メッセージを送信するとします。 1 つのメッセージが完了するのを待ってから次のメッセージを送信しないように、メッセージの送信を重複させることができます。 また、送信操作がいつ完了したか、エラーが発生したかどうかを確認することもできます。
IEnumerable<Task> asyncOps = from addr in addrs select SendMailAsync(addr);
await Task.WhenAll(asyncOps);
このコードは発生する可能性のある例外を明示的に処理しませんが、結果のタスクの await
から例外を WhenAllから伝達できます。 例外を処理するには、次のようなコードを使用します。
IEnumerable<Task> asyncOps = from addr in addrs select SendMailAsync(addr);
try
{
await Task.WhenAll(asyncOps);
}
catch(Exception exc)
{
...
}
この場合、非同期操作が失敗した場合、すべての例外がAggregateException例外に統合されます。この例外は、WhenAll メソッドから返されるTaskに格納されます。 ただし、 await
キーワードによって反映されるのは、これらの例外の 1 つだけです。 すべての例外を調べる場合は、前のコードを次のように書き直すことができます。
Task [] asyncOps = (from addr in addrs select SendMailAsync(addr)).ToArray();
try
{
await Task.WhenAll(asyncOps);
}
catch(Exception exc)
{
foreach(Task faulted in asyncOps.Where(t => t.IsFaulted))
{
… // work with faulted and faulted.Exception
}
}
Web から複数のファイルを非同期にダウンロードする例を考えてみましょう。 この場合、すべての非同期操作には同種の結果の種類があり、結果に簡単にアクセスできます。
string [] pages = await Task.WhenAll(
from url in urls select DownloadStringTaskAsync(url));
前の void を返すシナリオで説明したのと同じ例外処理手法を使用できます。
Task<string> [] asyncOps =
(from url in urls select DownloadStringTaskAsync(url)).ToArray();
try
{
string [] pages = await Task.WhenAll(asyncOps);
...
}
catch(Exception exc)
{
foreach(Task<string> faulted in asyncOps.Where(t => t.IsFaulted))
{
… // work with faulted and faulted.Exception
}
}
Task.WhenAny
WhenAny メソッドを使用すると、タスクとして表される複数の非同期操作のいずれかが完了するまで非同期的に待機できます。 このメソッドは、次の 4 つの主要なユース ケースに対応します。
冗長性: 操作を複数回実行し、最初に完了した操作を選択します (たとえば、1 つの結果を生成する複数の株価情報 Web サービスに連絡し、最も速く完了するものを選択するなど)。
インターリーブ: 複数の操作を起動し、すべての操作が完了するまで待機しますが、完了したら処理します。
調整: 他の操作が完了すると、追加の操作を開始できます。 これは、インターリーブ シナリオの拡張機能です。
早期の救済: たとえば、タスク t1 で表される操作は、別のタスク t2 を持つ WhenAny タスクにグループ化でき、 WhenAny タスクを待機できます。 タスク t2 は、タイムアウト、取り消し、または t1 が完了する前に WhenAny タスクが完了する原因となるその他のシグナルを表している可能性があります。
冗長性
株式を買うかどうかを決定する場合を考えてみましょう。 信頼できるストックレコメンデーション Web サービスがいくつかありますが、毎日の負荷によっては、各サービスが異なる時間に遅くなる可能性があります。 WhenAnyメソッドを使用すると、操作が完了したときに通知を受け取ることができます。
var recommendations = new List<Task<bool>>()
{
GetBuyRecommendation1Async(symbol),
GetBuyRecommendation2Async(symbol),
GetBuyRecommendation3Async(symbol)
};
Task<bool> recommendation = await Task.WhenAny(recommendations);
if (await recommendation) BuyStock(symbol);
正常に完了したすべてのタスクのラップされていない結果を返す WhenAllとは異なり、 WhenAny は完了したタスクを返します。 タスクが失敗した場合は、失敗したことを把握することが重要であり、タスクが成功した場合は、戻り値が関連付けられているタスクを把握することが重要です。 そのため、この例に示すように、返されたタスクの結果にアクセスするか、さらに待機する必要があります。
WhenAllと同様に、例外に対応できる必要があります。 完了したタスクが返されるため、返されたタスクにエラーが反映されるのを待ち、適切に try/catch
できます。次に例を示します。
Task<bool> [] recommendations = …;
while(recommendations.Count > 0)
{
Task<bool> recommendation = await Task.WhenAny(recommendations);
try
{
if (await recommendation) BuyStock(symbol);
break;
}
catch(WebException exc)
{
recommendations.Remove(recommendation);
}
}
さらに、最初のタスクが正常に完了した場合でも、後続のタスクが失敗する可能性があります。 この時点で、例外を処理するためのいくつかのオプションがあります。起動されたすべてのタスクが完了するまで待機できます。その場合は、 WhenAll メソッドを使用することも、すべての例外が重要であり、ログに記録する必要があると判断することもできます。 このためには、継続を使用して、タスクが非同期的に完了したときに通知を受け取ることができます。
foreach(Task recommendation in recommendations)
{
var ignored = recommendation.ContinueWith(
t => { if (t.IsFaulted) Log(t.Exception); });
}
又は:
foreach(Task recommendation in recommendations)
{
var ignored = recommendation.ContinueWith(
t => Log(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
}
または次の場合も含まれます。
private static async void LogCompletionIfFailed(IEnumerable<Task> tasks)
{
foreach(var task in tasks)
{
try { await task; }
catch(Exception exc) { Log(exc); }
}
}
…
LogCompletionIfFailed(recommendations);
最後に、残りのすべての操作を取り消したい場合があります。
var cts = new CancellationTokenSource();
var recommendations = new List<Task<bool>>()
{
GetBuyRecommendation1Async(symbol, cts.Token),
GetBuyRecommendation2Async(symbol, cts.Token),
GetBuyRecommendation3Async(symbol, cts.Token)
};
Task<bool> recommendation = await Task.WhenAny(recommendations);
cts.Cancel();
if (await recommendation) BuyStock(symbol);
インターリーブ
Web からイメージをダウンロードし、各イメージを処理する場合を考えてみましょう (たとえば、UI コントロールに画像を追加する)。 UI スレッドでイメージを順番に処理しますが、可能な限り同時にイメージをダウンロードする必要があります。 また、イメージがすべてダウンロードされるまで、UI へのイメージの追加を保留する必要はありません。 代わりに、完了時に追加します。
List<Task<Bitmap>> imageTasks =
(from imageUrl in urls select GetBitmapAsync(imageUrl)).ToList();
while(imageTasks.Count > 0)
{
try
{
Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
imageTasks.Remove(imageTask);
Bitmap image = await imageTask;
panel.AddImage(image);
}
catch{}
}
ダウンロードしたイメージの ThreadPool に対して計算負荷の高い処理を伴うシナリオにインターリーブを適用することもできます。次に例を示します。
List<Task<Bitmap>> imageTasks =
(from imageUrl in urls select GetBitmapAsync(imageUrl)
.ContinueWith(t => ConvertImage(t.Result)).ToList();
while(imageTasks.Count > 0)
{
try
{
Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
imageTasks.Remove(imageTask);
Bitmap image = await imageTask;
panel.AddImage(image);
}
catch{}
}
スロットリング
インターリーブの例を考えてみましょう。ただし、ユーザーがダウンロードする画像が非常に多くダウンロードされるため、ダウンロードを調整する必要がある点を除きます。たとえば、特定の数のダウンロードのみを同時に実行する必要があります。 これを実現するには、非同期操作のサブセットを開始します。 操作が完了したら、追加の操作を開始して代わりに実行できます。
const int CONCURRENCY_LEVEL = 15;
Uri [] urls = …;
int nextIndex = 0;
var imageTasks = new List<Task<Bitmap>>();
while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)
{
imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
nextIndex++;
}
while(imageTasks.Count > 0)
{
try
{
Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
imageTasks.Remove(imageTask);
Bitmap image = await imageTask;
panel.AddImage(image);
}
catch(Exception exc) { Log(exc); }
if (nextIndex < urls.Length)
{
imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
nextIndex++;
}
}
早期の救済
ユーザーの取り消し要求 (たとえば、ユーザーがキャンセル ボタンをクリックした場合など) に同時に応答しながら、操作が完了するまで非同期的に待機しているとします。 次のコードは、このシナリオを示しています。
private CancellationTokenSource m_cts;
public void btnCancel_Click(object sender, EventArgs e)
{
if (m_cts != null) m_cts.Cancel();
}
public async void btnRun_Click(object sender, EventArgs e)
{
m_cts = new CancellationTokenSource();
btnRun.Enabled = false;
try
{
Task<Bitmap> imageDownload = GetBitmapAsync(txtUrl.Text);
await UntilCompletionOrCancellation(imageDownload, m_cts.Token);
if (imageDownload.IsCompleted)
{
Bitmap image = await imageDownload;
panel.AddImage(image);
}
else imageDownload.ContinueWith(t => Log(t));
}
finally { btnRun.Enabled = true; }
}
private static async Task UntilCompletionOrCancellation(
Task asyncOp, CancellationToken ct)
{
var tcs = new TaskCompletionSource<bool>();
using(ct.Register(() => tcs.TrySetResult(true)))
await Task.WhenAny(asyncOp, tcs.Task);
return asyncOp;
}
この実装では、ユーザー インターフェイスが再び有効になります。ただし、基になる非同期操作は取り消されません。 もう 1 つの方法は、取り消し要求が原因で早期に終了する可能性があるため、保留中の操作を取り消し、操作が完了するまでユーザー インターフェイスを再確立しない場合です。
private CancellationTokenSource m_cts;
public async void btnRun_Click(object sender, EventArgs e)
{
m_cts = new CancellationTokenSource();
btnRun.Enabled = false;
try
{
Task<Bitmap> imageDownload = GetBitmapAsync(txtUrl.Text, m_cts.Token);
await UntilCompletionOrCancellation(imageDownload, m_cts.Token);
Bitmap image = await imageDownload;
panel.AddImage(image);
}
catch(OperationCanceledException) {}
finally { btnRun.Enabled = true; }
}
早期救済のもう 1 つの例として、次のセクションで説明するように、 WhenAny メソッドを Delay メソッドと組み合わせて使用する必要があります。
Task.Delay (タスクの遅延)
Task.Delay メソッドを使用して、非同期メソッドの実行に一時停止を導入できます。 これは、ポーリング ループの構築や、ユーザー入力の処理の遅延など、さまざまな種類の機能に役立ちます。 Task.Delay メソッドは、awaits でタイムアウトを実装するためのTask.WhenAnyと組み合わせて使用することもできます。
大規模な非同期操作 (たとえば、ASP.NET Web サービス) の一部であるタスクの完了に時間がかかりすぎる場合は、操作全体が低下する可能性があります(特に、完了に失敗した場合)。 このため、非同期操作を待機しているときにタイムアウトできることが重要です。 同期 Task.Wait、 Task.WaitAll、および Task.WaitAny メソッドはタイムアウト値を受け取りますが、対応する TaskFactory.ContinueWhenAll/TaskFactory.ContinueWhenAny と前述の Task.WhenAll/Task.WhenAny メソッドでは受け入れません。 代わりに、 Task.Delay と Task.WhenAny を組み合わせて使用してタイムアウトを実装できます。
たとえば、UI アプリケーションで、イメージをダウンロードし、イメージのダウンロード中に UI を無効にするとします。 ただし、ダウンロードに時間がかかりすぎる場合は、UI を再度有効にしてダウンロードを破棄する必要があります。
public async void btnDownload_Click(object sender, EventArgs e)
{
btnDownload.Enabled = false;
try
{
Task<Bitmap> download = GetBitmapAsync(url);
if (download == await Task.WhenAny(download, Task.Delay(3000)))
{
Bitmap bmp = await download;
pictureBox.Image = bmp;
status.Text = "Downloaded";
}
else
{
pictureBox.Image = null;
status.Text = "Timed out";
var ignored = download.ContinueWith(
t => Trace("Task finally completed"));
}
}
finally { btnDownload.Enabled = true; }
}
WhenAllはタスクを返すので、複数のダウンロードにも同じことが当てはまります。
public async void btnDownload_Click(object sender, RoutedEventArgs e)
{
btnDownload.Enabled = false;
try
{
Task<Bitmap[]> downloads =
Task.WhenAll(from url in urls select GetBitmapAsync(url));
if (downloads == await Task.WhenAny(downloads, Task.Delay(3000)))
{
foreach(var bmp in downloads.Result) panel.AddImage(bmp);
status.Text = "Downloaded";
}
else
{
status.Text = "Timed out";
downloads.ContinueWith(t => Log(t));
}
}
finally { btnDownload.Enabled = true; }
}
タスク ベースの組み合わせ子の構築
タスクは完全に非同期操作を表し、操作との結合、結果の取得などの同期機能と非同期機能を提供できるため、タスクを構成する組み合わせ子の便利なライブラリを構築して、より大きなパターンを構築できます。 前のセクションで説明したように、.NET には複数の組み込みコンバイネータが含まれていますが、独自にビルドすることもできます。 次のセクションでは、組み合わせ子のメソッドと型の例をいくつか示します。
RetryOnFault
多くの場合、以前の試行が失敗した場合に操作を再試行することが必要になる場合があります。 同期コードの場合は、次の例の RetryOnFault
などのヘルパー メソッドをビルドしてこれを実現できます。
public static T RetryOnFault<T>(
Func<T> function, int maxTries)
{
for(int i=0; i<maxTries; i++)
{
try { return function(); }
catch { if (i == maxTries-1) throw; }
}
return default(T);
}
TAP を使用して実装され、タスクを返す非同期操作用に、ほぼ同じヘルパー メソッドを構築できます。
public static async Task<T> RetryOnFault<T>(
Func<Task<T>> function, int maxTries)
{
for(int i=0; i<maxTries; i++)
{
try { return await function().ConfigureAwait(false); }
catch { if (i == maxTries-1) throw; }
}
return default(T);
}
その後、この組み合わせ器を使用して、再試行をアプリケーションのロジックにエンコードできます。例えば:
// Download the URL, trying up to three times in case of failure
string pageContents = await RetryOnFault(
() => DownloadStringTaskAsync(url), 3);
RetryOnFault
関数をさらに拡張することもできます。 たとえば、この関数は再試行の間に呼び出される別の Func<Task>
を受け入れて、操作を再試行するタイミングを決定できます。次に例を示します。
public static async Task<T> RetryOnFault<T>(
Func<Task<T>> function, int maxTries, Func<Task> retryWhen)
{
for(int i=0; i<maxTries; i++)
{
try { return await function().ConfigureAwait(false); }
catch { if (i == maxTries-1) throw; }
await retryWhen().ConfigureAwait(false);
}
return default(T);
}
その後、次のように関数を使用して、操作を再試行する前に 1 秒間待機できます。
// Download the URL, trying up to three times in case of failure,
// and delaying for a second between retries
string pageContents = await RetryOnFault(
() => DownloadStringTaskAsync(url), 3, () => Task.Delay(1000));
NeedOnlyOne
場合によっては、冗長性を利用して、操作の待機時間と成功の可能性を向上させることができます。 株価情報を提供する複数の Web サービスを検討しますが、1 日のさまざまな時間帯に、各サービスによって異なるレベルの品質と応答時間が提供される場合があります。 これらの変動に対処するには、すべての Web サービスに要求を発行し、応答が返されたら、残りの要求を取り消します。 ヘルパー関数を実装すると、複数の操作を起動し、任意の操作を待機してから、残りの操作を取り消すという一般的なパターンを簡単に実装できます。 次の例の NeedOnlyOne
関数は、このシナリオを示しています。
public static async Task<T> NeedOnlyOne(
params Func<CancellationToken,Task<T>> [] functions)
{
var cts = new CancellationTokenSource();
var tasks = (from function in functions
select function(cts.Token)).ToArray();
var completed = await Task.WhenAny(tasks).ConfigureAwait(false);
cts.Cancel();
foreach(var task in tasks)
{
var ignored = task.ContinueWith(
t => Log(t), TaskContinuationOptions.OnlyOnFaulted);
}
return completed;
}
その後、次のようにこの関数を使用できます。
double currentPrice = await NeedOnlyOne(
ct => GetCurrentPriceFromServer1Async("msft", ct),
ct => GetCurrentPriceFromServer2Async("msft", ct),
ct => GetCurrentPriceFromServer3Async("msft", ct));
インターリーブ操作
WhenAny メソッドを使用して、大規模なタスク セットを操作する場合のインターリーブ シナリオをサポートする場合、パフォーマンスの問題が発生する可能性があります。
WhenAnyを呼び出すたびに、継続が各タスクに登録されます。 N 個のタスクの場合、インターリーブ操作の有効期間中に O(N2) 継続が作成されます。 多数のタスクセットを使用している場合は、コンバイネータ (次の例のInterleaved
) を使用してパフォーマンスの問題に対処できます。
static IEnumerable<Task<T>> Interleaved<T>(IEnumerable<Task<T>> tasks)
{
var inputTasks = tasks.ToList();
var sources = (from _ in Enumerable.Range(0, inputTasks.Count)
select new TaskCompletionSource<T>()).ToList();
int nextTaskIndex = -1;
foreach (var inputTask in inputTasks)
{
inputTask.ContinueWith(completed =>
{
var source = sources[Interlocked.Increment(ref nextTaskIndex)];
if (completed.IsFaulted)
source.TrySetException(completed.Exception.InnerExceptions);
else if (completed.IsCanceled)
source.TrySetCanceled();
else
source.TrySetResult(completed.Result);
}, CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
return from source in sources
select source.Task;
}
その後、組み合わせ子を使用して、完了したタスクの結果を処理できます。例えば:
IEnumerable<Task<int>> tasks = ...;
foreach(var task in Interleaved(tasks))
{
int result = await task;
…
}
WhenAllOrFirstException
特定の散布図/収集シナリオでは、いずれかのタスクがエラーを発生しない限り、セット内のすべてのタスクを待機することが必要な場合があります。その場合は、例外が発生したらすぐに待機を停止する必要があります。 次の例では、 WhenAllOrFirstException
などのコンバイネータ メソッドを使用してこれを実現できます。
public static Task<T[]> WhenAllOrFirstException<T>(IEnumerable<Task<T>> tasks)
{
var inputs = tasks.ToList();
var ce = new CountdownEvent(inputs.Count);
var tcs = new TaskCompletionSource<T[]>();
Action<Task> onCompleted = (Task completed) =>
{
if (completed.IsFaulted)
tcs.TrySetException(completed.Exception.InnerExceptions);
if (ce.Signal() && !tcs.Task.IsCompleted)
tcs.TrySetResult(inputs.Select(t => t.Result).ToArray());
};
foreach (var t in inputs) t.ContinueWith(onCompleted);
return tcs.Task;
}
タスクベースのデータ構造の構築
カスタム タスク ベースの組み合わせ子を構築する機能に加えて、非同期操作の結果と、それに結合するために必要な同期の両方を表すデータ構造を Task と Task<TResult> することで、非同期シナリオで使用するカスタム データ構造を構築するための強力な型になります。
AsyncCache
1 つのタスクの重要な側面の 1 つは、複数のコンシューマーに渡される可能性があり、全員がそれを待つ、継続を登録する、その結果または例外を取得する ( Task<TResult>の場合)などです。 これにより、 Task と Task<TResult> 非同期キャッシュ インフラストラクチャでの使用に最適です。 Task<TResult>に基づいて構築された、小さくて強力な非同期キャッシュの例を次に示します。
public class AsyncCache<TKey, TValue>
{
private readonly Func<TKey, Task<TValue>> _valueFactory;
private readonly ConcurrentDictionary<TKey, Lazy<Task<TValue>>> _map;
public AsyncCache(Func<TKey, Task<TValue>> valueFactory)
{
if (valueFactory == null) throw new ArgumentNullException("valueFactory");
_valueFactory = valueFactory;
_map = new ConcurrentDictionary<TKey, Lazy<Task<TValue>>>();
}
public Task<TValue> this[TKey key]
{
get
{
if (key == null) throw new ArgumentNullException("key");
return _map.GetOrAdd(key, toAdd =>
new Lazy<Task<TValue>>(() => _valueFactory(toAdd))).Value;
}
}
}
AsyncCache<TKey,TValue> クラスは、TKey
を受け取り、Task<TResult>を返す関数をコンストラクターへのデリゲートとして受け取ります。 キャッシュから以前にアクセスした値は内部ディクショナリに格納され、キャッシュが同時にアクセスされる場合でも、 AsyncCache
はキーごとに 1 つのタスクのみが生成されるようにします。
たとえば、ダウンロードした Web ページのキャッシュを作成できます。
private AsyncCache<string,string> m_webPages =
new AsyncCache<string,string>(DownloadStringTaskAsync);
その後、Web ページの内容が必要な場合は常に、非同期メソッドでこのキャッシュを使用できます。
AsyncCache
クラスを使用すると、できるだけ少ないページをダウンロードし、結果をキャッシュできます。
private async void btnDownload_Click(object sender, RoutedEventArgs e)
{
btnDownload.IsEnabled = false;
try
{
txtContents.Text = await m_webPages["https://www.microsoft.com"];
}
finally { btnDownload.IsEnabled = true; }
}
AsyncProducerConsumerCollection
また、タスクを使用して、非同期アクティビティを調整するためのデータ構造を構築することもできます。 従来の並列設計パターンの 1 つであるプロデューサー/コンシューマーについて考えてみましょう。 このパターンでは、プロデューサーはコンシューマーによって使用されるデータを生成し、プロデューサーとコンシューマーは並列で実行できます。 たとえば、コンシューマーはアイテム 1 を処理します。これは、アイテム 2 を現在生産しているプロデューサーによって以前に生成されました。 プロデューサー/コンシューマー パターンでは、プロデューサーによって作成された作業を格納して、コンシューマーに新しいデータの通知を受け取り、使用可能な場合にそれを見つけられるように、データ構造が必ず必要になります。
非同期メソッドをプロデューサーおよびコンシューマーとして使用できるようにする、タスクに基づいて構築された単純なデータ構造を次に示します。
public class AsyncProducerConsumerCollection<T>
{
private readonly Queue<T> m_collection = new Queue<T>();
private readonly Queue<TaskCompletionSource<T>> m_waiting =
new Queue<TaskCompletionSource<T>>();
public void Add(T item)
{
TaskCompletionSource<T> tcs = null;
lock (m_collection)
{
if (m_waiting.Count > 0) tcs = m_waiting.Dequeue();
else m_collection.Enqueue(item);
}
if (tcs != null) tcs.TrySetResult(item);
}
public Task<T> Take()
{
lock (m_collection)
{
if (m_collection.Count > 0)
{
return Task.FromResult(m_collection.Dequeue());
}
else
{
var tcs = new TaskCompletionSource<T>();
m_waiting.Enqueue(tcs);
return tcs.Task;
}
}
}
}
そのデータ構造を配置すると、次のようなコードを記述できます。
private static AsyncProducerConsumerCollection<int> m_data = …;
…
private static async Task ConsumerAsync()
{
while(true)
{
int nextItem = await m_data.Take();
ProcessNextItem(nextItem);
}
}
…
private static void Produce(int data)
{
m_data.Add(data);
}
System.Threading.Tasks.Dataflow名前空間にはBufferBlock<T>型が含まれています。これは同様の方法で使用できますが、カスタム コレクション型を作成する必要はありません。
private static BufferBlock<int> m_data = …;
…
private static async Task ConsumerAsync()
{
while(true)
{
int nextItem = await m_data.ReceiveAsync();
ProcessNextItem(nextItem);
}
}
…
private static void Produce(int data)
{
m_data.Post(data);
}
注
System.Threading.Tasks.Dataflow名前空間は、NuGet パッケージとして使用できます。
System.Threading.Tasks.Dataflow名前空間を含むアセンブリをインストールするには、Visual Studio でプロジェクトを開き、[プロジェクト] メニューから [NuGet パッケージの管理] を選択し、System.Threading.Tasks.Dataflow
パッケージをオンラインで検索します。
こちらも参照ください
.NET