Aracılığıyla paylaş


Kılavuz: C# ve .NET kullanarak eşzamanlı olmayan akışlar oluşturma ve tüketme

Asenkron akışlar veri akışı kaynağını modeller. Veri akışları genellikle öğeleri zaman uyumsuz olarak alır veya oluşturur. Zaman uyumsuz akış veri kaynakları için doğal bir programlama modeli sağlar.

Bu öğreticide aşağıdakilerin nasıl yapılacağını öğreneceksiniz:

  • Zaman uyumsuz olarak bir dizi veri öğesi oluşturan bir veri kaynağı oluşturun.
  • Bu veri kaynağını zaman uyumsuz olarak tüketin.
  • Zaman uyumsuz akışlar için iptali ve yakalanan bağlamları destekler.
  • Yeni arabirimin ve veri kaynağının daha önceki zaman uyumlu veri dizilerine ne zaman tercih edildiğinden emin olun.

Önkoşullar

C# derleyicisi de dahil olmak üzere makinenizi .NET çalıştıracak şekilde ayarlamanız gerekir. C# derleyicisi Visual Studio 2022 veya .NET SDKile kullanılabilir.

GitHub GraphQL uç noktasına erişebilmek için bir GitHub erişim belirteci oluşturmanız gerekir. GitHub Erişim Belirteciniz için aşağıdaki izinleri seçin:

  • repo:status
  • public_repo

GitHub API uç noktasına erişim elde etmek için erişim belirtecini güvenli bir yere kaydedin.

Uyarı

Kişisel erişim belirtecinizi güvende tutun. Kişisel erişim belirtecinize sahip tüm yazılımlar erişim haklarınızı kullanarak GitHub API çağrıları yapabilir.

Bu öğreticide, Visual Studio veya .NET CLI dahil olmak üzere C# ve .NET hakkında bilgi sahibi olduğunuz varsayılır.

Başlangıç uygulamasını çalıştırma

Bu öğreticide kullanılan başlangıç uygulamasının kodunu asynchronous-programming/snippets klasöründeki dotnet/docs deposundan alabilirsiniz.

Başlangıç uygulaması, dotnet/docs deposunda yazılan son sorunları almak için GitHub GraphQL arabirimini kullanan bir konsol uygulamasıdır. Başlangıç uygulaması Main yöntemi için aşağıdaki koda bakarak başlayın:

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Kişisel erişim belirtecinizi bir GitHubKey ortam değişkeni olarak ayarlayabilir veya GetEnvVariable çağrısında son bağımsız değişkeni kişisel erişim belirtecinizle değiştirebilirsiniz. Kaynağı başkalarıyla paylaşacaksanız, erişim kodunuzu kaynak koduna koymayın. Erişim kodlarını hiçbir zaman paylaşılan kaynak deposuna yüklemeyin.

GitHub istemcisini oluşturduktan sonra içindeki Main kod bir ilerleme raporlama nesnesi ve bir iptal belirteci oluşturur. Bu nesneler oluşturulduktan sonra, Main en son oluşturulan 250 konuyu almak için RunPagedQueryAsync çağrılır. Bu görev tamamlandıktan sonra sonuçlar görüntülenir.

Başlangıç uygulamasını çalıştırdığınızda, bu uygulamanın nasıl çalıştığı hakkında bazı önemli gözlemler yapabilirsiniz. GitHub'dan döndürülen her sayfa için ilerleme durumunun bildirildiği görürsünüz. GitHub her yeni sorun sayfasını döndürmeden önce fark edilebilir bir duraklama gözlemleyebilirsiniz. Son olarak, sorunlar yalnızca 10 sayfanın tümü GitHub'dan alındıktan sonra görüntülenir.

Uygulamayı inceleme

Uygulama, önceki bölümde açıklanan davranışı neden gözlemlediğiniz açıklanmıştır. için RunPagedQueryAsynckodu inceleyin:

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Bu yöntemin yaptığı ilk şey, sınıfını kullanarak GraphQLRequest POST nesnesini oluşturmaktır:

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

