Поделиться через


Использование асинхронного шаблона на основе задач

При использовании асинхронного шаблона на основе задач (TAP) для работы с асинхронными операциями можно использовать обратные вызовы для достижения ожидания без блокировки. Для задач это достигается с помощью таких методов, как Task.ContinueWith. Асинхронная поддержка на основе языка скрывает обратные вызовы, позволяя асинхронным операциям ожидаться в обычном потоке управления, а код, созданный компилятором, обеспечивает эту же поддержку на уровне API.

Приостановка выполнения с помощью Await

Ключевое слово await в C# и оператор Await в Visual Basic можно использовать для асинхронного ожидания Task и Task<TResult> объектов. Когда вы ожидаете Task, выражение await имеет тип void. Когда вы ожидаете Task<TResult>, выражение await имеет тип TResult. Выражение await должно использоваться внутри тела асинхронного метода. (Эти языковые функции были представлены в .NET Framework 4.5.)

В этой области функция await устанавливает обратный вызов для задачи с помощью продолжения. Этот обратный вызов возобновляет асинхронный метод в точке приостановки. При возобновлении асинхронного метода, если ожидаемая операция завершилась успешно и была Task<TResult>, возвращается ее TResult. Если Task или Task<TResult>, которого ожидали, завершилось в состоянии Canceled, возникает исключение OperationCanceledException. Если ожидаемое Task или Task<TResult> завершилось в состоянии Faulted, вызывается исключение, которое стало причиной сбоя. Ошибка Task может возникнуть в результате нескольких исключений, но распространяется только одно из этих исключений. Однако Task.Exception свойство возвращает AggregateException исключение, содержащее все ошибки.

Если контекст синхронизации (SynchronizationContext объект) связан с потоком, выполнявшим асинхронный метод во время приостановки (например, если свойство SynchronizationContext.Current не является null), асинхронный метод возобновляется в том же контексте синхронизации с помощью метода контекста Post. В противном случае он зависит от планировщика задач (TaskScheduler объекта), текущего во время приостановки. Как правило, это планировщик задач по умолчанию (TaskScheduler.Default), предназначенный для пула потоков. Этот планировщик задач определяет, должна ли ожидаемая асинхронная операция возобновиться в месте, где она завершилась, или следует назначить время для возобновления. Планировщик по умолчанию обычно позволяет продолжению выполняться в потоке, завершаемом ожидаемой операцией.

Когда вызывается асинхронный метод, он синхронно выполняет текст функции до первого выражения await в ожидаемом экземпляре, который еще не завершен, в то время как вызов возвращается вызывающему объекту. Если асинхронный метод не возвращает void, возвращается объект Task или Task<TResult>, представляющий текущее вычисление. В асинхронном методе, не возвращающем void, если встречается оператор return или достигается конец тела метода, задача завершается в окончательном RanToCompletion состоянии. Если необработанное исключение приводит к тому, что управление покидает тело асинхронного метода, задача заканчивается в состоянии Faulted. Если это исключение является OperationCanceledException, задача вместо этого заканчивается в состоянии Canceled. Таким образом, результат или исключение в конечном итоге публикуется.

Существует несколько важных вариантов этого поведения. По соображениям производительности, если задача уже завершена к моменту ожидания, управление не передается, и функция продолжает выполняться. Кроме того, возврат к исходному контексту не всегда является требуемым поведением и может быть изменен; Это подробно описано в следующем разделе.

Настройка приостановки и возобновления с помощью Yield и ConfigureAwait

Несколько методов обеспечивают более контроль над выполнением асинхронного метода. Например, можно использовать Task.Yield метод для внедрения точки доходности в асинхронный метод:

public class Task : …
{
    public static YieldAwaitable Yield();
    …
}

Это эквивалентно асинхронной отправке или запланированию обратного возвращения в текущий контекст.

Task.Run(async delegate
{
    for(int i=0; i<1000000; i++)
    {
        await Task.Yield(); // fork the continuation into a separate work item
        ...
    }
});

Вы также можете использовать Task.ConfigureAwait метод для более эффективного контроля над приостановкой и возобновлением в асинхронном методе. Как упоминалось ранее, по умолчанию текущий контекст фиксируется во время приостановки асинхронного метода, и для вызова продолжения асинхронного метода при возобновлении используется захваченный контекст. Во многих случаях это точное поведение, которое вы хотите. В других случаях вы можете не заботиться о контексте продолжения, и вы можете добиться лучшей производительности, избегая таких записей обратно в исходный контекст. Чтобы включить это, используйте метод Task.ConfigureAwait, чтобы сообщить операции await не закреплять и не возобновлять выполнение в контексте, а продолжать выполнение там, где было завершено ожидание асинхронной операции.

