Megosztás a következőn keresztül:


Oktatóanyag: Aszinkron streamek létrehozása és felhasználása c# és .NET használatával

Az aszinkron streamek adatstreamelési forrást modellezhetnek. Az adatfolyamok gyakran aszinkron módon kérik le vagy generálják az elemeket. Természetes programozási modellt biztosítanak az aszinkron streamelési adatforrásokhoz.

Ebben az oktatóanyagban megtanulhatja, hogyan:

  • Hozzon létre egy adatforrást, amely aszinkron módon hoz létre adatelemek sorozatát.
  • Használja az adatforrást aszinkron módon.
  • Aszinkron streamek lemondásának és rögzített környezeteinek támogatása.
  • Ismerje fel, hogy az új felület és adatforrás mikor előnyösebb a korábbi szinkron adatütemezésekhez.

Előfeltételek

Be kell állítania a gépet a .NET futtatására, beleértve a C# fordítót is. A C# fordító a Visual Studio 2022-szel vagy a .NET SDK-mal érhető el.

Létre kell hoznia egy GitHub hozzáférési jogkivonatot , hogy hozzáférhessen a GitHub GraphQL-végponthoz. Válassza ki a következő engedélyeket a GitHub hozzáférési jogkivonatához:

  • adattár:állapot
  • nyilvános repó

Mentse a hozzáférési jogkivonatot biztonságos helyre, hogy ezzel hozzáférést kapjon a GitHub API-végponthoz.

Figyelmeztetés

Tartsa biztonságban a személyes hozzáférési tokent. Bármely szoftver a személyes hozzáférési jogkivonatoddal a te hozzáférési jogaiddal kezdeményezhet GitHub API-hívásokat.

Ez az oktatóanyag feltételezi, hogy ismeri a C# és a .NET használatát, beleértve a Visual Studiót vagy a .NET CLI-t is.

A kezdőalkalmazás futtatása

Az oktatóanyagban használt kezdőalkalmazás kódját az aszinkron-programozás/kódrészletek mappában található dotnet/docs-adattárból szerezheti be.

A kezdőalkalmazás egy konzolalkalmazás, amely a GitHub GraphQL felületét használja a dotnet/docs-adattárban írt legutóbbi problémák lekéréséhez. Első lépésként tekintse meg a kezdőalkalmazás-metódus Main következő kódját:

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Beállíthat egy környezeti változót GitHubKey a személyes hozzáférési jogkivonatra, vagy lecserélheti a hívás GetEnvVariable utolsó argumentumát a személyes hozzáférési jogkivonatára. Ne helyezze a hozzáférési kódot a forráskódba, ha megosztja a forrást másokkal. Soha ne töltsön fel hozzáférési kódokat egy megosztott forrásadattárba.

A GitHub-ügyfél létrehozása után a benne lévő Main kód létrehoz egy folyamatjelentési objektumot és egy lemondási jogkivonatot. Ezen objektumok létrehozásuk után a Main meghívja a RunPagedQueryAsync-et, hogy lekérje a legutóbbi 250 létrehozott problémát. A tevékenység befejezése után megjelennek az eredmények.

A kezdőalkalmazás futtatásakor fontos megfigyeléseket tehet az alkalmazás működéséről. A GitHubról visszaadott minden egyes oldal előrehaladását láthatja. Észrevehető szünet figyelhető meg, mielőtt a GitHub megjeleníti a problémák minden új oldalát. Végül a problémák csak akkor jelennek meg, ha mind a 10 oldalt lekértük a GitHubról.

A megvalósítás vizsgálata

Az implementációból kiderül, hogy miért figyelte meg az előző szakaszban tárgyalt viselkedést. Vizsgálja meg a következőhöz tartozó RunPagedQueryAsynckódot:

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

A módszer első lépéseként hozza létre a POST objektumot az GraphQLRequest osztály használatával:

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

