SynchronizationContext a konzolové aplikace

Architektury uživatelského rozhraní, jako jsou model Windows Forms, WPF (Windows Presentation Foundation) a .NET MAUI nainstalují na vlákno uživatelského rozhraní SynchronizationContext. Když v těchto prostředích provedete await úlohu, pokračování se automaticky vrátí do vlákna uživatelského rozhraní. Konzolové aplikace nenainstalují SynchronizationContext, což znamená, že pokračování await běží ve fondu vláken. Tento článek vysvětluje důsledky a ukazuje, jak vytvořit jednovláknové čerpadlo zpráv, když ho potřebujete.

Výchozí chování v aplikaci konzoly

V konzolové aplikaci SynchronizationContext.Current vrátí hodnotu null. Když metoda dosáhne výnosu await, pokračování se spustí na jakémkoli dostupném vlákně z fondu vláken:

static void DefaultBehaviorDemo()
{
    DemoAsync().GetAwaiter().GetResult();
}

static async Task DemoAsync()
{
    var d = new Dictionary<int, int>();
    for (int i = 0; i < 10_000; i++)
    {
        int id = Thread.CurrentThread.ManagedThreadId;
        d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

        await Task.Yield();
    }

    foreach (var pair in d)
        Console.WriteLine(pair);
}

Reprezentativní výstup ze spuštění tohoto programu:

[1, 1]
[3, 2687]
[4, 2399]
[5, 2397]
[6, 2516]

Vlákno 1 (hlavní vlákno) se zobrazí pouze jednou během první synchronní iterace před await Task.Yield() pozastavením metody. Následné iterace běží na vláknech ve fondu vláken.

Moderní asynchronní vstupní body

Počínaje jazykem C# 7.1 můžete deklarovat Main jako async Task nebo async Task<int>. V jazyce C# 9 a novějších můžete použít příkazy nejvyšší úrovně přímo await :

// Top-level statements (C# 9+)
await DemoAsync();
// async Task Main (C# 7.1+)
static async Task Main()
{
    await DemoAsync();
}

Tyto vstupní body nenainstalují SynchronizationContext. Modul runtime vygeneruje metodu bootstrap, která volá asynchronní metodu a blokuje vrácenou metodu Task, podobně jako volání .GetAwaiter().GetResult(). Pokračování stále běží na vlákenném poolu.

Když potřebujete spřažení vláken

U mnoha konzolových aplikací je spuštění pokračování ve fondu vláken v pořádku. Některé scénáře však vyžadují, aby se všechna pokračování spouštěla v jednom vlákně:

  • Serializované spuštění: Více souběžných asynchronních operací sdílí stav bez zámků tím, že jejich pokračování běží na stejném vlákně.
  • Požadavky na knihovnu: Některé knihovny nebo objekty MODELU COM vyžadují spřažení s konkrétním vláknem.
  • Testování jednotek: Testovací architektury můžou potřebovat deterministické provádění asynchronního kódu s jedním vláknem.

Sestavení synchronizačníhocontextu s jedním vláknem

Pokud chcete spustit všechna pokračování na jednom vlákně, potřebujete dvě věci:

  1. Jejíž SynchronizationContextPost metoda zařadí fronty do kolekce bezpečné pro přístup z více vláken.
  2. Smyčka pumpy zpráv, která zpracovává frontu v cílovém vlákně.

Vlastní kontext

Kontext používá BlockingCollection<T> ke koordinaci producentů (asynchronní pokračování) a spotřebitele (smyčka pumpování):

sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
Class SingleThreadSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

Metoda AsyncPump.Run

AsyncPump.Run nainstaluje vlastní kontext, činí asynchronní metodu a provádí pokračování ve volajícím vlákně, dokud metoda není dokončena:

static class AsyncPump
{
    public static void Run(Func<Task> func)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new SingleThreadSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Task t;
            try
            {
                t = func();
            }
            catch
            {
                syncCtx.Complete();
                throw;
            }

            t.ContinueWith(
                _ => syncCtx.Complete(), TaskScheduler.Default);

            syncCtx.RunOnCurrentThread();

            t.GetAwaiter().GetResult();
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
Class AsyncPump
    Public Shared Sub Run(func As Func(Of Task))
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New SingleThreadSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            Dim t As Task
            Try
                t = func()
            Catch
                syncCtx.Complete()
                Throw
            End Try

