SynchronizationContext ve konsol uygulamaları

Windows Forms, WPF ve .NET MAUI gibi ui çerçeveleri, kullanıcı arabirimi iş parçacığına bir SynchronizationContext yükler. Bu ortamlarda bir görev yaptığınızda await, devamlılık otomatik olarak UI iş parçacığına geri iletilir. Konsol uygulamaları bir SynchronizationContext yüklemez, bu yüzden devamlılıklar iş parçacığı havuzunda çalıştırılır await. Bu makale, sonuçları açıklar ve ihtiyaç duyduğunuzda tek iş parçacıklı bir ileti pompasının nasıl oluşturulacağını gösterir.

Konsol uygulamasında varsayılan davranış

Konsol uygulamasında SynchronizationContext.Current bu null olarak geri döndürür. Bir yöntem bir await olduğunda, devam ettirme işlemi kullanılabilir iş parçacığı havuzu iş parçacığı üzerinde yürütülür.

static void DefaultBehaviorDemo()
{
    DemoAsync().GetAwaiter().GetResult();
}

static async Task DemoAsync()
{
    var d = new Dictionary<int, int>();
    for (int i = 0; i < 10_000; i++)
    {
        int id = Thread.CurrentThread.ManagedThreadId;
        d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

        await Task.Yield();
    }

    foreach (var pair in d)
        Console.WriteLine(pair);
}

Bu programı çalıştırmanın temsili çıktısı:

[1, 1]
[3, 2687]
[4, 2399]
[5, 2397]
[6, 2516]

İş parçacığı 1 (ana iş parçacığı), yöntemi askıya almadan önce await Task.Yield() ilk zaman uyumlu yineleme sırasında yalnızca bir kez görünür. Sonraki tüm yinelemeler iş parçacığı havuzunda çalışır.

Modern asenkron giriş noktaları

C# 7.1'den itibaren, Main, async Task veya async Task<int> olarak bildirebilirsiniz. C# 9 ve sonraki sürümlerde, en üst düzey ifadeleri şunlarla doğrudan kullanabilirsiniz: await.

// Top-level statements (C# 9+)
await DemoAsync();
// async Task Main (C# 7.1+)
static async Task Main()
{
    await DemoAsync();
}

Bu giriş noktaları bir SynchronizationContextyüklemez. Çalışma zamanı, zaman uyumsuz yönteminizi çağıran ve döndürülen Task üzerinde bloklar oluşturan bir bootstrap oluşturur, tıpkı .GetAwaiter().GetResult() çağrısında olduğu gibi. Devamlılıklar iş parçacığı havuzunda işlemeye devam eder.

İş parçacığı bağlılığı gerektiğinde

Birçok konsol uygulamasında, iş parçacığı havuzunda devamlılıkları çalıştırmak iyi olur. Ancak bazı senaryolarda tüm devamlılıkların tek bir iş parçacığında çalıştırılması gerekir:

  • Serileştirilmiş yürütme: Birden çok eşzamanlı zaman uyumsuz operasyon, devamlarını aynı iş parçacığında çalıştırarak kilit kullanmadan durumu paylaşır.
  • Kitaplık gereksinimleri: Bazı kitaplıklar veya COM nesneleri belirli bir iş parçacığına bağlılık gerektirebilir.
  • Birim testi: Test framework'leri, zaman uyumsuz kodun belirlenimci ve tek iş parçacıklı çalışmasını gerektirebilir.

Tek iş parçacıklı bir SynchronizationContext oluşturun

Tüm devamlılıkları tek bir iş parçacığında çalıştırmak için iki şeye ihtiyacınız vardır:

  1. İş parçacığı güvenli bir koleksiyona işleri sıraya sokan bir SynchronizationContext'nin Post yöntemi.
  2. Hedef iş parçacığında bu kuyruğu işleyen bir ileti pompa döngüsü.

Özel bağlam

Bağlam, üreticileri (zaman uyumsuz devamlılıklar) ve tüketiciyi (pompalama döngüsü) koordine etmek için bir BlockingCollection<T> kullanır.

sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
Class SingleThreadSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

AsyncPump.Run yöntemi

AsyncPump.Run özel bağlamı yükler, zaman uyumsuz yöntemi çağırır ve yöntem tamamlanana kadar çağıran iş parçacığında devamlılıkları sürdürür:

static class AsyncPump
{
    public static void Run(Func<Task> func)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new SingleThreadSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Task t;
            try
            {
                t = func();
            }
            catch
            {
                syncCtx.Complete();
                throw;
            }

            t.ContinueWith(
                _ => syncCtx.Complete(), TaskScheduler.Default);

            syncCtx.RunOnCurrentThread();

