Tutoriel : Générer et consommer des flux asynchrones à l’aide de C# et .NET

Les flux asynchrones modélisent une source de données de streaming. Les flux de données récupèrent ou génèrent des éléments de façon asynchrone. Ils fournissent un modèle de programmation naturel pour les sources de données de diffusion en continu asynchrones.

Ce didacticiel vous montre comment effectuer les opérations suivantes :

  • créer une source de données qui génère une séquence d’éléments de données de façon asynchrone ;
  • consommer cette source de données de façon asynchrone ;
  • Prise en charge des contextes d’annulation et de capture pour les flux asynchrones.
  • reconnaître quand l’interface et la source de données nouvelles sont préférables aux séquences de données synchrones précédentes.

Prérequis

Vous devez configurer votre ordinateur pour exécuter .NET, y compris le compilateur C#. Le compilateur C# est disponible avec Visual Studio 2022 ou le Kit de développement logiciel (SDK) .NET.

Vous devrez créer un jeton d’accès GitHub afin de pouvoir accéder au point de terminaison GitHub GraphQL. Sélectionnez les autorisations suivantes pour votre jeton d’accès GitHub :

  • repo:status
  • public_repo

Enregistrez le jeton d’accès à un endroit sûr afin de pouvoir l’utiliser pour accéder au point de terminaison de l’API GitHub.

Avertissement

Sécurisez votre jeton d’accès personnel. Tous les logiciels disposant de votre jeton d’accès personnel peuvent effectuer des appels d’API GitHub à l’aide de vos droits d’accès.

Ce didacticiel suppose que vous êtes familiarisé avec C# et .NET, y compris Visual Studio ou l’interface CLI .NET.

Exécutez l’application de démarrage

Vous pouvez obtenir le code de l’application de démarrage utilisée dans ce didacticiel à partir du référentiel dotnet/docs dans le dossier csharp/whats-new/tutorials .

L’application de démarrage est une application console qui utilise l’interface GitHub GraphQL pour récupérer des problèmes récents écrits dans le référentiel dotnet/docs. Commencez par examiner le code suivant pour la méthode Main de l’application de démarrage :

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Vous pouvez soit définir une variable d’environnement GitHubKey sur votre jeton d’accès personnel, soit remplacer le dernier argument dans l’appel par GetEnvVariable avec votre jeton d’accès personnel. Ne placez pas votre code d’accès dans le code source si vous partagerez la source avec d’autres personnes. Ne chargez jamais les codes d’accès dans un référentiel source partagé.

Après la création du client de GitHub, le code dans Main crée un objet de rapport de progression et un jeton d’annulation. Une fois que ces objets sont créés, Main appelle RunPagedQueryAsync pour récupérer les 250 problèmes créés les plus récents. Une fois cette tâche terminée, les résultats sont affichés.

Lorsque vous exécutez l’application de démarrage, vous pouvez faire quelques observations importantes concernant son fonctionnement. Vous voyez la progression signalée pour chaque page retournée à partir de GitHub. Vous pouvez observer un temps de pause avant le retour de chaque nouvelle page de problèmes par GitHub. Enfin, les problèmes ne sont affichés que lorsque les 10 pages ont été récupérées à partir de GitHub.

Examinez l’implémentation

L’implémentation révèle pourquoi vous avez observé le comportement décrit dans la section précédente. Examinez the code for RunPagedQueryAsync :

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Concentrons-nous sur l’algorithme de pagination et sur la structure asynchrone du code précédent. (Vous pouvez consulter la documentation GitHub GraphQL pour plus d’informations sur l’API GitHub GraphQL.) La RunPagedQueryAsync méthode énumère les problèmes les plus récents et les plus anciens. Elle a besoin de 25 problèmes par page et examine la structure pageInfo de la réponse pour continuer avec la page précédente. Cela suit la prise en charge standard de la pagination de GraphQL pour les réponses multipages. La réponse inclut un objet pageInfo qui contient une valeur hasPreviousPages et une valeur startCursor utilisées pour demander la page précédente. Les problèmes se trouvent dans le tableau nodes. La méthode RunPagedQueryAsync ajoute ces nœuds à un tableau qui contient tous les résultats de toutes les pages.

Après la récupération et la restauration d’une page de résultats, RunPagedQueryAsync signale la progression et vérifie l’annulation. Si l’annulation a été demandée, RunPagedQueryAsync lève une OperationCanceledException.

Plusieurs éléments de ce code peuvent être améliorés. Plus important encore, RunPagedQueryAsync doit allouer du stockage pour tous les problèmes retournés. Cet exemple s’arrête à 250 problèmes, car la récupération de tous les problèmes ouverts nécessiterait beaucoup plus de mémoire pour stocker tous les problèmes récupérées. Les protocoles permettant de prendre en charge les rapports de progression et l’annulation rendent l’algorithme plus difficile à comprendre lors de sa première lecture. D’autres types et API sont impliqués. Vous devez suivre les communications par le biais de la CancellationTokenSource communication et son associé CancellationToken pour comprendre où l’annulation est demandée et où elle est accordée.

