Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Asynchronní streamy modelují streamované zdroje dat. Datové proudy často načítají nebo generují prvky asynchronně. Poskytují přirozený programovací model pro asynchronní streamované zdroje dat.
V tomto kurzu se naučíte:
- Vytvořte zdroj dat, který asynchronně generuje sekvenci datových prvků.
- Tento zdroj dat spotřebovávejte asynchronně.
- Podpora zrušení a zachycených kontextů pro asynchronní streamy
- Rozpoznáte, kdy se nové rozhraní a zdroj dat preferují před dřívějšími synchronními sekvencemi dat.
Požadavky
Budete muset nastavit počítač tak, aby běžel na platformě .NET, včetně kompilátoru jazyka C#. Kompilátor jazyka C# je k dispozici s Visual Studio 2022 nebo .NET SDK.
Budete muset vytvořit přístupový token GitHubu , abyste měli přístup ke koncovému bodu GitHub GraphQL. Pro přístupový token GitHubu vyberte následující oprávnění:
- úložiště:status
- public_repo
Přístupový token uložte na bezpečném místě, abyste ho mohli použít k získání přístupu ke koncovému bodu rozhraní API GitHubu.
Výstraha
Zabezpečte svůj osobní přístupový token. Jakýkoli software s vaším osobním přístupovým tokenem může využívat API GitHubu podle vašich přístupových práv.
V tomto kurzu se předpokládá, že znáte C# a .NET, včetně sady Visual Studio nebo rozhraní příkazového řádku .NET.
Spuštění úvodní aplikace
Kód pro úvodní aplikaci použitou v tomto kurzu můžete získat z úložiště dotnet/docs ve složce asynchronního programování/fragmentů kódu .
Úvodní aplikace je konzolová aplikace, která používá rozhraní GitHub GraphQL k načtení nedávných problémů napsaných v úložišti dotnet/docs . Začněte tím, že se podíváte na následující kód počáteční metody aplikace Main :
static async Task Main(string[] args)
{
//Follow these steps to create a GitHub Access Token
// https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
//Select the following permissions for your GitHub Access Token:
// - repo:status
// - public_repo
// Replace the 3rd parameter to the following code with your GitHub access token.
var key = GetEnvVariable("GitHubKey",
"You must store your GitHub key in the 'GitHubKey' environment variable",
"");
var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
{
Credentials = new Octokit.Credentials(key)
};
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
}
Proměnnou GitHubKey prostředí můžete buď nastavit na svůj osobní přístupový token, nebo můžete nahradit poslední argument voláním GetEnvVariable pomocí vašeho osobního přístupového tokenu. Pokud zdroj budete sdílet s ostatními, nevkládejte svůj přístupový kód do zdrojového kódu. Nikdy neodesílejte přístupové kódy do sdíleného zdrojového úložiště.
Po vytvoření klienta GitHubu vytvoří kód v Main objekt pro hlášení průběhu a předmět pro zrušení. Jakmile se tyto objekty vytvoří, Main volá RunPagedQueryAsync k načtení nejnovějších 250 vytvořených problémů. Po dokončení úkolu se zobrazí výsledky.
Při spuštění úvodní aplikace můžete provést několik důležitých pozorování o tom, jak tato aplikace běží. Zobrazí se průběh jednotlivých stránek vrácených z GitHubu. Než GitHub vrátí každou novou stránku problémů, můžete si všimnout znatelné pozastavení. Nakonec se problémy zobrazí až po načtení všech 10 stránek z GitHubu.
Prozkoumání implementace
Implementace ukazuje, proč jste zaznamenali chování, které jsme probírali v předchozí části. Prozkoumejte kód pro RunPagedQueryAsync:
private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
JArray finalResults = new JArray();
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
}
return finalResults;
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
První věc, kterou tato metoda dělá, je vytvořit objekt POST pomocí GraphQLRequest třídy:
public class GraphQLRequest
{
[JsonProperty("query")]
public string? Query { get; set; }
[JsonProperty("variables")]
public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();
public string ToJsonText() =>
JsonConvert.SerializeObject(this);
}
který pomáhá vytvořit tělo objektu POST a správně ho převést na JSON prezentovaný jako jediný řetězec pomocí metody ToJsonText, která odstraní všechny znaky pro nový řádek z textu žádosti a označí je escape znakem \ (zpětné lomítko).
Pojďme se soustředit na stránkovací algoritmus a asynchronní strukturu předchozího kódu. (Podrobnosti o rozhraní GraphQL api GitHubu najdete v dokumentaci k GitHubu GraphQL .) Metoda RunPagedQueryAsync vytvoří výčet problémů od nejnovějších po nejstarší. Žádá o 25 problémů na stránku a zkoumá pageInfo strukturu odpovědi, aby pokračovala na předchozí stránce. Řídí se to standardní podporou stránkování GraphQL pro vícestránkové odpovědi. Odpověď obsahuje pageInfo objekt, který obsahuje hasPreviousPages hodnotu a startCursor hodnotu použitou k vyžádání předchozí stránky. Problémy jsou v nodes poli. Metoda RunPagedQueryAsync připojí tyto uzly k poli, které obsahuje všechny výsledky ze všech stránek.
Po načtení a obnovení stránky výsledků RunPagedQueryAsync hlásí průběh a zjišťuje, zda nedošlo ke zrušení. Pokud bylo požadováno zrušení, RunPagedQueryAsync vyvolá chybu OperationCanceledException.
V tomto kódu je několik prvků, které lze vylepšit. Nejdůležitější je, aby RunPagedQueryAsync přidělilo prostor pro všechny vrácené položky. Tato ukázka se zastaví na 250 problémech, protože načtení všech otevřených problémů by vyžadovalo mnohem více paměti pro uložení všech načtených problémů. Protokoly podporující zprávy o postupu a zrušení znesnadňují pochopení algoritmu při prvním čtení. Jsou zapojeny další typy a rozhraní API. Pokud chcete zjistit, kde je požadováno zrušení a kde je uděleno, musíte trasovat komunikaci prostřednictvím CancellationTokenSource a jeho souvisejícího CancellationToken.
Asynchronní streamy poskytují lepší způsob
Všechny tyto obavy řeší asynchronní streamy a přidružená podpora jazyků. Kód, který vygeneruje sekvenci, teď může použít yield return k vrácení prvků v metodě, která byla deklarována pomocí modifikátoru async . Asynchronní datový proud můžete využívat pomocí await foreach smyčky stejně jako jakoukoli sekvenci pomocí smyčky foreach .
Tyto nové jazykové funkce závisí na třech nových rozhraních přidaných do .NET Standard 2.1 a implementovaných v .NET Core 3.0:
- System.Collections.Generic.IAsyncEnumerable<T>
- System.Collections.Generic.IAsyncEnumerator<T>
- System.IAsyncDisposable
Tato tři rozhraní by měla být pro většinu vývojářů v jazyce C# známá. Chovají se podobným způsobem jako jejich synchronní protějšky:
- System.Collections.Generic.IEnumerable<T>
- System.Collections.Generic.IEnumerator<T>
- System.IDisposable
Jeden typ, který může být neznámý je System.Threading.Tasks.ValueTask. Struktura ValueTask poskytuje podobné rozhraní API jako System.Threading.Tasks.Task třída.
ValueTask v těchto rozhraních se používá z důvodů výkonu.
Převod na asynchronní streamy
Dále převeďte metodu RunPagedQueryAsync tak, aby vygenerovala asynchronní datový proud. Nejprve změňte podpis RunPagedQueryAsync tak, aby vracel IAsyncEnumerable<JToken>, a odeberte token zrušení a objekty průběhu ze seznamu parametrů, jak je znázorněno v následujícím kódu:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
Počáteční kód zpracovává každou stránku při načítání stránky, jak je znázorněno v následujícím kódu:
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
Nahraďte tyto tři řádky následujícím kódem:
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
Můžete také dříve v této metodě odebrat deklaraci finalResults a příkaz return, který následuje za smyčkou, kterou jste upravili.
Dokončili jste změny k vytvoření asynchronního streamu. Hotová metoda by měla vypadat podobně jako v následujícím kódu:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
Dále změníte kód, který využívá sbírku, aby spotřeboval asynchronní stream. Vyhledejte následující kód, Main který zpracovává kolekci problémů:
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
Nahraďte tento kód následující await foreach smyčkou:
int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
Nové rozhraní IAsyncEnumerator<T> je odvozeno z IAsyncDisposable. To znamená, že předchozí smyčka asynchronně odstraní stream po dokončení smyčky. Můžete si představit, že smyčka vypadá jako následující kód:
int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
while (await enumerator.MoveNextAsync())
{
var issue = enumerator.Current;
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
} finally
{
if (enumerator != null)
await enumerator.DisposeAsync();
}
Ve výchozím nastavení se prvky streamu zpracovávají v zachyceného kontextu. Pokud chcete zakázat zachytávání kontextu, použijte metodu TaskAsyncEnumerableExtensions.ConfigureAwait rozšíření. Další informace o kontextech synchronizace a zachycení aktuálního kontextu najdete v článku o využívání asynchronního vzoru založeného na úlohách.
Asynchronní streamy podporují zrušení pomocí stejného protokolu jako jiné async metody. Podpis pro metodu asynchronního iterátoru byste upravili následujícím způsobem, aby podporoval zrušení:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
Atribut System.Runtime.CompilerServices.EnumeratorCancellationAttribute způsobí, že kompilátor vygeneruje kód pro IAsyncEnumerator<T>, který zajistí, že token předaný GetAsyncEnumerator bude viditelný pro tělo asynchronního iterátoru jako tento argument. Uvnitř runQueryAsyncmůžete prozkoumat stav tokenu a v případě potřeby zrušit další práci.
Pro předání tokenu zrušení do asynchronního streamu použijete jinou rozšiřovací metodu, WithCancellation. Smyčku výčtu těchto problémů byste upravili následujícím způsobem:
private static async Task EnumerateWithCancellation(GitHubClient client)
{
int num = 0;
var cancellation = new CancellationTokenSource();
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
.WithCancellation(cancellation.Token))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
}
Kód dokončeného kurzu můžete získat z úložiště dotnet/docs ve složce asynchronní-programování/fragmenty kódu .
Spuštění dokončené aplikace
Spusťte aplikaci znovu. Porovnejte jeho chování s chováním úvodní aplikace. První stránka výsledků se zobrazí, jakmile bude k dispozici. Je patrné, že při požadavku na novou stránku a jejím načítání dojde k pozastavení, poté jsou výsledky následující stránky rychle vypsány. Blok try / catch není potřeba ke řešení zrušení: volající může přestat vyjmenovávat kolekci. Průběh je jasně hlášen, protože asynchronní datový proud generuje výsledky při stahování každé stránky. Stav každé vrácené záležitosti je bezproblémově zahrnutý ve smyčce await foreach. Ke sledování průběhu nepotřebujete objekt zpětného volání.
Vylepšení využití paměti můžete zobrazit prozkoumáním kódu. Už nemusíte přidělovat kolekci, aby se ukládaly všechny výsledky, než se vytvoří výčet. Volající může určit, jak využívat výsledky a jestli je potřeba kolekce úložiště.
Spusťte úvodní i dokončené aplikace a můžete sledovat rozdíly mezi implementacemi pro sebe. Po dokončení tohoto kurzu můžete odstranit přístupový token GitHubu, který jste vytvořili. Pokud útočník získal přístup k danému tokenu, mohl by přistupovat k rozhraním API GitHubu pomocí vašich přihlašovacích údajů.
V tomto kurzu jste použili asynchronní streamy ke čtení jednotlivých položek ze síťového rozhraní API, které vrací stránky dat. Asynchronní streamy se také můžou číst z "nikdy nekončících datových proudů", jako je burzovní ticker nebo senzorové zařízení. Volání MoveNextAsync vrátí další položku hned jak bude k dispozici.