SynchronizationContext 和控制台应用

UI 框架(如 Windows 窗体、WPF 和 .NET MAUI)在其 UI 线程上安装 SynchronizationContext。 当你在这些环境中 await 一个任务时,延续会自动发布回 UI 线程。 控制台应用程序不安装 SynchronizationContext,这意味着 await 延续在线程池上运行。 本文介绍后果,并说明在需要单线程消息泵时如何生成单线程消息泵。

控制台应用中的默认行为

在控制台应用中, SynchronizationContext.Current 返回 null。 当一个方法在 await 处产生结果时,延续将在任何可用的线程池线程上运行:

static void DefaultBehaviorDemo()
{
    DemoAsync().GetAwaiter().GetResult();
}

static async Task DemoAsync()
{
    var d = new Dictionary<int, int>();
    for (int i = 0; i < 10_000; i++)
    {
        int id = Thread.CurrentThread.ManagedThreadId;
        d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

        await Task.Yield();
    }

    foreach (var pair in d)
        Console.WriteLine(pair);
}

运行此程序的代表输出:

[1, 1]
[3, 2687]
[4, 2399]
[5, 2397]
[6, 2516]

线程 1(主线程)仅在暂停方法前的第一次同步迭代期间 await Task.Yield() 出现一次。 所有后续迭代都在线程池线程上运行。

新式异步入口点

从 C# 7.1 开始,可以声明 Mainasync Taskasync Task<int>. 在 C# 9 及更高版本中,可以直接使用顶级语句 await

// Top-level statements (C# 9+)
await DemoAsync();
// async Task Main (C# 7.1+)
static async Task Main()
{
    await DemoAsync();
}

这些入口点不安装 SynchronizationContext。 运行时生成一个引导程序,调用你的异步方法,并在返回的 Task 上阻塞,类似于调用 .GetAwaiter().GetResult()。 延续仍在线程池上运行。

当需要线程亲和性时

对于许多控制台应用程序,在线程池上运行延续任务是可以接受的。 但是,某些方案要求所有延续在单个线程上运行:

  • 序列化执行:多个并发异步操作通过在同一线程上运行延续来共享无锁状态。
  • 库要求:某些库或 COM 对象需要与特定线程关联。
  • 单元测试:测试框架可能需要确定性的单线程执行异步代码。

构建单线程 SynchronizationContext

若要在一个线程上运行所有延续,需要两项操作:

  1. SynchronizationContext,其 Post 方法将工作队列存储到一个线程安全的集合中。
  2. 在目标线程上处理该队列的消息泵循环。

自定义上下文

上下文使用 a BlockingCollection<T> 来协调生成者(异步延续)和使用者(抽水循环):

sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
Class SingleThreadSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

AsyncPump.Run 方法

AsyncPump.Run 安装自定义上下文,调用异步方法,并在调用线程上泵送延续,直到方法完成:

static class AsyncPump
{
    public static void Run(Func<Task> func)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new SingleThreadSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Task t;
            try
            {
                t = func();
            }
            catch
            {
                syncCtx.Complete();
                throw;
            }

            t.ContinueWith(
                _ => syncCtx.Complete(), TaskScheduler.Default);

            syncCtx.RunOnCurrentThread();

            t.GetAwaiter().GetResult();
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
Class AsyncPump
    Public Shared Sub Run(func As Func(Of Task))
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New SingleThreadSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            Dim t As Task
            Try
                t = func()
            Catch
                syncCtx.Complete()
                Throw
            End Try

            t.ContinueWith(
                Sub(unused) syncCtx.Complete(), TaskScheduler.Default)

            syncCtx.RunOnCurrentThread()

            t.GetAwaiter().GetResult()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub

查看实际操作

将默认调用替换为 AsyncPump.Run

static void AsyncPumpDemo()
{
    AsyncPump.Run(async () =>
    {
        var d = new Dictionary<int, int>();
        for (int i = 0; i < 10_000; i++)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

            await Task.Yield();
        }

        foreach (var pair in d)
            Console.WriteLine(pair);
    });
}
Sub AsyncPumpDemo()
    AsyncPump.Run(
        Async Function() As Task
            Dim d As New Dictionary(Of Integer, Integer)()
            For i As Integer = 0 To 9999
                Dim id As Integer = Thread.CurrentThread.ManagedThreadId
                Dim count As Integer
                If d.TryGetValue(id, count) Then
                    d(id) = count + 1
                Else
                    d(id) = 1
                End If

                Await Task.Yield()
            Next

            For Each pair In d
                Console.WriteLine(pair)
            Next
        End Function)
End Sub

输出:

[1, 10000]

特定线程 ID 可能因运行时和平台而异,但关键结果是所有 10,000 次迭代都在单个线程上运行:主线程。

处理异步 void 方法

Func<Task> 重载通过返回的 Task 跟踪完成情况。 异步 void 方法不会返回任务;而是通知当前 SynchronizationContext 通过 OperationStarted()OperationCompleted()。 若要支持异步 void 方法,请扩展上下文以跟踪未完成的操作:

    public static void Run(Action asyncMethod)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new AsyncVoidSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Exception? caughtException = null;

            syncCtx.OperationStarted();
            try
            {
                asyncMethod();
            }
            catch (Exception ex)
            {
                caughtException = ex;
                syncCtx.Complete();
            }
            finally
            {
                syncCtx.OperationCompleted();
            }

            syncCtx.RunOnCurrentThread();

            if (caughtException is not null)
            {
                System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(caughtException).Throw();
            }
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
}

sealed class AsyncVoidSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();
    private int _operationCount;

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public override void OperationStarted() =>
        Interlocked.Increment(ref _operationCount);

    public override void OperationCompleted()
    {
        if (Interlocked.Decrement(ref _operationCount) == 0)
            Complete();
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
    Public Shared Sub Run(asyncMethod As Action)
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New AsyncVoidSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            syncCtx.OperationStarted()
            Try
                asyncMethod()
            Catch
                syncCtx.Complete()
                Throw
            Finally
                syncCtx.OperationCompleted()
            End Try

            syncCtx.RunOnCurrentThread()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub
End Class

Class AsyncVoidSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()
    Private _operationCount As Integer

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Overrides Sub OperationStarted()
        Interlocked.Increment(_operationCount)
    End Sub

    Public Overrides Sub OperationCompleted()
        If Interlocked.Decrement(_operationCount) = 0 Then
            Complete()
        End If
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

启用操作跟踪后,泵仅在所有未完成的 async void 方法完成时退出,而不仅仅是顶级任务。

实际注意事项

  • 死锁风险:如果代码在AsyncPump.Run块内同步运行(例如,通过调用.Result.Wait()来执行必须返回到泵线程的任务的后续操作),那么泵线程将无法处理该后续操作。 最终结果是陷入死锁。 异步方法的同步包装器中介绍了同样的问题。
  • 性能:单线程泵将吞吐量限制为一个线程。 仅当线程相关性很重要时,才使用此方法。
  • 跨平台:此处所示的实现仅使用来自 System.Collections.ConcurrentSystem.Threading 命名空间的类型。 它适用于.NET支持的所有平台。

另见