Partekatu honen bidez:


Tutorial: Generación y consumo de flujos asincrónicos mediante C# y .NET

Los flujos asincrónicos modelan un flujo continuo de datos. Los flujos de datos suelen recuperar o generar elementos de forma asincrónica. Proporcionan un modelo de programación natural para orígenes de datos de streaming asincrónicos.

En este tutorial, aprenderá a:

  • Cree un origen de datos que genere una secuencia de elementos de datos de forma asincrónica.
  • Consumir ese origen de datos de forma asincrónica.
  • Admitir la cancelación y los contextos capturados para secuencias asincrónicas.
  • Reconocer cuándo se prefiere la nueva interfaz y el origen de datos a secuencias de datos sincrónicas anteriores.

Prerrequisitos

Deberá configurar la máquina para ejecutar .NET, incluido el compilador de C#. El compilador de C# está disponible con Visual Studio 2022 o con el SDK de .NET.

Deberá crear un token de acceso de GitHub para que pueda acceder al punto de conexión de GraphQL de GitHub. Seleccione los permisos siguientes para el token de acceso de GitHub:

  • repo:status
  • public_repo

Guarde el token de acceso en un lugar seguro para poder usarlo para obtener acceso al punto de conexión de la API de GitHub.

Advertencia

Mantenga el token de acceso personal seguro. Cualquier software con el token de acceso personal podría realizar llamadas API de GitHub mediante sus derechos de acceso.

En este tutorial se da por supuesto que está familiarizado con C# y .NET, incluido Visual Studio o la CLI de .NET.

Ejecución de la aplicación de inicio

Puede obtener el código para la aplicación de inicio usada en este tutorial en el repositorio dotnet/docs de la carpeta asynchronous-programming/snippets.

La aplicación de inicio es una aplicación de consola que usa la interfaz GraphQL de GitHub para recuperar los problemas recientes escritos en el repositorio dotnet/docs . Para empezar, examine el siguiente código para el método de la aplicación de arranque Main.

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Puede establecer una GitHubKey variable de entorno en el token de acceso personal o puede reemplazar el último argumento de la llamada a GetEnvVariable por el token de acceso personal. No coloque el código de acceso en el código fuente si va a compartir el origen con otros usuarios. Nunca cargue códigos de acceso en un repositorio de origen compartido.

Después de crear el cliente de GitHub, el código de Main crea un objeto de informe de progreso y un token de cancelación. Una vez creados esos objetos, Main llama a RunPagedQueryAsync para recuperar las 250 incidencias creadas más recientes. Una vez finalizada la tarea, se muestran los resultados.

Al ejecutar la aplicación de inicio, puede realizar algunas observaciones importantes sobre cómo se ejecuta esta aplicación. Verá el progreso notificado para cada página devuelta desde GitHub. Puede observar una pausa notable antes de que GitHub devuelva cada nueva página de problemas. Por último, los problemas se muestran solo después de que se hayan recuperado las 10 páginas de GitHub.

Examen de la implementación

La implementación revela por qué ha observado el comportamiento descrito en la sección anterior. Examine el código de RunPagedQueryAsync:

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Lo primero que hace este método es crear el objeto POST mediante la GraphQLRequest clase :

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

que ayuda a formar el cuerpo del objeto POST y convertirlo correctamente en JSON presentado como una sola cadena con el método ToJsonText, que quita todos los caracteres de nueva línea del cuerpo de la solicitud que los marcan con el carácter de escape \ (barra diagonal inversa).

Vamos a centrarnos en el algoritmo de paginación y la estructura asincrónica del código anterior. (Puede consultar la documentación de GraphQL de GitHub para obtener más información sobre GraphQL API de GitHub). El RunPagedQueryAsync método enumera los problemas de más recientes a más antiguos. Solicita 25 problemas por página y examina la pageInfo estructura de la respuesta para continuar con la página anterior. Esto sigue el soporte de paginación estándar de GraphQL para respuestas de varias páginas. La respuesta incluye un pageInfo objeto que incluye un hasPreviousPages valor y un startCursor valor usado para solicitar la página anterior. Los problemas están en la nodes matriz. El RunPagedQueryAsync método anexa estos nodos a una matriz que contiene todos los resultados de todas las páginas.

Después de recuperar y restaurar una página de resultados, RunPagedQueryAsync notifica el progreso y comprueba la cancelación. Si se ha solicitado la cancelación, RunPagedQueryAsync lanza un OperationCanceledException.

Hay varios elementos en este código que se pueden mejorar. Lo más importante, RunPagedQueryAsync debe asignar el almacenamiento para todas las incidencias devueltas. Este ejemplo se detiene en 250 problemas porque recuperar todos los problemas abiertos requeriría mucho más memoria para almacenar todos los problemas recuperados. Los protocolos para admitir informes de progreso y cancelación hacen que el algoritmo sea más difícil de entender en su primera lectura. Hay más tipos y API implicados. Debe realizar un seguimiento de las comunicaciones a través de CancellationTokenSource y su CancellationToken asociado para comprender dónde se solicita la cancelación y dónde se concede.

