使用基于任务的异步模式

使用基于任务的异步模式(TAP)来处理异步操作时,可以使用回调来实现等待而不阻塞。 对于任务,这可通过 Task.ContinueWith 等方法实现。 语言级异步支持通过在正常控制流中实现异步操作的等待机制来隐藏回调,而编译器生成的代码也提供这种API级的支持。

使用 Await 来暂停执行

可以使用 C# 中的 await 关键字和 Visual Basic 中的 Await 运算符 异步等待 TaskTask<TResult> 对象。 当你在等待 Task 时,await 表达式的类型是 void。 当你在等待 Task<TResult> 时,await 表达式的类型是 TResultawait表达式必须出现在异步方法的正文中。 (这些语言功能是在 .NET Framework 4.5 中引入的。

实际上,await 功能通过使用延续任务在任务上安装回叫。 此回叫在挂起点恢复异步方法。 当异步方法恢复时,如果等待的操作成功完成且为Task<TResult>类型的,则返回TResult。 如果等待的 TaskTask<TResult>Canceled 状态结束,就会抛出 OperationCanceledException 异常。 如果等待的 TaskTask<TResult>Faulted 状态结束,就会抛出导致它发生故障的异常。 一个 Task 可能由于多个异常而出错,但只会传播一个异常。 不过,Task.Exception 属性会返回包含所有错误的 AggregateException 异常。

如果同步上下文(SynchronizationContext 对象)与在挂起时执行异步方法的线程相关联(例如,如果该 SynchronizationContext.Current 属性不是 null),则异步方法通过使用上下文 Post 的方法在相同的同步上下文上恢复。 否则,它依赖暂停时的当前任务计划程序(TaskScheduler 对象)。 通常,这是面向线程池的默认任务计划程序(TaskScheduler.Default)。 此任务计划程序确定等待的异步操作是否应在该操作完成时恢复,或是否应计划该恢复。 默认计划程序通常允许在完成等待操作的线程上延续任务。

调用异步方法时,将同步执行函数的正文,直到遇见尚未完成的可等待实例上的第一个 await 表达式,此时调用返回到调用方。 如果异步方法未返回 void,则返回一个 TaskTask<TResult> 对象来表示正在进行的计算。 在非 void 异步方法中,如果遇到 return 语句或到达方法正文末尾,任务就以 RanToCompletion 最终状态完成。 如果未经处理的异常导致控制权离开异步方法的主体,任务将以 Faulted 状态结束。 如果该异常是一个 OperationCanceledException,任务将改为以 Canceled 状态结束。 通过这种方式,最终结果或异常会被公布。

此行为的几个重要变体。 出于性能原因,如果任务在等待时已完成,则不会交出控制权,并且函数将继续执行。 此外,返回到原始上下文并不总是所需的行为,可以更改;下一部分将更详细地介绍这一点。

使用 Yield 和 ConfigureAwait 配置挂起和恢复

多个方法可更好地控制异步方法的执行。 例如,可以使用 Task.Yield 方法,将暂停点引入异步方法:

public class Task : …
{
    public static YieldAwaitable Yield();
    …
}

这相当于以异步方式发布或计划返回当前上下文。

Task.Run(async delegate
{
    for(int i=0; i<1000000; i++)
    {
        await Task.Yield(); // fork the continuation into a separate work item
        ...
    }
});

还可以使用 Task.ConfigureAwait 方法,更好地控制异步方法中的暂停和恢复。 如前所述,默认情况下,在暂停异步方法时捕获当前上下文,捕获的上下文用于在恢复时调用异步方法的延续。 在许多情况下,这正是你想要的行为。 在其他情况下,你可能不关心延续上下文,则可以通过避免此类发布返回原始上下文来获得更好的性能。 若要启用此功能,请使用 Task.ConfigureAwait 方法,指示等待操作不要捕获和恢复上下文,而是继续执行正在等待完成的所有异步操作:

await someTask.ConfigureAwait(continueOnCapturedContext:false);

