Windows フォーム、WPF、.NET MAUIなどの UI フレームワークは、ui スレッドにSynchronizationContextをインストールします。 これらの環境でタスクを await すると、継続は自動的に UI スレッドにポストバックします。 コンソール アプリはSynchronizationContext をインストールしません。つまり、継続処理awaitがスレッド プールで実行されます。 この記事では、その結果について説明し、必要なときにシングルスレッド メッセージ ポンプを構築する方法について説明します。
コンソール アプリでの既定の動作
コンソール アプリでは、 SynchronizationContext.Current は nullを返します。
await でメソッドが生成されると、継続は使用可能なすべてのスレッド プール スレッドで実行されます。
static void DefaultBehaviorDemo()
{
DemoAsync().GetAwaiter().GetResult();
}
static async Task DemoAsync()
{
var d = new Dictionary<int, int>();
for (int i = 0; i < 10_000; i++)
{
int id = Thread.CurrentThread.ManagedThreadId;
d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;
await Task.Yield();
}
foreach (var pair in d)
Console.WriteLine(pair);
}
このプログラムの実行からの代表的な出力:
[1, 1]
[3, 2687]
[4, 2399]
[5, 2397]
[6, 2516]
スレッド 1 (メイン スレッド) は、メソッドを中断する前の最初の同期イテレーション中 await Task.Yield() 1 回だけ表示されます。 後続のすべてのイテレーションは、スレッド プール スレッドで実行されます。
最新の非同期エントリ ポイント
C# 7.1 以降では、 Main を async Task または async Task<int>として宣言できます。 C# 9 以降では、 await で最上位レベルのステートメントを直接使用できます。
// Top-level statements (C# 9+)
await DemoAsync();
// async Task Main (C# 7.1+)
static async Task Main()
{
await DemoAsync();
}
これらのエントリ ポイントでは、 SynchronizationContextはインストールされません。 ランタイムは、非同期メソッドを呼び出すブートストラップを生成し、返されたTaskでブロックします。これは.GetAwaiter().GetResult()を呼び出すのと同様です。 継続はスレッド プールで引き続き実行されます。
スレッド アフィニティが必要な場合
多くのコンソール アプリでは、スレッド プールで継続を実行しても問題ありません。 ただし、一部のシナリオでは、すべての継続を 1 つのスレッドで実行する必要があります。
- シリアル化された実行: 複数の同時非同期操作は、同じスレッドで継続を実行することで、ロックなしで状態を共有します。
- ライブラリの要件: 一部のライブラリまたは COM オブジェクトには、特定のスレッドとのアフィニティが必要です。
- 単体テスト: テスト フレームワークでは、非同期コードの確定的なシングル スレッド実行が必要になる場合があります。
シングル スレッドの SynchronizationContext を構築する
1 つのスレッドですべての継続を実行するには、次の 2 つが必要です。
- Post メソッド キューがスレッド セーフなコレクションに対して機能する SynchronizationContext。
- ターゲット スレッドでキューを処理するメッセージ ポンプ ループ。
カスタム コンテキスト
コンテキストでは、 BlockingCollection<T> を使用してプロデューサー (非同期継続) とコンシューマー (ポンプ ループ) を調整します。
sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
private readonly
BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();
public override void Post(SendOrPostCallback d, object? state)
{
_queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
}
public void RunOnCurrentThread()
{
while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
Timeout.Infinite))
{
workItem.Key(workItem.Value);
}
}
public void Complete() => _queue.CompleteAdding();
}
Class SingleThreadSynchronizationContext
Inherits SynchronizationContext
Private ReadOnly _queue As New _
BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()
Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
_queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
End Sub
Public Sub RunOnCurrentThread()
Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
While _queue.TryTake(workItem, Timeout.Infinite)
workItem.Key.Invoke(workItem.Value)
End While
End Sub
Public Sub Complete()
_queue.CompleteAdding()
End Sub
End Class
AsyncPump.Run メソッド
AsyncPump.Run は、カスタム コンテキストをインストールし、非同期メソッドを呼び出し、メソッドが完了するまで呼び出し元スレッドに継続をポンプします。
static class AsyncPump
{
public static void Run(Func<Task> func)
{
SynchronizationContext? prevCtx = SynchronizationContext.Current;
try
{
var syncCtx = new SingleThreadSynchronizationContext();
SynchronizationContext.SetSynchronizationContext(syncCtx);
Task t;
try
{
t = func();
}
catch
{
syncCtx.Complete();
throw;
}
t.ContinueWith(
_ => syncCtx.Complete(), TaskScheduler.Default);
syncCtx.RunOnCurrentThread();
t.GetAwaiter().GetResult();
}
finally
{
SynchronizationContext.SetSynchronizationContext(prevCtx);
}
}
Class AsyncPump
Public Shared Sub Run(func As Func(Of Task))
Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
Try
Dim syncCtx As New SingleThreadSynchronizationContext()
SynchronizationContext.SetSynchronizationContext(syncCtx)
Dim t As Task
Try
t = func()
Catch
syncCtx.Complete()
Throw
End Try
t.ContinueWith(
Sub(unused) syncCtx.Complete(), TaskScheduler.Default)
syncCtx.RunOnCurrentThread()
t.GetAwaiter().GetResult()
Finally
SynchronizationContext.SetSynchronizationContext(prevCtx)
End Try
End Sub
実際に確認しましょう。
既定の呼び出しを AsyncPump.Runに置き換えます。
static void AsyncPumpDemo()
{
AsyncPump.Run(async () =>
{
var d = new Dictionary<int, int>();
for (int i = 0; i < 10_000; i++)
{
int id = Thread.CurrentThread.ManagedThreadId;
d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;
await Task.Yield();
}
foreach (var pair in d)
Console.WriteLine(pair);
});
}
Sub AsyncPumpDemo()
AsyncPump.Run(
Async Function() As Task
Dim d As New Dictionary(Of Integer, Integer)()
For i As Integer = 0 To 9999
Dim id As Integer = Thread.CurrentThread.ManagedThreadId
Dim count As Integer
If d.TryGetValue(id, count) Then
d(id) = count + 1
Else
d(id) = 1
End If
Await Task.Yield()
Next
For Each pair In d
Console.WriteLine(pair)
Next
End Function)
End Sub
アウトプット:
[1, 10000]
特定のスレッド ID はランタイムとプラットフォームによって異なる場合がありますが、主な結果は、1 つのスレッド (メイン スレッド) で 10,000 回のイテレーションがすべて実行されるということです。
非同期 void メソッドを処理する
Func<Task> オーバーロードは、返される Task を通じて完了を追跡します。 非同期voidメソッドはタスクを返しません。代わりに、SynchronizationContextとOperationStarted()を通じて現在のOperationCompleted()に通知します。 非同期 void メソッドをサポートするには、未処理の操作を追跡するようにコンテキストを拡張します。
public static void Run(Action asyncMethod)
{
SynchronizationContext? prevCtx = SynchronizationContext.Current;
try
{
var syncCtx = new AsyncVoidSynchronizationContext();
SynchronizationContext.SetSynchronizationContext(syncCtx);
Exception? caughtException = null;
syncCtx.OperationStarted();
try
{
asyncMethod();
}
catch (Exception ex)
{
caughtException = ex;
syncCtx.Complete();
}
finally
{
syncCtx.OperationCompleted();
}
syncCtx.RunOnCurrentThread();
if (caughtException is not null)
{
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(caughtException).Throw();
}
}
finally
{
SynchronizationContext.SetSynchronizationContext(prevCtx);
}
}
}
sealed class AsyncVoidSynchronizationContext : SynchronizationContext
{
private readonly
BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();
private int _operationCount;
public override void Post(SendOrPostCallback d, object? state)
{
_queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
}
public override void OperationStarted() =>
Interlocked.Increment(ref _operationCount);
public override void OperationCompleted()
{
if (Interlocked.Decrement(ref _operationCount) == 0)
Complete();
}
public void RunOnCurrentThread()
{
while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
Timeout.Infinite))
{
workItem.Key(workItem.Value);
}
}
public void Complete() => _queue.CompleteAdding();
}
Public Shared Sub Run(asyncMethod As Action)
Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
Try
Dim syncCtx As New AsyncVoidSynchronizationContext()
SynchronizationContext.SetSynchronizationContext(syncCtx)
syncCtx.OperationStarted()
Try
asyncMethod()
Catch
syncCtx.Complete()
Throw
Finally
syncCtx.OperationCompleted()
End Try
syncCtx.RunOnCurrentThread()
Finally
SynchronizationContext.SetSynchronizationContext(prevCtx)
End Try
End Sub
End Class
Class AsyncVoidSynchronizationContext
Inherits SynchronizationContext
Private ReadOnly _queue As New _
BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()
Private _operationCount As Integer
Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
_queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
End Sub
Public Overrides Sub OperationStarted()
Interlocked.Increment(_operationCount)
End Sub
Public Overrides Sub OperationCompleted()
If Interlocked.Decrement(_operationCount) = 0 Then
Complete()
End If
End Sub
Public Sub RunOnCurrentThread()
Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
While _queue.TryTake(workItem, Timeout.Infinite)
workItem.Key.Invoke(workItem.Value)
End While
End Sub
Public Sub Complete()
_queue.CompleteAdding()
End Sub
End Class
操作の追跡を有効にすると、ポンプは、最上位のタスクだけでなく、未処理のすべての非同期 void メソッドが完了した場合にのみ終了します。
実用的な考慮事項
-
デッドロック のリスク:
AsyncPump.Run内で実行されているコードが同期的にブロックされた場合 (たとえば、継続がポンプにポストバックする必要があるタスクで.Resultまたは.Wait()を呼び出すことによって)、ポンプ スレッドはその継続を処理できません。 結果はデッドロックになります。 同じ問題が非同期メソッドの同期ラッパーについても説明されています。 - パフォーマンス:シングルスレッドポンプはスループットを1スレッドに制限します。 この方法は、スレッド アフィニティが重要な場合にのみ使用します。
-
クロスプラットフォーム: ここで示す
AsyncPump実装では、System.Collections.ConcurrentおよびSystem.Threading名前空間からの型のみを使用します。 .NETがサポートするすべてのプラットフォームで動作します。
こちらも参照ください
.NET