Ескертпе
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Жүйеге кіруді немесе каталогтарды өзгертуді байқап көруге болады.
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Каталогтарды өзгертуді байқап көруге болады.
Платформы пользовательского интерфейса, такие как Windows Forms, WPF и .NET MAUI, устанавливают SynchronizationContext в своем потоке пользовательского интерфейса.
await При выполнении задачи в этих средах продолжение автоматически отправляется в поток пользовательского интерфейса. Консольные приложения не устанавливают SynchronizationContext, что означает, что продолжения await выполняются в пуле потоков. В этой статье описываются последствия и показано, как создать однопоточный цикл обработки сообщений, когда он вам понадобится.
Поведение по умолчанию в консольном приложении
В консольном приложении SynchronizationContext.Current возвращает null. Когда метод приостанавливается на await, продолжение выполняется в любом доступном потоке пула потоков.
static void DefaultBehaviorDemo()
{
DemoAsync().GetAwaiter().GetResult();
}
static async Task DemoAsync()
{
var d = new Dictionary<int, int>();
for (int i = 0; i < 10_000; i++)
{
int id = Thread.CurrentThread.ManagedThreadId;
d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;
await Task.Yield();
}
foreach (var pair in d)
Console.WriteLine(pair);
}
Репрезентативные выходные данные выполнения этой программы:
[1, 1]
[3, 2687]
[4, 2399]
[5, 2397]
[6, 2516]
Поток 1 (основной поток) отображается только один раз во время первой синхронной итерации, перед приостановкой выполнения метода await Task.Yield(). Все последующие итерации выполняются в потоках пула потоков.
Современные асинхронные точки входа
Начиная с C# 7.1, можно объявить Main как async Task или async Task<int>. В C# 9 и более поздних версиях можно использовать верхнеуровневые инструкции с await напрямую:
// Top-level statements (C# 9+)
await DemoAsync();
// async Task Main (C# 7.1+)
static async Task Main()
{
await DemoAsync();
}
Эти точки входа не устанавливают SynchronizationContext. Среда выполнения создает вспомогательный код, который вызывает асинхронный метод и блокирует выполнение до получения результата Task, как при вызове .GetAwaiter().GetResult(). Продолжения по-прежнему выполняются в пуле потоков.
При необходимости привязки потоков
Для многих консольных приложений выполнение продолжений в пуле потоков допустимо. Однако для некоторых сценариев требуется, чтобы все продолжения выполнялись в одном потоке:
- Сериализованное выполнение: несколько одновременных асинхронных операций совместно используют состояние без блокировок, выполняя свои продолжения в одном потоке.
- Требования к библиотеке: некоторые библиотеки или COM-объекты требуют сходство с определенным потоком.
- Модульное тестирование. Для платформ тестирования может потребоваться детерминированное однопоточное выполнение асинхронного кода.
Создание однопоточной синхронизацииContext
Чтобы запустить все продолжения в одном потоке, вам потребуется две вещи:
- Очередь SynchronizationContext методов которого Post работает в потокобезопасной коллекции.
- Цикл насоса сообщений, который обрабатывает очередь в целевом потоке.
Настраиваемый контекст
Контекст использует BlockingCollection<T> для координации производителей (асинхронных продолжений) и потребителя (помпового цикла):
sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
private readonly
BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();
public override void Post(SendOrPostCallback d, object? state)
{
_queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
}
public void RunOnCurrentThread()
{
while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
Timeout.Infinite))
{
workItem.Key(workItem.Value);
}
}
public void Complete() => _queue.CompleteAdding();
}
Class SingleThreadSynchronizationContext
Inherits SynchronizationContext
Private ReadOnly _queue As New _
BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()
Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
_queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
End Sub
Public Sub RunOnCurrentThread()
Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
While _queue.TryTake(workItem, Timeout.Infinite)
workItem.Key.Invoke(workItem.Value)
End While
End Sub
Public Sub Complete()
_queue.CompleteAdding()
End Sub
End Class
Метод AsyncPump.Run
AsyncPump.Run устанавливает настраиваемый контекст, вызывает асинхронный метод и продолжает продолжать работу в вызываемом потоке до завершения метода:
static class AsyncPump
{
public static void Run(Func<Task> func)
{
SynchronizationContext? prevCtx = SynchronizationContext.Current;
try
{
var syncCtx = new SingleThreadSynchronizationContext();
SynchronizationContext.SetSynchronizationContext(syncCtx);
Task t;
try
{
t = func();
}
catch
{
syncCtx.Complete();
throw;
}
t.ContinueWith(
_ => syncCtx.Complete(), TaskScheduler.Default);
syncCtx.RunOnCurrentThread();
t.GetAwaiter().GetResult();
}
finally
{
SynchronizationContext.SetSynchronizationContext(prevCtx);
}
}
Class AsyncPump
Public Shared Sub Run(func As Func(Of Task))
Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
Try
Dim syncCtx As New SingleThreadSynchronizationContext()
SynchronizationContext.SetSynchronizationContext(syncCtx)
Dim t As Task
Try
t = func()
Catch
syncCtx.Complete()
Throw
End Try
t.ContinueWith(
Sub(unused) syncCtx.Complete(), TaskScheduler.Default)
syncCtx.RunOnCurrentThread()
t.GetAwaiter().GetResult()
Finally
SynchronizationContext.SetSynchronizationContext(prevCtx)
End Try
End Sub
Наглядные примеры
Замените вызов по умолчанию на AsyncPump.Run.
static void AsyncPumpDemo()
{
AsyncPump.Run(async () =>
{
var d = new Dictionary<int, int>();
for (int i = 0; i < 10_000; i++)
{
int id = Thread.CurrentThread.ManagedThreadId;
d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;
await Task.Yield();
}
foreach (var pair in d)
Console.WriteLine(pair);
});
}
Sub AsyncPumpDemo()
AsyncPump.Run(
Async Function() As Task
Dim d As New Dictionary(Of Integer, Integer)()
For i As Integer = 0 To 9999
Dim id As Integer = Thread.CurrentThread.ManagedThreadId
Dim count As Integer
If d.TryGetValue(id, count) Then
d(id) = count + 1
Else
d(id) = 1
End If
Await Task.Yield()
Next
For Each pair In d
Console.WriteLine(pair)
Next
End Function)
End Sub
Выходные данные:
[1, 10000]
Идентификатор конкретного потока может отличаться в зависимости от среды выполнения и платформы, но ключевой результат заключается в том, что все 10 000 итерации выполняются в одном потоке: основной поток.
Обработка асинхронных методов void
Перегрузка Func<Task> отслеживает завершение через возвращаемый Task объект. Асинхронные void методы не возвращают объект Task; вместо этого они уведомляют текущий SynchronizationContext через OperationStarted() и OperationCompleted(). Чтобы поддерживать асинхронные методы, расширьте контекст для отслеживания невыполненных void операций:
public static void Run(Action asyncMethod)
{
SynchronizationContext? prevCtx = SynchronizationContext.Current;
try
{
var syncCtx = new AsyncVoidSynchronizationContext();
SynchronizationContext.SetSynchronizationContext(syncCtx);
Exception? caughtException = null;
syncCtx.OperationStarted();
try
{
asyncMethod();
}
catch (Exception ex)
{
caughtException = ex;
syncCtx.Complete();
}
finally
{
syncCtx.OperationCompleted();
}
syncCtx.RunOnCurrentThread();
if (caughtException is not null)
{
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(caughtException).Throw();
}
}
finally
{
SynchronizationContext.SetSynchronizationContext(prevCtx);
}
}
}
sealed class AsyncVoidSynchronizationContext : SynchronizationContext
{
private readonly
BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();
private int _operationCount;
public override void Post(SendOrPostCallback d, object? state)
{
_queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
}
public override void OperationStarted() =>
Interlocked.Increment(ref _operationCount);
public override void OperationCompleted()
{
if (Interlocked.Decrement(ref _operationCount) == 0)
Complete();
}
public void RunOnCurrentThread()
{
while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
Timeout.Infinite))
{
workItem.Key(workItem.Value);
}
}
public void Complete() => _queue.CompleteAdding();
}
Public Shared Sub Run(asyncMethod As Action)
Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
Try
Dim syncCtx As New AsyncVoidSynchronizationContext()
SynchronizationContext.SetSynchronizationContext(syncCtx)
syncCtx.OperationStarted()
Try
asyncMethod()
Catch
syncCtx.Complete()
Throw
Finally
syncCtx.OperationCompleted()
End Try
syncCtx.RunOnCurrentThread()
Finally
SynchronizationContext.SetSynchronizationContext(prevCtx)
End Try
End Sub
End Class
Class AsyncVoidSynchronizationContext
Inherits SynchronizationContext
Private ReadOnly _queue As New _
BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()
Private _operationCount As Integer
Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
_queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
End Sub
Public Overrides Sub OperationStarted()
Interlocked.Increment(_operationCount)
End Sub
Public Overrides Sub OperationCompleted()
If Interlocked.Decrement(_operationCount) = 0 Then
Complete()
End If
End Sub
Public Sub RunOnCurrentThread()
Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
While _queue.TryTake(workItem, Timeout.Infinite)
workItem.Key.Invoke(workItem.Value)
End While
End Sub
Public Sub Complete()
_queue.CompleteAdding()
End Sub
End Class
При включенном отслеживании операций насос завершает работу только после завершения всех невыполненных асинхронных void методов, а не только задачи верхнего уровня.
Практические рекомендации
-
Риск взаимоблокировки: если код выполняется внутри
AsyncPump.Runблоков синхронно (например, путем вызова.Resultили.Wait()задачи, продолжение которой должно отправляться обратно в насос), поток насоса не может обработать это продолжение. Результатом является взаимоблокировка. Эта же проблема описана в синхронных оболочках для асинхронных методов. - Производительность: однопоточный насос ограничивает пропускную способность до уровня одного потока. Используйте этот подход только в том случае, если имеет значение привязка потоков.
-
Кроссплатформенная реализация, показанная
AsyncPumpздесь, использует только типы из пространств именSystem.Collections.ConcurrentиSystem.Threading. Он работает на всех платформах, которые поддерживает .NET.