取消异步操作

从 .NET Framework 4 开始,支持取消操作的 TAP 方法提供至少一个接受取消令牌(CancellationToken 对象)的重载。

取消令牌是通过取消令牌源(CancellationTokenSource 对象)创建的。 源的 Token 属性返回取消令牌,它在源的 Cancel 方法获得调用时收到信号。 例如,如果要下载单个网页并想要能够取消操作,请创建一个 CancellationTokenSource 对象,将其令牌传递给 TAP 方法,然后在准备好取消操作时调用源的 Cancel 方法。

var cts = new CancellationTokenSource();
string result = await DownloadStringTaskAsync(url, cts.Token);
… // at some point later, potentially on another thread
cts.Cancel();

若要取消多个异步调用,可以将相同的令牌传递给所有调用:

var cts = new CancellationTokenSource();
    IList<string> results = await Task.WhenAll(from url in urls select DownloadStringTaskAsync(url, cts.Token));
    // at some point later, potentially on another thread
    …
    cts.Cancel();

或者,将相同令牌传递给操作的选择性子集:

var cts = new CancellationTokenSource();
    byte [] data = await DownloadDataAsync(url, cts.Token);
    await SaveToDiskAsync(outputPath, data, CancellationToken.None);
    … // at some point later, potentially on another thread
    cts.Cancel();

重要

可以从任何线程启动取消请求。

您可以将 CancellationToken.None 值传递给任何接受取消令牌的方法,以表明永远不会请求取消。 这会导致 CancellationToken.CanBeCanceled 属性返回 false,并且调用的方法可以相应地进行优化。 出于测试目的,还可以通过传入预取消标记(该标记使用接受布尔值的构造函数进行实例化)来指示是否应以已取消或未取消状态启动标记。

取消方法具有几个优点:

  • 可以将相同的取消令牌传递给任意数量的异步和同步操作。

  • 相同的取消请求可能会扩展到任意数量的侦听器。

  • 异步 API 的开发人员完全控制是否可请求取消以及何时生效。

  • 使用 API 的代码可以选择性地确定将对其传播取消请求的异步调用。

监视进度

某些异步方法通过传递到异步方法的进度接口公开进度。 例如,请考虑异步下载一个文本字符串的函数,在此过程中会引发进度更新,其中包括到目前为止已完成的下载百分比。 在 Windows Presentation Foundation (WPF) 应用程序中可以采用此类方法,具体方法如下:

private async void btnDownload_Click(object sender, RoutedEventArgs e)
{
    btnDownload.IsEnabled = false;
    try
    {
        txtResult.Text = await DownloadStringTaskAsync(txtUrl.Text,
            new Progress<int>(p => pbDownloadProgress.Value = p));
    }
    finally { btnDownload.IsEnabled = true; }
}

使用内置的基于任务的连结符

命名空间 System.Threading.Tasks 包含多种用于撰写和使用任务的方法。

Task.Run

Task 包含多种 Run 方法,可让你轻松将工作作为 TaskTask<TResult> 任务卸载到线程池,例如:

public async void button1_Click(object sender, EventArgs e)
{
    textBox1.Text = await Task.Run(() =>
    {
        // … do compute-bound work here
        return answer;
    });
}

其中 Run 一些方法(如 Task.Run(Func<Task>) 重载)作为方法 TaskFactory.StartNew 的简写形式存在。 借助此重载,可以在卸载的工作内使用 await,例如:

public async void button1_Click(object sender, EventArgs e)
{
    pictureBox1.Image = await Task.Run(async() =>
    {
        using(Bitmap bmp1 = await DownloadFirstImageAsync())
        using(Bitmap bmp2 = await DownloadSecondImageAsync())
        return Mashup(bmp1, bmp2);
    });
}

此类重载在逻辑上等效于将 TaskFactory.StartNew 方法与任务并行库中的 Unwrap 扩展方法结合使用。

Task.FromResult