amely segít létrehozni a POST objektum törzsét, és megfelelően átalakítani a ToJsonText metódussal egyetlen sztringként megjelenített JSON-ra. Ez a módszer eltávolítja az összes új sor karaktert a kérés törzséből, és megjelöli őket a \ (fordított perjel) feloldó karakterrel.

Koncentráljunk az előző kód lapozási algoritmusára és aszinkron szerkezetére. (A GitHub GraphQL api-val kapcsolatos részletekért tekintse meg a GitHub GraphQL dokumentációját .) A RunPagedQueryAsync metódus a legújabbtól a legrégebbiig sorolja fel a problémákat. Oldalanként 25 problémát kér, és megvizsgálja a pageInfo válasz szerkezetét, hogy az előző oldallal folytatódjon. Ez a GraphQL szabványos lapozási támogatását követi a többoldalas válaszokhoz. A válasz tartalmaz egy pageInfo objektumot, amely tartalmaz egy hasPreviousPages értéket és egy startCursor értéket, amelyet az előző oldal kéréséhez használnak. A problémák a nodes tömbben vannak. A RunPagedQueryAsync metódus hozzáfűzi ezeket a csomópontokat egy tömbhöz, amely az összes oldal összes eredményét tartalmazza.

Az eredmények egy oldalának lekérése és visszaállítása után RunPagedQueryAsync jelentést tesz a folyamat előrehaladásáról és ellenőrzi az esetleges lemondást. Ha a lemondást kérték, RunPagedQueryAsync kivételt dob, egy OperationCanceledException.

A kód több olyan elemet is tartalmazhat, amelyek javíthatók. Legfőképpen, RunPagedQueryAsync minden visszaadott problémához tárolóhelyet kell biztosítania. Ez a minta 250-nél leáll, mert az összes nyitott probléma beolvasása sokkal több memóriát igényel a beolvasott problémák tárolásához. Az előrehaladási jelentések és lemondások támogatásának protokolljai megnehezítik az algoritmus megértését az első olvasatban. További típusok és API-k is érintettek. Fel kell derítenie a kommunikációt a CancellationTokenSource és a hozzá kapcsolódó CancellationToken keresztül, hogy megérthesse, ahol kérik a lemondást, és ahol megkapják azt.

Az aszinkron streamek jobb módot biztosítanak

Az aszinkron streamek és a hozzá tartozó nyelvi támogatás az összes ilyen problémát kezelik. A sorozatot létrehozó kód mostantól yield return használhatja az elemek visszaadására egy async módosítóval ellátott metódusban. Az aszinkron streameket ugyanúgy használhatja hurkok await foreach használatával, mint bármely sorozatot egy foreach hurok használatával.

Ezek az új nyelvi funkciók a .NET Standard 2.1-hez hozzáadott és a .NET Core 3.0-ban implementált három új felülettől függnek:

Ez a három felület a legtöbb C#-fejlesztő számára ismerős lehet. A szinkron megfelelőikhez hasonló módon viselkednek:

Az egyik típus, amely ismeretlen lehet, a System.Threading.Tasks.ValueTask. A ValueTask szerkezet az osztályhoz System.Threading.Tasks.Task hasonló API-t biztosít. ValueTask ezekben az interfészekben kerül használatra a teljesítménybeli okok miatt.

Konvertálás aszinkron streamekké

Ezután konvertálja a metódust RunPagedQueryAsync aszinkron stream létrehozásához. Először módosítsa a RunPagedQueryAsync szignatúráját úgy, hogy egy IAsyncEnumerable<JToken>-t adjon vissza, és távolítsa el a törlési jogkivonatot és a folyamat objektumokat a paraméterlistából az alábbi kódban látható módon.

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

A kezdőkód az egyes oldalakat az oldal lekérésekor dolgozza fel, ahogyan az a következő kódban is látható:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Cserélje le ezt a három sort a következő kódra:

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

Az eljárásban korábban szereplő finalResults deklarációját, valamint a módosított ciklust követő return utasítást is eltávolíthatja.

