SynchronizationContext e aplicativos de console

As estruturas de interface do usuário como Windows Forms, WPF e .NET MAUI instalam um SynchronizationContext no thread da interface do usuário. Quando você executa await em uma tarefa nesses ambientes, a continuação é automaticamente retornada para o thread da interface do usuário. Os aplicativos de console não instalam SynchronizationContext, o que significa que as continuações await são executadas no pool de threads. Este artigo explica as consequências e mostra como criar uma bomba de mensagens de um único thread caso precise.

Comportamento padrão em um aplicativo de console

Em um aplicativo de console, SynchronizationContext.Current retorna null. Quando um método produzir um await, a continuação é executada em qualquer thread disponível do pool de threads:

static void DefaultBehaviorDemo()
{
    DemoAsync().GetAwaiter().GetResult();
}

static async Task DemoAsync()
{
    var d = new Dictionary<int, int>();
    for (int i = 0; i < 10_000; i++)
    {
        int id = Thread.CurrentThread.ManagedThreadId;
        d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

        await Task.Yield();
    }

    foreach (var pair in d)
        Console.WriteLine(pair);
}

Saída representativa da execução deste programa:

[1, 1]
[3, 2687]
[4, 2399]
[5, 2397]
[6, 2516]

O thread 1 (o thread principal) aparece apenas uma vez, durante a primeira iteração síncrona antes await Task.Yield() de suspender o método. Todas as iterações subsequentes são executadas em threads do pool de threads.

Pontos de entrada assíncronos modernos

A partir do C# 7.1, você pode declarar Main como async Task ou async Task<int>. No C# 9 e posterior, você pode usar declarações de nível superior diretamente:await

// Top-level statements (C# 9+)
await DemoAsync();
// async Task Main (C# 7.1+)
static async Task Main()
{
    await DemoAsync();
}

Esses pontos de entrada não instalam um SynchronizationContext. O runtime gera uma inicialização que chama seu método assíncrono e bloqueia no Task retornado, semelhante à chamada .GetAwaiter().GetResult(). As continuações ainda são executadas no pool de threads.

Quando você precisa de afinidade de thread

Para muitos aplicativos de console, é bom executar as continuações no pool de threads. No entanto, alguns cenários exigem que todas as continuações sejam executadas em um único thread:

  • Execução serializada: Várias operações assíncronas simultâneas compartilham o estado sem bloqueios executando suas continuações no mesmo thread.
  • Requisitos de biblioteca: algumas bibliotecas ou objetos COM exigem afinidade com um thread específico.
  • Teste de unidade: frameworks de teste podem precisar de execução determinística e de thread único para código assíncrono.

Criar um SynchronizationContext de thread único

Para executar todas as continuações em um thread, você precisa de duas coisas:

  1. Um SynchronizationContext cujas filas de método Post funcionam para uma coleção thread-safe.
  2. Um loop de bomba de mensagens que processa essa fila no thread de destino.

O contexto personalizado

O contexto usa um BlockingCollection<T> para coordenar produtores (as continuações assíncronas) e um consumidor (o loop de bombeamento):

sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
Class SingleThreadSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

O método AsyncPump.Run

AsyncPump.Run instala o contexto personalizado, invoca o método assíncrono e bombeia continuações no thread de chamada até que o método seja concluído:

static class AsyncPump
{
    public static void Run(Func<Task> func)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new SingleThreadSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Task t;
            try
            {
                t = func();
            }
            catch
            {
                syncCtx.Complete();
                throw;
            }

            t.ContinueWith(
                _ => syncCtx.Complete(), TaskScheduler.Default);

            syncCtx.RunOnCurrentThread();

            t.GetAwaiter().GetResult();
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
Class AsyncPump
    Public Shared Sub Run(func As Func(Of Task))
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New SingleThreadSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            Dim t As Task
            Try
                t = func()
            Catch
                syncCtx.Complete()
                Throw
            End Try

            t.ContinueWith(
                Sub(unused) syncCtx.Complete(), TaskScheduler.Default)

