Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Async streams modell a streaming source of data. Datenströme rufen häufig Elemente asynchron ab oder generieren sie. Sie stellen ein natürliches Programmiermodell für asynchrone Streamingdatenquellen bereit.
In diesem Tutorial lernen Sie, wie Sie:
- Erstellen Sie eine Datenquelle, die eine Abfolge von Datenelementen asynchron generiert.
- Nutzen Sie diese Datenquelle asynchron.
- Unterstützung von Abbruch- und erfassten Kontexten für asynchrone Datenströme.
- Erkennen Sie, wann die neue Schnittstelle und Datenquelle früheren synchronen Datensequenzen bevorzugt werden.
Voraussetzungen
Sie müssen Ihren Computer so einrichten, dass .NET ausgeführt wird, einschließlich des C#-Compilers. Der C#-Compiler ist mit Visual Studio 2022- oder dem .NET SDK-verfügbar.
Sie müssen ein GitHub-Zugriffstoken erstellen, damit Sie auf den GitHub GraphQL-Endpunkt zugreifen können. Wählen Sie die folgenden Berechtigungen für Ihr GitHub-Zugriffstoken aus:
- Repo:Status
- öffentliches_Repo
Speichern Sie das Zugriffstoken an einem sicheren Ort, damit Sie es verwenden können, um Zugriff auf den GitHub-API-Endpunkt zu erhalten.
Warnung
Schützen Sie Ihr persönliches Zugriffstoken. Jede Software mit Ihrem persönlichen Zugriffstoken kann GitHub-API-Aufrufe mit Ihren Zugriffsrechten durchführen.
In diesem Tutorial wird davon ausgegangen, dass Sie mit C# und .NET vertraut sind (einschließlich Visual Studio oder der .NET-CLI).
Ausführen der Startanwendung
Sie können den Code für die In diesem Lernprogramm verwendete Startanwendung aus dem Dotnet/docs-Repository im Ordner "asynchrone Programmierung/Codeausschnitte " abrufen.
Die Startanwendung ist eine Konsolenanwendung, die die GitHub GraphQL-Schnittstelle verwendet, um aktuelle Probleme abzurufen, die im dotnet/docs-Repository geschrieben wurden. Sehen Sie sich zunächst den folgenden Code für die Start-App-Methode Main
an:
static async Task Main(string[] args)
{
//Follow these steps to create a GitHub Access Token
// https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
//Select the following permissions for your GitHub Access Token:
// - repo:status
// - public_repo
// Replace the 3rd parameter to the following code with your GitHub access token.
var key = GetEnvVariable("GitHubKey",
"You must store your GitHub key in the 'GitHubKey' environment variable",
"");
var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
{
Credentials = new Octokit.Credentials(key)
};
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
}
Sie können entweder eine GitHubKey
Umgebungsvariable auf Ihr persönliches Zugriffstoken festlegen oder das letzte Argument im Aufruf GetEnvVariable
durch Ihr persönliches Zugriffstoken ersetzen. Platzieren Sie Ihren Zugriffscode nicht im Quellcode, wenn Sie die Quelle für andere freigeben. Laden Sie niemals Zugriffscodes in ein freigegebenes Quell-Repository hoch.
Nach dem Erstellen des GitHub-Clients erstellt der Code Main
ein Statusberichtsobjekt und ein Abbruchtoken. Sobald diese Objekte erstellt wurden, Main
rufen Aufrufe RunPagedQueryAsync
zum Abrufen der letzten 250 erstellten Probleme auf. Nachdem dieser Vorgang abgeschlossen ist, werden die Ergebnisse angezeigt.
Wenn Sie die Startanwendung ausführen, können Sie einige wichtige Beobachtungen dazu machen, wie diese Anwendung ausgeführt wird. Für jede von GitHub zurückgegebene Seite wird der Fortschritt angezeigt. Sie können eine spürbare Pause beobachten, bevor GitHub jede neue Seite mit Problemen zurückgibt. Schließlich werden die Probleme erst angezeigt, nachdem alle 10 Seiten von GitHub abgerufen wurden.
Untersuchen der Implementierung
Die Implementierung zeigt, warum Sie das im vorherigen Abschnitt erläuterte Verhalten beobachtet haben. Überprüfen Sie den Code für RunPagedQueryAsync
:
private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
JArray finalResults = new JArray();
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
}
return finalResults;
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
Die erste Methode besteht darin, das POST-Objekt mithilfe der GraphQLRequest
Klasse zu erstellen:
public class GraphQLRequest
{
[JsonProperty("query")]
public string? Query { get; set; }
[JsonProperty("variables")]
public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();
public string ToJsonText() =>
JsonConvert.SerializeObject(this);
}
dies hilft, den POST-Objekttext zu bilden und ihn korrekt in JSON zu konvertieren, die als einzelne Zeichenfolge mit der ToJsonText
Methode dargestellt wird, wodurch alle Zeilenumbruchzeichen aus dem Anforderungstext entfernt werden, der sie mit dem \
Escapezeichen (umgekehrter Schrägstrich) markiert.
Konzentrieren wir uns auf den Pagingalgorithmus und die asynchrone Struktur des vorherigen Codes. (Ausführliche Informationen zur GitHub GraphQL-API finden Sie in der GitHub GraphQL-Dokumentation .) Die RunPagedQueryAsync
Methode listet die Probleme von der neuesten bis zur ältesten auf. Es fordert 25 Probleme pro Seite an und untersucht die pageInfo
Struktur der Antwort, um mit der vorherigen Seite fortzufahren. Dies folgt der Standardmäßigen Paging-Unterstützung von GraphQL für Mehrseitenantworten. Die Antwort enthält ein pageInfo
Objekt, das einen hasPreviousPages
Wert und einen startCursor
Wert enthält, der zum Anfordern der vorherigen Seite verwendet wird. Die Probleme befinden sich im nodes
Array. Die RunPagedQueryAsync
Methode fügt diese Knoten an ein Array an, das alle Ergebnisse aller Seiten enthält.
Nach dem Abrufen und Wiederherstellen einer Seite mit Ergebnissen RunPagedQueryAsync
meldet der Fortschritt und die Überprüfung auf den Abbruch. Wenn eine Abbruch angefordert wurde, RunPagedQueryAsync
wird ein OperationCanceledException.
In diesem Code gibt es mehrere Elemente, die verbessert werden können. Am wichtigsten ist, RunPagedQueryAsync
dass der Speicher für alle zurückgegebenen Probleme zugewiesen werden muss. In diesem Beispiel werden 250 Probleme beendet, da beim Abrufen aller offenen Probleme viel mehr Arbeitsspeicher erforderlich wäre, um alle abgerufenen Probleme zu speichern. Die Protokolle für die Unterstützung von Fortschrittsberichten und -abbruch machen den Algorithmus bei der ersten Lesung schwieriger zu verstehen. Weitere Typen und APIs sind beteiligt. Sie müssen die Kommunikation über die CancellationTokenSource und deren Zugehörigen CancellationToken nachverfolgen, um zu verstehen, wo die Kündigung angefordert wird und wo sie gewährt wird.
Async-Streams bieten eine bessere Möglichkeit
Async-Streams und die zugehörige Sprachunterstützung behandeln alle diese Bedenken. Der Code, der die Sequenz generiert, kann jetzt verwendet werden yield return
, um Elemente in einer Methode zurückzugeben, die mit dem async
Modifizierer deklariert wurde. Sie können einen asynchronen Datenstrom mit einer await foreach
Schleife genauso nutzen, wie Sie jede Sequenz mit einer foreach
Schleife nutzen.
Diese neuen Sprachfeatures sind von drei neuen Schnittstellen abhängig, die .NET Standard 2.1 hinzugefügt und in .NET Core 3.0 implementiert wurden:
- System.Collections.Generic.IAsyncEnumerable<T>
- System.Collections.Generic.IAsyncEnumerator<T>
- System.IAsyncDisposable
Diese drei Schnittstellen sollten den meisten C#-Entwicklern vertraut sein. Sie verhalten sich ähnlich wie bei ihren synchronen Gegenstücken:
- System.Collections.Generic.IEnumerable<T>
- System.Collections.Generic.IEnumerator<T>
- System.IDisposable
Ein Typ, der möglicherweise nicht vertraut ist, ist System.Threading.Tasks.ValueTask. Die ValueTask
Struktur stellt eine ähnliche API wie die System.Threading.Tasks.Task Klasse bereit.
ValueTask
wird aus Leistungsgründen in diesen Schnittstellen verwendet.
In asynchrone Datenströme konvertieren
Konvertieren Sie als Nächstes die RunPagedQueryAsync
Methode, um einen asynchronen Datenstrom zu generieren. Ändern Sie zunächst die Signatur, um RunPagedQueryAsync
ein IAsyncEnumerable<JToken>
Objekt zurückzugeben, und entfernen Sie das Abbruchtoken und Fortschrittsobjekte aus der Parameterliste, wie im folgenden Code gezeigt:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
Der Startcode verarbeitet jede Seite, während die Seite abgerufen wird, wie im folgenden Code dargestellt:
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
Ersetzen Sie diese drei Zeilen durch den folgenden Code:
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
Sie können auch die Deklaration früher finalResults
in dieser Methode und die Anweisung entfernen, die return
auf die von Ihnen geänderte Schleife folgt.
Sie haben die Änderungen abgeschlossen, um einen asynchronen Datenstrom zu generieren. Die fertige Methode sollte dem folgenden Code ähneln:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
Als Nächstes ändern Sie den Code, der die Sammlung verwendet, um den asynchronen Datenstrom zu nutzen. Suchen Sie den folgenden Code, in Main
dem die Sammlung von Problemen verarbeitet wird:
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
Ersetzen Sie diesen Code durch die folgende await foreach
Schleife:
int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
Die neue Schnittstelle IAsyncEnumerator<T> wird von IAsyncDisposable. Dies bedeutet, dass die vorhergehende Schleife den Datenstrom asynchron verwerfen wird, wenn die Schleife abgeschlossen ist. Sie können sich vorstellen, dass die Schleife wie der folgende Code aussieht:
int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
while (await enumerator.MoveNextAsync())
{
var issue = enumerator.Current;
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
} finally
{
if (enumerator != null)
await enumerator.DisposeAsync();
}
Streamelemente werden standardmäßig im erfassten Kontext verarbeitet. Wenn Sie die Erfassung des Kontexts deaktivieren möchten, verwenden Sie die TaskAsyncEnumerableExtensions.ConfigureAwait Erweiterungsmethode. Weitere Informationen zu Synchronisierungskontexten und zum Erfassen des aktuellen Kontexts finden Sie im Artikel zum Verwenden des aufgabenbasierten asynchronen Musters.
Async-Streams unterstützen den Abbruch mithilfe desselben Protokolls wie andere async
Methoden. Sie würden die Signatur für die asynchrone Iteratormethode wie folgt ändern, um den Abbruch zu unterstützen:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
Das System.Runtime.CompilerServices.EnumeratorCancellationAttribute Attribut bewirkt, dass der Compiler Code für das IAsyncEnumerator<T> Token generiert, das an den Textkörper des asynchronen Iterators als dieses Argument übergeben wird GetAsyncEnumerator
. Innerhalb runQueryAsync
können Sie den Status des Tokens untersuchen und bei Bedarf weitere Arbeiten abbrechen.
Sie verwenden eine andere Erweiterungsmethode, WithCancellationum das Abbruchtoken an den asynchronen Datenstrom zu übergeben. Sie würden die Enumerierung der Probleme wie folgt ändern:
private static async Task EnumerateWithCancellation(GitHubClient client)
{
int num = 0;
var cancellation = new CancellationTokenSource();
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
.WithCancellation(cancellation.Token))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
}
Sie können den Code für das fertige Lernprogramm aus dem Dotnet/docs-Repository im Ordner "asynchrone Programmierung/Codeausschnitte " abrufen.
Ausführen der fertigen Anwendung
Erneutes Ausführen der Anwendung Kontrast zu seinem Verhalten mit dem Verhalten der Startanwendung. Die erste Seite der Ergebnisse wird aufgezählt, sobald sie verfügbar ist. Es gibt eine feststellbare Pause, während jede neue Seite angefordert und abgerufen wird, dann werden die Ergebnisse der nächsten Seite schnell aufgezählt. Der Block ist nicht zum Behandeln des Abbruchs erforderlich: Der try
/ catch
Aufrufer kann die Aufzählung der Auflistung beenden. Der Fortschritt wird eindeutig gemeldet, da der asynchrone Datenstrom Ergebnisse generiert, sobald jede Seite heruntergeladen wird. Der Status für jedes zurückgegebene Problem ist nahtlos in der await foreach
Schleife enthalten. Sie benötigen kein Rückrufobjekt, um den Fortschritt nachzuverfolgen.
Sie können Verbesserungen der Speichernutzung sehen, indem Sie den Code untersuchen. Sie müssen eine Sammlung nicht mehr zuweisen, um alle Ergebnisse zu speichern, bevor sie aufgezählt werden. Der Aufrufer kann bestimmen, wie die Ergebnisse verwendet werden und ob eine Speicherauflistung erforderlich ist.
Führen Sie sowohl die Start- als auch die fertigen Anwendungen aus, und Sie können die Unterschiede zwischen den Implementierungen selbst beobachten. Sie können das GitHub-Zugriffstoken löschen, das Sie erstellt haben, nachdem Sie dieses Lernprogramm gestartet haben. Wenn ein Angreifer Zugriff auf dieses Token erlangt hat, kann er mithilfe Ihrer Anmeldeinformationen auf GitHub-APIs zugreifen.
In diesem Lernprogramm haben Sie asynchrone Datenströme zum Lesen einzelner Elemente aus einer Netzwerk-API verwendet, die Datenseiten zurückgibt. Asynchrone Streams können auch aus "Nie endenden Datenströmen" wie einem Aktienticker oder Sensorgerät lesen. Der Aufruf, um MoveNextAsync
das nächste Element zurückgibt, sobald es verfügbar ist.