Использование асинхронного шаблона на основе задач

При использовании асинхронного шаблона на основе задач (TAP) для работы с асинхронными операциями можно использовать обратные вызовы для достижения ожидания без блокировки. Для задач этот шаблон использует такие методы, как Task.ContinueWith. Асинхронная поддержка на основе языка скрывает обратные вызовы, позволяя асинхронным операциям ожидаться в обычном потоке управления, а код, созданный компилятором, обеспечивает эту же поддержку на уровне API.

Приостановка выполнения с помощью Await

Ключевое слово await в C# и оператор Await в Visual Basic можно использовать для асинхронного ожидания Task и Task<TResult> объектов. При ожидании Task иawait выражение имеет тип void. При ожидании Task<TResult>, выражение await имеет тип TResult. Выражение await должно использоваться внутри тела асинхронного метода. (Эти языковые функции были представлены в .NET Framework 4.5.)

В этой области функция await устанавливает обратный вызов для задачи с помощью продолжения. Этот обратный вызов возобновляет асинхронный метод в точке приостановки. При возобновлении асинхронного метода, если ожидаемая операция завершилась успешно и была Task<TResult>, возвращается ее TResult. Если Task или Task<TResult>, которого ожидали, завершилось в состоянии Canceled, возникает исключение OperationCanceledException. Если ожидаемое Task или Task<TResult> завершилось в состоянии Faulted, вызывается исключение, которое стало причиной сбоя. Ошибка Task может возникнуть в результате нескольких исключений, но распространяется только одно из этих исключений. Однако Task.Exception свойство возвращает AggregateException исключение, содержащее все ошибки.

Если контекст синхронизации (SynchronizationContext объект) связан с потоком, выполнявшим асинхронный метод во время приостановки (например, если свойство SynchronizationContext.Current не является null), асинхронный метод возобновляется в том же контексте синхронизации с помощью метода контекста Post. В противном случае он зависит от планировщика задач (TaskScheduler объекта), текущего во время приостановки. Как правило, это планировщик задач по умолчанию (TaskScheduler.Default), предназначенный для пула потоков. Этот планировщик задач определяет, должна ли ожидаемая асинхронная операция возобновиться в месте, где она завершилась, или следует назначить время для возобновления. Планировщик по умолчанию обычно позволяет продолжению выполняться в потоке, завершаемом ожидаемой операцией.

При вызове асинхронного метода он синхронно выполняет текст функции до тех пор, пока не будет выполнено первое выражение ожидания в ожидаемом экземпляре, который еще не завершен, в то время как вызов возвращается вызывающему объекту. Если асинхронный метод не возвращает void, он возвращает объект Task или Task<TResult>, представляющий текущее вычисление. В асинхронном методе, не возвращающем void, если встречается оператор return или достигается конец тела метода, задача завершается в окончательном RanToCompletion состоянии. Если необработанное исключение приводит к тому, что управление покидает тело асинхронного метода, задача заканчивается в состоянии Faulted. Если это исключение является OperationCanceledException, задача вместо этого заканчивается в состоянии Canceled. Таким образом, результат или исключение в конечном итоге публикуется.

Существуют несколько важных вариантов этого поведения. По соображениям производительности, если к моменту, когда задача ожидается, она уже завершена, управление не передается, и функция продолжает выполняться. Кроме того, возврат к исходному контексту не всегда является требуемым поведением и может быть изменен; Это поведение подробно описано в следующем разделе.

Настройка приостановки и возобновления с помощью Yield и ConfigureAwait

Несколько методов обеспечивают более контроль над выполнением асинхронного метода. Например, можно использовать Task.Yield метод для внедрения точки доходности в асинхронный метод:

public class Task : …
{
    public static YieldAwaitable Yield();
    …
}

Этот метод эквивалентен асинхронной отправке или переносу обратно в текущий контекст.

public static async Task YieldLoopExample()
{
    await Task.Run(async delegate
    {
        for (int i = 0; i < 1000000; i++)
        {
            await Task.Yield(); // fork the continuation into a separate work item
        }
    });
}
Public Async Function YieldLoopExample() As Task
    Await Task.Run(Async Function()
                       For i As Integer = 0 To 999999
                           Await Task.Yield() ' fork the continuation into a separate work item
                       Next
                   End Function)
End Function

Вы также можете использовать Task.ConfigureAwait метод для более эффективного контроля над приостановкой и возобновлением в асинхронном методе. Как упоминалось ранее, по умолчанию текущий контекст фиксируется во время приостановки асинхронного метода, и для вызова продолжения асинхронного метода при возобновлении используется захваченный контекст. Во многих случаях это точное поведение, которое вы хотите. В других случаях можно не уделять внимания контексту продолжения, и можно добиться лучшей производительности, избегая возврата к исходному контексту. Чтобы включить это поведение, используйте метод Task.ConfigureAwait, чтобы указать операции await не захватывать и не возобновлять выполнение в контексте, а продолжать выполнение там, где завершилась асинхронная операция.

await someTask.ConfigureAwait(continueOnCapturedContext:false);

Awaitables, ConfigureAwait и SynchronizationContext — парадигмы и компоненты в асинхронном программировании.

await работает с любым типом, удовлетворяющим шаблону ожидаемого выражения, а не только Task. Тип можно ожидать, если он содержит совместимый метод GetAwaiter, который возвращает тип с элементами IsCompleted, OnCompleted, и GetResult. В большинстве общедоступных API возвращайте Task, Task<TResult>, ValueTask или ValueTask<TResult>. Используйте пользовательские ожидаемые объекты только для специализированных сценариев.

Используйте ConfigureAwait , если продолжение не требует контекста вызывающего объекта. В коде приложения, который обновляет пользовательский интерфейс, часто требуется запись контекста. В коде библиотеки для повторного использования обычно предпочтительно ConfigureAwait(false), так как он избегает ненужных прыжков контекста и снижает риск дедлока для вызывающих функций при блокировке.

ConfigureAwait(false) изменяет планирование продолжений, а не управляет ExecutionContext потоком. Более подробное описание поведения контекста см. в разделе ExecutionContext и SynchronizationContext.

Отмена асинхронной операции

Начиная с .NET Framework 4 методы TAP, поддерживающие отмену, предоставляют по крайней мере одну перегрузку, которая принимает маркер отмены (CancellationToken объект).

Вы создаете маркер отмены через источник маркера отмены (объект CancellationTokenSource). Свойство источника Token возвращает маркер отмены, который сигнализирует при вызове метода источника Cancel .

var cts = new CancellationTokenSource();
string result = await DownloadStringTaskAsync(url, cts.Token);
… // at some point later, potentially on another thread
cts.Cancel();

Например, если вы хотите скачать одну веб-страницу и хотите отменить операцию, создать CancellationTokenSource объект, передать его маркер в метод TAP, а затем вызвать метод источника Cancel , когда вы будете готовы отменить операцию:

var cts = new CancellationTokenSource();
    IList<string> results = await Task.WhenAll(from url in urls select DownloadStringTaskAsync(url, cts.Token));
    // at some point later, potentially on another thread
    …
    cts.Cancel();

Кроме того, можно передать один и тот же маркер в выборочное подмножество операций:

var cts = new CancellationTokenSource();
    byte [] data = await DownloadDataAsync(url, cts.Token);
    await SaveToDiskAsync(outputPath, data, CancellationToken.None);
    … // at some point later, potentially on another thread
    cts.Cancel();

Это важно

Любой поток может инициировать запросы на отмену.

Вы можете передать значение CancellationToken.None любому методу, который принимает маркер отмены, чтобы указать, что отмена никогда не запрашивается. Это значение приводит к тому, что свойство CancellationToken.CanBeCanceled возвращает false, и вызываемый метод может быть оптимизирован соответствующим образом. В целях тестирования можно также передать предварительно отмененный маркер отмены, созданный с помощью конструктора, который принимает логическое значение, чтобы указать, должен ли маркер начинаться в уже отмененном или неотменяемом состоянии.

Этот подход к отмене имеет несколько преимуществ:

  • Вы можете передать один и тот же маркер отмены любому количеству асинхронных и синхронных операций.

  • Тот же запрос на отмену может быть отправлен любому числу слушателей.

  • Разработчик асинхронного API полностью контролирует, можно ли запрашивать отмену и когда она вступает в силу.

  • Код, который использует API, может выборочно определить асинхронные вызовы, на которые отправляются запросы на отмену.

Мониторинг хода выполнения

Некоторые асинхронные методы предоставляют интерфейс прогресса для отслеживания хода выполнения, который передается в асинхронный метод. Например, рассмотрим функцию, которая асинхронно загружает строку текста и в процессе поступают обновления хода выполнения, включающие процент завершенной на данный момент загрузки. Такой метод можно использовать в приложении Windows Presentation Foundation (WPF) следующим образом:

private async void btnDownload_Click(object sender, RoutedEventArgs e)
{
    btnDownload.IsEnabled = false;
    try
    {
        txtResult.Text = await DownloadStringTaskAsync(txtUrl.Text,
            new Progress<int>(p => pbDownloadProgress.Value = p));
    }
    finally { btnDownload.IsEnabled = true; }
}

Использование встроенных комбинаторов на основе задач

Пространство System.Threading.Tasks имен включает несколько методов создания и работы с задачами.

Замечание

В нескольких примерах кода в этом разделе используется Bitmap, для которого требуется пакет System.Drawing.Common и поддерживается только в Windows. Асинхронные шаблоны, которые они демонстрируют, применяются на всех платформах; замените кроссплатформенную библиотеку изображений для целевых объектов, отличных от Windows.

Task.Run

Класс Task включает несколько Run методов, которые позволяют легко выгрузить работу в виде Task или Task<TResult> в пул потоков. Рассмотрим пример.

public static async Task TaskRunBasicExample()
{
    int answer = 42;
    string result = await Task.Run(() =>
    {
        // … do compute-bound work here
        return answer.ToString();
    });
    Console.WriteLine(result);
}
Public Async Function TaskRunBasicExample() As Task
    Dim answer As Integer = 42
    Dim result As String = Await Task.Run(Function()
                                              ' … do compute-bound work here
                                              Return answer.ToString()
                                          End Function)
    Console.WriteLine(result)
End Function

Некоторые из этих Run методов, например перегрузка Task.Run(Func<Task>) , существуют как сокращенные для TaskFactory.StartNew метода. Этот метод перегрузки позволяет использовать await в переданной работе. Рассмотрим пример.

public static async Task TaskRunAsyncExample()
{
    Bitmap image = await Task.Run(async () =>
    {
        using Bitmap bmp1 = await Stubs.DownloadFirstImageAsync();
        using Bitmap bmp2 = await Stubs.DownloadSecondImageAsync();
        return Stubs.Mashup(bmp1, bmp2);
    });
}
Public Async Function TaskRunAsyncExample() As Task
    Dim image As Bitmap = Await Task.Run(Async Function()
                                             Using bmp1 As Bitmap = Await Stubs.DownloadFirstImageAsync()
                                                 Using bmp2 As Bitmap = Await Stubs.DownloadSecondImageAsync()
                                                     Return Stubs.Mashup(bmp1, bmp2)
                                                 End Using
                                             End Using
                                         End Function)
End Function

Такие перегрузки логически эквивалентны использованию TaskFactory.StartNew метода в сочетании с Unwrap методом расширения в библиотеке параллельных задач.

Task.FromResult

Используйте метод FromResult в сценариях, где данные уже могут быть доступны, и вам просто нужно вернуть их из метода, возвращающего задачи Task<TResult>:

public static Task<int> GetValueAsync(string key)
{
    int cachedValue;
    return Stubs.TryGetCachedValue(out cachedValue) ?
        Task.FromResult(cachedValue) :
        GetValueAsyncInternal(key);
}

static async Task<int> GetValueAsyncInternal(string key)
{
    await Task.Delay(1);
    return 0;
}
Public Function GetValueAsync(key As String) As Task(Of Integer)
    Dim cachedValue As Integer
    If Stubs.TryGetCachedValue(cachedValue) Then
        Return Task.FromResult(cachedValue)
    Else
        Return GetValueAsyncInternal(key)
    End If
End Function

Private Async Function GetValueAsyncInternal(key As String) As Task(Of Integer)
    Await Task.Delay(1)
    Return 0
End Function

Task.WhenAll

Используйте метод WhenAll для асинхронного ожидания нескольких операций, представленных в виде задач. Метод имеет несколько перегрузок, поддерживающих набор неуниверсальных задач или неравномерный набор универсальных задач (например, асинхронное ожидание нескольких операций, возвращающих void, или асинхронное ожидание нескольких методов, возвращающих значения, где каждое значение может иметь другой тип), и поддерживает унифицированный набор универсальных задач (например, асинхронное ожидание нескольких методов, возвращающих TResult).

Предположим, вы хотите отправить сообщения электронной почты нескольким клиентам. Вы можете перекрывать отправку сообщений, чтобы вы не ждали завершения одного сообщения перед отправкой следующего. Вы также можете узнать, когда операции отправки завершены и возникают ли ошибки:

IEnumerable<Task> asyncOps = from addr in addrs select SendMailAsync(addr);
await Task.WhenAll(asyncOps);

Этот код не обрабатывает исключения, которые могут возникнуть, но позволяет исключениям распространяться за пределы await из результирующей задачи WhenAll. Для обработки исключений используйте следующий код:

public static async Task WhenAllWithCatch()
{
    IEnumerable<Task> asyncOps = from addr in Stubs.addrs select Stubs.SendMailAsync(addr);
    try
    {
        await Task.WhenAll(asyncOps);
    }
    catch (Exception exc)
    {
        Console.WriteLine(exc);
    }
}
Public Async Function WhenAllWithCatch() As Task
    Dim asyncOps As IEnumerable(Of Task) = From addr In Stubs.addrs Select Stubs.SendMailAsync(addr)
    Try
        Await Task.WhenAll(asyncOps)
    Catch exc As Exception
        Console.WriteLine(exc)
    End Try
End Function

В этом случае, если какая-либо асинхронная операция завершится сбоем, все исключения объединяются в одно исключение AggregateException, которое хранится в Task, возвращаемого методом WhenAll. Однако только одно из этих исключений распространяется ключевым словом await. Если вы хотите проверить все исключения, можно переписать предыдущий код следующим образом:

public static async Task WhenAllExamineExceptions()
{
    Task[] asyncOps = (from addr in Stubs.addrs select Stubs.SendMailAsync(addr)).ToArray();
    try
    {
        await Task.WhenAll(asyncOps);
    }
    catch (Exception exc)
    {
        foreach (Task faulted in asyncOps.Where(t => t.IsFaulted))
        {
            Console.WriteLine($"Faulted: {faulted.Exception}");
        }
    }
}
Public Async Function WhenAllExamineExceptions() As Task
    Dim asyncOps As Task() = (From addr In Stubs.addrs Select Stubs.SendMailAsync(addr)).ToArray()
    Try
        Await Task.WhenAll(asyncOps)
    Catch exc As Exception
        For Each faulted As Task In asyncOps.Where(Function(t) t.IsFaulted)
            Console.WriteLine($"Faulted: {faulted.Exception}")
        Next
    End Try
End Function

Рассмотрим пример загрузки нескольких файлов из Интернета асинхронно. В этом случае все асинхронные операции имеют однородные типы результатов и легко получить доступ к результатам:

string [] pages = await Task.WhenAll(
    from url in urls select DownloadStringTaskAsync(url));

Вы можете использовать те же методы обработки исключений, которые рассматриваются в предыдущем сценарии возврата void:

public static async Task WhenAllDownloadPagesExceptions()
{
    Task<string>[] asyncOps =
        (from url in Stubs.urls select Stubs.DownloadStringTaskAsync(url)).ToArray();
    try
    {
        string[] pages = await Task.WhenAll(asyncOps);
        Console.WriteLine(pages.Length);
    }
    catch (Exception exc)
    {
        foreach (Task<string> faulted in asyncOps.Where(t => t.IsFaulted))
        {
            Console.WriteLine($"Faulted: {faulted.Exception}");
        }
    }
}
Public Async Function WhenAllDownloadPagesExceptions() As Task
    Dim asyncOps As Task(Of String)() =
        (From url In Stubs.urls Select Stubs.DownloadStringTaskAsync(url)).ToArray()
    Try
        Dim pages As String() = Await Task.WhenAll(asyncOps)
        Console.WriteLine(pages.Length)
    Catch exc As Exception
        For Each faulted As Task(Of String) In asyncOps.Where(Function(t) t.IsFaulted)
            Console.WriteLine($"Faulted: {faulted.Exception}")
        Next
    End Try
End Function

Task.WhenAny

WhenAny Используйте метод для асинхронного ожидания выполнения только одного из нескольких асинхронных операций, представленных как задачи. Этот метод служит четырьмя основными вариантами использования:

  • Редундантность: выполнение операции несколько раз и выбор первой завершившейся (например, обращение к нескольким веб-службам по биржевым котировкам, возвращающих результат; выбор самой быстрой).

  • Переключение: запуск нескольких операций и ожидание завершения всех этих операций, но обработка их по завершении.

  • Регулирование. Разрешение дополнительных операций начинаться после завершения других операций. Этот сценарий является расширением сценария взаимодействия.

  • Раннее освобождение: например, операция, представленная задачей WhenAny t1, может быть сгруппирована в задачу WhenAny, содержащую другую задачу t2, и вы можете ждать задачу . Задача t2 может представлять собой время ожидания, отмену или другой сигнал, который заставляет задачу WhenAny завершиться раньше, чем завершится t1.

Избыточность

Рассмотрим случай, когда вы хотите принять решение о том, следует ли покупать акции. Существуют несколько веб-сервисов по рекомендациям акций, которым вы доверяете, но в зависимости от ежедневной нагрузки каждый из них может быть медленным в разные периоды. WhenAny Используйте метод для получения уведомления при завершении любой операции:

public static async Task WhenAnyRedundancy(string symbol)
{
    var recommendations = new List<Task<bool>>()
    {
        Stubs.GetBuyRecommendation1Async(symbol),
        Stubs.GetBuyRecommendation2Async(symbol),
        Stubs.GetBuyRecommendation3Async(symbol)
    };
    Task<bool> recommendation = await Task.WhenAny(recommendations);
    if (await recommendation) Stubs.BuyStock(symbol);
}
Public Async Function WhenAnyRedundancy(symbol As String) As Task
    Dim recommendations As New List(Of Task(Of Boolean)) From {
        Stubs.GetBuyRecommendation1Async(symbol),
        Stubs.GetBuyRecommendation2Async(symbol),
        Stubs.GetBuyRecommendation3Async(symbol)
    }
    Dim recommendation As Task(Of Boolean) = Await Task.WhenAny(recommendations)
    If Await recommendation Then Stubs.BuyStock(symbol)
End Function

В отличие от WhenAll, который возвращает незавернутые результаты всех задач, успешно выполненных, WhenAny возвращает выполненную задачу. Если задача завершается ошибкой, важно знать, что она завершилась ошибкой, и если задача завершается успешно, важно знать, с какой задачей связано возвращаемое значение. Поэтому необходимо получить доступ к результату возвращенной задачи или ожидать её завершения, как показано в этом примере.

Как и в случае с WhenAll, вы должны иметь возможность учитывать исключения. Так как вы получите завершенную задачу обратно, вы можете ожидать последствий распространения ошибок в возвращаемой задаче и try/catch их соответствующей корректировки. Например:

public static async Task WhenAnyRetryOnException(string symbol)
{
    Task<bool>[] allRecommendations = new Task<bool>[]
    {
        Stubs.GetBuyRecommendation1Async(symbol),
        Stubs.GetBuyRecommendation2Async(symbol),
        Stubs.GetBuyRecommendation3Async(symbol)
    };
    var remaining = allRecommendations.ToList();
    while (remaining.Count > 0)
    {
        Task<bool> recommendation = await Task.WhenAny(remaining);
        try
        {
            if (await recommendation) Stubs.BuyStock(symbol);
            break;
        }
        catch (WebException)
        {
            remaining.Remove(recommendation);
        }
    }
}
Public Async Function WhenAnyRetryOnException(symbol As String) As Task
    Dim allRecommendations As Task(Of Boolean)() = {
        Stubs.GetBuyRecommendation1Async(symbol),
        Stubs.GetBuyRecommendation2Async(symbol),
        Stubs.GetBuyRecommendation3Async(symbol)
    }
    Dim remaining As List(Of Task(Of Boolean)) = allRecommendations.ToList()
    While remaining.Count > 0
        Dim recommendation As Task(Of Boolean) = Await Task.WhenAny(remaining)
        Try
            If Await recommendation Then Stubs.BuyStock(symbol)
            Exit While
        Catch ex As WebException
            remaining.Remove(recommendation)
        End Try
    End While
End Function

Кроме того, даже если первая задача успешно завершена, последующие задачи могут завершиться ошибкой. На этом этапе у вас есть несколько вариантов для решения исключений: вы можете ждать завершения всех запущенных задач, в этом случае можно использовать WhenAll метод или решить, что все исключения важны и должны быть зарегистрированы. В этом сценарии можно использовать продолжения для получения уведомления при выполнении задач асинхронно:

foreach(Task recommendation in recommendations)
{
    var ignored = recommendation.ContinueWith(
        t => { if (t.IsFaulted) Log(t.Exception); });
}

или:

foreach(Task recommendation in recommendations)
{
    var ignored = recommendation.ContinueWith(
        t => Log(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
}

или даже:

private static async void LogCompletionIfFailed(IEnumerable<Task> tasks)
{
    foreach (var task in tasks)
    {
        try { await task; }
        catch (Exception exc) { Stubs.Log(exc); }
    }
}
Private Async Sub LogCompletionIfFailed(tasks As IEnumerable(Of Task))
    For Each task In tasks
        Try
            Await task
        Catch exc As Exception
            Stubs.Log(exc)
        End Try
    Next
End Sub

Наконец, может потребоваться отменить все оставшиеся операции:

public static async Task WhenAnyCancelRemainder(string symbol)
{
    var cts = new CancellationTokenSource();
    var recommendations = new List<Task<bool>>()
    {
        Stubs.GetBuyRecommendation1Async(symbol, cts.Token),
        Stubs.GetBuyRecommendation2Async(symbol, cts.Token),
        Stubs.GetBuyRecommendation3Async(symbol, cts.Token)
    };

    Task<bool> recommendation = await Task.WhenAny(recommendations);
    cts.Cancel();
    if (await recommendation) Stubs.BuyStock(symbol);
}
Public Async Function WhenAnyCancelRemainder(symbol As String) As Task
    Dim cts As New CancellationTokenSource()
    Dim recommendations As New List(Of Task(Of Boolean)) From {
        Stubs.GetBuyRecommendation1Async(symbol, cts.Token),
        Stubs.GetBuyRecommendation2Async(symbol, cts.Token),
        Stubs.GetBuyRecommendation3Async(symbol, cts.Token)
    }

    Dim recommendation As Task(Of Boolean) = Await Task.WhenAny(recommendations)
    cts.Cancel()
    If Await recommendation Then Stubs.BuyStock(symbol)
End Function

Чередование

Рассмотрим случай, когда вы загружаете изображения из Интернета и обрабатываете каждый образ (например, добавление изображения в элемент управления пользовательским интерфейсом). Вы обрабатываете изображения последовательно в потоке пользовательского интерфейса, но хотите скачивать их как можно более параллельно. Кроме того, вы не хотите добавлять изображения в пользовательский интерфейс, пока они не будут загружены. Вместо этого необходимо добавлять их, как только они будут завершены.

public static async Task WhenAnyInterleaving(string[] imageUrls)
{
    List<Task<Bitmap>> imageTasks =
        (from imageUrl in imageUrls select Stubs.GetBitmapAsync(imageUrl)).ToList();
    while (imageTasks.Count > 0)
    {
        try
        {
            Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
            imageTasks.Remove(imageTask);

            Bitmap image = await imageTask;
            Console.WriteLine($"Got image: {image.Width}x{image.Height}");
        }
        catch { }
    }
}
Public Async Function WhenAnyInterleaving(imageUrls As String()) As Task
    Dim imageTasks As List(Of Task(Of Bitmap)) =
        (From imageUrl In imageUrls Select Stubs.GetBitmapAsync(imageUrl)).ToList()
    While imageTasks.Count > 0
        Try
            Dim imageTask As Task(Of Bitmap) = Await Task.WhenAny(imageTasks)
            imageTasks.Remove(imageTask)

            Dim image As Bitmap = Await imageTask
            Console.WriteLine($"Got image: {image.Width}x{image.Height}")
        Catch
        End Try
    End While
End Function

Можно также применить чередование к сценарию, который включает вычислительно интенсивную обработку на загруженных изображениях, например:

public static async Task WhenAnyInterleavingWithProcessing(string[] imageUrls)
{
    List<Task<Bitmap>> imageTasks =
        (from imageUrl in imageUrls
         select Stubs.GetBitmapAsync(imageUrl)
             .ContinueWith(t => Stubs.ConvertImage(t.Result))).ToList();
    while (imageTasks.Count > 0)
    {
        try
        {
            Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
            imageTasks.Remove(imageTask);

            Bitmap image = await imageTask;
            Console.WriteLine($"Got image: {image.Width}x{image.Height}");
        }
        catch { }
    }
}
Public Async Function WhenAnyInterleavingWithProcessing(imageUrls As String()) As Task
    Dim imageTasks As List(Of Task(Of Bitmap)) =
        (From imageUrl In imageUrls
         Select Stubs.GetBitmapAsync(imageUrl).ContinueWith(Function(t) Stubs.ConvertImage(t.Result))).ToList()
    While imageTasks.Count > 0
        Try
            Dim imageTask As Task(Of Bitmap) = Await Task.WhenAny(imageTasks)
            imageTasks.Remove(imageTask)

            Dim image As Bitmap = Await imageTask
            Console.WriteLine($"Got image: {image.Width}x{image.Height}")
        Catch
        End Try
    End While
End Function

Ограничение скорости

Рассмотрим пример чередования, за исключением того, что пользователь скачивает такое количество изображений, что загрузки должны быть ограничены. Например, вы хотите, чтобы только определенное количество загрузок происходило одновременно. Чтобы достичь этой цели, запустите подмножество асинхронных операций. По завершении операций можно запустить другие операции, чтобы заменить их.

public static async Task WhenAnyThrottling(Uri[] uriList)
{
    const int CONCURRENCY_LEVEL = 15;
    int nextIndex = 0;
    var imageTasks = new List<Task<Bitmap>>();
    while (nextIndex < CONCURRENCY_LEVEL && nextIndex < uriList.Length)
    {
        imageTasks.Add(Stubs.GetBitmapAsync(uriList[nextIndex].ToString()));
        nextIndex++;
    }

    while (imageTasks.Count > 0)
    {
        try
        {
            Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
            imageTasks.Remove(imageTask);

            Bitmap image = await imageTask;
            Console.WriteLine($"Got image: {image.Width}x{image.Height}");
        }
        catch (Exception exc) { Stubs.Log(exc); }

        if (nextIndex < uriList.Length)
        {
            imageTasks.Add(Stubs.GetBitmapAsync(uriList[nextIndex].ToString()));
            nextIndex++;
        }
    }
}
Public Async Function WhenAnyThrottling(uriList As Uri()) As Task
    Const CONCURRENCY_LEVEL As Integer = 15
    Dim nextIndex As Integer = 0
    Dim imageTasks As New List(Of Task(Of Bitmap))
    While nextIndex < CONCURRENCY_LEVEL AndAlso nextIndex < uriList.Length
        imageTasks.Add(Stubs.GetBitmapAsync(uriList(nextIndex).ToString()))
        nextIndex += 1
    End While

    While imageTasks.Count > 0
        Try
            Dim imageTask As Task(Of Bitmap) = Await Task.WhenAny(imageTasks)
            imageTasks.Remove(imageTask)

            Dim image As Bitmap = Await imageTask
            Console.WriteLine($"Got image: {image.Width}x{image.Height}")
        Catch exc As Exception
            Stubs.Log(exc)
        End Try

        If nextIndex < uriList.Length Then
            imageTasks.Add(Stubs.GetBitmapAsync(uriList(nextIndex).ToString()))
            nextIndex += 1
        End If
    End While
End Function

Раннее финансовое спасение

Предположим, что вы ожидаете асинхронно завершения операции при одновременном реагировании на запрос на отмену пользователя (например, пользователь нажимал кнопку отмены). Следующий код иллюстрирует этот сценарий:

class EarlyBailoutUI
{
    private CancellationTokenSource? m_cts;

    public void btnCancel_Click(object sender, EventArgs e)
    {
        if (m_cts != null) m_cts.Cancel();
    }

    public async void btnRun_Click(object sender, EventArgs e)
    {
        m_cts = new CancellationTokenSource();
        try
        {
            Task<Bitmap> imageDownload = Stubs.GetBitmapAsync("url");
            await Examples.UntilCompletionOrCancellation(imageDownload, m_cts.Token);
            if (imageDownload.IsCompleted)
            {
                Bitmap image = await imageDownload;
                Stubs.Log(image);
            }
            else imageDownload.ContinueWith(t => Stubs.Log(t));
        }
        finally { }
    }
}
Class EarlyBailoutUI
    Private m_cts As CancellationTokenSource

    Public Sub btnCancel_Click(sender As Object, e As EventArgs)
        If m_cts IsNot Nothing Then m_cts.Cancel()
    End Sub

    Public Async Sub btnRun_Click(sender As Object, e As EventArgs)
        m_cts = New CancellationTokenSource()
        Try
            Dim imageDownload As Task(Of Bitmap) = Stubs.GetBitmapAsync("url")
            Await Examples.UntilCompletionOrCancellation(imageDownload, m_cts.Token)
            If imageDownload.IsCompleted Then
                Dim image As Bitmap = Await imageDownload
                Stubs.Log(image)
            Else
                imageDownload.ContinueWith(Sub(t) Stubs.Log(t))
            End If
        Finally
        End Try
    End Sub
End Class

Эта реализация снова активирует пользовательский интерфейс, как только вы решите прервать операцию, но не отменяет фоновую асинхронную операцию. Другой альтернативой будет отмена ожидающих операций, когда вы решите прервать процесс, но не восстанавливать пользовательский интерфейс, пока операции не завершатся, возможно, из-за их преждевременного завершения вследствие запроса на отмену.

class EarlyBailoutWithTokenUI
{
    private CancellationTokenSource? m_cts;

    public async void btnRun_Click(object sender, EventArgs e)
    {
        m_cts = new CancellationTokenSource();
        try
        {
            Task<Bitmap> imageDownload = Stubs.GetBitmapAsync("url", m_cts.Token);
            await Examples.UntilCompletionOrCancellation(imageDownload, m_cts.Token);
            Bitmap image = await imageDownload;
            Stubs.Log(image);
        }
        catch (OperationCanceledException) { }
        finally { }
    }
}
Class EarlyBailoutWithTokenUI
    Private m_cts As CancellationTokenSource

    Public Async Sub btnRun_Click(sender As Object, e As EventArgs)
        m_cts = New CancellationTokenSource()
        Try
            Dim imageDownload As Task(Of Bitmap) = Stubs.GetBitmapAsync("url", m_cts.Token)
            Await Examples.UntilCompletionOrCancellation(imageDownload, m_cts.Token)
            Dim image As Bitmap = Await imageDownload
            Stubs.Log(image)
        Catch ex As OperationCanceledException
        Finally
        End Try
    End Sub
End Class

Еще один пример раннего спасения включает использование WhenAny метода в сочетании с Delay методом, как описано в следующем разделе.

Task.Delay

Task.Delay Используйте метод, чтобы добавить паузы в выполнение асинхронного метода. Эта пауза полезна для многих видов функций, включая создание циклов опроса и задержку обработки входных данных пользователей в течение предопределенного периода времени. Этот метод можно использовать вместе с Task.WhenAny для реализации времени ожидания на ожидания.

Если задача, которая является частью более крупной асинхронной операции (например, веб-службы ASP.NET) занимает слишком много времени, общая операция может пострадать, особенно если она не завершится. По этой причине важно иметь возможность прерывать асинхронную операцию по истечении времени ожидания. Синхронные методы Task.Wait, Task.WaitAll и Task.WaitAny принимают значения времени ожидания, но соответствующие TaskFactory.ContinueWhenAll/TaskFactory.ContinueWhenAny и ранее упомянутые Task.WhenAll/Task.WhenAny методы не принимают. Вместо этого используйте Task.Delay и Task.WhenAny вместе для реализации времени ожидания.

Например, в приложении пользовательского интерфейса предположим, что вы хотите скачать изображение и отключить пользовательский интерфейс во время скачивания изображения. Однако если скачивание занимает слишком много времени, необходимо повторно включить пользовательский интерфейс и отменить скачивание:

public static async Task<Bitmap?> DownloadWithTimeout(string url)
{
    Task<Bitmap> download = Stubs.GetBitmapAsync(url);
    if (download == await Task.WhenAny(download, Task.Delay(3000)))
    {
        return await download;
    }
    else
    {
        var ignored = download.ContinueWith(
            t => Trace($"Task finally completed: {t.Status}"));
        return null;
    }
}

static void Trace(string message) => Console.WriteLine(message);
Public Async Function DownloadWithTimeout(url As String) As Task(Of Bitmap)
    Dim download As Task(Of Bitmap) = Stubs.GetBitmapAsync(url)
    If download Is Await Task.WhenAny(download, Task.Delay(3000)) Then
        Return Await download
    Else
        Dim ignored = download.ContinueWith(Sub(t) TraceMsg($"Task finally completed: {t.Status}"))
        Return Nothing
    End If
End Function

Тот же принцип применяется к нескольким скачиваниям, так как WhenAll возвращает задачу:

public static async Task<Bitmap[]?> DownloadMultipleWithTimeout(string[] imageUrls)
{
    Task<Bitmap[]> downloads =
        Task.WhenAll(from url in imageUrls select Stubs.GetBitmapAsync(url));
    if (downloads == await Task.WhenAny(downloads, Task.Delay(3000)))
    {
        return await downloads;
    }
    else
    {
        downloads.ContinueWith(t => Stubs.Log(t));
        return null;
    }
}
Public Async Function DownloadMultipleWithTimeout(imageUrls As String()) As Task(Of Bitmap())
    Dim downloads As Task(Of Bitmap()) =
        Task.WhenAll(From url In imageUrls Select Stubs.GetBitmapAsync(url))
    If downloads Is Await Task.WhenAny(downloads, Task.Delay(3000)) Then
        Return Await downloads
    Else
        downloads.ContinueWith(Sub(t) Stubs.Log(t))
        Return Nothing
    End If
End Function

Создание комбинаторов на основе задач

Так как задача может полностью представлять асинхронную операцию и предоставлять синхронные и асинхронные возможности для объединения с операцией, получения результатов и т. д., можно создавать полезные библиотеки комбинаторов, которые создают задачи для создания более крупных шаблонов. Как описано в предыдущем разделе, .NET включает несколько встроенных комбинаторов, но вы также можете создавать собственные. В следующих разделах приведены несколько примеров потенциальных методов и типов комбинатора.

Повторная попытка при ошибке

Во многих ситуациях необходимо повторить операцию, если предыдущая попытка завершится ошибкой. Для синхронного кода можно создать вспомогательный метод, например RetryOnFault в следующем примере для выполнения этой задачи:

public static T RetryOnFault<T>(Func<T> function, int maxTries)
{
    for (int i = 0; i < maxTries; i++)
    {
        try { return function(); }
        catch { if (i == maxTries - 1) throw; }
    }
    return default(T)!;
}
Public Function RetryOnFaultSync(Of T)(func As Func(Of T), maxTries As Integer) As T
    For i As Integer = 0 To maxTries - 1
        Try
            Return func()
        Catch
            If i = maxTries - 1 Then Throw
        End Try
    Next
    Return Nothing
End Function

Вы можете создать практически идентичный вспомогательный метод для асинхронных операций, реализованных с помощью TAP, и таким образом возвращать задачи:

public static async Task<T> RetryOnFault<T>(Func<Task<T>> function, int maxTries)
{
    for (int i = 0; i < maxTries; i++)
    {
        try { return await function().ConfigureAwait(false); }
        catch { if (i == maxTries - 1) throw; }
    }
    return default(T)!;
}
Public Async Function RetryOnFault(Of T)(func As Func(Of Task(Of T)), maxTries As Integer) As Task(Of T)
    For i As Integer = 0 To maxTries - 1
        Try
            Return Await func().ConfigureAwait(False)
        Catch
            If i = maxTries - 1 Then Throw
        End Try
    Next
    Return Nothing
End Function

Затем этот комбинатор можно использовать для кодирования повторных попыток в логику приложения. Рассмотрим пример.

// Download the URL, trying up to three times in case of failure
string pageContents = await RetryOnFault(
    () => DownloadStringTaskAsync(url), 3);

Вы можете расширить функцию RetryOnFault дальше. Например, функция может принимать другой Func<Task>, который вызывается между попытками, чтобы определить, когда снова попробовать выполнить операцию. Рассмотрим пример.

public static async Task<T> RetryOnFaultWithDelay<T>(
    Func<Task<T>> function, int maxTries, Func<Task> retryWhen)
{
    for (int i = 0; i < maxTries; i++)
    {
        try { return await function().ConfigureAwait(false); }
        catch { if (i == maxTries - 1) throw; }
        await retryWhen().ConfigureAwait(false);
    }
    return default(T)!;
}
Public Async Function RetryOnFaultWithDelay(Of T)(
    func As Func(Of Task(Of T)), maxTries As Integer, retryWhen As Func(Of Task)) As Task(Of T)
    For i As Integer = 0 To maxTries - 1
        Try
            Return Await func().ConfigureAwait(False)
        Catch
            If i = maxTries - 1 Then Throw
        End Try
        Await retryWhen().ConfigureAwait(False)
    Next
    Return Nothing
End Function

Затем можно использовать функцию следующим образом, чтобы дождаться секунды перед повтором операции:

// Download the URL, trying up to three times in case of failure,
// and delaying for a second between retries
string pageContents = await RetryOnFault(
    () => DownloadStringTaskAsync(url), 3, () => Task.Delay(1000));

NeedOnlyOne

Иногда вы можете воспользоваться преимуществами избыточности, чтобы улучшить задержку операции и шансы на успех. Рассмотрим несколько веб-служб, которые предоставляют акции, но в разное время суток каждая служба может обеспечить различные уровни качества и времени отклика. Чтобы справиться с этими колебаниями, вы можете выдавать запросы ко всем веб-службам, и как только вы получите ответ от одного, отмените остальные запросы. Вы можете реализовать вспомогающую функцию, чтобы упростить реализацию этого общего шаблона запуска нескольких операций, ожидая любых операций, а затем отменить остальные. Функция NeedOnlyOne в следующем примере иллюстрирует этот сценарий:

public static async Task<T> NeedOnlyOne<T>(
    params Func<CancellationToken, Task<T>>[] functions)
{
    var cts = new CancellationTokenSource();
    var tasks = (from function in functions
                 select function(cts.Token)).ToArray();
    var completed = await Task.WhenAny(tasks).ConfigureAwait(false);
    cts.Cancel();
    foreach (var task in tasks)
    {
        var ignored = task.ContinueWith(
            t => Stubs.Log(t), TaskContinuationOptions.OnlyOnFaulted);
    }
    return await completed;
}
Public Async Function NeedOnlyOne(Of T)(
    ParamArray functions As Func(Of CancellationToken, Task(Of T))()) As Task(Of T)
    Dim cts As New CancellationTokenSource()
    Dim tasks As Task(Of T)() = (From func In functions Select func(cts.Token)).ToArray()
    Dim completed As Task(Of T) = Await Task.WhenAny(tasks).ConfigureAwait(False)
    cts.Cancel()
    For Each task In tasks
        Dim ignored = task.ContinueWith(
            Sub(tsk) Stubs.Log(tsk), TaskContinuationOptions.OnlyOnFaulted)
    Next
    Return Await completed
End Function

Затем эту функцию можно использовать следующим образом:

double currentPrice = await NeedOnlyOne(
    ct => GetCurrentPriceFromServer1Async("msft", ct),
    ct => GetCurrentPriceFromServer2Async("msft", ct),
    ct => GetCurrentPriceFromServer3Async("msft", ct));

Чересстрочные операции

Использование метода WhenAny для поддержки сценария взаимопереплетения может вызвать проблему с производительностью при работе с большими наборами задач. Каждый вызов WhenAny регистрирует продолжение для каждой задачи. Для N числа задач этот процесс создает продолжения O(N2) за время существования операции переключения. Если вы работаете с большим набором задач, используйте комбинатор (Interleaved в следующем примере) для решения проблемы с производительностью:

public static IEnumerable<Task<T>> Interleaved<T>(IEnumerable<Task<T>> tasks)
{
    var inputTasks = tasks.ToList();
    var sources = (from _ in Enumerable.Range(0, inputTasks.Count)
                   select new TaskCompletionSource<T>()).ToList();
    int nextTaskIndex = -1;
    foreach (var inputTask in inputTasks)
    {
        inputTask.ContinueWith(completed =>
        {
            var source = sources[Interlocked.Increment(ref nextTaskIndex)];
            if (completed.IsFaulted)
                source.TrySetException(completed.Exception!.InnerExceptions);
            else if (completed.IsCanceled)
                source.TrySetCanceled();
            else
                source.TrySetResult(completed.Result);
        }, CancellationToken.None,
           TaskContinuationOptions.ExecuteSynchronously,
           TaskScheduler.Default);
    }
    return from source in sources
           select source.Task;
}
Public Function Interleaved(Of T)(tasks As IEnumerable(Of Task(Of T))) As IEnumerable(Of Task(Of T))
    Dim inputTasks As List(Of Task(Of T)) = tasks.ToList()
    Dim sources As List(Of TaskCompletionSource(Of T)) =
        (From _i In Enumerable.Range(0, inputTasks.Count) Select New TaskCompletionSource(Of T)()).ToList()
    Dim indexRef As Integer() = {-1}
    For Each inputTask In inputTasks
        inputTask.ContinueWith(Sub(completed)
                                   Dim idx = Interlocked.Increment(indexRef(0))
                                   Dim source = sources(idx)
                                   If completed.IsFaulted Then
                                       source.TrySetException(completed.Exception.InnerExceptions)
                                   ElseIf completed.IsCanceled Then
                                       source.TrySetCanceled()
                                   Else
                                       source.TrySetResult(completed.Result)
                                   End If
                               End Sub,
                               CancellationToken.None,
                               TaskContinuationOptions.ExecuteSynchronously,
                               TaskScheduler.Default)
    Next
    Return From source In sources Select source.Task
End Function

Используйте комбинатор для обработки результатов задач по мере их выполнения. Рассмотрим пример.

IEnumerable<Task<int>> tasks = ...;
foreach(var task in Interleaved(tasks))
{
    int result = await task;
    …
}

WhenAllOrFirstException

В некоторых сценариях разбиения/сбора данных может потребоваться ждать выполнения всех задач в наборе, за исключением случая, если одна из них заканчивается сбоем. В этом случае необходимо прекратить ожидание сразу после возникновения исключения. Это поведение можно выполнить с помощью метода комбинатора, например WhenAllOrFirstException в следующем примере:

public static Task<T[]> WhenAllOrFirstException<T>(IEnumerable<Task<T>> tasks)
{
    var inputs = tasks.ToList();
    var ce = new CountdownEvent(inputs.Count);
    var tcs = new TaskCompletionSource<T[]>();

    Action<Task> onCompleted = (Task completed) =>
    {
        if (completed.IsFaulted)
            tcs.TrySetException(completed.Exception!.InnerExceptions);
        if (ce.Signal() && !tcs.Task.IsCompleted)
            tcs.TrySetResult(inputs.Select(t => ((Task<T>)t).Result).ToArray());
    };

    foreach (var t in inputs) t.ContinueWith(onCompleted);
    return tcs.Task;
}
Public Function WhenAllOrFirstException(Of T)(tasks As IEnumerable(Of Task(Of T))) As Task(Of T())
    Dim inputs As List(Of Task(Of T)) = tasks.ToList()
    Dim ce As New CountdownEvent(inputs.Count)
    Dim tcs As New TaskCompletionSource(Of T())()

    Dim onCompleted As Action(Of Task) = Sub(completed As Task)
                                             If completed.IsFaulted Then
                                                 tcs.TrySetException(completed.Exception.InnerExceptions)
                                             End If
                                             If ce.Signal() AndAlso Not tcs.Task.IsCompleted Then
                                                 tcs.TrySetResult(inputs.Select(Function(taskItem) DirectCast(taskItem, Task(Of T)).Result).ToArray())
                                             End If
                                         End Sub

    For Each t In inputs
        t.ContinueWith(onCompleted)
    Next
    Return tcs.Task
End Function

Создание структур данных на основе задач

Помимо возможности создания пользовательских комбинаторов на основе задач, наличие структуры данных в Task и Task<TResult>, которая представляет результаты асинхронной операции и обеспечивает необходимую синхронизацию для объединения с ней, делает этот тип мощным инструментом для построения пользовательских структур данных, которые будут использоваться в асинхронных сценариях.

Асинхронный Кэш

Одним из важных аспектов задачи является то, что вы можете передать его нескольким потребителям. Все потребители могут ожидать его, регистрировать продолжения с ним, получать его результат или исключения (в случае Task<TResult>) и т. д. Этот аспект делает Task и Task<TResult> идеально подходящими для использования в асинхронной инфраструктуре кэширования. Ниже приведен пример небольшого, но мощного асинхронного кэша, созданного на основе Task<TResult>:

public class AsyncCache<TKey, TValue> where TKey : notnull
{
    private readonly Func<TKey, Task<TValue>> _valueFactory;
    private readonly ConcurrentDictionary<TKey, Lazy<Task<TValue>>> _map;

    public AsyncCache(Func<TKey, Task<TValue>> valueFactory)
    {
        if (valueFactory == null) throw new ArgumentNullException(nameof(valueFactory));
        _valueFactory = valueFactory;
        _map = new ConcurrentDictionary<TKey, Lazy<Task<TValue>>>();
    }

    public Task<TValue> this[TKey key]
    {
        get
        {
            if (key == null) throw new ArgumentNullException(nameof(key));
            return _map.GetOrAdd(key, toAdd =>
                new Lazy<Task<TValue>>(() => _valueFactory(toAdd))).Value;
        }
    }
}
Public Class AsyncCache(Of TKey, TValue)
    Private ReadOnly _valueFactory As Func(Of TKey, Task(Of TValue))
    Private ReadOnly _map As New ConcurrentDictionary(Of TKey, Lazy(Of Task(Of TValue)))()

    Public Sub New(valueFactory As Func(Of TKey, Task(Of TValue)))
        If valueFactory Is Nothing Then Throw New ArgumentNullException(NameOf(valueFactory))
        _valueFactory = valueFactory
    End Sub

    Default Public ReadOnly Property Item(key As TKey) As Task(Of TValue)
        Get
            If key Is Nothing Then Throw New ArgumentNullException(NameOf(key))
            Return _map.GetOrAdd(key, Function(toAdd) New Lazy(Of Task(Of TValue))(Function() _valueFactory(toAdd))).Value
        End Get
    End Property
End Class

Класс AsyncCache<TKey,TValue> принимает в качестве делегата для своего конструктора функцию, которая принимает TKey и возвращает Task<TResult>. Внутренний словарь сохраняет любые ранее доступные значения из кэша, и AsyncCache гарантирует, что он создает только одну задачу на ключ, даже если кэш используется одновременно.

Например, можно создать кэш для скачанных веб-страниц:

private AsyncCache<string,string> m_webPages =
    new AsyncCache<string,string>(DownloadStringTaskAsync);

Затем этот кэш можно использовать в асинхронных методах, когда требуется содержимое веб-страницы. Класс AsyncCache гарантирует, что вы загружаете как можно меньше страниц и кэшируете результаты.

static AsyncCache<string, string> m_webPages =
    new AsyncCache<string, string>(url => Stubs.DownloadStringTaskAsync(url));

public static async Task UseWebPageCache(string url)
{
    string contents = await m_webPages[url];
    Console.WriteLine(contents.Length);
}
Private m_webPages As New AsyncCache(Of String, String)(Function(url) Stubs.DownloadStringTaskAsync(url))

Public Async Function UseWebPageCache(url As String) As Task
    Dim contents As String = Await m_webPages(url)
    Console.WriteLine(contents.Length)
End Function

AsyncProducerConsumerCollection

Вы также можете использовать задачи для создания структур данных для координации асинхронных действий. Рассмотрим один из классических шаблонов параллельного проектирования: производитель или потребитель. В этом шаблоне производители создают данные, которые используют потребители, и производители и потребители могут выполняться параллельно. Например, потребитель обрабатывает элемент 1, который ранее был создан производителем, который в настоящее время производит элемент 2. Для шаблона производителя или потребителя всегда требуется некоторая структура данных для хранения работы, созданной производителями, чтобы потребители могли получать уведомления о новых данных и находить их при наличии.

Ниже приведена простая структура данных, построенная на основе задач, которая позволяет асинхронным методам использоваться в качестве производителей и потребителей:

public class AsyncProducerConsumerCollection<T>
{
    private readonly Queue<T> m_collection = new Queue<T>();
    private readonly Queue<TaskCompletionSource<T>> m_waiting =
        new Queue<TaskCompletionSource<T>>();

    public void Add(T item)
    {
        TaskCompletionSource<T>? tcs = null;
        lock (m_collection)
        {
            if (m_waiting.Count > 0) tcs = m_waiting.Dequeue();
            else m_collection.Enqueue(item);
        }
        if (tcs != null) tcs.TrySetResult(item);
    }

    public Task<T> Take()
    {
        lock (m_collection)
        {
            if (m_collection.Count > 0)
            {
                return Task.FromResult(m_collection.Dequeue());
            }
            else
            {
                var tcs = new TaskCompletionSource<T>();
                m_waiting.Enqueue(tcs);
                return tcs.Task;
            }
        }
    }
}
Public Class AsyncProducerConsumerCollection(Of T)
    Private ReadOnly m_collection As New Queue(Of T)()
    Private ReadOnly m_waiting As New Queue(Of TaskCompletionSource(Of T))()

    Public Sub Add(item As T)
        Dim tcs As TaskCompletionSource(Of T) = Nothing
        SyncLock m_collection
            If m_waiting.Count > 0 Then
                tcs = m_waiting.Dequeue()
            Else
                m_collection.Enqueue(item)
            End If
        End SyncLock
        If tcs IsNot Nothing Then tcs.TrySetResult(item)
    End Sub

    Public Function Take() As Task(Of T)
        SyncLock m_collection
            If m_collection.Count > 0 Then
                Return Task.FromResult(m_collection.Dequeue())
            Else
                Dim tcs As New TaskCompletionSource(Of T)()
                m_waiting.Enqueue(tcs)
                Return tcs.Task
            End If
        End SyncLock
    End Function
End Class

Используя эту структуру данных, можно написать код, например следующий:

static AsyncProducerConsumerCollection<int> m_data = new();

public static async Task ConsumerAsync()
{
    while (true)
    {
        int nextItem = await m_data.Take();
        Stubs.ProcessNextItem(nextItem);
    }
}

public static void Produce(int data)
{
    m_data.Add(data);
}
Private m_data As New AsyncProducerConsumerCollection(Of Integer)()

Public Async Function ConsumerAsync() As Task
    While True
        Dim nextItem As Integer = Await m_data.Take()
        Stubs.ProcessNextItem(nextItem)
    End While
End Function

Public Sub Produce(data As Integer)
    m_data.Add(data)
End Sub

Пространство System.Threading.Tasks.Dataflow имен включает BufferBlock<T> тип, который можно использовать подобным образом, но без необходимости создавать собственный тип коллекции.

static BufferBlock<int> m_dataBlock = new();

public static async Task ConsumerAsyncBlock()
{
    while (true)
    {
        int nextItem = await m_dataBlock.ReceiveAsync();
        Stubs.ProcessNextItem(nextItem);
    }
}

public static void ProduceBlock(int data)
{
    m_dataBlock.Post(data);
}
Private m_dataBlock As New BufferBlock(Of Integer)()

Public Async Function ConsumerAsyncBlock() As Task
    While True
        Dim nextItem As Integer = Await m_dataBlock.ReceiveAsync()
        Stubs.ProcessNextItem(nextItem)
    End While
End Function

Public Sub ProduceBlock(data As Integer)
    m_dataBlock.Post(data)
End Sub

Замечание

Пространство System.Threading.Tasks.Dataflow имен доступно в виде пакета NuGet. Чтобы установить сборку, содержащую System.Threading.Tasks.Dataflow пространство имен, откройте проект в Visual Studio, выберите "Управление пакетами NuGet " в меню "Проект" и найдите пакет в Интернете System.Threading.Tasks.Dataflow .

См. также