Синхронизация контекста и консольные приложения

Платформы пользовательского интерфейса, такие как Windows Forms, WPF и .NET MAUI, устанавливают SynchronizationContext в своем потоке пользовательского интерфейса. await При выполнении задачи в этих средах продолжение автоматически отправляется в поток пользовательского интерфейса. Консольные приложения не устанавливают SynchronizationContext, что означает, что продолжения await выполняются в пуле потоков. В этой статье описываются последствия и показано, как создать однопоточный цикл обработки сообщений, когда он вам понадобится.

Поведение по умолчанию в консольном приложении

В консольном приложении SynchronizationContext.Current возвращает null. Когда метод приостанавливается на await, продолжение выполняется в любом доступном потоке пула потоков.

static void DefaultBehaviorDemo()
{
    DemoAsync().GetAwaiter().GetResult();
}

static async Task DemoAsync()
{
    var d = new Dictionary<int, int>();
    for (int i = 0; i < 10_000; i++)
    {
        int id = Thread.CurrentThread.ManagedThreadId;
        d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

        await Task.Yield();
    }

    foreach (var pair in d)
        Console.WriteLine(pair);
}

Репрезентативные выходные данные выполнения этой программы:

[1, 1]
[3, 2687]
[4, 2399]
[5, 2397]
[6, 2516]

Поток 1 (основной поток) отображается только один раз во время первой синхронной итерации, перед приостановкой выполнения метода await Task.Yield(). Все последующие итерации выполняются в потоках пула потоков.

Современные асинхронные точки входа

Начиная с C# 7.1, можно объявить Main как async Task или async Task<int>. В C# 9 и более поздних версиях можно использовать верхнеуровневые инструкции с await напрямую:

// Top-level statements (C# 9+)
await DemoAsync();
// async Task Main (C# 7.1+)
static async Task Main()
{
    await DemoAsync();
}

Эти точки входа не устанавливают SynchronizationContext. Среда выполнения создает вспомогательный код, который вызывает асинхронный метод и блокирует выполнение до получения результата Task, как при вызове .GetAwaiter().GetResult(). Продолжения по-прежнему выполняются в пуле потоков.

При необходимости привязки потоков

Для многих консольных приложений выполнение продолжений в пуле потоков допустимо. Однако для некоторых сценариев требуется, чтобы все продолжения выполнялись в одном потоке:

  • Сериализованное выполнение: несколько одновременных асинхронных операций совместно используют состояние без блокировок, выполняя свои продолжения в одном потоке.
  • Требования к библиотеке: некоторые библиотеки или COM-объекты требуют сходство с определенным потоком.
  • Модульное тестирование. Для платформ тестирования может потребоваться детерминированное однопоточное выполнение асинхронного кода.

Создание однопоточной синхронизацииContext

Чтобы запустить все продолжения в одном потоке, вам потребуется две вещи:

  1. Очередь SynchronizationContext методов которого Post работает в потокобезопасной коллекции.
  2. Цикл насоса сообщений, который обрабатывает очередь в целевом потоке.

Настраиваемый контекст

Контекст использует BlockingCollection<T> для координации производителей (асинхронных продолжений) и потребителя (помпового цикла):

sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
Class SingleThreadSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

Метод AsyncPump.Run

AsyncPump.Run устанавливает настраиваемый контекст, вызывает асинхронный метод и продолжает продолжать работу в вызываемом потоке до завершения метода:

static class AsyncPump
{
    public static void Run(Func<Task> func)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new SingleThreadSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Task t;
            try
            {
                t = func();
            }
            catch
            {
                syncCtx.Complete();
                throw;
            }

            t.ContinueWith(
                _ => syncCtx.Complete(), TaskScheduler.Default);

            syncCtx.RunOnCurrentThread();

            t.GetAwaiter().GetResult();
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
Class AsyncPump
    Public Shared Sub Run(func As Func(Of Task))
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New SingleThreadSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            Dim t As Task
            Try
                t = func()
            Catch
                syncCtx.Complete()
                Throw
            End Try