await someTask.ConfigureAwait(continueOnCapturedContext:false);

Отмена асинхронной операции

Начиная с .NET Framework 4 методы TAP, поддерживающие отмену, предоставляют по крайней мере одну перегрузку, которая принимает маркер отмены (CancellationToken объект).

Маркер отмены создается с помощью источника маркера отмены (CancellationTokenSource объекта). Свойство источника Token возвращает маркер отмены, который будет активирован при вызове метода источника Cancel. Например, если вы хотите скачать одну веб-страницу и хотите отменить операцию, создайте CancellationTokenSource объект, передайте его маркер в метод TAP, а затем вызовите метод источника Cancel , когда вы будете готовы отменить операцию:

var cts = new CancellationTokenSource();
string result = await DownloadStringTaskAsync(url, cts.Token);
… // at some point later, potentially on another thread
cts.Cancel();

Чтобы отменить несколько асинхронных вызовов, можно передать один и тот же маркер всем вызовам:

var cts = new CancellationTokenSource();
    IList<string> results = await Task.WhenAll(from url in urls select DownloadStringTaskAsync(url, cts.Token));
    // at some point later, potentially on another thread
    …
    cts.Cancel();

Кроме того, можно передать один и тот же маркер в выборочное подмножество операций:

var cts = new CancellationTokenSource();
    byte [] data = await DownloadDataAsync(url, cts.Token);
    await SaveToDiskAsync(outputPath, data, CancellationToken.None);
    … // at some point later, potentially on another thread
    cts.Cancel();

Это важно

Запросы на отмену могут быть инициированы из любого потока.

Вы можете передать значение CancellationToken.None любому методу, который принимает токен отмены, чтобы указать, что отмена никогда не будет запрашиваться. Это заставляет свойство CancellationToken.CanBeCanceled возвращать false, и вызываемый метод может оптимизировать соответственно. В целях тестирования можно также передать предварительно отмененный маркер отмены, созданный с помощью конструктора, который принимает логическое значение, чтобы указать, должен ли маркер начинаться в уже отмененном или неотменяемом состоянии.

Этот подход к отмене имеет несколько преимуществ:

  • Вы можете передать один и тот же маркер отмены любому количеству асинхронных и синхронных операций.

  • Тот же запрос на отмену может распространяться на любое количество прослушивателей.

  • Разработчик асинхронного API полностью контролирует, может ли потребоваться отмена и когда она может ввести в силу.

  • Код, использующий API, может избирательно определить, на какие асинхронные вызовы будут распространяться запросы на отмену.

Ход мониторинга

Некоторые асинхронные методы предоставляют информацию о ходе выполнения с помощью переданного в них интерфейса прогресса. Например, рассмотрим функцию, которая асинхронно загружает строку текста и в процессе поступают обновления хода выполнения, включающие процент завершенной на данный момент загрузки. Такой метод можно использовать в приложении Windows Presentation Foundation (WPF) следующим образом:

private async void btnDownload_Click(object sender, RoutedEventArgs e)
{
    btnDownload.IsEnabled = false;
    try
    {
        txtResult.Text = await DownloadStringTaskAsync(txtUrl.Text,
            new Progress<int>(p => pbDownloadProgress.Value = p));
    }
    finally { btnDownload.IsEnabled = true; }
}

Использование встроенных комбинаторов на основе задач

Пространство System.Threading.Tasks имен включает несколько методов создания и работы с задачами.

Task.Run

Класс Task включает несколько Run методов, которые позволяют легко выгрузить работу как Task или Task<TResult> в пул потоков, например:

public async void button1_Click(object sender, EventArgs e)
{
    textBox1.Text = await Task.Run(() =>
    {
        // … do compute-bound work here
        return answer;
    });
}

Некоторые из этих Run методов, например перегрузка Task.Run(Func<Task>) , существуют как сокращенные для TaskFactory.StartNew метода. Эта перегрузка позволяет использовать await в разгруженной работе, например:

public async void button1_Click(object sender, EventArgs e)
{
    pictureBox1.Image = await Task.Run(async() =>
    {
        using(Bitmap bmp1 = await DownloadFirstImageAsync())
        using(Bitmap bmp2 = await DownloadSecondImageAsync())
        return Mashup(bmp1, bmp2);
    });
}

