Öğretici: C# ve .NET kullanarak zaman uyumsuz akışlar oluşturma ve kullanma

Zaman uyumsuz akışlar, bir veri akışı kaynağını modeller . Veri akışları genellikle öğeleri zaman uyumsuz olarak alır veya oluşturur. Zaman uyumsuz akış veri kaynakları için doğal bir programlama modeli sağlar.

Bu öğreticide aşağıdakilerin nasıl yapılacağını öğreneceksiniz:

  • Zaman uyumsuz olarak bir dizi veri öğesi oluşturan bir veri kaynağı oluşturun.
  • Bu veri kaynağını zaman uyumsuz olarak tüketin.
  • Zaman uyumsuz akışlar için iptali ve yakalanan bağlamları destekler.
  • Yeni arabirimin ve veri kaynağının daha önceki zaman uyumlu veri dizilerine ne zaman tercih edildiğinden emin olun.

Önkoşullar

C# derleyicisi de dahil olmak üzere makinenizi .NET çalıştıracak şekilde ayarlamanız gerekir. C# derleyicisi Visual Studio 2022 veya .NET SDK ile kullanılabilir.

GitHub GraphQL uç noktasına erişebilmek için bir GitHub erişim belirteci oluşturmanız gerekir. GitHub Erişim Belirteciniz için aşağıdaki izinleri seçin:

  • repo:status
  • public_repo

GitHub API uç noktasına erişim elde etmek için erişim belirtecini güvenli bir yere kaydedin.

Uyarı

Kişisel erişim belirtecinizi güvende tutun. Kişisel erişim belirtecinize sahip tüm yazılımlar erişim haklarınızı kullanarak GitHub API çağrıları yapabilir.

Bu öğreticide, Visual Studio veya .NET CLI dahil olmak üzere C# ve .NET hakkında bilgi sahibi olduğunuz varsayılır.

Başlangıç uygulamasını çalıştırma

Bu öğreticide kullanılan başlangıç uygulamasının kodunu asynchronous-programming/snippets klasöründeki dotnet/docs deposundan alabilirsiniz.

Başlangıç uygulaması, dotnet/docs deposunda yazılan son sorunları almak için GitHub GraphQL arabirimini kullanan bir konsol uygulamasıdır. Başlangıç uygulaması Main yöntemi için aşağıdaki koda bakarak başlayın:

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Kişisel erişim belirtecinize bir GitHubKey ortam değişkeni ayarlayabilir veya çağrısındaki GetEnvVariable son bağımsız değişkeni kişisel erişim belirtecinizle değiştirebilirsiniz. Kaynağı başkalarıyla paylaşacaksanız, erişim kodunuzu kaynak koduna koymayın. Erişim kodlarını hiçbir zaman paylaşılan kaynak deposuna yüklemeyin.

GitHub istemcisini oluşturduktan sonra içindeki Main kod bir ilerleme raporlama nesnesi ve bir iptal belirteci oluşturur. Bu nesneler oluşturulduktan sonra, Main en son oluşturulan 250 sorunu almak için çağrılar RunPagedQueryAsync . Bu görev tamamlandıktan sonra sonuçlar görüntülenir.

Başlangıç uygulamasını çalıştırdığınızda, bu uygulamanın nasıl çalıştığı hakkında bazı önemli gözlemler yapabilirsiniz. GitHub'dan döndürülen her sayfa için ilerleme durumunun bildirildiği görürsünüz. GitHub her yeni sorun sayfasını döndürmeden önce fark edilebilir bir duraklama gözlemleyebilirsiniz. Son olarak, sorunlar yalnızca 10 sayfanın tümü GitHub'dan alındıktan sonra görüntülenir.

Uygulamayı inceleme

Uygulama, önceki bölümde açıklanan davranışı neden gözlemlediğiniz açıklanmıştır. için RunPagedQueryAsynckodu inceleyin:

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Bu yöntemin yaptığı ilk şey, sınıfını kullanarak GraphQLRequest POST nesnesini oluşturmaktır:

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

