SynchronizationContext dan aplikasi konsol

Kerangka kerja UI seperti Formulir Windows, WPF, dan .NET MAUI menginstal SynchronizationContext pada utas UI mereka. Saat Anda await melakukan tugas di lingkungan tersebut, kelanjutan secara otomatis mengembalikan ke thread UI. Aplikasi konsol tidak menginstal SynchronizationContext, yang berarti await kelanjutan berjalan pada kumpulan utas. Artikel ini menjelaskan konsekuensinya dan menunjukkan cara membangun pompa pesan satu utas saat Anda membutuhkannya.

Perilaku default di aplikasi konsol

Di aplikasi konsol, SynchronizationContext.Current mengembalikan null. Ketika sebuah metode menghasilkan pada await, kelanjutan akan dijalankan pada utas dalam kumpulan utas yang tersedia.

static void DefaultBehaviorDemo()
{
    DemoAsync().GetAwaiter().GetResult();
}

static async Task DemoAsync()
{
    var d = new Dictionary<int, int>();
    for (int i = 0; i < 10_000; i++)
    {
        int id = Thread.CurrentThread.ManagedThreadId;
        d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

        await Task.Yield();
    }

    foreach (var pair in d)
        Console.WriteLine(pair);
}

Output contoh dari menjalankan program ini:

[1, 1]
[3, 2687]
[4, 2399]
[5, 2397]
[6, 2516]

Utas 1 (utas utama) hanya muncul sekali, selama iterasi sinkron pertama sebelum await Task.Yield() menangguhkan metode. Semua iterasi berikutnya berjalan pada utas kumpulan utas.

Titik masuk modern yang asinkron

Dimulai dengan C# 7.1, Anda dapat mendeklarasikan Main sebagai async Task atau async Task<int>. Di C# 9 dan yang lebih baru, Anda dapat menggunakan pernyataan tingkat atas dengan await secara langsung:

// Top-level statements (C# 9+)
await DemoAsync();
// async Task Main (C# 7.1+)
static async Task Main()
{
    await DemoAsync();
}

Titik-titik masuk ini tidak menginstal SynchronizationContext. Runtime menghasilkan bootstrap yang memanggil metode asinkron Anda dan memblokir hasil yang dikembalikan Task, mirip dengan memanggil .GetAwaiter().GetResult(). Tindakan lanjut masih berjalan pada thread pool.

Ketika Anda membutuhkan afinitas benang

Untuk banyak aplikasi konsol, menjalankan kelanjutan pada kumpulan utas tidak masalah. Namun, beberapa skenario mengharuskan semua kelanjutan berjalan pada satu thread.

  • Eksekusi berseri: Beberapa operasi asinkron bersamaan berbagi status tanpa kunci dengan menjalankan kelanjutannya pada utas yang sama.
  • Persyaratan pustaka: Beberapa pustaka atau objek COM memerlukan keterikatan pada utas tertentu.
  • Pengujian unit: Kerangka kerja pengujian mungkin memerlukan eksekusi deterministik secara utas tunggal dari kode asinkron.

Membangun SynchronizationContext dengan thread tunggal

Untuk menjalankan semua kelanjutan pada satu utas, Anda memerlukan dua hal:

  1. Antrean SynchronizationContext metode yang Post berfungsi untuk pengumpulan utas yang aman.
  2. Loop pemompaan pesan yang memproses antrean pada utas target.

Konteks yang disesuaikan

Konteks menggunakan BlockingCollection<T> untuk mengoordinasikan produsen (kelanjutan asinkron) dan konsumen (perulangan pemompaan):

sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
Class SingleThreadSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

Metode AsyncPump.Run

AsyncPump.Run menginstal konteks kustom, memanggil metode asinkron, dan memompa kelanjutan pada utas panggilan hingga metode selesai:

static class AsyncPump
{
    public static void Run(Func<Task> func)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new SingleThreadSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Task t;
            try
            {
                t = func();
            }
            catch
            {
                syncCtx.Complete();
                throw;
            }

            t.ContinueWith(
                _ => syncCtx.Complete(), TaskScheduler.Default);

            syncCtx.RunOnCurrentThread();

            t.GetAwaiter().GetResult();
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
Class AsyncPump
    Public Shared Sub Run(func As Func(Of Task))
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New SingleThreadSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            Dim t As Task
            Try
                t = func()
            Catch
                syncCtx.Complete()
                Throw
            End Try

