Zelfstudie: Asynchrone streams genereren en gebruiken met C# en .NET

Asynchrone streams modelleren een streamingbron van gegevens. Gegevensstromen halen vaak asynchroon elementen op of genereren ze. Ze bieden een natuurlijk programmeermodel voor asynchrone streaminggegevensbronnen.

In deze zelfstudie leert u het volgende:

  • Maak een gegevensbron die asynchroon een reeks gegevenselementen genereert.
  • Deze gegevensbron asynchroon gebruiken.
  • Ondersteuning voor annulering en vastgelegde contexten voor asynchrone streams.
  • Herkennen wanneer de nieuwe interface en gegevensbron de voorkeur hebben voor eerdere synchrone gegevensreeksen.

Vereisten

U moet uw computer instellen om .NET uit te voeren, inclusief de C#-compiler. De C#-compiler is beschikbaar met Visual Studio 2022 of de .NET SDK.

U moet een GitHub-toegangstoken maken, zodat u toegang hebt tot het GitHub GraphQL-eindpunt. Selecteer de volgende machtigingen voor uw GitHub-toegangstoken:

  • opslagplaats:status
  • public_repo

Sla het toegangstoken op een veilige plaats op, zodat u het kunt gebruiken om toegang te krijgen tot het GitHub API-eindpunt.

Waarschuwing

Houd uw persoonlijke toegangstoken veilig. Alle software met uw persoonlijke toegangstoken kan GitHub API-aanroepen maken met behulp van uw toegangsrechten.

In deze zelfstudie wordt ervan uitgegaan dat u bekend bent met C# en .NET, met inbegrip van Visual Studio of de .NET CLI.

De starterstoepassing uitvoeren

U kunt de code ophalen voor de starterstoepassing die in deze zelfstudie wordt gebruikt vanuit de dotnet/docs-opslagplaats in de map asynchroon programmeren/fragmenten .

De starterstoepassing is een consoletoepassing die gebruikmaakt van de GitHub GraphQL-interface om recente problemen op te halen die zijn geschreven in de dotnet/docs-opslagplaats . Bekijk eerst de volgende code voor de methode voor de starter-app Main :

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

U kunt een GitHubKey omgevingsvariabele instellen op uw persoonlijke toegangstoken of u kunt het laatste argument in de aanroep GetEnvVariable vervangen door uw persoonlijke toegangstoken. Plaats uw toegangscode niet in de broncode als u de bron met anderen deelt. Upload nooit toegangscodes naar een gedeelde bronopslagplaats.

Nadat u de GitHub-client hebt gemaakt, wordt met de code een Main voortgangsrapportageobject en een annuleringstoken gemaakt. Zodra deze objecten zijn gemaakt, Main worden aanroepen RunPagedQueryAsync om de meest recente 250 gemaakte problemen op te halen. Nadat deze taak is voltooid, worden de resultaten weergegeven.

Wanneer u de starterstoepassing uitvoert, kunt u belangrijke opmerkingen maken over hoe deze toepassing wordt uitgevoerd. U ziet de voortgang die wordt gerapporteerd voor elke pagina die wordt geretourneerd vanuit GitHub. U kunt een merkbare pauze observeren voordat GitHub elke nieuwe pagina met problemen retourneert. Ten slotte worden de problemen pas weergegeven nadat alle 10 pagina's zijn opgehaald uit GitHub.

De implementatie onderzoeken

De implementatie laat zien waarom u het gedrag hebt waargenomen dat in de vorige sectie is besproken. Bekijk de code voor RunPagedQueryAsync:

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Het eerste wat deze methode doet, is het maken van het POST-object met behulp van de GraphQLRequest klasse:

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

dit helpt bij het vormen van de hoofdtekst van het POST-object en deze correct te converteren naar JSON die wordt weergegeven als één tekenreeks met de ToJsonText methode, waardoor alle nieuwe regeltekens uit de hoofdtekst van uw aanvraag worden verwijderd die deze markeren met het \ escapeteken (backslash).

Laten we ons concentreren op het pagingalgoritme en de asynchrone structuur van de voorgaande code. (U kunt de GitHub GraphQL-documentatie voor meer informatie over de GitHub GraphQL-API.) De RunPagedQueryAsync methode bevat een opsomming van de problemen van de meest recente naar de oudste. Er worden 25 problemen per pagina aangevraagd en de pageInfo structuur van het antwoord onderzocht om door te gaan met de vorige pagina. Dit volgt de standaard paging-ondersteuning van GraphQL voor antwoorden met meerdere pagina's. Het antwoord bevat een pageInfo object met een hasPreviousPages waarde en een startCursor waarde die wordt gebruikt om de vorige pagina aan te vragen. De problemen bevinden zich in de nodes matrix. De RunPagedQueryAsync methode voegt deze knooppunten toe aan een matrix die alle resultaten van alle pagina's bevat.

Nadat u een pagina met resultaten hebt opgehaald en hersteld, RunPagedQueryAsync wordt de voortgang gerapporteerd en wordt gecontroleerd op annulering. Als annulering is aangevraagd, RunPagedQueryAsync gooit een OperationCanceledException.

Er zijn verschillende elementen in deze code die kunnen worden verbeterd. Het belangrijkste is dat RunPagedQueryAsync opslag moet worden toegewezen voor alle geretourneerde problemen. Dit voorbeeld stopt bij 250 problemen omdat het ophalen van alle geopende problemen veel meer geheugen vereist om alle opgehaalde problemen op te slaan. De protocollen voor het ondersteunen van voortgangsrapporten en annuleringen maken het algoritme moeilijker te begrijpen in de eerste lezing. Er zijn meer typen en API's betrokken. U moet de communicatie traceren via de CancellationTokenSource en de bijbehorende CancellationToken informatie om te begrijpen waar annulering wordt aangevraagd en waar deze wordt verleend.

