Kurz: Generování a využívání asynchronních streamů pomocí C# a .NET

Asynchronní streamy modelují streamované zdroje dat. Datové proudy často načítají nebo generují prvky asynchronně. Poskytují přirozený programovací model pro asynchronní streamované zdroje dat.

V tomto kurzu se naučíte:

  • Vytvořte zdroj dat, který asynchronně generuje sekvenci datových prvků.
  • Tento zdroj dat spotřebovávejte asynchronně.
  • Podpora zrušení a zachycených kontextů pro asynchronní streamy
  • Rozpoznáte, kdy se nové rozhraní a zdroj dat preferují před dřívějšími synchronními sekvencemi dat.

Požadavky

Budete muset nastavit počítač tak, aby běžel na platformě .NET, včetně kompilátoru jazyka C#. Kompilátor jazyka C# je k dispozici se sadou Visual Studio 2022 nebo sadou .NET SDK.

Budete muset vytvořit přístupový token GitHubu, abyste měli přístup ke koncovému bodu GitHub GraphQL. Pro přístupový token GitHubu vyberte následující oprávnění:

  • úložiště:status
  • public_repo

Přístupový token uložte na bezpečném místě, abyste ho mohli použít k získání přístupu ke koncovému bodu rozhraní API GitHubu.

Upozorňující

Zabezpečte svůj osobní přístupový token. Jakýkoli software s vaším osobním přístupovým tokenem může volat rozhraní API GitHubu pomocí vašich přístupových práv.

V tomto kurzu se předpokládá, že znáte C# a .NET, včetně sady Visual Studio nebo rozhraní příkazového řádku .NET.

Spuštění úvodní aplikace

Kód pro úvodní aplikaci použitou v tomto kurzu můžete získat z úložiště dotnet/docs ve složce asynchronního programování/fragmentů kódu .

Úvodní aplikace je konzolová aplikace, která používá rozhraní GitHub GraphQL k načtení nedávných problémů napsaných v úložišti dotnet/docs . Začněte tím, že se podíváte na následující kód počáteční metody aplikace Main :

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Proměnnou GitHubKey prostředí můžete buď nastavit na svůj osobní přístupový token, nebo můžete nahradit poslední argument voláním GetEnvVariable pomocí vašeho osobního přístupového tokenu. Pokud zdroj budete sdílet s ostatními, nevkládejte svůj přístupový kód do zdrojového kódu. Nikdy neodesílejte přístupové kódy do sdíleného zdrojového úložiště.

Po vytvoření klienta GitHubu vytvoří kód v Main objektu pro generování sestav průběhu a token zrušení. Jakmile se tyto objekty vytvoří, Main volání RunPagedQueryAsync pro načtení nejnovějších 250 vytvořených problémů. Po dokončení úkolu se zobrazí výsledky.

Při spuštění úvodní aplikace můžete provést několik důležitých pozorování o tom, jak tato aplikace běží. Zobrazí se průběh jednotlivých stránek vrácených z GitHubu. Než GitHub vrátí každou novou stránku problémů, můžete si všimnout znatelné pozastavení. Nakonec se problémy zobrazí až po načtení všech 10 stránek z GitHubu.

Prozkoumání implementace

Implementace ukazuje, proč jste zaznamenali chování, které jsme probírali v předchozí části. Prozkoumejte kód pro RunPagedQueryAsync:

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

První věc, kterou tato metoda dělá, je vytvořit objekt POST pomocí GraphQLRequest třídy:

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

který pomáhá vytvořit tělo objektu POST a správně ho převést na JSON prezentovaný jako jediný řetězec s ToJsonText metodou, která odebere všechny znaky nového řádku z textu požadavku označující je řídicím znakem \ (zpětné lomítko).

Pojďme se soustředit na stránkovací algoritmus a asynchronní strukturu předchozího kódu. (Můžete se obrátit na Dokumentace ke službě GitHub GraphQL pro podrobnosti o rozhraní GRAPHQL API GitHubu.) Metoda RunPagedQueryAsync vytvoří výčet problémů od nejnovějších po nejstarší. Žádá o 25 problémů na stránku a zkoumá pageInfo strukturu odpovědi, aby pokračovala na předchozí stránce. To se řídí standardní podporou stránkování GraphQL pro odpovědi na více stránek. Odpověď obsahuje pageInfo objekt, který obsahuje hasPreviousPages hodnotu a startCursor hodnotu použitou k vyžádání předchozí stránky. Problémy jsou v nodes poli. Metoda RunPagedQueryAsync připojí tyto uzly k poli, které obsahuje všechny výsledky ze všech stránek.

Po načtení a obnovení stránky výsledků RunPagedQueryAsync hlásí průběh a kontroluje zrušení. Pokud bylo požadováno zrušení, RunPagedQueryAsync vyvolá chybu OperationCanceledException.

V tomto kódu je několik prvků, které lze vylepšit. Nejdůležitější je RunPagedQueryAsync přidělit úložiště pro všechny vrácené problémy. Tato ukázka se zastaví na 250 problémech, protože načtení všech otevřených problémů by vyžadovalo mnohem více paměti pro uložení všech načtených problémů. Protokoly pro podporu zpráv o průběhu a zrušení znesnadňuje pochopení algoritmu při prvním čtení. Jsou zapojeny další typy a rozhraní API. Pokud chcete zjistit, kde je požadováno zrušení a kde je uděleno, musíte trasovat komunikaci prostřednictvím CancellationTokenSource a související CancellationToken komunikace.