Les flux asynchrones sont mieux adaptés

Les flux asynchrones et la prise en charge associée du langage résolvent tous ces problèmes. Le code qui génère la séquence peut désormais utiliser yield return pour retourner des éléments dans une méthode qui a été déclarée avec le modificateur async. Vous pouvez consommer un flux asynchrone à l’aide une boucle await foreach tout comme vous consommez n’importe quelle séquence à l’aide d’une boucle foreach.

Ces nouvelles fonctionnalités de langage dépendent de trois nouvelles interfaces ajoutées à .NET Standard 2.1 et implémentées dans .NET Core 3.0 :

Ces trois interfaces sont très certainement familières à la plupart des développeurs C#. Elles se comportent de manière similaire à leurs équivalents synchrones :

Il est possible que le type System.Threading.Tasks.ValueTask ne soit pas familier. Le struct ValueTask fournit une API similaire à la classe System.Threading.Tasks.Task. ValueTask est utilisé dans ces interfaces pour des raisons de performances.

Convertir en flux asynchrones

Ensuite, convertissez la méthode RunPagedQueryAsync pour générer un flux asynchrone. Tout d’abord, modifiez la signature de RunPagedQueryAsync pour retourner un IAsyncEnumerable<JToken> et supprimer les objets de jeton d’annulation et de progression de la liste de paramètres comme indiqué dans le code suivant :

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

Le code de démarrage traite chaque page lorsqu’elle est récupérée, comme indiqué dans le code suivant :

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Remplacez ces trois lignes par le code suivant :

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

Vous pouvez également supprimer la déclaration de finalResults plus tôt dans cette méthode et l’instruction return qui suit la boucle que vous avez modifiée.

Vous avez terminé les modifications permettant de générer un flux asynchrone. La méthode terminée doit ressembler au code suivant :

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Ensuite, vous modifiez le code qui utilise la collection pour consommer le flux de données asynchrone. Recherchez dans Main le code suivant, qui traite l’ensemble des problèmes :

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Remplacez-le par la boucle await foreach suivante :

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

La nouvelle interface IAsyncEnumerator<T> dérive de IAsyncDisposable. Cela signifie que la boucle précédente supprime de façon asynchrone le flux lorsque la boucle se termine. Vous pouvez imaginer que la boucle ressemble au code suivant :

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetEnumeratorAsync();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

Par défaut, les éléments de flux sont traités dans le contexte capturé. Si vous souhaitez désactiver la capture du contexte, utilisez la TaskAsyncEnumerableExtensions.ConfigureAwait méthode d’extension. Pour plus d’informations sur les contextes de synchronisation et la capture du contexte actuel, consultez l’article sur l’utilisation du modèle asynchrone basé sur les tâches.

Les flux asynchrones prennent en charge l’annulation à l’aide du même protocole que d’autres async méthodes. Vous modifiez la signature de la méthode itérateur asynchrone comme suit pour prendre en charge l’annulation :

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

L’attribut EnumeratorCancellationAttribute provoque la génération du code pour le IAsyncEnumerator<T> jeton qui rend le jeton transmis au GetAsyncEnumerator corps de l’itérateur asynchrone comme cet argument. À l’intérieur runQueryAsync, vous pouvez examiner l’état du jeton et annuler un travail supplémentaire si demandé.

Vous utilisez une autre méthode d’extension, WithCancellationpour transmettre le jeton d’annulation au flux asynchrone. Vous modifiez la boucle en énumérant les problèmes comme suit :

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

Vous pouvez obtenir le code du didacticiel terminé à partir du référentiel dotnet/docs dans le dossier csharp/whats-new/tutorials .

Exécutez l'application terminée

Exécutez de nouveau l'application. Comparez son comportement avec le comportement de l’application de démarrage. La première page de résultats est énumérée dès qu’elle est disponible. Une pause peut être observée lorsque chaque nouvelle page est demandée et récupérée, puis les résultats de la page suivante sont rapidement énumérés. Le bloc try / catch n’est pas nécessaire pour gérer l’annulation : l’appelant peut arrêter l’énumération de la collection. Le rapport de progression est clair, car le flux asynchrone génère des résultats à mesure que chaque page est téléchargée. L’état de chaque problème retourné est inclus en toute transparence dans la await foreach boucle. Vous n’avez pas besoin d’un objet de rappel pour suivre la progression.

Vous pouvez voir des améliorations lors de l’utilisation de mémoire en examinant le code. Vous n’avez plus besoin d’allouer une collection pour stocker tous les résultats avant qu’ils ne soient énumérés. L’appelant peut déterminer comment utiliser les résultats et si une collection de stockage est nécessaire.

Exécutez les applications de démarrage et les applications terminées. Ceci vous permettra d’observer les différences entre les implémentations pour vous. À la fin de ce tutoriel, vous pouvez supprimer le jeton d’accès GitHub que vous avez créé au début. Si un attaquant arrive à accéder à ce jeton, il pourrait accéder aux API GitHub à l’aide de vos informations d’identification.