Asynchrone streams bieden een betere manier

Asynchrone streams en de bijbehorende taalondersteuning pakken al deze problemen aan. De code die de reeks genereert, kan nu worden gebruikt yield return om elementen te retourneren in een methode die is gedeclareerd met de async wijzigingsfunctie. U kunt een asynchrone stroom gebruiken met behulp van een await foreach lus, net zoals u elke reeks gebruikt met behulp van een foreach lus.

Deze nieuwe taalfuncties zijn afhankelijk van drie nieuwe interfaces die zijn toegevoegd aan .NET Standard 2.1 en geïmplementeerd in .NET Core 3.0:

Deze drie interfaces moeten bekend zijn bij de meeste C#-ontwikkelaars. Ze gedragen zich op een manier die vergelijkbaar is met hun synchrone tegenhangers:

Een type dat misschien onbekend is, is System.Threading.Tasks.ValueTask. De ValueTask struct biedt een vergelijkbare API als de System.Threading.Tasks.Task klasse. ValueTask wordt gebruikt in deze interfaces om prestatieredenen.

Converteren naar asynchrone streams

Converteer vervolgens de RunPagedQueryAsync methode om een asynchrone stream te genereren. Wijzig eerst de handtekening van het retourneren van RunPagedQueryAsync een IAsyncEnumerable<JToken>, en verwijder het annuleringstoken en voortgangsobjecten uit de lijst met parameters, zoals wordt weergegeven in de volgende code:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

De starterscode verwerkt elke pagina terwijl de pagina wordt opgehaald, zoals wordt weergegeven in de volgende code:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Vervang deze drie regels door de volgende code:

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

U kunt ook de declaratie van finalResults eerder in deze methode verwijderen en de return instructie die volgt op de lus die u hebt gewijzigd.

U hebt de wijzigingen voltooid om een asynchrone stream te genereren. De voltooide methode moet lijken op de volgende code:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Vervolgens wijzigt u de code die de verzameling gebruikt om de asynchrone stroom te gebruiken. Zoek de volgende code in Main die de verzameling problemen verwerkt:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Vervang die code door de volgende await foreach lus:

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

De nieuwe interface IAsyncEnumerator<T> is afgeleid van IAsyncDisposable. Dat betekent dat de voorgaande lus de stroom asynchroon verwijderd wanneer de lus is voltooid. U kunt zich voorstellen dat de lus eruitziet als de volgende code:

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

Stream-elementen worden standaard verwerkt in de vastgelegde context. Als u het vastleggen van de context wilt uitschakelen, gebruikt u de TaskAsyncEnumerableExtensions.ConfigureAwait extensiemethode. Zie het artikel over het gebruik van het asynchrone patroon op basis van taken voor meer informatie over synchronisatiecontexten en het vastleggen van de huidige context.

Asynchrone streams bieden ondersteuning voor annulering met hetzelfde protocol als andere async methoden. U wijzigt de handtekening voor de asynchrone iteratormethode als volgt ter ondersteuning van annulering:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Het System.Runtime.CompilerServices.EnumeratorCancellationAttribute kenmerk zorgt ervoor dat de compiler code genereert voor het IAsyncEnumerator<T> token dat wordt doorgegeven aan GetAsyncEnumerator de hoofdtekst van de asynchrone iterator als dat argument. Binnen runQueryAsynckunt u de status van het token onderzoeken en verdere werkzaamheden annuleren als dit wordt aangevraagd.

U gebruikt een andere extensiemethode WithCancellationom het annuleringstoken door te geven aan de asynchrone stream. U zou de lus als volgt wijzigen om de problemen op te sommen:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

U kunt de code voor de voltooide zelfstudie ophalen uit de dotnet/docs-opslagplaats in de map asynchroon programmeren/fragmenten .

De voltooide toepassing uitvoeren

Voer de toepassing opnieuw uit. Vergelijk het gedrag ervan met het gedrag van de starterstoepassing. De eerste pagina met resultaten wordt geïnventariseerd zodra deze beschikbaar is. Er is een waarneembare pauze wanneer elke nieuwe pagina wordt aangevraagd en opgehaald, waarna de resultaten van de volgende pagina snel worden geïnventariseerd. Het try / catch blok is niet nodig om annulering af te handelen: de beller kan stoppen met het inventariseren van de verzameling. De voortgang wordt duidelijk gerapporteerd omdat de asynchrone stream resultaten genereert wanneer elke pagina wordt gedownload. De status voor elk geretourneerd probleem wordt naadloos opgenomen in de await foreach lus. U hebt geen callback-object nodig om de voortgang bij te houden.

U kunt verbeteringen in het geheugengebruik zien door de code te onderzoeken. U hoeft geen verzameling meer toe te wijzen om alle resultaten op te slaan voordat ze worden geïnventariseerd. De beller kan bepalen hoe de resultaten moeten worden verbruikt en of een opslagverzameling nodig is.

Voer zowel de starterstoepassingen als voltooide toepassingen uit en u kunt de verschillen tussen de implementaties voor uzelf bekijken. U kunt het GitHub-toegangstoken verwijderen dat u hebt gemaakt toen u deze zelfstudie hebt gestart nadat u klaar bent. Als een aanvaller toegang tot dat token heeft verkregen, heeft deze toegang tot GitHub-API's met behulp van uw referenties.

In deze zelfstudie hebt u asynchrone streams gebruikt om afzonderlijke items te lezen uit een netwerk-API die pagina's met gegevens retourneert. Asynchrone streams kunnen ook lezen uit 'nooit eindigende streams' zoals een aandelen ticker of sensorapparaat. De oproep om MoveNextAsync het volgende item te retourneren zodra het beschikbaar is.