            t.GetAwaiter().GetResult();
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
Class AsyncPump
    Public Shared Sub Run(func As Func(Of Task))
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New SingleThreadSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            Dim t As Task
            Try
                t = func()
            Catch
                syncCtx.Complete()
                Throw
            End Try

            t.ContinueWith(
                Sub(unused) syncCtx.Complete(), TaskScheduler.Default)

            syncCtx.RunOnCurrentThread()

            t.GetAwaiter().GetResult()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub

Nasıl çalıştığını görün

Varsayılan çağrıyı AsyncPump.Run ile değiştirin.

static void AsyncPumpDemo()
{
    AsyncPump.Run(async () =>
    {
        var d = new Dictionary<int, int>();
        for (int i = 0; i < 10_000; i++)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

            await Task.Yield();
        }

        foreach (var pair in d)
            Console.WriteLine(pair);
    });
}
Sub AsyncPumpDemo()
    AsyncPump.Run(
        Async Function() As Task
            Dim d As New Dictionary(Of Integer, Integer)()
            For i As Integer = 0 To 9999
                Dim id As Integer = Thread.CurrentThread.ManagedThreadId
                Dim count As Integer
                If d.TryGetValue(id, count) Then
                    d(id) = count + 1
                Else
                    d(id) = 1
                End If

                Await Task.Yield()
            Next

            For Each pair In d
                Console.WriteLine(pair)
            Next
        End Function)
End Sub

Çıktı:

[1, 10000]

Belirli iş parçacığı kimliği çalışma zamanına ve platforma bağlı olarak farklılık gösterebilir, ancak önemli sonuç, 10.000 yinelemenin tümünün tek bir iş parçacığında çalışmasıdır: ana iş parçacığı.

Zaman uyumsuz void yöntemlerini işleme

Aşırı yükleme, Func<Task> döndürülen Task aracılığıyla tamamlanmasını izler. Asenkron void yöntemler bir görev döndürmez; bunun yerine SynchronizationContext, OperationStarted() ve OperationCompleted() aracılığıyla geçerli durumu bildirir. Asenkron void yöntemleri desteklemek için bağlamı genişleterek devam eden işlemleri izleyin.

    public static void Run(Action asyncMethod)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new AsyncVoidSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Exception? caughtException = null;

            syncCtx.OperationStarted();
            try
            {
                asyncMethod();
            }
            catch (Exception ex)
            {
                caughtException = ex;
                syncCtx.Complete();
            }
            finally
            {
                syncCtx.OperationCompleted();
            }

            syncCtx.RunOnCurrentThread();

            if (caughtException is not null)
            {
                System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(caughtException).Throw();
            }
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
}

sealed class AsyncVoidSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();
    private int _operationCount;

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public override void OperationStarted() =>
        Interlocked.Increment(ref _operationCount);

    public override void OperationCompleted()
    {
        if (Interlocked.Decrement(ref _operationCount) == 0)
            Complete();
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
    Public Shared Sub Run(asyncMethod As Action)
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New AsyncVoidSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            syncCtx.OperationStarted()
            Try
                asyncMethod()
            Catch
                syncCtx.Complete()
                Throw
            Finally
                syncCtx.OperationCompleted()
            End Try

            syncCtx.RunOnCurrentThread()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub
End Class

Class AsyncVoidSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()
    Private _operationCount As Integer

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Overrides Sub OperationStarted()
        Interlocked.Increment(_operationCount)
    End Sub

    Public Overrides Sub OperationCompleted()
        If Interlocked.Decrement(_operationCount) = 0 Then
            Complete()
        End If
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

İşlem izleme etkinleştirildiğinde, pompa yalnızca en üst düzey görev değil, tüm bekleyen zaman uyumsuz void yöntemler tamamlandığında çıkar.

Pratikte dikkat edilmesi gerekenler

  • Kilitlenme riski: Eğer içinde AsyncPump.Run çalışan kod, devamlılığı pompaya geri göndermesi gereken bir görevi .Result veya .Wait() ile çağırarak zaman uyumlu olarak engellenirse, pompa iş parçacığı bu devamlılığı işleyemez. Sonuç bir kilitlenmedir. Aynı sorun, eşzamanlı olmayan yöntemler için eşzamanlı sarmalayıcılar bölümünde açıklanmıştır.
  • Performans: Tek iş parçacıklı pompa, aktarım hızını bir iş parçacığıyla sınırlar. Bu yaklaşımı yalnızca iş parçacığı bağlılığı önemli olduğunda kullanın.
  • Platformlar arası: AsyncPump Burada gösterilen uygulama yalnızca System.Collections.Concurrent ve System.Threading ad alanlarındaki türleri kullanır. .NET tarafından desteklenen tüm platformlarda çalışır.

Ayrıca bakınız