Asynchronní streamy poskytují lepší způsob

Všechny tyto obavy řeší asynchronní streamy a přidružená podpora jazyků. Kód, který vygeneruje sekvenci, teď může použít yield return k vrácení prvků v metodě, která byla deklarována pomocí modifikátoru async . Asynchronní datový proud můžete využívat pomocí await foreach smyčky stejně jako jakoukoli sekvenci pomocí smyčky foreach .

Tyto nové jazykové funkce závisí na třech nových rozhraních přidaných do .NET Standard 2.1 a implementovaných v .NET Core 3.0:

Tato tři rozhraní by měla být pro většinu vývojářů v jazyce C# známá. Chovají se podobným způsobem jako jejich synchronní protějšky:

Jeden typ, který může být neznámý je System.Threading.Tasks.ValueTask. Struktura ValueTask poskytuje podobné rozhraní API jako System.Threading.Tasks.Task třída. ValueTask v těchto rozhraních se používá z důvodů výkonu.

Převod na asynchronní streamy

Dále převeďte metodu RunPagedQueryAsync tak, aby vygenerovala asynchronní datový proud. Nejprve změňte podpis na vrácení tokenu RunPagedQueryAsyncIAsyncEnumerable<JToken>a odeberte token zrušení a průběh objektů ze seznamu parametrů, jak je znázorněno v následujícím kódu:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

Počáteční kód zpracovává každou stránku při načítání stránky, jak je znázorněno v následujícím kódu:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Nahraďte tyto tři řádky následujícím kódem:

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

Můžete také odebrat deklaraci finalResults dříve v této metodě a return příkaz, který následuje za smyčkou, kterou jste upravili.

Dokončili jste změny vygenerování asynchronního streamu. Hotová metoda by měla vypadat podobně jako v následujícím kódu:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Dále změníte kód, který využívá kolekci, aby spotřebovávat asynchronní datový proud. Vyhledejte následující kód, Main který zpracovává kolekci problémů:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Nahraďte tento kód následující await foreach smyčkou:

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

Nové rozhraní IAsyncEnumerator<T> je odvozeno z IAsyncDisposable. To znamená, že předchozí smyčka asynchronně odstraní stream po dokončení smyčky. Můžete si představit, že smyčka vypadá jako následující kód:

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

Ve výchozím nastavení se prvky streamu zpracovávají v zachyceného kontextu. Pokud chcete zakázat zachytávání kontextu, použijte metodu TaskAsyncEnumerableExtensions.ConfigureAwait rozšíření. Další informace o kontextech synchronizace a zachycení aktuálního kontextu najdete v článku o využívání asynchronního vzoru založeného na úlohách.

Asynchronní streamy podporují zrušení pomocí stejného protokolu jako jiné async metody. Podpis pro metodu asynchronního iterátoru byste upravili následujícím způsobem, aby podporoval zrušení:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Atribut System.Runtime.CompilerServices.EnumeratorCancellationAttribute způsobí, že kompilátor vygeneruje kód, IAsyncEnumerator<T> který token předá GetAsyncEnumerator do těla asynchronního iterátoru jako tento argument. Uvnitř runQueryAsyncmůžete prozkoumat stav tokenu a v případě potřeby zrušit další práci.

K předání tokenu zrušení do asynchronního streamu použijete jinou metodu WithCancellationrozšíření. Smyčku výčtu těchto problémů byste upravili následujícím způsobem:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

Kód dokončeného kurzu můžete získat z úložiště dotnet/docs ve složce asynchronní-programování/fragmenty kódu .

Spuštění dokončené aplikace

Spusťte aplikaci znovu. Porovnejte jeho chování s chováním úvodní aplikace. První stránka výsledků se zobrazí, jakmile bude k dispozici. Když se vyžaduje a načte každá nová stránka, je pozorovatelná pozastavení, výsledky další stránky se rychle vyčíslí. Blok try / catch není potřeba ke zpracování zrušení: volající může zastavit výčet kolekce. Průběh je jasně hlášen, protože asynchronní datový proud generuje výsledky při stahování každé stránky. Stav pro každý vrácený problém je bezproblémově zahrnutý ve await foreach smyčce. Ke sledování průběhu nepotřebujete objekt zpětného volání.

Vylepšení využití paměti můžete zobrazit prozkoumáním kódu. Už nemusíte přidělovat kolekci, aby se ukládaly všechny výsledky, než se vytvoří výčet. Volající může určit, jak využívat výsledky a jestli je potřeba kolekce úložiště.

Spusťte úvodní i dokončené aplikace a můžete sledovat rozdíly mezi implementacemi pro sebe. Po dokončení tohoto kurzu můžete odstranit přístupový token GitHubu, který jste vytvořili. Pokud útočník získal přístup k danému tokenu, mohl by přistupovat k rozhraním API GitHubu pomocí vašich přihlašovacích údajů.

V tomto kurzu jste použili asynchronní streamy ke čtení jednotlivých položek ze síťového rozhraní API, které vrací stránky dat. Asynchronní streamy se také můžou číst z "nikdy nekončících datových proudů", jako je burzovní ticker nebo senzorové zařízení. Volání, které vrátí MoveNextAsync další položku, jakmile bude k dispozici.