Такие перегрузки логически эквивалентны использованию TaskFactory.StartNew метода в сочетании с Unwrap методом расширения в библиотеке параллельных задач.

Task.FromResult

FromResult Используйте метод в сценариях, где данные уже могут быть доступны и должны быть возвращены из асинхронного метода с задачами, помещённого в Task<TResult>:

public Task<int> GetValueAsync(string key)
{
    int cachedValue;
    return TryGetCachedValue(out cachedValue) ?
        Task.FromResult(cachedValue) :
        GetValueAsyncInternal();
}

private async Task<int> GetValueAsyncInternal(string key)
{
    …
}

Task.WhenAll

Используйте метод WhenAll для асинхронного ожидания нескольких операций, представленных в виде задач. Метод имеет несколько перегрузок, поддерживающих набор не универсальных задач или не однородный набор универсальных задач (например, асинхронное ожидание нескольких операций, возвращающих void, или асинхронное ожидание нескольких методов, возвращающих значения, где каждое значение может иметь различный тип), а также поддерживающих однородный набор универсальных задач (например, асинхронное ожидание нескольких методов, возвращающих TResult).

Предположим, что вы хотите отправлять сообщения электронной почты нескольким клиентам. Вы можете перекрывать отправку сообщений, чтобы вы не ждали завершения одного сообщения перед отправкой следующего. Вы также можете узнать, когда операции отправки завершились и произошли ли какие-либо ошибки:

IEnumerable<Task> asyncOps = from addr in addrs select SendMailAsync(addr);
await Task.WhenAll(asyncOps);

Этот код не обрабатывает исключения, которые могут возникнуть, но позволяет исключениям выходить из await и распространяться на результирующую задачу от WhenAll. Для обработки исключений можно использовать следующий код:

IEnumerable<Task> asyncOps = from addr in addrs select SendMailAsync(addr);
try
{
    await Task.WhenAll(asyncOps);
}
catch(Exception exc)
{
    ...
}

В этом случае при сбое любой асинхронной операции все исключения будут консолидированы в AggregateException исключении, которое хранится в Task, возвращаемом методом WhenAll. Однако только одно из этих исключений распространяется ключевым словом await. Если вы хотите проверить все исключения, можно переписать предыдущий код следующим образом:

Task [] asyncOps = (from addr in addrs select SendMailAsync(addr)).ToArray();
try
{
    await Task.WhenAll(asyncOps);
}
catch(Exception exc)
{
    foreach(Task faulted in asyncOps.Where(t => t.IsFaulted))
    {
        … // work with faulted and faulted.Exception
    }
}

Рассмотрим пример скачивания нескольких файлов из Интернета асинхронно. В этом случае все асинхронные операции имеют однородные типы результатов и легко получить доступ к результатам:

string [] pages = await Task.WhenAll(
    from url in urls select DownloadStringTaskAsync(url));

Вы можете использовать те же методы обработки исключений, которые мы обсуждали в предыдущем сценарии возврата void:

Task<string> [] asyncOps =
    (from url in urls select DownloadStringTaskAsync(url)).ToArray();
try
{
    string [] pages = await Task.WhenAll(asyncOps);
    ...
}
catch(Exception exc)
{
    foreach(Task<string> faulted in asyncOps.Where(t => t.IsFaulted))
    {
        … // work with faulted and faulted.Exception
    }
}

Task.WhenAny

Вы можете использовать метод WhenAny для асинхронного ожидания завершения только одной из нескольких асинхронных операций, которые представлены в виде задач. Этот метод служит четырьмя основными вариантами использования:

  • Избыточность: выполнение операции несколько раз и выбор той, которая завершится первой (например, обращение к нескольким веб-службам котировок акций, чтобы получить единый результат и выбрать тот, который завершится быстрее всех).

  • Переключение: запуск нескольких операций и ожидание завершения всех этих операций, но обработка их по завершении.

  • Регулирование. Разрешение дополнительных операций начинаться после завершения других операций. Это расширение сценария взаимодействия.

  • Раннее освобождение: например, операция, представленная задачей WhenAny t1, может быть сгруппирована в задачу WhenAny, содержащую другую задачу t2, и вы можете ждать задачу . Задача t2 может представлять собой время ожидания, отмену или другой сигнал, который заставляет задачу WhenAny завершиться раньше, чем завершится t1.

Избыточность