Befejezte a módosításokat az aszinkron stream létrehozásához. A kész metódusnak a következő kódhoz kell hasonlítania:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Ezután módosítja a gyűjteményt használó kódot, hogy az aszinkron streamet használja. A következő kódot keresse meg a Main kódban, amely feldolgozza a problémák gyűjteményét:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

A kódot cserélje ki a következő await foreach ciklusra:

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

Az új felület IAsyncEnumerator<T> a következőből IAsyncDisposableszármazik: . Ez azt jelenti, hogy az előző ciklus aszinkron módon törli a streamet a ciklus befejeződésekor. El tudja képzelni, hogy a hurok a következő kódhoz hasonlóan néz ki:

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

Alapértelmezés szerint a streamelemek feldolgozása a rögzített környezetben történik. Ha le szeretné tiltani a környezet rögzítését, használja a TaskAsyncEnumerableExtensions.ConfigureAwait bővítménymetódust. A szinkronizálási környezetekről és az aktuális környezet rögzítéséről a tevékenységalapú aszinkron minta felhasználásáról szóló cikkben talál további információt.

Az aszinkron streamek ugyanúgy támogatják a lemondást, mint más async metódusok esetén. Az aszinkron iterátor metódus aláírását az alábbiak szerint módosítaná a lemondás támogatásához:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

A System.Runtime.CompilerServices.EnumeratorCancellationAttribute attribútum hatására a fordító olyan kódot generál, amely a IAsyncEnumerator<T> számára átadott jogkivonatot láthatóvá teszi az aszinkron iterátor törzsében, mint argumentum. Belül runQueryAsync megvizsgálhatja a token állapotát, és kérés esetén leállíthatja a további munkát.

Egy másik bővítménymetódus, WithCancellation használatával adja át a törlési tokent az aszinkron streamnek. A problémákat számba adó hurkot az alábbiak szerint módosítaná:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

A kész oktatóanyag kódját az aszinkron-programozási/kódrészletek mappában található dotnet/docs-adattárból szerezheti be.

A kész alkalmazás futtatása

Futtassa újra az alkalmazást. Hasonlítsd össze annak a viselkedését a kezdőalkalmazás viselkedésével. A találatok első oldala amint elérhető, felsorolásra kerül. Megfigyelhető szüneteltetés történik az új lapok kérése és lekérése során, majd a következő lap eredményei gyorsan számba lesznek véve. A try / catch blokk nem szükséges a megszakítás kezeléséhez: a hívó leállíthatja a gyűjtemény felsorolását. A folyamat előrehaladását egyértelműen jelenti a rendszer, mert az aszinkron stream az egyes lapok letöltésekor eredményeket hoz létre. Az egyes visszaadott problémák állapota zökkenőmentesen szerepel a await foreach ciklusban. A folyamat nyomon követéséhez nincs szükség visszahívási objektumra.

A kód vizsgálatával a memóriahasználat fejlesztései láthatók. A továbbiakban nem kell lefoglalnia egy gyűjteményt az összes eredmény tárolásához a számbavétel előtt. A hívó meghatározhatja, hogyan használhatja fel az eredményeket, és hogy szükség van-e tárgyűjteményre.

Futtassa mind a kezdő, mind a kész alkalmazásokat, és megfigyelheti az implementációk közötti különbségeket. Az oktatóanyag befejezése után törölheti a gitHub hozzáférési jogkivonatát, amelyet akkor hozott létre, amikor elindította ezt az oktatóanyagot. Ha egy támadó hozzáfér ehhez a jogkivonathoz, az Ön hitelesítő adataival férhet hozzá a GitHub API-khoz.

Ebben az oktatóanyagban aszinkron streamekkel olvasta be az egyes elemeket egy hálózati API-ból, amely adatoldalakat ad vissza. Az aszinkron streamek "soha véget nem érő streamekből" is olvashatnak, például tőzsdei ketyegőből vagy érzékelőeszközből. A MoveNextAsync hívás visszaadja a következő elemet, amint elérhető.