            t.ContinueWith(
                Sub(unused) syncCtx.Complete(), TaskScheduler.Default)

            syncCtx.RunOnCurrentThread()

            t.GetAwaiter().GetResult()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub

Zobrazit ji v akci

Nahraďte výchozí hovor:AsyncPump.Run

static void AsyncPumpDemo()
{
    AsyncPump.Run(async () =>
    {
        var d = new Dictionary<int, int>();
        for (int i = 0; i < 10_000; i++)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

            await Task.Yield();
        }

        foreach (var pair in d)
            Console.WriteLine(pair);
    });
}
Sub AsyncPumpDemo()
    AsyncPump.Run(
        Async Function() As Task
            Dim d As New Dictionary(Of Integer, Integer)()
            For i As Integer = 0 To 9999
                Dim id As Integer = Thread.CurrentThread.ManagedThreadId
                Dim count As Integer
                If d.TryGetValue(id, count) Then
                    d(id) = count + 1
                Else
                    d(id) = 1
                End If

                Await Task.Yield()
            Next

            For Each pair In d
                Console.WriteLine(pair)
            Next
        End Function)
End Sub

Výstup:

[1, 10000]

Konkrétní ID vlákna se může lišit v závislosti na modulu runtime a platformě, ale klíčovým výsledkem je, že všech 10 000 iterací běží na jednom vlákně: hlavním vlákně.

Zpracování asynchronních void metod

Přetížení Func<Task> sleduje se dokončení prostřednictvím vrácené Task. Asynchronní void metody nevrací úlohu. Místo toho upozorňují aktuální SynchronizationContext metodu prostřednictvím OperationStarted() a OperationCompleted(). Chcete-li podporovat asynchronní void metody, rozšiřte kontext pro sledování nevyřízených operací.

    public static void Run(Action asyncMethod)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new AsyncVoidSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Exception? caughtException = null;

            syncCtx.OperationStarted();
            try
            {
                asyncMethod();
            }
            catch (Exception ex)
            {
                caughtException = ex;
                syncCtx.Complete();
            }
            finally
            {
                syncCtx.OperationCompleted();
            }

            syncCtx.RunOnCurrentThread();

            if (caughtException is not null)
            {
                System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(caughtException).Throw();
            }
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
}

sealed class AsyncVoidSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();
    private int _operationCount;

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public override void OperationStarted() =>
        Interlocked.Increment(ref _operationCount);

    public override void OperationCompleted()
    {
        if (Interlocked.Decrement(ref _operationCount) == 0)
            Complete();
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
    Public Shared Sub Run(asyncMethod As Action)
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New AsyncVoidSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            syncCtx.OperationStarted()
            Try
                asyncMethod()
            Catch
                syncCtx.Complete()
                Throw
            Finally
                syncCtx.OperationCompleted()
            End Try

            syncCtx.RunOnCurrentThread()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub
End Class

Class AsyncVoidSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()
    Private _operationCount As Integer

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Overrides Sub OperationStarted()
        Interlocked.Increment(_operationCount)
    End Sub

    Public Overrides Sub OperationCompleted()
        If Interlocked.Decrement(_operationCount) = 0 Then
            Complete()
        End If
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

Když je zapnuté sledování provozu, čerpadlo se ukončí pouze v případě, že jsou dokončeny všechny nevyřízené asynchronní void metody, nejen úkol na nejvyšší úrovni.

Praktické aspekty

  • Riziko zablokování: Pokud kód spuštěný uvnitř AsyncPump.Run blokuje synchronně (například voláním .Result nebo .Wait() na úlohu, jejíž pokračování se musí vrátit do pumpy), vlákno pumpy nemůže zpracovat toto pokračování. Výsledkem je vzájemné zablokování. Stejný problém je popsaný v synchronních obálkách pro asynchronní metody.
  • Výkon: Jednovláknové čerpadlo omezuje propustnost na jedno vlákno. Tento přístup použijte pouze v případě, že záleží na spřažení vláken.
  • Multiplatformní: Zde uvedená implementace AsyncPump používá pouze typy z oborů názvů System.Collections.Concurrent a System.Threading. Funguje na všech platformách, které .NET podporují.

Viz také