Tutorial: Generieren und Nutzen asynchroner Datenströme mit C# und .NET
Mit asynchronen Datenströmen wird eine Streamingdatenquelle modelliert. In Datenströmen werden Elemente häufig asynchron abgerufen oder generiert. Sie stellen ein intuitives Programmiermodell für asynchrone Streamingdatenquellen bereit.
In diesem Tutorial lernen Sie, wie die folgenden Aufgaben ausgeführt werden:
- Erstellen einer Datenquelle, die eine Sequenz von Datenelementen asynchron generiert
- Asynchrones Nutzen dieser Datenquelle
- Unterstützung für Abbruchvorgänge und erfasste Kontexte für asynchrone Streams
- Erkennen, wenn die neue Schnittstelle und Datenquelle früheren synchronen Datensequenzen vorgezogen werden
Voraussetzungen
Sie müssen Ihren Computer für das Ausführen von .NET einschließlich des C#-Compilers einrichten. Der C#-Compiler ist mit Visual Studio 2022 oder dem .NET SDK verfügbar.
Sie müssen ein GitHub-Zugriffstoken erstellen, damit Sie auf den GitHub GraphQL-Endpunkt zugreifen können. Wählen Sie die folgenden Berechtigungen für Ihr GitHub-Zugriffstoken aus:
- repo:status
- public_repo
Speichern Sie das Zugriffstoken an einem sicheren Ort, damit Sie es für den Zugriff auf den GitHub-API-Endpunkt verwenden können.
Warnung
Schützen Sie Ihr persönliches Zugriffstoken. Jede Software mit Ihrem persönlichen Zugriffstoken kann mit Ihren Zugriffsrechten GitHub-API-Aufrufe ausführen.
In diesem Tutorial wird davon ausgegangen, dass Sie mit C# und .NET vertraut sind (einschließlich Visual Studio oder der .NET-CLI).
Ausführen der Startanwendung
Sie können den Code für die in diesem Tutorial verwendete Startanwendung aus unserem Repository dotnet/docs im Ordner asynchronous-programming/snippets abrufen.
Die Startanwendung ist eine Konsolenanwendung, die die GitHub GraphQL-Schnittstelle zum Abrufen aktueller Issues verwendet, die in das Repository dotnet/docs geschrieben wurden. Sehen Sie sich zunächst folgenden Code für die Main
-Methode der Starter-App an:
static async Task Main(string[] args)
{
//Follow these steps to create a GitHub Access Token
// https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
//Select the following permissions for your GitHub Access Token:
// - repo:status
// - public_repo
// Replace the 3rd parameter to the following code with your GitHub access token.
var key = GetEnvVariable("GitHubKey",
"You must store your GitHub key in the 'GitHubKey' environment variable",
"");
var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
{
Credentials = new Octokit.Credentials(key)
};
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
}
Sie können entweder eine GitHubKey
-Umgebungsvariable auf Ihr persönliches Zugriffstoken festlegen, oder Sie können das letzte Argument im Aufruf von GetEnvVariable
durch Ihr persönliches Zugriffstoken ersetzen. Fügen Sie Ihren Zugriffscode nicht in den Quellcode ein, wenn Sie die Quelle für andere freigeben. Laden Sie Zugriffscodes niemals in ein freigegebenes Quellrepository hoch.
Nach dem Erstellen des GitHub-Clients werden durch den Code in Main
ein Objekt für Fortschrittsberichte und ein Abbruchtoken erstellt. Nachdem die Objekte erstellt wurden, wird RunPagedQueryAsync
durch Main
aufgerufen, um die neuesten 250 Issues abzurufen. Nach Abschluss dieser Aufgabe werden die Ergebnisse angezeigt.
Bei Ausführung der Startanwendung können Sie einige wichtige Details zur Ausführung dieser Anwendung beobachten. Für jede von GitHub zurückgegebene Seite wird der Fortschritt gemeldet. Sie können eine deutliche Pause beobachten, bevor GitHub eine weitere neue Seite mit Issues zurückgibt. Die Issues werden erst angezeigt, nachdem alle zehn Seiten aus GitHub abgerufen wurden.
Untersuchen der Implementierung
Die Implementierung zeigt, warum Sie das im vorherigen Abschnitt beschriebene Verhalten beobachten konnten. Untersuchen Sie den Code für RunPagedQueryAsync
:
private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
JArray finalResults = new JArray();
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
}
return finalResults;
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
Die erste Methode besteht darin, das POST-Objekt mithilfe der GraphQLRequest
Klasse zu erstellen:
public class GraphQLRequest
{
[JsonProperty("query")]
public string? Query { get; set; }
[JsonProperty("variables")]
public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();
public string ToJsonText() =>
JsonConvert.SerializeObject(this);
}
dies hilft, den POST-Objekttext zu bilden und ihn korrekt in JSON zu konvertieren, die als einzelne Zeichenfolge mit der ToJsonText
Methode dargestellt wird, wodurch alle Zeilenumbruchzeichen aus dem Anforderungstext entfernt werden, der sie mit dem \
Escapezeichen (umgekehrter Schrägstrich) markiert.
Konzentrieren wir uns auf den Paginierungsalgorithmus und die asynchrone Struktur des obigen Codes. (Details zur GitHub GraphQL-API finden Sie in der GitHub GraphQL-Dokumentation.) Die RunPagedQueryAsync
-Methode listet die Issues vom neuesten zum ältesten auf. Sie fordert 25 Issues pro Seite an und untersucht die pageInfo
-Struktur der Antwort, um mit der vorherigen Seite fortzufahren. Dies entspricht der GraphQL-Standardpaginierungsunterstützung für mehrseitige Antworten. Die Antwort enthält ein pageInfo
-Objekt mit einem hasPreviousPages
-Wert und einem startCursor
-Wert zum Anfordern der vorherigen Seite. Die Issues befinden sich im nodes
-Array. Die RunPagedQueryAsync
-Methode fügt diese Knoten einem Array an, das alle Ergebnisse aus allen Seiten enthält.
Nach dem Abrufen und Wiederherstellen einer Seite mit Ergebnissen meldet RunPagedQueryAsync
den Fortschritt und prüft auf Abbruch. Wenn ein Abbruch angefordert wurde, löst RunPagedQueryAsync
eine OperationCanceledException aus.
Es gibt mehrere Elemente in diesem Code, die verbessert werden können. Vor allem muss RunPagedQueryAsync
Speicherplatz für alle zurückgegebenen Issues zuordnen. In diesem Beispiel wird der Vorgang bei 250 Issues beendet, weil das Abrufen aller offenen Issues wesentlich mehr Arbeitsspeicher zum Speichern aller abgerufenen Issues erfordern würde. Der Algorithmus ist durch die Protokolle zur Unterstützung von Fortschrittsberichten und Abbruchvorgängen beim ersten Lesen schwieriger zu verstehen. Es sind mehr Typen und APIs beteiligt. Außerdem müssen Sie die Kommunikation über CancellationTokenSource und die zugehörige CancellationToken-Struktur verfolgen, um nachzuvollziehen, wo der Abbruch angefordert und wo er gewährt wird.
Bessere Möglichkeiten durch asynchrone Datenströme
Mit asynchronen Datenströmen und der zugehörigen Sprachunterstützung lassen sich diese Probleme beheben. Der Code, der die Sequenz generiert, kann mit yield return
jetzt Elemente in einer Methode zurückgeben, die mit dem async
-Modifizierer deklariert wurde. Sie können einen asynchronen Datenstrom mit einer await foreach
-Schleife genau so nutzen, wie Sie eine beliebige Sequenz mit einer foreach
-Schleife einsetzen.
Diese neuen Sprachfeatures hängen von drei neuen Schnittstellen ab, die dem .NET Standard 2.1 hinzugefügt und in .NET Core 3.0 implementiert wurden:
- System.Collections.Generic.IAsyncEnumerable<T>
- System.Collections.Generic.IAsyncEnumerator<T>
- System.IAsyncDisposable
Diese drei Schnittstellen sollten den meisten C#-Entwicklern vertraut sein. Sie verhalten sich ähnlich wie ihre synchronen Gegenstücke:
- System.Collections.Generic.IEnumerable<T>
- System.Collections.Generic.IEnumerator<T>
- System.IDisposable
Ein möglicherweise weniger bekannter Typ ist System.Threading.Tasks.ValueTask. Die ValueTask
-Struktur bietet eine ähnliche API für die System.Threading.Tasks.Task-Klasse. ValueTask
wird in diesen Schnittstellen aus Leistungsgründen verwendet.
Konvertieren in asynchrone Datenströme
Als Nächstes konvertieren Sie die RunPagedQueryAsync
-Methode, um einen asynchronen Datenstrom zu generieren. Ändern Sie zunächst die Signatur von RunPagedQueryAsync
so, dass ein IAsyncEnumerable<JToken>
zurückgegeben wird, und entfernen Sie das Abbruchtoken und die Fortschrittsobjekte aus der Parameterliste, wie im folgenden Code gezeigt:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
Der Startcode verarbeitet die einzelnen Seiten, während sie abgerufen werden, wie im folgenden Code gezeigt:
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
Ersetzen Sie diese drei Zeilen durch den folgenden Code:
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
Sie können auch die Deklaration von finalResults
weiter oben in dieser Methode sowie die return
-Anweisung entfernen, die der von Ihnen geänderten Schleife folgt.
Sie haben die Änderungen zum Generieren eines asynchronen Datenstroms abgeschlossen. Die fertige Methode sollte in etwa dem folgenden Code entsprechen:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
Als Nächstes ändern Sie den Code, der die Sammlung nutzt, um den asynchronen Datenstrom zu verwenden. Suchen Sie in Main
den folgenden Code, der die Sammlung der Issues verarbeitet:
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
Ersetzen Sie den Code durch die folgende await foreach
-Schleife:
int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
Die neue Schnittstelle IAsyncEnumerator<T> leitet von IAsyncDisposable ab. Das bedeutet, dass die vorhergehende Schleife den Stream asynchron löscht, wenn die Schleife beendet wird. Die Schleife ähnelt dem folgenden Code:
int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
while (await enumerator.MoveNextAsync())
{
var issue = enumerator.Current;
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
} finally
{
if (enumerator != null)
await enumerator.DisposeAsync();
}
Standardmäßig werden Streamelemente im erfassten Kontext verarbeitet. Wenn Sie die Erfassung des Kontexts deaktivieren möchten, verwenden Sie die Erweiterungsmethode TaskAsyncEnumerableExtensions.ConfigureAwait. Weitere Informationen über Synchronisierungskontexte und die Erfassung des aktuellen Kontexts finden Sie im Artikel über das Verwenden des aufgabenbasierten asynchronen Musters.
Asynchrone Streams unterstützen Abbruchvorgänge mithilfe desselben Protokolls wie andere async
-Methoden. Ändern Sie die Signatur für die asynchrone Iteratormethode folgendermaßen, damit Abbruchvorgänge unterstützt werden:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
Das System.Runtime.CompilerServices.EnumeratorCancellationAttribute-Attribut bewirkt, dass der Compiler Code für IAsyncEnumerator<T> generiert, der dazu führt, dass das an GetAsyncEnumerator
übergebene Token für den Text des asynchronen Iterators als dieses Argument sichtbar ist. In runQueryAsync
können Sie den Status des Tokens überprüfen und sich ggf. weitere Arbeit sparen.
Eine andere Erweiterungsmethode (WithCancellation) wird verwendet, um das Abbruchtoken an den asynchronen Stream zu übergeben. Ändern Sie die Schleife, die die Issues enumeriert, folgendermaßen:
private static async Task EnumerateWithCancellation(GitHubClient client)
{
int num = 0;
var cancellation = new CancellationTokenSource();
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
.WithCancellation(cancellation.Token))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
}
Sie können den Code für das abgeschlossene Tutorial aus unserem Repository dotnet/docs im Ordner asynchronous-programming/snippets abrufen.
Ausführen der fertig gestellten Anwendung
Führen Sie die Anwendung erneut aus. Vergleichen Sie deren Verhalten mit dem Verhalten der Startanwendung. Die erste Seite mit Ergebnissen wird aufgelistet, sobald sie verfügbar ist. Es gibt eine wahrnehmbare Pause, wenn eine neue Seite angefordert und abgerufen wird, und dann werden die Ergebnisse der nächsten Seite schnell aufgelistet. Der try
/ catch
-Block ist zur Verarbeitung eines Abbruchs nicht erforderlich: Der Aufrufer kann das Auflisten der Sammlung beenden. Der Fortschritt wird deutlich gemeldet, weil der asynchrone Datenstrom die Ergebnisse generiert, während die einzelnen Seiten heruntergeladen werden. Der Status jedes zurückgegebenen Problems ist nahtlos in der await foreach
-Schleife enthalten. Sie benötigen kein Rückrufobjekt, um den Fortschritt nachzuverfolgen.
Sie können Verbesserungen in der Arbeitsspeichernutzung erkennen, indem Sie den Code untersuchen. Sie müssen eine Sammlung nicht mehr zuordnen, um alle Ergebnisse zu speichern, bevor sie aufgelistet werden. Der Aufrufer kann festlegen, wie die Ergebnisse genutzt werden und ob eine Speichersammlung erforderlich ist.
Führen Sie sowohl die Startanwendung als auch die fertig gestellte Anwendung aus, um die Unterschiede zwischen den Implementierungen selbst zu beobachten. Wenn Sie fertig sind, können Sie das zu Beginn des Tutorials erstellte GitHub-Zugriffstoken löschen. Wenn ein Angreifer Zugriff auf dieses Token erlangt hat, kann er mit Ihren Anmeldeinformationen auf GitHub-APIs zugreifen.
In diesem Tutorial haben Sie asynchrone Streams verwendet, um einzelne Elemente aus einer Netzwerk-API zu lesen, die Seiten mit Daten zurückgibt. Asynchrone Streams können auch aus endlosen Streams wie einem Aktienticker oder einem Sensorgerät lesen. Der Aufruf von MoveNextAsync
gibt das nächste Element zurück, sobald es verfügbar ist.