同步上下文與控制台應用程式

像 Windows Forms、WPF 和 .NET MAUI 這類 UI 框架會在他們的 UI 執行緒上安裝一個 SynchronizationContext。 當你 await 在這些環境中執行任務時,延續會自動回傳到 UI 執行緒。 主控台應用程式不會安裝 SynchronizationContext,這代表 await 續集會在執行緒池上執行。 本文說明了其後果,並示範在需要時如何建立單線程訊息泵。

控制台應用程式的預設行為

在控制台應用程式中,SynchronizationContext.Current 會回傳 null。 當方法在 處 await輸出時,延續運算會在可用的執行緒池執行緒上執行:

static void DefaultBehaviorDemo()
{
    DemoAsync().GetAwaiter().GetResult();
}

static async Task DemoAsync()
{
    var d = new Dictionary<int, int>();
    for (int i = 0; i < 10_000; i++)
    {
        int id = Thread.CurrentThread.ManagedThreadId;
        d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

        await Task.Yield();
    }

    foreach (var pair in d)
        Console.WriteLine(pair);
}

執行此程式的代表性輸出:

[1, 1]
[3, 2687]
[4, 2399]
[5, 2397]
[6, 2516]

執行緒 1(主執行緒)只在第一次同步迭代中出現一次,然後 await Task.Yield() 暫停該方法。 之後所有迭代都在執行緒池執行緒上執行。

現代非同步進入點

從 C# 7.1 開始,你可以宣告 Mainasync Taskasync Task<int>。 在 C# 9 及以後版本中,你可以直接使用頂層語句:await

// Top-level statements (C# 9+)
await DemoAsync();
// async Task Main (C# 7.1+)
static async Task Main()
{
    await DemoAsync();
}

這些切入點不會安裝 SynchronizationContext. 執行時會產生一個引導程序,呼叫你的非同步方法,並在回傳的 Task上進行阻塞,類似於呼叫 .GetAwaiter().GetResult()。 延續運算仍然在執行緒池中執行。

當你需要線程親和力時

對於許多控制台應用程式來說,在執行緒池上運行延續程式是可以的。 然而,有些情境要求所有續接必須在單一執行緒上執行:

  • 序列化執行:多個並行的非同步操作透過在同一執行緒上執行續行操作,共享狀態且不鎖。
  • 函式庫需求:部分函式庫或 COM 物件要求與特定執行緒有親和力。
  • 單元測試:測試框架可能需要非同步程式碼的確定性單執行緒執行。

建立單執行緒的同步上下文

要在同一執行緒上執行所有續集,你需要兩樣東西:

  1. SynchronizationContextPost方法佇列可運作至執行緒安全的集合。
  2. 一個訊息泵迴圈,負責處理目標執行緒上的該佇列。

自訂上下文

上下文中使用 a BlockingCollection<T> 來協調生產者(非同步延續)與消費者(泵送迴路):

sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
Class SingleThreadSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

AsyncPump.Run 方法

AsyncPump.Run 安裝自訂上下文,呼叫非同步方法,並在呼叫執行緒上持續執行,直到方法完成:

static class AsyncPump
{
    public static void Run(Func<Task> func)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new SingleThreadSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Task t;
            try
            {
                t = func();
            }
            catch
            {
                syncCtx.Complete();
                throw;
            }

            t.ContinueWith(
                _ => syncCtx.Complete(), TaskScheduler.Default);

            syncCtx.RunOnCurrentThread();

            t.GetAwaiter().GetResult();
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
Class AsyncPump
    Public Shared Sub Run(func As Func(Of Task))
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New SingleThreadSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            Dim t As Task
            Try
                t = func()
            Catch
                syncCtx.Complete()
                Throw
            End Try

            t.ContinueWith(
                Sub(unused) syncCtx.Complete(), TaskScheduler.Default)

            syncCtx.RunOnCurrentThread()

            t.GetAwaiter().GetResult()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub

看看它的實際運作

將預設呼叫替換為 AsyncPump.Run

static void AsyncPumpDemo()
{
    AsyncPump.Run(async () =>
    {
        var d = new Dictionary<int, int>();
        for (int i = 0; i < 10_000; i++)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

            await Task.Yield();
        }

        foreach (var pair in d)
            Console.WriteLine(pair);
    });
}
Sub AsyncPumpDemo()
    AsyncPump.Run(
        Async Function() As Task
            Dim d As New Dictionary(Of Integer, Integer)()
            For i As Integer = 0 To 9999
                Dim id As Integer = Thread.CurrentThread.ManagedThreadId
                Dim count As Integer
                If d.TryGetValue(id, count) Then
                    d(id) = count + 1
                Else
                    d(id) = 1
                End If

                Await Task.Yield()
            Next

            For Each pair In d
                Console.WriteLine(pair)
            Next
        End Function)
End Sub

輸出:

[1, 10000]

具體執行緒 ID 可能會因執行環境和平台而異,但關鍵結果是所有 10,000 次迭代都在單一執行緒上執行:主執行緒。

處理非同步空法

過載會通過返回的 Task 來追蹤完成情況Func<Task>。 非同步void方法不會返回任務,而是透過OperationStarted()OperationCompleted()通知當前SynchronizationContext。 為了支援非同步 void 方法,請擴展上下文以追蹤未完成的操作:

    public static void Run(Action asyncMethod)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new AsyncVoidSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Exception? caughtException = null;

            syncCtx.OperationStarted();
            try
            {
                asyncMethod();
            }
            catch (Exception ex)
            {
                caughtException = ex;
                syncCtx.Complete();
            }
            finally
            {
                syncCtx.OperationCompleted();
            }

            syncCtx.RunOnCurrentThread();

            if (caughtException is not null)
            {
                System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(caughtException).Throw();
            }
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
}

sealed class AsyncVoidSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();
    private int _operationCount;

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public override void OperationStarted() =>
        Interlocked.Increment(ref _operationCount);

    public override void OperationCompleted()
    {
        if (Interlocked.Decrement(ref _operationCount) == 0)
            Complete();
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
    Public Shared Sub Run(asyncMethod As Action)
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New AsyncVoidSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            syncCtx.OperationStarted()
            Try
                asyncMethod()
            Catch
                syncCtx.Complete()
                Throw
            Finally
                syncCtx.OperationCompleted()
            End Try

            syncCtx.RunOnCurrentThread()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub
End Class

Class AsyncVoidSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()
    Private _operationCount As Integer

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Overrides Sub OperationStarted()
        Interlocked.Increment(_operationCount)
    End Sub

    Public Overrides Sub OperationCompleted()
        If Interlocked.Decrement(_operationCount) = 0 Then
            Complete()
        End If
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

啟用作業追蹤時,泵浦僅在所有未完成的非同步 void 方法完成後才退出,而非僅頂層任務。

實務考量

  • 死鎖風險:若內部 AsyncPump.Run 執行的程式碼以同步方式阻塞(例如在必須回傳給泵浦的任務上呼叫 .Result.Wait()),訊息迴圈執行緒無法處理該續接。 結果是僵局。 同樣的問題也描述在 非同步方法的同步包裝器中。
  • 效能:單執行緒泵浦限制吞吐量為單一執行緒。 只有在線程綁定性很重要時才使用這種方法。
  • 跨平台:此處展示的AsyncPump實作僅使用與System.Collections.Concurrent命名空間的System.Threading型別。 它能在 .NET 支援的所有平台上運作。

另請參閱