Las secuencias asincrónicas ofrecen una manera mejor

Las secuencias asincrónicas y la compatibilidad con el idioma asociado abordan todas esas preocupaciones. El código que genera la secuencia ahora puede usar yield return para devolver elementos en un método declarado con el async modificador . Puede consumir una secuencia asincrónica mediante un await foreach bucle igual que consumir cualquier secuencia mediante un foreach bucle .

Estas nuevas características de lenguaje dependen de tres interfaces nuevas agregadas a .NET Standard 2.1 e implementadas en .NET Core 3.0:

Estas tres interfaces deberían ser familiares para la mayoría de los desarrolladores de C#. Se comportan de forma similar a sus homólogos sincrónicos:

Un tipo que podría no ser conocido es System.Threading.Tasks.ValueTask. La ValueTask estructura proporciona una API similar a la System.Threading.Tasks.Task clase . ValueTask se usa en estas interfaces por motivos de rendimiento.

Conversión en secuencias asincrónicas

A continuación, convierta el RunPagedQueryAsync método para generar una secuencia asincrónica. En primer lugar, cambie la firma de RunPagedQueryAsync para devolver un IAsyncEnumerable<JToken>y quite el token de cancelación y los objetos de progreso de la lista de parámetros, como se muestra en el código siguiente:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

El código de inicio procesa cada página a medida que se recupera la página, como se muestra en el código siguiente:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Reemplace esas tres líneas por el código siguiente:

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

También puede quitar la declaración de finalResults anteriormente en este método y la instrucción return que sigue al bucle modificado.

Has terminado los cambios para generar un flujo asíncrono. El método terminado debe ser similar al código siguiente:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

A continuación, cambie el código que utiliza la colección para usar la secuencia asincrónica. Busque el código siguiente en Main que procesa la colección de problemas:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Reemplace ese código por el siguiente await foreach bucle:

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

La nueva interfaz IAsyncEnumerator<T> deriva de IAsyncDisposable. Esto significa que el bucle anterior desechará la secuencia de forma asincrónica cuando finalice el bucle. Puede imaginar que el bucle tiene el siguiente aspecto:

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

De forma predeterminada, los elementos de secuencia se procesan en el contexto capturado. Si desea deshabilitar la captura del contexto, use el TaskAsyncEnumerableExtensions.ConfigureAwait método de extensión . Para obtener más información sobre los contextos de sincronización y la captura del contexto actual, consulte el artículo sobre el consumo del patrón asincrónico basado en tareas.

Los flujos asincrónicos admiten la cancelación mediante el mismo protocolo que otros async métodos. Modificaría la firma para el método de iterador asincrónico como se indica a continuación para admitir la cancelación:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

El atributo System.Runtime.CompilerServices.EnumeratorCancellationAttribute hace que el compilador genere código para IAsyncEnumerator<T>, lo que hace que el token pasado a GetAsyncEnumerator sea visible para el cuerpo del iterador asincrónico como ese argumento. En runQueryAsync, puede examinar el estado del token y cancelar el trabajo posterior si es necesario.

Se puede usar otro método de extensión, WithCancellation, para pasar el token de cancelación a la secuencia asincrónica. Modificaría el bucle enumerando los problemas de la siguiente manera:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

Puede obtener el código del tutorial terminado en el repositorio dotnet/docs, en la carpeta programación-asincrónica/snippets.

Ejecución de la aplicación finalizada

Vuelva a ejecutar la aplicación. Contraste su comportamiento con el comportamiento de la aplicación de inicio. La primera página de resultados se enumera en cuanto está disponible. Hay una pausa observable a medida que se solicita y recupera cada página nueva y, a continuación, los resultados de la página siguiente se enumeran rápidamente. El try / catch bloque no es necesario para controlar la cancelación: el autor de la llamada puede dejar de enumerar la colección. El progreso se notifica claramente porque la secuencia asincrónica genera resultados a medida que se descarga cada página. El estado de cada incidencia devuelta se incluye sin problemas en el bucle await foreach. No necesita un objeto callback para realizar un seguimiento del progreso.

Para ver mejoras en el uso de memoria, examine el código. Ya no es necesario asignar una colección para almacenar todos los resultados antes de enumerarse. El autor de la llamada puede determinar cómo consumir los resultados y si se necesita una colección de almacenamiento.

Ejecute las aplicaciones de inicio y finalizadas, y puede observar las diferencias entre las implementaciones por sí mismo. Puede eliminar el token de acceso de GitHub que creó al iniciar este tutorial después de que haya terminado. Si un atacante obtuvo acceso a ese token, podría acceder a las API de GitHub mediante sus credenciales.

En este tutorial, ha usado secuencias asincrónicas para leer elementos individuales de una API de red que devuelve páginas de datos. Los flujos asincrónicos también pueden leer "flujos que nunca terminan", como un teletipo de bolsa o un sensor. La llamada a MoveNextAsync devuelve el siguiente elemento tan pronto como esté disponible.