Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Asenkron akışlar veri akışı kaynağını modeller. Veri akışları genellikle öğeleri zaman uyumsuz olarak alır veya oluşturur. Zaman uyumsuz akış veri kaynakları için doğal bir programlama modeli sağlar.
Bu öğreticide aşağıdakilerin nasıl yapılacağını öğreneceksiniz:
- Zaman uyumsuz olarak bir dizi veri öğesi oluşturan bir veri kaynağı oluşturun.
- Bu veri kaynağını zaman uyumsuz olarak tüketin.
- Zaman uyumsuz akışlar için iptali ve yakalanan bağlamları destekler.
- Yeni arabirimin ve veri kaynağının daha önceki zaman uyumlu veri dizilerine ne zaman tercih edildiğinden emin olun.
Önkoşullar
C# derleyicisi de dahil olmak üzere makinenizi .NET çalıştıracak şekilde ayarlamanız gerekir. C# derleyicisi Visual Studio 2022 veya .NET SDKile kullanılabilir.
GitHub GraphQL uç noktasına erişebilmek için bir GitHub erişim belirteci oluşturmanız gerekir. GitHub Erişim Belirteciniz için aşağıdaki izinleri seçin:
- repo:status
- public_repo
GitHub API uç noktasına erişim elde etmek için erişim belirtecini güvenli bir yere kaydedin.
Uyarı
Kişisel erişim belirtecinizi güvende tutun. Kişisel erişim belirtecinize sahip tüm yazılımlar erişim haklarınızı kullanarak GitHub API çağrıları yapabilir.
Bu öğreticide, Visual Studio veya .NET CLI dahil olmak üzere C# ve .NET hakkında bilgi sahibi olduğunuz varsayılır.
Başlangıç uygulamasını çalıştırma
Bu öğreticide kullanılan başlangıç uygulamasının kodunu asynchronous-programming/snippets klasöründeki dotnet/docs deposundan alabilirsiniz.
Başlangıç uygulaması, dotnet/docs deposunda yazılan son sorunları almak için GitHub GraphQL arabirimini kullanan bir konsol uygulamasıdır. Başlangıç uygulaması Main
yöntemi için aşağıdaki koda bakarak başlayın:
static async Task Main(string[] args)
{
//Follow these steps to create a GitHub Access Token
// https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
//Select the following permissions for your GitHub Access Token:
// - repo:status
// - public_repo
// Replace the 3rd parameter to the following code with your GitHub access token.
var key = GetEnvVariable("GitHubKey",
"You must store your GitHub key in the 'GitHubKey' environment variable",
"");
var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
{
Credentials = new Octokit.Credentials(key)
};
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
}
Kişisel erişim belirtecinizi bir GitHubKey
ortam değişkeni olarak ayarlayabilir veya GetEnvVariable
çağrısında son bağımsız değişkeni kişisel erişim belirtecinizle değiştirebilirsiniz. Kaynağı başkalarıyla paylaşacaksanız, erişim kodunuzu kaynak koduna koymayın. Erişim kodlarını hiçbir zaman paylaşılan kaynak deposuna yüklemeyin.
GitHub istemcisini oluşturduktan sonra içindeki Main
kod bir ilerleme raporlama nesnesi ve bir iptal belirteci oluşturur. Bu nesneler oluşturulduktan sonra, Main
en son oluşturulan 250 konuyu almak için RunPagedQueryAsync
çağrılır. Bu görev tamamlandıktan sonra sonuçlar görüntülenir.
Başlangıç uygulamasını çalıştırdığınızda, bu uygulamanın nasıl çalıştığı hakkında bazı önemli gözlemler yapabilirsiniz. GitHub'dan döndürülen her sayfa için ilerleme durumunun bildirildiği görürsünüz. GitHub her yeni sorun sayfasını döndürmeden önce fark edilebilir bir duraklama gözlemleyebilirsiniz. Son olarak, sorunlar yalnızca 10 sayfanın tümü GitHub'dan alındıktan sonra görüntülenir.
Uygulamayı inceleme
Uygulama, önceki bölümde açıklanan davranışı neden gözlemlediğiniz açıklanmıştır. için RunPagedQueryAsync
kodu inceleyin:
private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
JArray finalResults = new JArray();
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
}
return finalResults;
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
Bu yöntemin yaptığı ilk şey, sınıfını kullanarak GraphQLRequest
POST nesnesini oluşturmaktır:
public class GraphQLRequest
{
[JsonProperty("query")]
public string? Query { get; set; }
[JsonProperty("variables")]
public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();
public string ToJsonText() =>
JsonConvert.SerializeObject(this);
}
POST nesne gövdesinin oluşturulmasına yardımcı olur ve ToJsonText
yöntemiyle bunu tek bir JSON dizesi olarak doğru bir şekilde dönüştürür. Bu yöntem, istek gövdesindeki tüm yeni satır karakterlerini "\
" (ters eğik çizgi) kaçış karakteriyle işaretleyerek kaldırır.
Şimdi önceki kodun sayfalama algoritmasına ve eşzamanlı olmayan yapısına odaklanalım. ( GitHub GraphQL API'sinin ayrıntıları için GitHub GraphQL belgelerine başvurabilirsiniz.) RunPagedQueryAsync
yöntemi sorunları en son olandan en eskiye numaralandırır. Sayfa başına 25 sorun istemekte ve önceki sayfaya devam etmek için yanıtın pageInfo
yapısını inceler. Bu, GraphQL'in çok sayfalı yanıtlar için standart sayfalama desteğine uyar. Yanıt, önceki sayfayı istemek için kullanılan bir pageInfo
değeri ve bir hasPreviousPages
değeri içeren startCursor
nesnesi içerir. Sorunlar nodes
dizisinde. yöntemi, RunPagedQueryAsync
bu düğümleri tüm sayfalardan tüm sonuçları içeren bir diziye ekler.
Bir sonuç sayfası alınıp geri yüklendikten sonra, RunPagedQueryAsync
ilerleme durumunu raporlar ve iptali denetler. İptal istendiyse, RunPagedQueryAsync
bir OperationCanceledExceptionoluşturur.
Bu kodda geliştirilebilen birkaç öğe vardır. En önemlisi, RunPagedQueryAsync
döndürülen tüm sorunlar için depolama ayırması gerekir. Tüm açık sorunların alınması, alınan tüm sorunların depolanması için çok daha fazla bellek gerektireceğinden bu örnek 250 sorunda durur. İlerleme raporlarını ve iptali destekleme protokolleri, algoritmanın ilk okumasında anlaşılmasını zorlaştırır. Daha fazla tür ve API söz konusudur. İptal talebinin nerede yapıldığını ve nerede onaylandığını anlamak için CancellationTokenSource ve buna bağlı CancellationToken üzerinden iletişimleri izlemeniz gerekir.
Zaman uyumsuz akışlar daha iyi bir yol sunar
Zaman uyumsuz akışlar ve ilişkili dil desteği tüm bu endişeleri giderir. Artık diziyi oluşturan kod, yield return
değiştiriciyle bildirilen bir yöntemle async
kullanarak öğeleri döndürebilir. Zaman uyumsuz bir akışı tıpkı herhangi bir await foreach
dizisini foreach
döngüsü kullanarak tükettiğiniz gibi bir foreach
döngüsü kullanarak tüketebilirsiniz.
Bu yeni dil özellikleri, .NET Standard 2.1'e eklenen ve .NET Core 3.0'da uygulanan üç yeni arabirime bağlıdır:
- System.Collections.Generic.IAsyncEnumerable<T>
- System.Collections.Generic.IAsyncEnumerator<T>
- System.IAsyncDisposable
Bu üç arabirim çoğu C# geliştiricisine tanıdık gelmelidir. Eşzamanlı karşılıklarına benzer şekilde davranırlar.
- System.Collections.Generic.IEnumerable<T>
- System.Collections.Generic.IEnumerator<T>
- System.IDisposable
Bu türlerden biri, tanınmayan bir türdür System.Threading.Tasks.ValueTask.
ValueTask
yapısı, sınıfına System.Threading.Tasks.Task benzer bir API sağlar.
ValueTask
performans nedenleriyle bu arabirimlerde kullanılır.
Asenkron akışlara dönüştür
Ardından, RunPagedQueryAsync
metodunu eşzamansız bir akış oluşturacak şekilde dönüştürün. İlk olarak, aşağıdaki kodda gösterildiği gibi RunPagedQueryAsync
'in imzasını IAsyncEnumerable<JToken>
döndürecek şekilde değiştirin ve iptal belirtecini ile ilerleme nesnelerini parametre listesinden kaldırın.
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
Başlangıç kodu, aşağıdaki kodda gösterildiği gibi sayfa alınırken her sayfayı işler:
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
Bu üç satırı aşağıdaki kodla değiştirin:
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
Bu yöntemde daha önceki finalResults
bildiriminin yanı sıra, değiştirdiğiniz döngüyü takip eden return
deyimini de kaldırabilirsiniz.
Eşzamansız bir akış oluşturmak için değişiklikleri tamamladınız. Tamamlanmış yöntem aşağıdaki koda benzemelidir:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
Ardından, koleksiyonu tüketen kodu, zaman uyumsuz akışı tüketmek için değiştirirsiniz. Sorun koleksiyonunu işleyen aşağıdaki kodu Main
bulun:
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
Bu kodu aşağıdaki await foreach
döngüyle değiştirin:
int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
Yeni arabirim IAsyncEnumerator<T> ' den IAsyncDisposabletüretilir. Bu, döngü tamamlandığında önceki döngünün akışı zaman uyumsuz olarak atacağı anlamına gelir. Döngünün aşağıdaki koda benzediğini düşünebilirsiniz:
int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
while (await enumerator.MoveNextAsync())
{
var issue = enumerator.Current;
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
} finally
{
if (enumerator != null)
await enumerator.DisposeAsync();
}
Varsayılan olarak, akış öğeleri yakalanan bağlamda işlenir. Bağlam yakalamayı devre dışı bırakmak istiyorsanız uzantı yöntemini kullanın TaskAsyncEnumerableExtensions.ConfigureAwait . Senkronizasyon bağlamları ve mevcut bağlamı yakalama hakkında daha fazla bilgi için Görev tabanlı zaman uyumsuz örüntüyü kullanma makalesine bakın.
Zaman uyumsuz akışlar, diğer async
yöntemlerle aynı protokolü kullanarak iptali destekler. Asenkron yineleyici yönteminin imzasını iptali desteklemek için aşağıdaki gibi değiştirirdiniz:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
System.Runtime.CompilerServices.EnumeratorCancellationAttribute özniteliği, derleyicinin, IAsyncEnumerator<T>'ye geçirilen belirteci zaman uyumsuz yineleyicinin gövdesinde bu bağımsız değişken olarak görünür kılan GetAsyncEnumerator
kodunu oluşturmasına neden olur. İçinde runQueryAsync
belirtecin durumunu inceleyip, istenirse ilerideki çalışmaları iptal edebilirsiniz.
İptal belirtecini zaman uyumsuz akışa geçirmek için, WithCancellation olarak başka bir uzantı yöntemi kullanırsınız. Sorunları numaralandıran döngüde aşağıdaki gibi değişiklik yapabilirsiniz:
private static async Task EnumerateWithCancellation(GitHubClient client)
{
int num = 0;
var cancellation = new CancellationTokenSource();
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
.WithCancellation(cancellation.Token))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
}
Tamamlanmış öğreticinin kodunu asynchronous-programming/snippets klasöründeki dotnet/docs deposundan alabilirsiniz.
Tamamlanmış uygulamayı çalıştırma
Uygulamayı yeniden çalıştırın. Davranışını başlangıç uygulamasının davranışıyla karşılaştırın. Sonuçların ilk sayfası, kullanılabilir olduğu anda numaralandırılır. Her yeni sayfa istenip alınırken gözlemlenebilir bir duraklama olur, ardından sonraki sayfanın sonuçları hızla numaralandırılır. blok try
/ catch
iptali işlemek için gerekli değildir: çağıran koleksiyonu numaralandırmayı durdurabilir. Zaman uyumsuz akış her sayfa indirildikçe sonuçlar ürettiği için ilerleme durumu açıkça bildirilir. Döndürülen her sorunun durumu döngüye await foreach
sorunsuz bir şekilde eklenir. İlerleme durumunu izlemek için geri çağırma nesnesine ihtiyacınız yoktur.
Kodu inceleyerek bellek kullanımında iyileştirmeler görebilirsiniz. Artık tüm sonuçları numaralandırmadan önce depolamak için bir koleksiyon ayırmanız gerekmez. Çağıran, sonuçların nasıl tüketileceğini ve bir depolama koleksiyonu gerekip gerekmediğini belirleyebilir.
Hem başlangıç hem de tamamlanmış uygulamaları çalıştırın ve uygulamalar arasındaki farkları kendiniz gözlemleyebilirsiniz. bitirdikten sonra bu öğreticiyi başlattığınızda oluşturduğunuz GitHub erişim belirtecini silebilirsiniz. Bir saldırgan bu belirteçe erişim elde ederse, kimlik bilgilerinizi kullanarak GitHub API'lerine erişebilir.
Bu öğreticide, veri sayfaları döndüren bir ağ API'sinden tek tek öğeleri okumak için eşzamansız akışlar kullandınız. Eş zamansız akışlar, borsa fiyat göstergesi veya sensör cihazı gibi "hiç bitmeyen akışlardan" da okuyabilir.
MoveNextAsync
çağrısı, kullanılabilir olur olmaz bir sonraki öğeyi döndürür.