            syncCtx.RunOnCurrentThread()

            t.GetAwaiter().GetResult()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub

Veja isso em ação

Substitua a chamada padrão por AsyncPump.Run:

static void AsyncPumpDemo()
{
    AsyncPump.Run(async () =>
    {
        var d = new Dictionary<int, int>();
        for (int i = 0; i < 10_000; i++)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

            await Task.Yield();
        }

        foreach (var pair in d)
            Console.WriteLine(pair);
    });
}
Sub AsyncPumpDemo()
    AsyncPump.Run(
        Async Function() As Task
            Dim d As New Dictionary(Of Integer, Integer)()
            For i As Integer = 0 To 9999
                Dim id As Integer = Thread.CurrentThread.ManagedThreadId
                Dim count As Integer
                If d.TryGetValue(id, count) Then
                    d(id) = count + 1
                Else
                    d(id) = 1
                End If

                Await Task.Yield()
            Next

            For Each pair In d
                Console.WriteLine(pair)
            Next
        End Function)
End Sub

Saída:

[1, 10000]

A ID de thread específica pode ser diferente dependendo do runtime e da plataforma, mas o resultado principal é que todas as 10.000 iterações são executadas em um único thread: o thread principal.

Tratar os métodos assíncronos nulos

A sobrecarga Func<Task> rastreia a conclusão por meio de Task retornado. Os métodos assíncronos void não retornam uma tarefa; em vez disso, eles notificam o atual SynchronizationContext por meio de OperationStarted() e OperationCompleted(). Para dar suporte a métodos assíncronos void , estenda o contexto para acompanhar operações pendentes:

    public static void Run(Action asyncMethod)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new AsyncVoidSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Exception? caughtException = null;

            syncCtx.OperationStarted();
            try
            {
                asyncMethod();
            }
            catch (Exception ex)
            {
                caughtException = ex;
                syncCtx.Complete();
            }
            finally
            {
                syncCtx.OperationCompleted();
            }

            syncCtx.RunOnCurrentThread();

            if (caughtException is not null)
            {
                System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(caughtException).Throw();
            }
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
}

sealed class AsyncVoidSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();
    private int _operationCount;

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public override void OperationStarted() =>
        Interlocked.Increment(ref _operationCount);

    public override void OperationCompleted()
    {
        if (Interlocked.Decrement(ref _operationCount) == 0)
            Complete();
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
    Public Shared Sub Run(asyncMethod As Action)
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New AsyncVoidSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            syncCtx.OperationStarted()
            Try
                asyncMethod()
            Catch
                syncCtx.Complete()
                Throw
            Finally
                syncCtx.OperationCompleted()
            End Try

            syncCtx.RunOnCurrentThread()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub
End Class

Class AsyncVoidSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()
    Private _operationCount As Integer

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Overrides Sub OperationStarted()
        Interlocked.Increment(_operationCount)
    End Sub

    Public Overrides Sub OperationCompleted()
        If Interlocked.Decrement(_operationCount) = 0 Then
            Complete()
        End If
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

Com o rastreamento de operação habilitado, a bomba sai somente quando todos os métodos void assíncronos pendentes são concluídos, e não apenas a tarefa de nível superior.

Considerações práticas

  • Risco de deadlock: se o código em execução dentro de AsyncPump.Run fizer um bloqueio de forma síncrona (por exemplo, chamando .Result ou .Wait() em uma tarefa cuja continuação deve ser postada novamente na bomba), o thread da bomba não poderá processar essa continuação. O resultado é um deadlock. O mesmo problema é descrito em Synchronous wrappers para métodos assíncronos.
  • Desempenho: uma bomba de único thread limita a vazão a um thread. Use essa abordagem somente quando a afinidade de thread for importante.
  • Multiplataforma: A AsyncPump implementação mostrada aqui usa apenas tipos dos namespaces System.Collections.Concurrent e System.Threading. Ele funciona em todas as plataformas compatíveis com .NET.

Consulte também