Notes
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Les flux asynchrones modélisent une source de données de streaming. Les flux de données récupèrent ou génèrent souvent des éléments de façon asynchrone. Ils fournissent un modèle de programmation naturel pour les sources de données de streaming asynchrones.
Dans ce tutoriel, vous apprendrez comment le faire :
- Créez une source de données qui génère une séquence d’éléments de données de façon asynchrone.
- Utilisez cette source de données de manière asynchrone.
- Prise en charge des contextes d’annulation et de capture pour les flux asynchrones.
- Reconnaître quand la nouvelle interface et la source de données sont préférées aux séquences de données synchrones antérieures.
Conditions préalables
Vous devez configurer votre ordinateur pour exécuter .NET, y compris le compilateur C#. Le compilateur C# est disponible avec Visual Studio 2022 ou le Kit de développement logiciel (SDK) .NET.
Vous devez créer un jeton d’accès GitHub pour pouvoir accéder au point de terminaison GitHub GraphQL. Sélectionnez les autorisations suivantes pour votre jeton d’accès GitHub :
- repo :status
- public_repo
Enregistrez le jeton d’accès dans un endroit sûr afin de pouvoir l’utiliser pour accéder au point de terminaison de l’API GitHub.
Avertissement
Sécurisez votre jeton d’accès personnel. Tout logiciel avec votre jeton d’accès personnel peut effectuer des appels d’API GitHub à l’aide de vos droits d’accès.
Ce tutoriel suppose de connaître C# et .NET, y compris Visual Studio ou l’interface CLI .NET.
Exécuter l’application de démarrage
Vous pouvez obtenir le code de l’application de démarrage utilisée dans ce didacticiel à partir du référentiel dotnet/docs dans le dossier asynchrone de programmation/extraits de code .
L’application de démarrage est une application console qui utilise l’interface GitHub GraphQL pour récupérer les problèmes récents écrits dans le référentiel dotnet/docs . Commencez par examiner le code suivant pour la méthode d’application Main
de démarrage :
static async Task Main(string[] args)
{
//Follow these steps to create a GitHub Access Token
// https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
//Select the following permissions for your GitHub Access Token:
// - repo:status
// - public_repo
// Replace the 3rd parameter to the following code with your GitHub access token.
var key = GetEnvVariable("GitHubKey",
"You must store your GitHub key in the 'GitHubKey' environment variable",
"");
var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
{
Credentials = new Octokit.Credentials(key)
};
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
}
Vous pouvez définir une variable d’environnement GitHubKey
sur votre jeton d’accès personnel, ou remplacer le dernier argument dans l’appel par GetEnvVariable
votre jeton d’accès personnel. Ne placez pas votre code d’accès dans le code source si vous partagerez la source avec d’autres personnes. Ne chargez jamais les codes d’accès dans un référentiel source partagé.
Après avoir créé le client GitHub, le code dans lequel il Main
crée un objet de rapport de progression et un jeton d’annulation. Une fois ces objets créés, Main
les appels RunPagedQueryAsync
pour récupérer les 250 problèmes créés les plus récents. Une fois cette tâche terminée, les résultats sont affichés.
Lorsque vous exécutez l’application de démarrage, vous pouvez faire des observations importantes sur la façon dont cette application s’exécute. Vous verrez la progression signalée pour chaque page retournée à partir de GitHub. Vous pouvez observer une pause notable avant que GitHub retourne chaque nouvelle page de problèmes. Enfin, les problèmes s’affichent uniquement après que toutes les 10 pages ont été récupérées à partir de GitHub.
Examiner l’implémentation
L’implémentation révèle pourquoi vous avez observé le comportement abordé dans la section précédente. Examinez le code pour RunPagedQueryAsync
:
private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
JArray finalResults = new JArray();
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
}
return finalResults;
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
La première chose que cette méthode fait est de créer l’objet POST, à l’aide de la GraphQLRequest
classe :
public class GraphQLRequest
{
[JsonProperty("query")]
public string? Query { get; set; }
[JsonProperty("variables")]
public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();
public string ToJsonText() =>
JsonConvert.SerializeObject(this);
}
qui permet de former le corps de l’objet POST et de le convertir correctement en JSON présenté sous forme de chaîne unique avec la ToJsonText
méthode, ce qui supprime tous les caractères de ligne de votre corps de requête les marquant avec le \
caractère d’échappement (barre oblique inverse).
Concentrons-nous sur l’algorithme de pagination et la structure asynchrone du code précédent. (Vous pouvez consulter la documentation GitHub GraphQL pour plus d’informations sur l’API GitHub GraphQL.) La RunPagedQueryAsync
méthode énumère les problèmes les plus récents et les plus anciens. Il demande 25 problèmes par page et examine la pageInfo
structure de la réponse pour continuer avec la page précédente. Cela suit la prise en charge de la pagination standard de GraphQL pour les réponses multipage. La réponse inclut un pageInfo
objet qui inclut une hasPreviousPages
valeur et une startCursor
valeur utilisée pour demander la page précédente. Les problèmes se trouvent dans le nodes
tableau. La RunPagedQueryAsync
méthode ajoute ces nœuds à un tableau qui contient tous les résultats de toutes les pages.
Après avoir récupéré et restauré une page de résultats, RunPagedQueryAsync
signale la progression et vérifie l’annulation. Si l’annulation a été demandée, RunPagedQueryAsync
lève un OperationCanceledException.
Il existe plusieurs éléments dans ce code qui peuvent être améliorés. Plus important encore, RunPagedQueryAsync
vous devez allouer le stockage pour tous les problèmes retournés. Cet exemple s’arrête à 250 problèmes, car la récupération de tous les problèmes ouverts nécessite beaucoup plus de mémoire pour stocker tous les problèmes récupérés. Les protocoles de prise en charge des rapports de progression et de l’annulation rendent l’algorithme plus difficile à comprendre lors de sa première lecture. D’autres types et API sont impliqués. Vous devez suivre les communications par le biais des CancellationTokenSource communications et leur associé CancellationToken pour comprendre où l’annulation est demandée et où elle est accordée.
Les flux asynchrones offrent un meilleur moyen
Les flux asynchrones et la prise en charge du langage associé répondent à toutes ces préoccupations. Le code qui génère la séquence peut désormais être utilisé yield return
pour retourner des éléments dans une méthode déclarée avec le async
modificateur. Vous pouvez utiliser un flux asynchrone à l’aide d’une await foreach
boucle tout comme vous consommez n’importe quelle séquence à l’aide d’une foreach
boucle.
Ces nouvelles fonctionnalités de langage dépendent de trois nouvelles interfaces ajoutées à .NET Standard 2.1 et implémentées dans .NET Core 3.0 :
- System.Collections.Generic.IAsyncEnumerable<T>
- System.Collections.Generic.IAsyncEnumerator<T>
- System.IAsyncDisposable
Ces trois interfaces doivent être familières à la plupart des développeurs C#. Ils se comportent de manière similaire à leurs équivalents synchrones :
- System.Collections.Generic.IEnumerable<T>
- System.Collections.Generic.IEnumerator<T>
- System.IDisposable
Un type qui peut être inconnu est System.Threading.Tasks.ValueTask. Le ValueTask
struct fournit une API similaire à la System.Threading.Tasks.Task classe.
ValueTask
est utilisé dans ces interfaces pour des raisons de performances.
Convertir en flux asynchrones
Ensuite, convertissez la RunPagedQueryAsync
méthode pour générer un flux asynchrone. Tout d’abord, modifiez la signature de RunPagedQueryAsync
retour d’un jeton d’annulation IAsyncEnumerable<JToken>
et supprimez les objets de progression et de jeton d’annulation de la liste des paramètres, comme indiqué dans le code suivant :
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
Le code de démarrage traite chaque page à mesure que la page est récupérée, comme indiqué dans le code suivant :
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
Remplacez ces trois lignes par le code suivant :
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
Vous pouvez également supprimer la déclaration antérieure de finalResults
cette méthode et l’instruction return
qui suit la boucle que vous avez modifiée.
Vous avez terminé les modifications pour générer un flux asynchrone. La méthode terminée doit ressembler au code suivant :
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
Ensuite, vous modifiez le code qui consomme la collection pour consommer le flux asynchrone. Recherchez le code suivant dans Main
lequel traite la collection de problèmes :
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
Remplacez ce code par la boucle suivante await foreach
:
int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
La nouvelle interface IAsyncEnumerator<T> dérive de IAsyncDisposable. Cela signifie que la boucle précédente supprime de façon asynchrone le flux à la fin de la boucle. Vous pouvez imaginer que la boucle ressemble au code suivant :
int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
while (await enumerator.MoveNextAsync())
{
var issue = enumerator.Current;
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
} finally
{
if (enumerator != null)
await enumerator.DisposeAsync();
}
Par défaut, les éléments de flux sont traités dans le contexte capturé. Si vous souhaitez désactiver la capture du contexte, utilisez la TaskAsyncEnumerableExtensions.ConfigureAwait méthode d’extension. Pour plus d’informations sur les contextes de synchronisation et la capture du contexte actuel, consultez l’article sur l’utilisation du modèle asynchrone basé sur les tâches.
Les flux asynchrones prennent en charge l’annulation à l’aide du même protocole que d’autres async
méthodes. Vous devez modifier la signature de la méthode d’itérateur asynchrone comme suit pour prendre en charge l’annulation :
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
L’attribut System.Runtime.CompilerServices.EnumeratorCancellationAttribute provoque la génération du code du compilateur pour le IAsyncEnumerator<T> jeton qui rend le jeton passé à GetAsyncEnumerator
visible au corps de l’itérateur asynchrone en tant qu’argument. À l’intérieur runQueryAsync
, vous pouvez examiner l’état du jeton et annuler un travail supplémentaire si demandé.
Vous utilisez une autre méthode d’extension, WithCancellationpour transmettre le jeton d’annulation au flux asynchrone. Vous devez modifier la boucle en énumérant les problèmes comme suit :
private static async Task EnumerateWithCancellation(GitHubClient client)
{
int num = 0;
var cancellation = new CancellationTokenSource();
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
.WithCancellation(cancellation.Token))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
}
Vous pouvez obtenir le code du didacticiel terminé à partir du référentiel dotnet/docs dans le dossier asynchrone de programmation/extraits de code .
Exécuter l’application terminée
Exécutez de nouveau l'application. Contrastez son comportement par rapport au comportement de l’application de démarrage. La première page des résultats est énumérée dès qu’elle est disponible. Il existe une pause observable à mesure que chaque nouvelle page est demandée et récupérée, puis les résultats de la page suivante sont rapidement énumérés. Le try
/ catch
bloc n’est pas nécessaire pour gérer l’annulation : l’appelant peut arrêter l’énumération de la collection. La progression est clairement signalée, car le flux asynchrone génère des résultats à mesure que chaque page est téléchargée. L’état de chaque problème retourné est inclus en toute transparence dans la await foreach
boucle. Vous n’avez pas besoin d’un objet de rappel pour suivre la progression.
Vous pouvez voir les améliorations apportées à l’utilisation de la mémoire en examinant le code. Vous n’avez plus besoin d’allouer une collection pour stocker tous les résultats avant leur énumération. L’appelant peut déterminer comment consommer les résultats et si une collection de stockage est nécessaire.
Exécutez les applications de démarrage et de fin et vous pouvez observer les différences entre les implémentations vous-même. Vous pouvez supprimer le jeton d’accès GitHub que vous avez créé lorsque vous avez démarré ce didacticiel une fois que vous avez terminé. Si un attaquant a obtenu l’accès à ce jeton, il peut accéder aux API GitHub à l’aide de vos informations d’identification.
Dans ce tutoriel, vous avez utilisé des flux asynchrones pour lire des éléments individuels à partir d’une API réseau qui retourne des pages de données. Les flux asynchrones peuvent également lire à partir de « flux sans fin » comme un ticker boursier ou un appareil de capteur. L’appel pour MoveNextAsync
renvoyer l’élément suivant dès qu’il est disponible.