            t.ContinueWith(
                Sub(unused) syncCtx.Complete(), TaskScheduler.Default)

            syncCtx.RunOnCurrentThread()

            t.GetAwaiter().GetResult()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub

Lihatlah bagaimana ia bekerja

Ganti panggilan default dengan AsyncPump.Run:

static void AsyncPumpDemo()
{
    AsyncPump.Run(async () =>
    {
        var d = new Dictionary<int, int>();
        for (int i = 0; i < 10_000; i++)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

            await Task.Yield();
        }

        foreach (var pair in d)
            Console.WriteLine(pair);
    });
}
Sub AsyncPumpDemo()
    AsyncPump.Run(
        Async Function() As Task
            Dim d As New Dictionary(Of Integer, Integer)()
            For i As Integer = 0 To 9999
                Dim id As Integer = Thread.CurrentThread.ManagedThreadId
                Dim count As Integer
                If d.TryGetValue(id, count) Then
                    d(id) = count + 1
                Else
                    d(id) = 1
                End If

                Await Task.Yield()
            Next

            For Each pair In d
                Console.WriteLine(pair)
            Next
        End Function)
End Sub

Hasil:

[1, 10000]

ID utas tertentu mungkin berbeda tergantung pada runtime dan platform, tetapi hasil utamanya adalah bahwa semua 10.000 iterasi berjalan pada satu utas: utas utama.

Penanganan metode void asinkron

Fungsi overload Func<Task> melacak penyelesaian melalui Task. Metode asinkron void tidak mengembalikan tugas; sebaliknya, mereka memberi tahu saat ini SynchronizationContext melalui OperationStarted() dan OperationCompleted(). Untuk mendukung metode asinkron void, perluas konteks untuk melacak operasi yang tertunda.

    public static void Run(Action asyncMethod)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new AsyncVoidSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Exception? caughtException = null;

            syncCtx.OperationStarted();
            try
            {
                asyncMethod();
            }
            catch (Exception ex)
            {
                caughtException = ex;
                syncCtx.Complete();
            }
            finally
            {
                syncCtx.OperationCompleted();
            }

            syncCtx.RunOnCurrentThread();

            if (caughtException is not null)
            {
                System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(caughtException).Throw();
            }
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
}

sealed class AsyncVoidSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();
    private int _operationCount;

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public override void OperationStarted() =>
        Interlocked.Increment(ref _operationCount);

    public override void OperationCompleted()
    {
        if (Interlocked.Decrement(ref _operationCount) == 0)
            Complete();
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
    Public Shared Sub Run(asyncMethod As Action)
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New AsyncVoidSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            syncCtx.OperationStarted()
            Try
                asyncMethod()
            Catch
                syncCtx.Complete()
                Throw
            Finally
                syncCtx.OperationCompleted()
            End Try

            syncCtx.RunOnCurrentThread()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub
End Class

Class AsyncVoidSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()
    Private _operationCount As Integer

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Overrides Sub OperationStarted()
        Interlocked.Increment(_operationCount)
    End Sub

    Public Overrides Sub OperationCompleted()
        If Interlocked.Decrement(_operationCount) = 0 Then
            Complete()
        End If
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

Dengan pelacakan operasi diaktifkan, pompa hanya keluar ketika semua metode asinkron void yang tertunda telah selesai, bukan hanya tugas tingkat atas saja.

Pertimbangan praktis

  • Risiko kebuntuan: Jika kode yang berjalan secara sinkron di dalam blok AsyncPump.Run (misalnya, dengan memanggil .Result atau .Wait() pada tugas yang kelanjutannya harus mengirim kembali ke pompa), utas pompa tidak dapat memproses kelanjutan tersebut. Hasilnya adalah kebuntuan. Masalah yang sama dijelaskan dalam Pembungkus sinkron untuk metode asinkron.
  • Performa: Pompa berulir tunggal membatasi throughput ke satu utas. Gunakan pendekatan ini hanya ketika afinitas utas penting.
  • Lintas platform: Implementasi yang AsyncPump ditampilkan di sini hanya menggunakan tipe dari namespace System.Collections.Concurrent dan System.Threading. Ini berfungsi pada semua platform yang .NET mendukung.

Baca juga