            t.ContinueWith(
                Sub(unused) syncCtx.Complete(), TaskScheduler.Default)

            syncCtx.RunOnCurrentThread()

            t.GetAwaiter().GetResult()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub

Наглядные примеры

Замените вызов по умолчанию на AsyncPump.Run.

static void AsyncPumpDemo()
{
    AsyncPump.Run(async () =>
    {
        var d = new Dictionary<int, int>();
        for (int i = 0; i < 10_000; i++)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

            await Task.Yield();
        }

        foreach (var pair in d)
            Console.WriteLine(pair);
    });
}
Sub AsyncPumpDemo()
    AsyncPump.Run(
        Async Function() As Task
            Dim d As New Dictionary(Of Integer, Integer)()
            For i As Integer = 0 To 9999
                Dim id As Integer = Thread.CurrentThread.ManagedThreadId
                Dim count As Integer
                If d.TryGetValue(id, count) Then
                    d(id) = count + 1
                Else
                    d(id) = 1
                End If

                Await Task.Yield()
            Next

            For Each pair In d
                Console.WriteLine(pair)
            Next
        End Function)
End Sub

Выходные данные:

[1, 10000]

Идентификатор конкретного потока может отличаться в зависимости от среды выполнения и платформы, но ключевой результат заключается в том, что все 10 000 итерации выполняются в одном потоке: основной поток.

Обработка асинхронных методов void

Перегрузка Func<Task> отслеживает завершение через возвращаемый Task объект. Асинхронные void методы не возвращают объект Task; вместо этого они уведомляют текущий SynchronizationContext через OperationStarted() и OperationCompleted(). Чтобы поддерживать асинхронные методы, расширьте контекст для отслеживания невыполненных void операций:

    public static void Run(Action asyncMethod)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new AsyncVoidSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Exception? caughtException = null;

            syncCtx.OperationStarted();
            try
            {
                asyncMethod();
            }
            catch (Exception ex)
            {
                caughtException = ex;
                syncCtx.Complete();
            }
            finally
            {
                syncCtx.OperationCompleted();
            }

            syncCtx.RunOnCurrentThread();

            if (caughtException is not null)
            {
                System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(caughtException).Throw();
            }
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
}

sealed class AsyncVoidSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();
    private int _operationCount;

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public override void OperationStarted() =>
        Interlocked.Increment(ref _operationCount);

    public override void OperationCompleted()
    {
        if (Interlocked.Decrement(ref _operationCount) == 0)
            Complete();
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
    Public Shared Sub Run(asyncMethod As Action)
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New AsyncVoidSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            syncCtx.OperationStarted()
            Try
                asyncMethod()
            Catch
                syncCtx.Complete()
                Throw
            Finally
                syncCtx.OperationCompleted()
            End Try

            syncCtx.RunOnCurrentThread()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub
End Class

Class AsyncVoidSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()
    Private _operationCount As Integer

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Overrides Sub OperationStarted()
        Interlocked.Increment(_operationCount)
    End Sub

    Public Overrides Sub OperationCompleted()
        If Interlocked.Decrement(_operationCount) = 0 Then
            Complete()
        End If
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

При включенном отслеживании операций насос завершает работу только после завершения всех невыполненных асинхронных void методов, а не только задачи верхнего уровня.

Практические рекомендации

  • Риск взаимоблокировки: если код выполняется внутри AsyncPump.Run блоков синхронно (например, путем вызова .Result или .Wait() задачи, продолжение которой должно отправляться обратно в насос), поток насоса не может обработать это продолжение. Результатом является взаимоблокировка. Эта же проблема описана в синхронных оболочках для асинхронных методов.
  • Производительность: однопоточный насос ограничивает пропускную способность до уровня одного потока. Используйте этот подход только в том случае, если имеет значение привязка потоков.
  • Кроссплатформенная реализация, показанная AsyncPump здесь, использует только типы из пространств имен System.Collections.Concurrent и System.Threading. Он работает на всех платформах, которые поддерживает .NET.

См. также