POST nesne gövdesinin oluşturulmasına yardımcı olur ve bunu yöntemiyle ToJsonText tek dize olarak sunulan JSON'a doğru bir şekilde dönüştürür ve bu da istek gövdesinden tüm yeni satır karakterlerini (ters eğik çizgi) kaçış karakteriyle \ işaretler.

Şimdi önceki kodun disk belleği algoritmasına ve zaman uyumsuz yapısına odaklanalım. (Aşağıdakilere başvurabilirsiniz: GitHub GraphQL API'sine ilişkin ayrıntılar için GitHub GraphQL belgeleri .) RunPagedQueryAsync yöntemi sorunları en son olandan en eskiye numaralandırır. Sayfa başına 25 sorun istemektedir ve önceki sayfaya devam etmek için yanıtın yapısını inceler pageInfo . Bu, GraphQL'in çok sayfalı yanıtlar için standart disk belleği desteğine uyar. Yanıt, bir pageInfohasPreviousPages değeri ve önceki sayfayı istemek için kullanılan değeri içeren bir startCursor nesnesi içerir. Sorunlar dizidedir nodes . yöntemi, RunPagedQueryAsync bu düğümleri tüm sayfalardan tüm sonuçları içeren bir diziye ekler.

Bir sonuç sayfası alınıp geri yüklendikten sonra, RunPagedQueryAsync ilerleme durumunu raporlar ve iptali denetler. İptal istendiyse, RunPagedQueryAsync bir OperationCanceledExceptionoluşturur.

Bu kodda geliştirilebilen birkaç öğe vardır. En önemlisi, RunPagedQueryAsync döndürülen tüm sorunlar için depolama ayırması gerekir. Tüm açık sorunların alınması, alınan tüm sorunların depolanması için çok daha fazla bellek gerektireceğinden bu örnek 250 sorunda durur. İlerleme raporlarını ve iptali destekleme protokolleri, algoritmanın ilk okumasında anlaşılmasını zorlaştırır. Daha fazla tür ve API söz konusudur. İptal işleminin nerede istendiği ve nerede verildiğini anlamak için ve ile ilişkili CancellationToken iletişimleri CancellationTokenSource izlemeniz gerekir.

Zaman uyumsuz akışlar daha iyi bir yol sağlar

Zaman uyumsuz akışlar ve ilişkili dil desteği tüm bu endişeleri giderir. Sırayı oluşturan kod artık değiştirici ile async bildirilen bir yöntemdeki öğeleri döndürmek için kullanabiliryield return. Döngü kullanarak zaman uyumsuz bir akışı, döngü kullanarak foreach herhangi bir await foreach diziyi tükettiğiniz gibi kullanabilirsiniz.

Bu yeni dil özellikleri, .NET Standard 2.1'e eklenen ve .NET Core 3.0'da uygulanan üç yeni arabirime bağlıdır:

Bu üç arabirim çoğu C# geliştiricisine tanıdık gelmelidir. Zaman uyumlu karşılıklarına benzer şekilde davranırlar:

Bu türlerden biri, tanınmayan bir türdür System.Threading.Tasks.ValueTask. ValueTask yapısı, sınıfına System.Threading.Tasks.Task benzer bir API sağlar. ValueTask performans nedenleriyle bu arabirimlerde kullanılır.

Zaman uyumsuz akışlara dönüştürme

Ardından, zaman uyumsuz bir akış oluşturmak için yöntemini dönüştürün RunPagedQueryAsync . İlk olarak, aşağıdaki kodda gösterildiği gibi öğesinin imzasını RunPagedQueryAsync değiştirip IAsyncEnumerable<JToken>iptal belirtecini ve ilerleme nesnelerini parametre listesinden kaldırın:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

Başlangıç kodu, aşağıdaki kodda gösterildiği gibi sayfa alınırken her sayfayı işler:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Bu üç satırı aşağıdaki kodla değiştirin:

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

Bu yöntemde önceki bildirimini finalResults ve değiştirdiğiniz döngüden return sonra gelen deyimini de kaldırabilirsiniz.

Zaman uyumsuz akış oluşturmak için değişiklikleri tamamladınız. Tamamlanmış yöntem aşağıdaki koda benzemelidir:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Ardından, zaman uyumsuz akışı kullanmak için koleksiyonu kullanan kodu değiştirirsiniz. Sorun koleksiyonunu işleyen aşağıdaki kodu Main bulun:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Bu kodu aşağıdaki await foreach döngüyle değiştirin:

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

Yeni arabirim IAsyncEnumerator<T> ' den IAsyncDisposabletüretilir. Bu, döngü tamamlandığında önceki döngünün akışı zaman uyumsuz olarak atacağı anlamına gelir. Döngünün aşağıdaki koda benzediğini düşünebilirsiniz:

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

Varsayılan olarak, akış öğeleri yakalanan bağlamda işlenir. Bağlam yakalamayı devre dışı bırakmak istiyorsanız uzantı yöntemini kullanın TaskAsyncEnumerableExtensions.ConfigureAwait . Eşitleme bağlamları ve geçerli bağlamı yakalama hakkında daha fazla bilgi için Görev tabanlı zaman uyumsuz deseni kullanma makalesine bakın.

Zaman uyumsuz akışlar, diğer async yöntemlerle aynı protokolü kullanarak iptali destekler. zaman uyumsuz yineleyici yönteminin imzasını iptali desteklemek için aşağıdaki gibi değiştirebilirsiniz:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

System.Runtime.CompilerServices.EnumeratorCancellationAttribute özniteliği, derleyicinin belirtecin zaman uyumsuz yineleyicinin gövdesine bu bağımsız değişken olarak geçirilmesini GetAsyncEnumerator sağlayan kodu IAsyncEnumerator<T> oluşturmasına neden olur. içinde runQueryAsyncbelirtecin durumunu inceleyip istenirse daha fazla çalışmayı iptal edebilirsiniz.

İptal belirtecini zaman uyumsuz akışa geçirmek için başka bir uzantı yöntemi WithCancellationkullanırsınız. Sorunları numaralandıran döngüde aşağıdaki gibi değişiklik yapabilirsiniz:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

Tamamlanmış öğreticinin kodunu asynchronous-programming/snippets klasöründeki dotnet/docs deposundan alabilirsiniz.

Tamamlanmış uygulamayı çalıştırma

Uygulamayı yeniden çalıştırın. Davranışını başlangıç uygulamasının davranışıyla karşıtlık. Sonuçların ilk sayfası, kullanılabilir olduğu anda numaralandırılır. Her yeni sayfa istenip alınırken gözlemlenebilir bir duraklama olur, ardından sonraki sayfanın sonuçları hızla numaralandırılır. blok try / catch iptali işlemek için gerekli değildir: çağıran koleksiyonu numaralandırmayı durdurabilir. Zaman uyumsuz akış her sayfa indirilirken sonuç oluşturduğundan ilerleme durumu açıkça bildirilir. Döndürülen her sorunun durumu döngüye await foreach sorunsuz bir şekilde eklenir. İlerleme durumunu izlemek için geri çağırma nesnesine ihtiyacınız yoktur.

Kodu inceleyerek bellek kullanımında iyileştirmeler görebilirsiniz. Artık tüm sonuçları numaralandırmadan önce depolamak için bir koleksiyon ayırmanız gerekmez. Çağıran, sonuçların nasıl tüketileceğini ve bir depolama koleksiyonu gerekip gerekmediğini belirleyebilir.

Hem başlangıç hem de tamamlanmış uygulamaları çalıştırın ve uygulamalar arasındaki farkları kendiniz gözlemleyebilirsiniz. bitirdikten sonra bu öğreticiyi başlattığınızda oluşturduğunuz GitHub erişim belirtecini silebilirsiniz. Bir saldırgan bu belirteçe erişim elde ederse, kimlik bilgilerinizi kullanarak GitHub API'lerine erişebilir.

Bu öğreticide, veri sayfaları döndüren bir ağ API'sinden tek tek öğeleri okumak için zaman uyumsuz akışlar kullandınız. Zaman uyumsuz akışlar, hisse senedi değerleyici veya sensör cihazı gibi "hiç bitmeyen akışlardan" da okuyabilir. çağrısı MoveNextAsync , kullanılabilir olduğunda sonraki öğeyi döndürür.