FromResult 方法的适用情景为,数据可能已存在,且只需通过提升到 Task<TResult> 的任务返回方法返回:

public Task<int> GetValueAsync(string key)
{
    int cachedValue;
    return TryGetCachedValue(out cachedValue) ?
        Task.FromResult(cachedValue) :
        GetValueAsyncInternal();
}

private async Task<int> GetValueAsyncInternal(string key)
{
    …
}

Task.WhenAll

WhenAll 方法可用于异步等待多个表示为任务的异步操作。 该方法有多个重载,这些重载支持一组非泛型任务或一组非统一的泛型任务(例如,异步等待多个返回void的操作,或异步等待多个返回值的方法,其中每个值可能具有不同的类型),并支持一组统一的泛型任务(例如异步等待多个TResult返回的方法)。

假设你想要向多个客户发送电子邮件。 可以重叠发送消息,这样就不会在发送下一条消息之前等待一条消息完成。 还可以了解发送作何时完成以及是否发生了任何错误:

IEnumerable<Task> asyncOps = from addr in addrs select SendMailAsync(addr);
await Task.WhenAll(asyncOps);

此代码不显式处理可能发生的异常,而是通过对 await 生成的任务执行 WhenAll 传播异常。 若要处理异常,可以使用如下代码:

IEnumerable<Task> asyncOps = from addr in addrs select SendMailAsync(addr);
try
{
    await Task.WhenAll(asyncOps);
}
catch(Exception exc)
{
    ...
}

在这种情况下,如果任何异步操作失败,所有异常将被合并到AggregateException异常中,该异常存储在从Task 方法返回的WhenAll中。 但是,只有一个异常是通过关键字await传播的。 如果要检查所有异常,可以按如下所示重写前面的代码:

Task [] asyncOps = (from addr in addrs select SendMailAsync(addr)).ToArray();
try
{
    await Task.WhenAll(asyncOps);
}
catch(Exception exc)
{
    foreach(Task faulted in asyncOps.Where(t => t.IsFaulted))
    {
        … // work with faulted and faulted.Exception
    }
}

让我们考虑一个从 Web 异步下载多个文件的示例。 在这种情况下,所有异步作都具有同质结果类型,并且可以轻松访问结果:

string [] pages = await Task.WhenAll(
    from url in urls select DownloadStringTaskAsync(url));

可以使用我们在前面的 void 返回方案中讨论的相同异常处理技术:

Task<string> [] asyncOps =
    (from url in urls select DownloadStringTaskAsync(url)).ToArray();
try
{
    string [] pages = await Task.WhenAll(asyncOps);
    ...
}
catch(Exception exc)
{
    foreach(Task<string> faulted in asyncOps.Where(t => t.IsFaulted))
    {
        … // work with faulted and faulted.Exception
    }
}

Task.WhenAny

WhenAny 方法可用于异步等待多个表示为要完成的任务的异步操作之一。 此方法提供四个主要用例:

  • 冗余:多次执行一个操作并选择最先完成的一次(例如,联系能够生成一个结果的多个股市行情 Web 服务并选择完成最快的一个)。

  • 交错:启动多个操作并等待所有这些操作完成,但是在完成这些操作时对其进行处理。

  • 限制:允许其他操作完成时开始附加操作。 这是交错方案的扩展。

  • 早期释放:例如,用任务 t1 表示的操作可以与任务 t2 组成 WhenAny 任务,您可以等待 WhenAny 任务。 任务 t2 可以表示超时或取消,或者导致 WhenAny 任务在 t1 完成之前完成的一些其他信号。

冗余

请考虑一个你想要决定是否购买股票的情况。 几个你信任的股票推荐网络服务会在不同时间因每天的负载情况而出现速度减慢的现象。 您可以使用 WhenAny 方法在任何操作完成时接收通知。

var recommendations = new List<Task<bool>>()
{
    GetBuyRecommendation1Async(symbol),
    GetBuyRecommendation2Async(symbol),
    GetBuyRecommendation3Async(symbol)
};
Task<bool> recommendation = await Task.WhenAny(recommendations);
if (await recommendation) BuyStock(symbol);