Рассмотрим случай, когда вы хотите принять решение о том, следует ли покупать акции. Существует несколько веб-служб рекомендаций акций, которые вы доверяете, но в зависимости от ежедневной нагрузки каждая служба может быть медленной в разные времена. Вы можете использовать метод WhenAny, чтобы получать уведомление при завершении любой операции.

var recommendations = new List<Task<bool>>()
{
    GetBuyRecommendation1Async(symbol),
    GetBuyRecommendation2Async(symbol),
    GetBuyRecommendation3Async(symbol)
};
Task<bool> recommendation = await Task.WhenAny(recommendations);
if (await recommendation) BuyStock(symbol);

В отличие от WhenAll, который возвращает незавернутые результаты всех задач, успешно выполненных, WhenAny возвращает выполненную задачу. Если задача завершается ошибкой, важно знать, что она завершилась ошибкой, и если задача завершается успешно, важно знать, с какой задачей связано возвращаемое значение. Поэтому необходимо получить доступ к результату возвращенной задачи или ожидать её завершения, как показано в этом примере.

Как и в случае с WhenAll, вы должны иметь возможность учитывать исключения. Так как вы получите завершенную задачу обратно, вы можете ожидать последствий распространения ошибок в возвращаемой задаче и try/catch их соответствующей корректировки. Например:

Task<bool> [] recommendations = …;
while(recommendations.Count > 0)
{
    Task<bool> recommendation = await Task.WhenAny(recommendations);
    try
    {
        if (await recommendation) BuyStock(symbol);
        break;
    }
    catch(WebException exc)
    {
        recommendations.Remove(recommendation);
    }
}

Кроме того, даже если первая задача успешно завершена, последующие задачи могут завершиться ошибкой. На этом этапе у вас есть несколько вариантов для решения исключений: вы можете ждать завершения всех запущенных задач, в этом случае можно использовать WhenAll метод или решить, что все исключения важны и должны быть зарегистрированы. Для этого можно использовать продолжения для получения уведомления при выполнении задач асинхронно:

foreach(Task recommendation in recommendations)
{
    var ignored = recommendation.ContinueWith(
        t => { if (t.IsFaulted) Log(t.Exception); });
}

или:

foreach(Task recommendation in recommendations)
{
    var ignored = recommendation.ContinueWith(
        t => Log(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
}

или даже:

private static async void LogCompletionIfFailed(IEnumerable<Task> tasks)
{
    foreach(var task in tasks)
    {
        try { await task; }
        catch(Exception exc) { Log(exc); }
    }
}
…
LogCompletionIfFailed(recommendations);

Наконец, можно отменить все оставшиеся операции:

var cts = new CancellationTokenSource();
var recommendations = new List<Task<bool>>()
{
    GetBuyRecommendation1Async(symbol, cts.Token),
    GetBuyRecommendation2Async(symbol, cts.Token),
    GetBuyRecommendation3Async(symbol, cts.Token)
};

Task<bool> recommendation = await Task.WhenAny(recommendations);
cts.Cancel();
if (await recommendation) BuyStock(symbol);

Чередование

Рассмотрим случай, когда вы загружаете изображения из Интернета и обрабатываете каждый образ (например, добавление изображения в элемент управления пользовательским интерфейсом). Вы обрабатываете изображения последовательно в потоке пользовательского интерфейса, но хотите скачивать их как можно более параллельно. Кроме того, вы не хотите добавлять изображения в пользовательский интерфейс, пока они не будут загружены. Вместо этого необходимо добавлять их, как только они будут завершены.

List<Task<Bitmap>> imageTasks =
    (from imageUrl in urls select GetBitmapAsync(imageUrl)).ToList();
while(imageTasks.Count > 0)
{
    try
    {
        Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
        imageTasks.Remove(imageTask);

        Bitmap image = await imageTask;
        panel.AddImage(image);
    }
    catch{}
}

Можно также применить чередование к сценарию, который включает вычислительно интенсивную обработку на загруженных изображениях, например:

List<Task<Bitmap>> imageTasks =
    (from imageUrl in urls select GetBitmapAsync(imageUrl)
         .ContinueWith(t => ConvertImage(t.Result)).ToList();
while(imageTasks.Count > 0)
{
    try
    {
        Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
        imageTasks.Remove(imageTask);

        Bitmap image = await imageTask;
        panel.AddImage(image);
    }
    catch{}
}

Ограничение скорости

Рассмотрим пример чередования, за исключением того, что пользователь скачивает так много изображений, что количество загрузок должно регулироваться; например, вы хотите, чтобы только определенное количество загрузок выполнялось одновременно. Для этого можно запустить подмножество асинхронных операций. По завершении операций можно запустить другие операции, чтобы заменить их.

const int CONCURRENCY_LEVEL = 15;
Uri [] urls = …;
int nextIndex = 0;
var imageTasks = new List<Task<Bitmap>>();
while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)
{
    imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
    nextIndex++;
}

while(imageTasks.Count > 0)
{
    try
    {
        Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
        imageTasks.Remove(imageTask);

        Bitmap image = await imageTask;
        panel.AddImage(image);
    }
    catch(Exception exc) { Log(exc); }

    if (nextIndex < urls.Length)
    {
        imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
        nextIndex++;
    }
}

Раннее финансовое спасение

Предположим, что вы ожидаете асинхронно завершения операции при одновременном реагировании на запрос на отмену пользователя (например, пользователь нажимал кнопку отмены). Следующий код иллюстрирует этот сценарий:

private CancellationTokenSource m_cts;

public void btnCancel_Click(object sender, EventArgs e)
{
    if (m_cts != null) m_cts.Cancel();
}

public async void btnRun_Click(object sender, EventArgs e)
{
    m_cts = new CancellationTokenSource();
    btnRun.Enabled = false;
    try
    {
        Task<Bitmap> imageDownload = GetBitmapAsync(txtUrl.Text);
        await UntilCompletionOrCancellation(imageDownload, m_cts.Token);
        if (imageDownload.IsCompleted)
        {
            Bitmap image = await imageDownload;
            panel.AddImage(image);
        }
        else imageDownload.ContinueWith(t => Log(t));
    }
    finally { btnRun.Enabled = true; }
}

private static async Task UntilCompletionOrCancellation(
    Task asyncOp, CancellationToken ct)
{
    var tcs = new TaskCompletionSource<bool>();
    using(ct.Register(() => tcs.TrySetResult(true)))
        await Task.WhenAny(asyncOp, tcs.Task);
    return asyncOp;
}

Эта реализация снова активирует пользовательский интерфейс, как только вы решите прервать операцию, но не отменяет фоновую асинхронную операцию. Другой альтернативой будет отмена ожидающих операций, когда вы решите прервать процесс, но не восстанавливать пользовательский интерфейс, пока операции не завершатся, возможно, из-за их преждевременного завершения вследствие запроса на отмену.

private CancellationTokenSource m_cts;

public async void btnRun_Click(object sender, EventArgs e)
{
    m_cts = new CancellationTokenSource();

    btnRun.Enabled = false;
    try
    {
        Task<Bitmap> imageDownload = GetBitmapAsync(txtUrl.Text, m_cts.Token);
        await UntilCompletionOrCancellation(imageDownload, m_cts.Token);
        Bitmap image = await imageDownload;
        panel.AddImage(image);
    }
    catch(OperationCanceledException) {}
    finally { btnRun.Enabled = true; }
}

Еще один пример раннего спасения включает использование WhenAny метода в сочетании с Delay методом, как описано в следующем разделе.

Task.Delay

Метод Task.Delay можно использовать для введения пауз в выполнение асинхронного метода. Это полезно для многих видов функциональных возможностей, включая создание циклов опроса и задержку обработки входных данных пользователей в течение предопределенного периода времени. Этот метод Task.Delay также может быть полезен для реализации тайм-аутов в ожиданиях, если использовать его в сочетании с Task.WhenAny.

Если задача, которая является частью более крупной асинхронной операции (например, веб-службы ASP.NET) занимает слишком много времени, общая операция может пострадать, особенно если она не завершится. По этой причине важно иметь возможность прерывать асинхронную операцию по истечении времени ожидания. Синхронные Task.Wait, Task.WaitAll и Task.WaitAny методы принимают значения времени ожидания, но соответствующие TaskFactory.ContinueWhenAll/TaskFactory.ContinueWhenAny и ранее упомянутые Task.WhenAll/Task.WhenAny методы не принимают значения времени ожидания. Вместо этого можно использовать Task.Delay и Task.WhenAny в сочетании для реализации времени ожидания.

Например, в приложении пользовательского интерфейса предположим, что вы хотите скачать изображение и отключить пользовательский интерфейс во время скачивания изображения. Однако если скачивание занимает слишком много времени, необходимо повторно включить пользовательский интерфейс и отменить скачивание:

public async void btnDownload_Click(object sender, EventArgs e)
{
    btnDownload.Enabled = false;
    try
    {
        Task<Bitmap> download = GetBitmapAsync(url);
        if (download == await Task.WhenAny(download, Task.Delay(3000)))
        {
            Bitmap bmp = await download;
            pictureBox.Image = bmp;
            status.Text = "Downloaded";
        }
        else
        {
            pictureBox.Image = null;
            status.Text = "Timed out";
            var ignored = download.ContinueWith(
                t => Trace("Task finally completed"));
        }
    }
    finally { btnDownload.Enabled = true; }
}

Одно и то же относится к нескольким скачиваниям, так как WhenAll возвращает задачу:

public async void btnDownload_Click(object sender, RoutedEventArgs e)
{
    btnDownload.Enabled = false;
    try
    {
        Task<Bitmap[]> downloads =
            Task.WhenAll(from url in urls select GetBitmapAsync(url));
        if (downloads == await Task.WhenAny(downloads, Task.Delay(3000)))
        {
            foreach(var bmp in downloads.Result) panel.AddImage(bmp);
            status.Text = "Downloaded";
        }
        else
        {
            status.Text = "Timed out";
            downloads.ContinueWith(t => Log(t));
        }
    }
    finally { btnDownload.Enabled = true; }
}

Создание комбинаторов на основе задач

Так как задача может полностью представлять асинхронную операцию и предоставлять синхронные и асинхронные возможности для объединения с операцией, получения результатов и т. д., можно создавать полезные библиотеки комбинаторов, которые создают задачи для создания более крупных шаблонов. Как описано в предыдущем разделе, .NET включает несколько встроенных комбинаторов, но вы также можете создавать собственные. В следующих разделах приведены несколько примеров потенциальных методов и типов комбинатора.

Повторная попытка при ошибке

Во многих ситуациях может потребоваться повторить операцию, если предыдущая попытка завершится ошибкой. Для синхронного кода можно создать вспомогательный метод, например RetryOnFault в следующем примере, чтобы выполнить следующее:

public static T RetryOnFault<T>(
    Func<T> function, int maxTries)
{
    for(int i=0; i<maxTries; i++)
    {
        try { return function(); }
        catch { if (i == maxTries-1) throw; }
    }
    return default(T);
}

Вы можете создать практически идентичный вспомогательный метод для асинхронных операций, реализованных с помощью TAP, и таким образом возвращать задачи:

public static async Task<T> RetryOnFault<T>(
    Func<Task<T>> function, int maxTries)
{
    for(int i=0; i<maxTries; i++)
    {
        try { return await function().ConfigureAwait(false); }
        catch { if (i == maxTries-1) throw; }
    }
    return default(T);
}

Затем этот комбинатор можно использовать для кодирования повторных попыток в логику приложения; Например:

// Download the URL, trying up to three times in case of failure
string pageContents = await RetryOnFault(
    () => DownloadStringTaskAsync(url), 3);

Можно расширить функцию RetryOnFault дальше. Например, функция может принять другую Func<Task>, которая будет вызываться между попытками, чтобы определить, когда снова попытаться выполнить операцию; например:

public static async Task<T> RetryOnFault<T>(
    Func<Task<T>> function, int maxTries, Func<Task> retryWhen)
{
    for(int i=0; i<maxTries; i++)
    {
        try { return await function().ConfigureAwait(false); }
        catch { if (i == maxTries-1) throw; }
        await retryWhen().ConfigureAwait(false);
    }
    return default(T);
}

Затем можно использовать функцию следующим образом, чтобы дождаться секунды, прежде чем повторить операцию:

// Download the URL, trying up to three times in case of failure,
// and delaying for a second between retries
string pageContents = await RetryOnFault(
    () => DownloadStringTaskAsync(url), 3, () => Task.Delay(1000));

NeedOnlyOne

Иногда вы можете воспользоваться преимуществами избыточности, чтобы улучшить задержку операции и шансы на успех. Рассмотрим несколько веб-служб, которые предоставляют акции, но в разное время суток каждая служба может предоставлять различные уровни качества и времени отклика. Чтобы справиться с этими колебаниями, вы можете выдавать запросы ко всем веб-службам, и как только вы получите ответ от одного, отмените остальные запросы. Вы можете реализовать вспомогающую функцию, чтобы упростить реализацию этого общего шаблона запуска нескольких операций, ожидая любых операций, а затем отменить остальные. Функция NeedOnlyOne в следующем примере иллюстрирует этот сценарий:

public static async Task<T> NeedOnlyOne(
    params Func<CancellationToken,Task<T>> [] functions)
{
    var cts = new CancellationTokenSource();
    var tasks = (from function in functions
                 select function(cts.Token)).ToArray();
    var completed = await Task.WhenAny(tasks).ConfigureAwait(false);
    cts.Cancel();
    foreach(var task in tasks)
    {
        var ignored = task.ContinueWith(
            t => Log(t), TaskContinuationOptions.OnlyOnFaulted);
    }
    return completed;
}

Затем эту функцию можно использовать следующим образом:

double currentPrice = await NeedOnlyOne(
    ct => GetCurrentPriceFromServer1Async("msft", ct),
    ct => GetCurrentPriceFromServer2Async("msft", ct),
    ct => GetCurrentPriceFromServer3Async("msft", ct));

Интерлированные операции

Существует потенциальная проблема с производительностью при использовании WhenAny метода для поддержки сценария переключения при работе с большими наборами задач. Каждый вызов WhenAny приводит к регистрации продолжения в каждой задаче. Для N задач это приводит к созданию O(N2) континуумов на протяжении всего времени выполнения операции интерливинга. Если вы работаете с большим набором задач, можно использовать комбинатор (Interleaved в следующем примере) для решения проблемы с производительностью:

static IEnumerable<Task<T>> Interleaved<T>(IEnumerable<Task<T>> tasks)
{
    var inputTasks = tasks.ToList();
    var sources = (from _ in Enumerable.Range(0, inputTasks.Count)
                   select new TaskCompletionSource<T>()).ToList();
    int nextTaskIndex = -1;
    foreach (var inputTask in inputTasks)
    {
        inputTask.ContinueWith(completed =>
        {
            var source = sources[Interlocked.Increment(ref nextTaskIndex)];
            if (completed.IsFaulted)
                source.TrySetException(completed.Exception.InnerExceptions);
            else if (completed.IsCanceled)
                source.TrySetCanceled();
            else
                source.TrySetResult(completed.Result);
        }, CancellationToken.None,
           TaskContinuationOptions.ExecuteSynchronously,
           TaskScheduler.Default);
    }
    return from source in sources
           select source.Task;
}

Затем можно использовать комбинатор для обработки результатов задач по мере их выполнения; Например:

IEnumerable<Task<int>> tasks = ...;
foreach(var task in Interleaved(tasks))
{
    int result = await task;
    …
}

WhenAllOrFirstException

В некоторых сценариях разброса или сбора данных может потребоваться ждать всех задач в наборе, если только одно из них не будет ошибкой, в этом случае вы хотите прекратить ожидание сразу после возникновения исключения. Это можно сделать с помощью метода комбинатора, например WhenAllOrFirstException в следующем примере:

public static Task<T[]> WhenAllOrFirstException<T>(IEnumerable<Task<T>> tasks)
{
    var inputs = tasks.ToList();
    var ce = new CountdownEvent(inputs.Count);
    var tcs = new TaskCompletionSource<T[]>();

    Action<Task> onCompleted = (Task completed) =>
    {
        if (completed.IsFaulted)
            tcs.TrySetException(completed.Exception.InnerExceptions);
        if (ce.Signal() && !tcs.Task.IsCompleted)
            tcs.TrySetResult(inputs.Select(t => t.Result).ToArray());
    };

    foreach (var t in inputs) t.ContinueWith(onCompleted);
    return tcs.Task;
}

Создание структур данных на основе задач

Помимо возможности создания пользовательских комбинаторов на основе задач, наличие структуры данных в Task и Task<TResult>, которая представляет результаты асинхронной операции и обеспечивает необходимую синхронизацию для объединения с ней, делает этот тип мощным инструментом для построения пользовательских структур данных, которые будут использоваться в асинхронных сценариях.

Асинхронный Кэш

Одним из важных аспектов задачи является то, что она может быть передана нескольким потребителям, каждый из которых может ожидать её, зарегистрировать продолжения для неё, получить её результат или исключения (в случае Task<TResult>) и т. д. Это делает Task и Task<TResult> идеально подходящими для использования в асинхронной инфраструктуре кэширования. Ниже приведен пример небольшого, но мощного асинхронного кэша, созданного на основе Task<TResult>:

public class AsyncCache<TKey, TValue>
{
    private readonly Func<TKey, Task<TValue>> _valueFactory;
    private readonly ConcurrentDictionary<TKey, Lazy<Task<TValue>>> _map;

    public AsyncCache(Func<TKey, Task<TValue>> valueFactory)
    {
        if (valueFactory == null) throw new ArgumentNullException("valueFactory");
        _valueFactory = valueFactory;
        _map = new ConcurrentDictionary<TKey, Lazy<Task<TValue>>>();
    }

    public Task<TValue> this[TKey key]
    {
        get
        {
            if (key == null) throw new ArgumentNullException("key");
            return _map.GetOrAdd(key, toAdd =>
                new Lazy<Task<TValue>>(() => _valueFactory(toAdd))).Value;
        }
    }
}

Класс AsyncCache<TKey,TValue> принимает в качестве делегата для своего конструктора функцию, которая принимает TKey и возвращает Task<TResult>. Все предыдущие значения из кэша хранятся во внутреннем словаре, и AsyncCache гарантирует, что только одна задача создается на ключ, даже при одновременном обращении к кэшу.

Например, можно создать кэш для скачанных веб-страниц:

private AsyncCache<string,string> m_webPages =
    new AsyncCache<string,string>(DownloadStringTaskAsync);

Затем этот кэш можно использовать в асинхронных методах, когда требуется содержимое веб-страницы. Класс AsyncCache гарантирует, что вы загружаете как можно меньше страниц и кэшируете результаты.

private async void btnDownload_Click(object sender, RoutedEventArgs e)
{
    btnDownload.IsEnabled = false;
    try
    {
        txtContents.Text = await m_webPages["https://www.microsoft.com"];
    }
    finally { btnDownload.IsEnabled = true; }
}

AsyncProducerConsumerCollection

Вы также можете использовать задачи для создания структур данных для координации асинхронных действий. Рассмотрим один из классических шаблонов параллельного проектирования: производитель или потребитель. В этой модели производители генерируют данные, которые затем потребляются потребителями, и они могут работать параллельно. Например, потребитель обрабатывает элемент 1, который ранее был создан производителем, который в настоящее время производит элемент 2. Для шаблона производителя или потребителя вам всегда требуется некоторая структура данных для хранения работы, созданной производителями, чтобы потребители могли получать уведомления о новых данных и находить их при наличии.

Ниже приведена простая структура данных, построенная на основе задач, которая позволяет асинхронным методам использоваться в качестве производителей и потребителей:

public class AsyncProducerConsumerCollection<T>
{
    private readonly Queue<T> m_collection = new Queue<T>();
    private readonly Queue<TaskCompletionSource<T>> m_waiting =
        new Queue<TaskCompletionSource<T>>();

    public void Add(T item)
    {
        TaskCompletionSource<T> tcs = null;
        lock (m_collection)
        {
            if (m_waiting.Count > 0) tcs = m_waiting.Dequeue();
            else m_collection.Enqueue(item);
        }
        if (tcs != null) tcs.TrySetResult(item);
    }

    public Task<T> Take()
    {
        lock (m_collection)
        {
            if (m_collection.Count > 0)
            {
                return Task.FromResult(m_collection.Dequeue());
            }
            else
            {
                var tcs = new TaskCompletionSource<T>();
                m_waiting.Enqueue(tcs);
                return tcs.Task;
            }
        }
    }
}

Используя эту структуру данных, можно написать код, например следующий:

private static AsyncProducerConsumerCollection<int> m_data = …;
…
private static async Task ConsumerAsync()
{
    while(true)
    {
        int nextItem = await m_data.Take();
        ProcessNextItem(nextItem);
    }
}
…
private static void Produce(int data)
{
    m_data.Add(data);
}

Пространство System.Threading.Tasks.Dataflow имен включает BufferBlock<T> тип, который можно использовать подобным образом, но без необходимости создавать собственный тип коллекции.

private static BufferBlock<int> m_data = …;
…
private static async Task ConsumerAsync()
{
    while(true)
    {
        int nextItem = await m_data.ReceiveAsync();
        ProcessNextItem(nextItem);
    }
}
…
private static void Produce(int data)
{
    m_data.Post(data);
}

Замечание

Пространство System.Threading.Tasks.Dataflow имен доступно в виде пакета NuGet. Чтобы установить сборку, содержащую System.Threading.Tasks.Dataflow пространство имен, откройте проект в Visual Studio, выберите "Управление пакетами NuGet " в меню "Проект" и найдите пакет в Интернете System.Threading.Tasks.Dataflow .

См. также