Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Struktury interfejsu użytkownika, takie jak Windows Forms, WPF i .NET MAUI, zainstalowują SynchronizationContext w ich wątku interfejsu użytkownika. Kiedy wykonujesz zadanie w tych środowiskach, kontynuacja automatycznie wraca do wątku interfejsu użytkownika. Aplikacje konsolowe nie instalują elementu SynchronizationContext, co oznacza, że await kontynuacje są uruchamiane w puli wątków. W tym artykule wyjaśniono konsekwencje i pokazano, jak utworzyć jednowątkowy mechanizm obsługi komunikatów, gdy jest to potrzebne.
Domyślne zachowanie w aplikacji konsolowej
W aplikacji konsolowej SynchronizationContext.Current zwraca wartość null. Gdy metoda zwraca awaitwartość , kontynuacja jest uruchamiana na każdym wątku puli wątków:
static void DefaultBehaviorDemo()
{
DemoAsync().GetAwaiter().GetResult();
}
static async Task DemoAsync()
{
var d = new Dictionary<int, int>();
for (int i = 0; i < 10_000; i++)
{
int id = Thread.CurrentThread.ManagedThreadId;
d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;
await Task.Yield();
}
foreach (var pair in d)
Console.WriteLine(pair);
}
Reprezentatywne dane wyjściowe z uruchamiania tego programu:
[1, 1]
[3, 2687]
[4, 2399]
[5, 2397]
[6, 2516]
Wątek 1 (główny wątek) pojawia się tylko raz, podczas pierwszej iteracji synchronicznej przed await Task.Yield() wstrzymaniem metody. Wszystkie kolejne iteracje uruchamiają się w wątkach puli wątków.
Nowoczesne punkty wejścia asynchronicznego
Począwszy od języka C# 7.1, można zadeklarować Main jako async Task lub async Task<int>. W języku C# 9 lub nowszym można używać instrukcji najwyższego poziomu bezpośrednio await :
// Top-level statements (C# 9+)
await DemoAsync();
// async Task Main (C# 7.1+)
static async Task Main()
{
await DemoAsync();
}
Te punkty wejścia nie instalują obiektu SynchronizationContext. Środowisko uruchomieniowe generuje metodę bootstrap, która wywołuje metodę asynchroniową i blokuje zwracaną Taskmetodę , podobnie jak wywołanie metody .GetAwaiter().GetResult(). Kontynuacje nadal działają w puli wątków.
Gdy potrzebujesz koligacji wątku
W przypadku wielu aplikacji konsolowych uruchamianie kontynuacji w puli wątków jest w porządku. Jednak niektóre scenariusze wymagają, aby wszystkie kontynuacje działały w jednym wątku:
- Serializowane wykonywanie: wiele współbieżnych operacji asynchronicznych współużytkuje stan bez blokad, uruchamiając ich kontynuacje w tym samym wątku.
- Wymagania dotyczące biblioteki: Niektóre biblioteki lub obiekty COM wymagają koligacji do określonego wątku.
- Testowanie jednostkowe: struktury testowe mogą wymagać deterministycznego, jednowątkowego wykonywania kodu asynchronicznego.
Utworzyć jednowątkowy kontekst SynchronizationContext
Aby uruchomić wszystkie kontynuacje w jednym wątku, potrzebne są dwie rzeczy:
- Obiekt SynchronizationContext , którego Post kolejki metod działają w kolekcji bezpiecznej wątkowo.
- Mechanizm pompy komunikatów, który przetwarza tę kolejkę w wątku docelowym.
Kontekst niestandardowy
Kontekst używa elementu BlockingCollection<T> do koordynowania producentów (kontynuacji asynchronicznych) i odbiorcy (pętli pompowania):
sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
private readonly
BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();
public override void Post(SendOrPostCallback d, object? state)
{
_queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
}
public void RunOnCurrentThread()
{
while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
Timeout.Infinite))
{
workItem.Key(workItem.Value);
}
}
public void Complete() => _queue.CompleteAdding();
}
Class SingleThreadSynchronizationContext
Inherits SynchronizationContext
Private ReadOnly _queue As New _
BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()
Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
_queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
End Sub
Public Sub RunOnCurrentThread()
Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
While _queue.TryTake(workItem, Timeout.Infinite)
workItem.Key.Invoke(workItem.Value)
End While
End Sub
Public Sub Complete()
_queue.CompleteAdding()
End Sub
End Class
Metoda AsyncPump.Run
AsyncPump.Run instaluje kontekst niestandardowy, wywołuje metodę asynchroniową i pompuje kontynuacje w wątku wywołującym, dopóki metoda nie zostanie ukończona:
static class AsyncPump
{
public static void Run(Func<Task> func)
{
SynchronizationContext? prevCtx = SynchronizationContext.Current;
try
{
var syncCtx = new SingleThreadSynchronizationContext();
SynchronizationContext.SetSynchronizationContext(syncCtx);
Task t;
try
{
t = func();
}
catch
{
syncCtx.Complete();
throw;
}
t.ContinueWith(
_ => syncCtx.Complete(), TaskScheduler.Default);
syncCtx.RunOnCurrentThread();
t.GetAwaiter().GetResult();
}
finally
{
SynchronizationContext.SetSynchronizationContext(prevCtx);
}
}
Class AsyncPump
Public Shared Sub Run(func As Func(Of Task))
Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
Try
Dim syncCtx As New SingleThreadSynchronizationContext()
SynchronizationContext.SetSynchronizationContext(syncCtx)
Dim t As Task
Try
t = func()
Catch
syncCtx.Complete()
Throw
End Try
t.ContinueWith(
Sub(unused) syncCtx.Complete(), TaskScheduler.Default)
syncCtx.RunOnCurrentThread()
t.GetAwaiter().GetResult()
Finally
SynchronizationContext.SetSynchronizationContext(prevCtx)
End Try
End Sub
Zobacz, jak działa
Zastąp wywołanie domyślne wyrażeniem AsyncPump.Run.
static void AsyncPumpDemo()
{
AsyncPump.Run(async () =>
{
var d = new Dictionary<int, int>();
for (int i = 0; i < 10_000; i++)
{
int id = Thread.CurrentThread.ManagedThreadId;
d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;
await Task.Yield();
}
foreach (var pair in d)
Console.WriteLine(pair);
});
}
Sub AsyncPumpDemo()
AsyncPump.Run(
Async Function() As Task
Dim d As New Dictionary(Of Integer, Integer)()
For i As Integer = 0 To 9999
Dim id As Integer = Thread.CurrentThread.ManagedThreadId
Dim count As Integer
If d.TryGetValue(id, count) Then
d(id) = count + 1
Else
d(id) = 1
End If
Await Task.Yield()
Next
For Each pair In d
Console.WriteLine(pair)
Next
End Function)
End Sub
Wyjście:
[1, 10000]
Identyfikator określonego wątku może się różnić w zależności od środowiska uruchomieniowego i platformy, ale kluczowym wynikiem jest to, że wszystkie 10 000 iteracji jest uruchamianych w jednym wątku: głównym wątku.
Obsługa metod asynchronicznych typu void
Przeciążenie Func<Task> śledzi ukończenie za pomocą zwróconego Task. Metody asynchroniczne void nie zwracają zadania; zamiast tego informują bieżące SynchronizationContext poprzez OperationStarted() i OperationCompleted(). Aby obsługiwać metody asynchroniczne void , rozszerz kontekst, aby śledzić zaległe operacje:
public static void Run(Action asyncMethod)
{
SynchronizationContext? prevCtx = SynchronizationContext.Current;
try
{
var syncCtx = new AsyncVoidSynchronizationContext();
SynchronizationContext.SetSynchronizationContext(syncCtx);
Exception? caughtException = null;
syncCtx.OperationStarted();
try
{
asyncMethod();
}
catch (Exception ex)
{
caughtException = ex;
syncCtx.Complete();
}
finally
{
syncCtx.OperationCompleted();
}
syncCtx.RunOnCurrentThread();
if (caughtException is not null)
{
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(caughtException).Throw();
}
}
finally
{
SynchronizationContext.SetSynchronizationContext(prevCtx);
}
}
}
sealed class AsyncVoidSynchronizationContext : SynchronizationContext
{
private readonly
BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();
private int _operationCount;
public override void Post(SendOrPostCallback d, object? state)
{
_queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
}
public override void OperationStarted() =>
Interlocked.Increment(ref _operationCount);
public override void OperationCompleted()
{
if (Interlocked.Decrement(ref _operationCount) == 0)
Complete();
}
public void RunOnCurrentThread()
{
while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
Timeout.Infinite))
{
workItem.Key(workItem.Value);
}
}
public void Complete() => _queue.CompleteAdding();
}
Public Shared Sub Run(asyncMethod As Action)
Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
Try
Dim syncCtx As New AsyncVoidSynchronizationContext()
SynchronizationContext.SetSynchronizationContext(syncCtx)
syncCtx.OperationStarted()
Try
asyncMethod()
Catch
syncCtx.Complete()
Throw
Finally
syncCtx.OperationCompleted()
End Try
syncCtx.RunOnCurrentThread()
Finally
SynchronizationContext.SetSynchronizationContext(prevCtx)
End Try
End Sub
End Class
Class AsyncVoidSynchronizationContext
Inherits SynchronizationContext
Private ReadOnly _queue As New _
BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()
Private _operationCount As Integer
Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
_queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
End Sub
Public Overrides Sub OperationStarted()
Interlocked.Increment(_operationCount)
End Sub
Public Overrides Sub OperationCompleted()
If Interlocked.Decrement(_operationCount) = 0 Then
Complete()
End If
End Sub
Public Sub RunOnCurrentThread()
Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
While _queue.TryTake(workItem, Timeout.Infinite)
workItem.Key.Invoke(workItem.Value)
End While
End Sub
Public Sub Complete()
_queue.CompleteAdding()
End Sub
End Class
Po włączeniu śledzenia operacji pompa kończy działanie tylko wtedy, gdy wszystkie niedokończone metody asynchroniczne void zostaną ukończone, a nie tylko zadanie najwyższego poziomu.
Zagadnienia praktyczne
-
Ryzyko zakleszczenia: Jeśli kod uruchomiony wewnątrz
AsyncPump.Runbloków działa synchronicznie (na przykład przez wywołanie.Resultlub.Wait()na zadaniu, którego kontynuacja musi zostać przekazana z powrotem do pętli zdarzeń), wątek pętli zdarzeń nie może przetworzyć tej kontynuacji zadania. Wynikiem jest impas. Ten sam problem został opisany w temacie Synchroniczne otoki dla metod asynchronicznych. - Wydajność: Pompa jednowątkowa ogranicza przepływność do jednego wątku. Stosuj tę metodę tylko wtedy, gdy przywiązanie wątku ma znaczenie.
- Międzyplatformowość: Pokazana tutaj implementacja używa tylko typów z przestrzeni nazw
System.Collections.ConcurrentiSystem.Threading. Działa na wszystkich platformach, które .NET obsługuje.