与返回成功完成的所有任务的未包装结果不同 WhenAllWhenAny 返回已完成的任务。 如果任务失败,请务必知道失败,如果任务成功,请务必知道返回值与哪个任务相关联。 因此,你需要访问返回任务的结果,或进一步等待,如本示例中所示。

WhenAll 一样,必须能够容纳异常。 因为接收到完成的任务后,可以等待返回的任务传播错误,并适当地进行 try/catch,例如:

Task<bool> [] recommendations = …;
while(recommendations.Count > 0)
{
    Task<bool> recommendation = await Task.WhenAny(recommendations);
    try
    {
        if (await recommendation) BuyStock(symbol);
        break;
    }
    catch(WebException exc)
    {
        recommendations.Remove(recommendation);
    }
}

此外,即使第一个任务成功完成,后续任务也可能失败。 此时,可以使用多个选项来处理异常:可以等待所有启动的任务完成,在这种情况下,可以使用 WhenAll 该方法,也可以确定所有异常都很重要,必须记录。 为此,您可以使用续集来在任务异步完成时接收通知。

foreach(Task recommendation in recommendations)
{
    var ignored = recommendation.ContinueWith(
        t => { if (t.IsFaulted) Log(t.Exception); });
}

或:

foreach(Task recommendation in recommendations)
{
    var ignored = recommendation.ContinueWith(
        t => Log(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
}

或者甚至:

private static async void LogCompletionIfFailed(IEnumerable<Task> tasks)
{
    foreach(var task in tasks)
    {
        try { await task; }
        catch(Exception exc) { Log(exc); }
    }
}
…
LogCompletionIfFailed(recommendations);

最后,可能需要取消所有剩余操作。

var cts = new CancellationTokenSource();
var recommendations = new List<Task<bool>>()
{
    GetBuyRecommendation1Async(symbol, cts.Token),
    GetBuyRecommendation2Async(symbol, cts.Token),
    GetBuyRecommendation3Async(symbol, cts.Token)
};

Task<bool> recommendation = await Task.WhenAny(recommendations);
cts.Cancel();
if (await recommendation) BuyStock(symbol);

交错

请考虑从 Web 下载图像并处理每个图像(例如,将图像添加到 UI 控件)的情况。 在 UI 线程上按顺序处理图像,但希望尽可能同时下载映像。 此外,建议不要直到所有图像都下载完成才将图像添加到 UI。 建议在完成下载时添加它们。

List<Task<Bitmap>> imageTasks =
    (from imageUrl in urls select GetBitmapAsync(imageUrl)).ToList();
while(imageTasks.Count > 0)
{
    try
    {
        Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
        imageTasks.Remove(imageTask);

        Bitmap image = await imageTask;
        panel.AddImage(image);
    }
    catch{}
}

还可以将交错应用于涉及下载图像 ThreadPool 的计算密集型处理的方案;例如:

List<Task<Bitmap>> imageTasks =
    (from imageUrl in urls select GetBitmapAsync(imageUrl)
         .ContinueWith(t => ConvertImage(t.Result)).ToList();
while(imageTasks.Count > 0)
{
    try
    {
        Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
        imageTasks.Remove(imageTask);

        Bitmap image = await imageTask;
        panel.AddImage(image);
    }
    catch{}
}

限制

请考虑交错示例,因用户大量下载图像而导致下载必须受到遏制除外;例如,你希望仅能同时下载特定数目的内容。 为此,可以启动异步操作的子集。 当操作完成后,可以启动其他操作来取代它们的位置。

const int CONCURRENCY_LEVEL = 15;
Uri [] urls = …;
int nextIndex = 0;
var imageTasks = new List<Task<Bitmap>>();
while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)
{
    imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
    nextIndex++;
}

while(imageTasks.Count > 0)
{
    try
    {
        Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
        imageTasks.Remove(imageTask);

        Bitmap image = await imageTask;
        panel.AddImage(image);
    }
    catch(Exception exc) { Log(exc); }

    if (nextIndex < urls.Length)
    {
        imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
        nextIndex++;
    }
}

早期释放

假设你正在异步等待一个操作完成,同时响应用户的取消请求(例如用户单击了取消按钮)。 以下代码演示了此方案:

private CancellationTokenSource m_cts;

public void btnCancel_Click(object sender, EventArgs e)
{
    if (m_cts != null) m_cts.Cancel();
}

public async void btnRun_Click(object sender, EventArgs e)
{
    m_cts = new CancellationTokenSource();
    btnRun.Enabled = false;
    try
    {
        Task<Bitmap> imageDownload = GetBitmapAsync(txtUrl.Text);
        await UntilCompletionOrCancellation(imageDownload, m_cts.Token);
        if (imageDownload.IsCompleted)
        {
            Bitmap image = await imageDownload;
            panel.AddImage(image);
        }
        else imageDownload.ContinueWith(t => Log(t));
    }
    finally { btnRun.Enabled = true; }
}

private static async Task UntilCompletionOrCancellation(
    Task asyncOp, CancellationToken ct)
{
    var tcs = new TaskCompletionSource<bool>();
    using(ct.Register(() => tcs.TrySetResult(true)))
        await Task.WhenAny(asyncOp, tcs.Task);
    return asyncOp;
}

一旦决定退出,此实现将重新启用用户界面,但不会取消基础异步操作。 另一种选择是决定退出时,取消挂起的操作,但在操作完成之前不重新建立用户界面,可能会由于取消请求而提前结束:

private CancellationTokenSource m_cts;

public async void btnRun_Click(object sender, EventArgs e)
{
    m_cts = new CancellationTokenSource();

    btnRun.Enabled = false;
    try
    {
        Task<Bitmap> imageDownload = GetBitmapAsync(txtUrl.Text, m_cts.Token);
        await UntilCompletionOrCancellation(imageDownload, m_cts.Token);
        Bitmap image = await imageDownload;
        panel.AddImage(image);
    }
    catch(OperationCanceledException) {}
    finally { btnRun.Enabled = true; }
}

早期救助的另一个示例涉及使用 WhenAny 方法与 Delay 方法结合,如下一部分所述。

Task.Delay

您可以使用Task.Delay方法将暂停引入到异步方法的执行中。 这对于多种功能非常有用,包括生成轮询循环,以及延迟用户输入的处理在预先确定的时间段。 Task.Delay 方法还可以与 Task.WhenAny 结合使用,以对 await 实现超时。

如果作为较大异步操作的一部分的任务(例如,ASP.NET Web 服务)需要很长时间才能完成,则整个操作可能会受到影响,尤其是在操作无法完成时。 因此,等待异步操作时可以超时非常重要。 同步Task.WaitTask.WaitAllTask.WaitAny方法接受超时值,但相应的TaskFactory.ContinueWhenAll/TaskFactory.ContinueWhenAny方法和前面提到的Task.WhenAll/Task.WhenAny方法则不接受。 相反,可以使用 Task.DelayTask.WhenAny 组合来实现超时。

例如,在 UI 应用程序中,假设你想要下载映像并在映像下载时禁用 UI。 如果下载时间过长,可以重新启用 UI 并放弃下载。

public async void btnDownload_Click(object sender, EventArgs e)
{
    btnDownload.Enabled = false;
    try
    {
        Task<Bitmap> download = GetBitmapAsync(url);
        if (download == await Task.WhenAny(download, Task.Delay(3000)))
        {
            Bitmap bmp = await download;
            pictureBox.Image = bmp;
            status.Text = "Downloaded";
        }
        else
        {
            pictureBox.Image = null;
            status.Text = "Timed out";
            var ignored = download.ContinueWith(
                t => Trace("Task finally completed"));
        }
    }
    finally { btnDownload.Enabled = true; }
}

这同样适用于多个下载,因为 WhenAll 返回任务:

public async void btnDownload_Click(object sender, RoutedEventArgs e)
{
    btnDownload.Enabled = false;
    try
    {
        Task<Bitmap[]> downloads =
            Task.WhenAll(from url in urls select GetBitmapAsync(url));
        if (downloads == await Task.WhenAny(downloads, Task.Delay(3000)))
        {
            foreach(var bmp in downloads.Result) panel.AddImage(bmp);
            status.Text = "Downloaded";
        }
        else
        {
            status.Text = "Timed out";
            downloads.ContinueWith(t => Log(t));
        }
    }
    finally { btnDownload.Enabled = true; }
}

构建基于任务的连结符

由于任务能够完全表示异步操作,并提供同步和异步功能以便与操作结合、检索其结果等,因此你可以构建有用的组合器库,这些组合器将任务组合以构建更复杂的模式。 如上一部分所述,.NET 包含多个内置组合器,但你也可以自行生成。 以下部分提供了几种可能的组合器方法和类型示例。

RetryOnFault

在许多情况下,如果以前的尝试失败,可能需要重试作。 对于同步代码,可以生成帮助程序方法,如 RetryOnFault 以下示例中实现此目的:

public static T RetryOnFault<T>(
    Func<T> function, int maxTries)
{
    for(int i=0; i<maxTries; i++)
    {
        try { return function(); }
        catch { if (i == maxTries-1) throw; }
    }
    return default(T);
}

你可以为异步操作(使用 TAP 实现,因此返回任务)构建几乎相同的帮助器方法:

public static async Task<T> RetryOnFault<T>(
    Func<Task<T>> function, int maxTries)
{
    for(int i=0; i<maxTries; i++)
    {
        try { return await function().ConfigureAwait(false); }
        catch { if (i == maxTries-1) throw; }
    }
    return default(T);
}

然后,可以使用此组合器将重试机制编码到应用程序的逻辑中。例如:

// Download the URL, trying up to three times in case of failure
string pageContents = await RetryOnFault(
    () => DownloadStringTaskAsync(url), 3);

可以进一步扩展 RetryOnFault 函数。 例如,该函数可以接受将在重试之间调用的另一个 Func<Task> 函数,以确定何时再次尝试该作;例如:

public static async Task<T> RetryOnFault<T>(
    Func<Task<T>> function, int maxTries, Func<Task> retryWhen)
{
    for(int i=0; i<maxTries; i++)
    {
        try { return await function().ConfigureAwait(false); }
        catch { if (i == maxTries-1) throw; }
        await retryWhen().ConfigureAwait(false);
    }
    return default(T);
}

然后,可以使用如下所示的函数等待第二秒钟,然后重试作:

// Download the URL, trying up to three times in case of failure,
// and delaying for a second between retries
string pageContents = await RetryOnFault(
    () => DownloadStringTaskAsync(url), 3, () => Task.Delay(1000));

NeedOnlyOne

有时,可以利用冗余来改善操作的延迟和成功的几率。 考虑提供股票报价的多个 Web 服务,但在一天的各个时间,每个服务可能提供不同的质量和响应时间。 若要处理这些波动,可能会向所有 Web 服务发出请求,并在收到来自其中一个服务的响应后立即取消剩余的请求。 你可以实现一个辅助函数,以便更轻松地实现这种常见模式,即启动多个操作,等待其中一个完成,然后取消其余操作。 NeedOnlyOne以下示例中的函数演示了此方案:

public static async Task<T> NeedOnlyOne(
    params Func<CancellationToken,Task<T>> [] functions)
{
    var cts = new CancellationTokenSource();
    var tasks = (from function in functions
                 select function(cts.Token)).ToArray();
    var completed = await Task.WhenAny(tasks).ConfigureAwait(false);
    cts.Cancel();
    foreach(var task in tasks)
    {
        var ignored = task.ContinueWith(
            t => Log(t), TaskContinuationOptions.OnlyOnFaulted);
    }
    return completed;
}

然后,可按如下所示使用此函数:

double currentPrice = await NeedOnlyOne(
    ct => GetCurrentPriceFromServer1Async("msft", ct),
    ct => GetCurrentPriceFromServer2Async("msft", ct),
    ct => GetCurrentPriceFromServer3Async("msft", ct));

交错操作

处理大型任务集时,如果使用 WhenAny 方法支持交错方案,可能存在潜在性能问题。 每次调用 WhenAny 都会向每个任务注册延续。 对于 N 个任务,这将导致在交错操作的操作期间创建 O(N2) 次延续。 如果使用的是一组大型任务,可以使用组合器(Interleaved 在以下示例中)解决性能问题:

static IEnumerable<Task<T>> Interleaved<T>(IEnumerable<Task<T>> tasks)
{
    var inputTasks = tasks.ToList();
    var sources = (from _ in Enumerable.Range(0, inputTasks.Count)
                   select new TaskCompletionSource<T>()).ToList();
    int nextTaskIndex = -1;
    foreach (var inputTask in inputTasks)
    {
        inputTask.ContinueWith(completed =>
        {
            var source = sources[Interlocked.Increment(ref nextTaskIndex)];
            if (completed.IsFaulted)
                source.TrySetException(completed.Exception.InnerExceptions);
            else if (completed.IsCanceled)
                source.TrySetCanceled();
            else
                source.TrySetResult(completed.Result);
        }, CancellationToken.None,
           TaskContinuationOptions.ExecuteSynchronously,
           TaskScheduler.Default);
    }
    return from source in sources
           select source.Task;
}

然后,可以使用组合器在任务完成时处理任务的结果;例如:

IEnumerable<Task<int>> tasks = ...;
foreach(var task in Interleaved(tasks))
{
    int result = await task;
    …
}

WhenAllOrFirstException

在特定的分散/集中情况下,你可能想要等待集中的所有任务,除非某个任务发生错误。在这种情况下,你希望在异常发生时停止等待。 您可以在以下示例中使用组合器方法,例如 WhenAllOrFirstException 来实现这一目标。

public static Task<T[]> WhenAllOrFirstException<T>(IEnumerable<Task<T>> tasks)
{
    var inputs = tasks.ToList();
    var ce = new CountdownEvent(inputs.Count);
    var tcs = new TaskCompletionSource<T[]>();

    Action<Task> onCompleted = (Task completed) =>
    {
        if (completed.IsFaulted)
            tcs.TrySetException(completed.Exception.InnerExceptions);
        if (ce.Signal() && !tcs.Task.IsCompleted)
            tcs.TrySetResult(inputs.Select(t => t.Result).ToArray());
    };

    foreach (var t in inputs) t.ContinueWith(onCompleted);
    return tcs.Task;
}

构建基于任务的数据结构

除了能够生成基于任务的自定义组合器,在 TaskTask<TResult>(表示异步操作结果和联接所必需的同步操作结果)中包含数据结构,还可以使其成为功能非常强大的类型,基于该类型可生成在异步方案中使用的自定义数据结构。

AsyncCache

任务的重要方面之一是,它可能会分发到多个使用者,所有使用者都可以等待任务、向任务注册延续、获取任务结果或异常(如果是 Task<TResult> 的话)等。 这使得TaskTask<TResult>完美适用于异步缓存基础设施。 下面是一个基于 Task<TResult>以下位置构建的小型但功能强大的异步缓存的示例:

public class AsyncCache<TKey, TValue>
{
    private readonly Func<TKey, Task<TValue>> _valueFactory;
    private readonly ConcurrentDictionary<TKey, Lazy<Task<TValue>>> _map;

    public AsyncCache(Func<TKey, Task<TValue>> valueFactory)
    {
        if (valueFactory == null) throw new ArgumentNullException("valueFactory");
        _valueFactory = valueFactory;
        _map = new ConcurrentDictionary<TKey, Lazy<Task<TValue>>>();
    }

    public Task<TValue> this[TKey key]
    {
        get
        {
            if (key == null) throw new ArgumentNullException("key");
            return _map.GetOrAdd(key, toAdd =>
                new Lazy<Task<TValue>>(() => _valueFactory(toAdd))).Value;
        }
    }
}

AsyncCache<TKey,TValue> 类接受需要使用 TKey 且返回 Task<TResult> 的函数作为构造函数的委托。 缓存中以前访问的任何值都存储在内部字典中,并确保 AsyncCache 每个键仅生成一个任务,即使同时访问缓存也是如此。

例如,可以为下载的网页生成缓存:

private AsyncCache<string,string> m_webPages =
    new AsyncCache<string,string>(DownloadStringTaskAsync);

然后,只要需要网页的内容,就可以在异步方法中使用此缓存。 该 AsyncCache 类可确保下载尽可能少的页面,并缓存结果。

private async void btnDownload_Click(object sender, RoutedEventArgs e)
{
    btnDownload.IsEnabled = false;
    try
    {
        txtContents.Text = await m_webPages["https://www.microsoft.com"];
    }
    finally { btnDownload.IsEnabled = true; }
}

AsyncProducerConsumerCollection

还可以使用任务来生成用于协调异步活动的数据结构。 请考虑经典并行设计模式之一:生成者/使用者。 在此模式中,生成者生成使用者使用的数据,生成者和使用者可以并行运行。 例如,消费者正在处理之前由生产者生成的项 1,而该生产者现在正在生成项 2。 对于生成者/使用者模式,你始终需要一些数据结构来存储由生成者创建的工作,以便使用者可以收到新数据的通知,并在可用时找到它。

以下是基于任务构建的简单数据结构,可以将异步方法用作生成方和使用方:

public class AsyncProducerConsumerCollection<T>
{
    private readonly Queue<T> m_collection = new Queue<T>();
    private readonly Queue<TaskCompletionSource<T>> m_waiting =
        new Queue<TaskCompletionSource<T>>();

    public void Add(T item)
    {
        TaskCompletionSource<T> tcs = null;
        lock (m_collection)
        {
            if (m_waiting.Count > 0) tcs = m_waiting.Dequeue();
            else m_collection.Enqueue(item);
        }
        if (tcs != null) tcs.TrySetResult(item);
    }

    public Task<T> Take()
    {
        lock (m_collection)
        {
            if (m_collection.Count > 0)
            {
                return Task.FromResult(m_collection.Dequeue());
            }
            else
            {
                var tcs = new TaskCompletionSource<T>();
                m_waiting.Enqueue(tcs);
                return tcs.Task;
            }
        }
    }
}

在该数据结构到位后,可以编写如下代码:

private static AsyncProducerConsumerCollection<int> m_data = …;
…
private static async Task ConsumerAsync()
{
    while(true)
    {
        int nextItem = await m_data.Take();
        ProcessNextItem(nextItem);
    }
}
…
private static void Produce(int data)
{
    m_data.Add(data);
}

命名空间 System.Threading.Tasks.Dataflow 包括 BufferBlock<T> 类型,可以以类似方式使用,但不需要构建自定义集合类型。

private static BufferBlock<int> m_data = …;
…
private static async Task ConsumerAsync()
{
    while(true)
    {
        int nextItem = await m_data.ReceiveAsync();
        ProcessNextItem(nextItem);
    }
}
…
private static void Produce(int data)
{
    m_data.Post(data);
}

注释

命名空间 System.Threading.Tasks.Dataflow 以 NuGet 包的形式提供。 若要安装包含命名空间的 System.Threading.Tasks.Dataflow 程序集,请在 Visual Studio 中打开项目,从“项目”菜单中选择“ 管理 NuGet 包 ”,然后联机搜索包 System.Threading.Tasks.Dataflow

另请参阅