POST nesne gövdesinin oluşturulmasına yardımcı olur ve ToJsonText yöntemiyle bunu tek bir JSON dizesi olarak doğru bir şekilde dönüştürür. Bu yöntem, istek gövdesindeki tüm yeni satır karakterlerini "\" (ters eğik çizgi) kaçış karakteriyle işaretleyerek kaldırır.

Şimdi önceki kodun sayfalama algoritmasına ve eşzamanlı olmayan yapısına odaklanalım. ( GitHub GraphQL API'sinin ayrıntıları için GitHub GraphQL belgelerine başvurabilirsiniz.) RunPagedQueryAsync yöntemi sorunları en son olandan en eskiye numaralandırır. Sayfa başına 25 sorun istemekte ve önceki sayfaya devam etmek için yanıtın pageInfo yapısını inceler. Bu, GraphQL'in çok sayfalı yanıtlar için standart sayfalama desteğine uyar. Yanıt, önceki sayfayı istemek için kullanılan bir pageInfo değeri ve bir hasPreviousPages değeri içeren startCursor nesnesi içerir. Sorunlar nodes dizisinde. yöntemi, RunPagedQueryAsync bu düğümleri tüm sayfalardan tüm sonuçları içeren bir diziye ekler.

Bir sonuç sayfası alınıp geri yüklendikten sonra, RunPagedQueryAsync ilerleme durumunu raporlar ve iptali denetler. İptal istendiyse, RunPagedQueryAsync bir OperationCanceledExceptionoluşturur.

Bu kodda geliştirilebilen birkaç öğe vardır. En önemlisi, RunPagedQueryAsync döndürülen tüm sorunlar için depolama ayırması gerekir. Tüm açık sorunların alınması, alınan tüm sorunların depolanması için çok daha fazla bellek gerektireceğinden bu örnek 250 sorunda durur. İlerleme raporlarını ve iptali destekleme protokolleri, algoritmanın ilk okumasında anlaşılmasını zorlaştırır. Daha fazla tür ve API söz konusudur. İptal talebinin nerede yapıldığını ve nerede onaylandığını anlamak için CancellationTokenSource ve buna bağlı CancellationToken üzerinden iletişimleri izlemeniz gerekir.

Zaman uyumsuz akışlar daha iyi bir yol sunar

Zaman uyumsuz akışlar ve ilişkili dil desteği tüm bu endişeleri giderir. Artık diziyi oluşturan kod, yield return değiştiriciyle bildirilen bir yöntemle async kullanarak öğeleri döndürebilir. Zaman uyumsuz bir akışı tıpkı herhangi bir await foreach dizisini foreach döngüsü kullanarak tükettiğiniz gibi bir foreach döngüsü kullanarak tüketebilirsiniz.

Bu yeni dil özellikleri, .NET Standard 2.1'e eklenen ve .NET Core 3.0'da uygulanan üç yeni arabirime bağlıdır:

Bu üç arabirim çoğu C# geliştiricisine tanıdık gelmelidir. Eşzamanlı karşılıklarına benzer şekilde davranırlar.

Bu türlerden biri, tanınmayan bir türdür System.Threading.Tasks.ValueTask. ValueTask yapısı, sınıfına System.Threading.Tasks.Task benzer bir API sağlar. ValueTask performans nedenleriyle bu arabirimlerde kullanılır.

Asenkron akışlara dönüştür

Ardından, RunPagedQueryAsync metodunu eşzamansız bir akış oluşturacak şekilde dönüştürün. İlk olarak, aşağıdaki kodda gösterildiği gibi RunPagedQueryAsync'in imzasını IAsyncEnumerable<JToken> döndürecek şekilde değiştirin ve iptal belirtecini ile ilerleme nesnelerini parametre listesinden kaldırın.

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

Başlangıç kodu, aşağıdaki kodda gösterildiği gibi sayfa alınırken her sayfayı işler:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Bu üç satırı aşağıdaki kodla değiştirin:

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

Bu yöntemde daha önceki finalResults bildiriminin yanı sıra, değiştirdiğiniz döngüyü takip eden return deyimini de kaldırabilirsiniz.

Eşzamansız bir akış oluşturmak için değişiklikleri tamamladınız. Tamamlanmış yöntem aşağıdaki koda benzemelidir:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Ardından, koleksiyonu tüketen kodu, zaman uyumsuz akışı tüketmek için değiştirirsiniz. Sorun koleksiyonunu işleyen aşağıdaki kodu Main bulun:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Bu kodu aşağıdaki await foreach döngüyle değiştirin:

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

Yeni arabirim IAsyncEnumerator<T> ' den IAsyncDisposabletüretilir. Bu, döngü tamamlandığında önceki döngünün akışı zaman uyumsuz olarak atacağı anlamına gelir. Döngünün aşağıdaki koda benzediğini düşünebilirsiniz:

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

Varsayılan olarak, akış öğeleri yakalanan bağlamda işlenir. Bağlam yakalamayı devre dışı bırakmak istiyorsanız uzantı yöntemini kullanın TaskAsyncEnumerableExtensions.ConfigureAwait . Senkronizasyon bağlamları ve mevcut bağlamı yakalama hakkında daha fazla bilgi için Görev tabanlı zaman uyumsuz örüntüyü kullanma makalesine bakın.

Zaman uyumsuz akışlar, diğer async yöntemlerle aynı protokolü kullanarak iptali destekler. Asenkron yineleyici yönteminin imzasını iptali desteklemek için aşağıdaki gibi değiştirirdiniz:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

System.Runtime.CompilerServices.EnumeratorCancellationAttribute özniteliği, derleyicinin, IAsyncEnumerator<T>'ye geçirilen belirteci zaman uyumsuz yineleyicinin gövdesinde bu bağımsız değişken olarak görünür kılan GetAsyncEnumerator kodunu oluşturmasına neden olur. İçinde runQueryAsync belirtecin durumunu inceleyip, istenirse ilerideki çalışmaları iptal edebilirsiniz.

İptal belirtecini zaman uyumsuz akışa geçirmek için, WithCancellation olarak başka bir uzantı yöntemi kullanırsınız. Sorunları numaralandıran döngüde aşağıdaki gibi değişiklik yapabilirsiniz:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

Tamamlanmış öğreticinin kodunu asynchronous-programming/snippets klasöründeki dotnet/docs deposundan alabilirsiniz.

Tamamlanmış uygulamayı çalıştırma

Uygulamayı yeniden çalıştırın. Davranışını başlangıç uygulamasının davranışıyla karşılaştırın. Sonuçların ilk sayfası, kullanılabilir olduğu anda numaralandırılır. Her yeni sayfa istenip alınırken gözlemlenebilir bir duraklama olur, ardından sonraki sayfanın sonuçları hızla numaralandırılır. blok try / catch iptali işlemek için gerekli değildir: çağıran koleksiyonu numaralandırmayı durdurabilir. Zaman uyumsuz akış her sayfa indirildikçe sonuçlar ürettiği için ilerleme durumu açıkça bildirilir. Döndürülen her sorunun durumu döngüye await foreach sorunsuz bir şekilde eklenir. İlerleme durumunu izlemek için geri çağırma nesnesine ihtiyacınız yoktur.

Kodu inceleyerek bellek kullanımında iyileştirmeler görebilirsiniz. Artık tüm sonuçları numaralandırmadan önce depolamak için bir koleksiyon ayırmanız gerekmez. Çağıran, sonuçların nasıl tüketileceğini ve bir depolama koleksiyonu gerekip gerekmediğini belirleyebilir.

Hem başlangıç hem de tamamlanmış uygulamaları çalıştırın ve uygulamalar arasındaki farkları kendiniz gözlemleyebilirsiniz. bitirdikten sonra bu öğreticiyi başlattığınızda oluşturduğunuz GitHub erişim belirtecini silebilirsiniz. Bir saldırgan bu belirteçe erişim elde ederse, kimlik bilgilerinizi kullanarak GitHub API'lerine erişebilir.

Bu öğreticide, veri sayfaları döndüren bir ağ API'sinden tek tek öğeleri okumak için eşzamansız akışlar kullandınız. Eş zamansız akışlar, borsa fiyat göstergesi veya sensör cihazı gibi "hiç bitmeyen akışlardan" da okuyabilir. MoveNextAsync çağrısı, kullanılabilir olur olmaz bir sonraki öğeyi döndürür.