Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
I flussi asincroni modellano un'origine di streaming di dati. I flussi di dati spesso recuperano o generano elementi in modo asincrono. Forniscono un modello di programmazione naturale per le origini dati di streaming asincrone.
In questa esercitazione si apprenderà come:
- Creare un'origine dati che genera una sequenza di elementi di dati in modo asincrono.
- Consumare l'origine dati in modo asincrono.
- Offrire supporto per l'annullamento e i contesti acquisiti per i flussi asincroni.
- Riconoscere quando la nuova interfaccia e l'origine dati sono preferibili alle sequenze di dati sincrone precedenti.
Prerequisiti
È necessario configurare il computer per eseguire .NET, incluso il compilatore C#. Il compilatore C# è disponibile con Visual Studio 2022 o .NET SDK.
È necessario creare un token di accesso GitHub in modo da poter accedere all'endpoint GraphQL di GitHub. Selezionare le autorizzazioni seguenti per il token di accesso GitHub:
- repo:stato
- public_repo
Salvare il token di accesso in un luogo sicuro in modo da poterlo usare per ottenere l'accesso all'endpoint dell'API GitHub.
Avvertimento
Mantenere sicuro il token di accesso personale. Qualsiasi software con il token di accesso personale potrebbe effettuare chiamate API GitHub usando i diritti di accesso.
Per questa esercitazione si presuppone che l'utente abbia familiarità con C# e .NET, inclusa l'interfaccia della riga di comando di Visual Studio o di .NET.
Eseguire l'applicazione iniziale
È possibile ottenere il codice per l'applicazione iniziale usata in questa esercitazione dal repository dotnet/docs nella cartella asynchronous-programming/snippets .
L'applicazione iniziale è un'applicazione console che usa l'interfaccia GraphQL di GitHub per recuperare i problemi recenti scritti nel repository dotnet/docs . Per iniziare, esaminare il codice seguente per il metodo dell'app Main
iniziale:
static async Task Main(string[] args)
{
//Follow these steps to create a GitHub Access Token
// https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
//Select the following permissions for your GitHub Access Token:
// - repo:status
// - public_repo
// Replace the 3rd parameter to the following code with your GitHub access token.
var key = GetEnvVariable("GitHubKey",
"You must store your GitHub key in the 'GitHubKey' environment variable",
"");
var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
{
Credentials = new Octokit.Credentials(key)
};
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
}
È possibile impostare una GitHubKey
variabile di ambiente sul token di accesso personale oppure sostituire l'ultimo argomento nella chiamata a GetEnvVariable
con il token di accesso personale. Non inserire il codice di accesso nel codice sorgente se si condividerà l'origine con altri utenti. Non caricare mai i codici di accesso in un repository di origine condiviso.
Dopo aver creato il client GitHub, il codice in Main
crea un oggetto di report sullo stato di avanzamento e un token di annullamento. Dopo aver creato questi oggetti, Main
chiama RunPagedQueryAsync
per recuperare i 250 problemi creati più recenti. Al termine dell'attività, vengono visualizzati i risultati.
Quando si esegue l'applicazione starter, è possibile apportare alcune importanti osservazioni sul modo in cui viene eseguita questa applicazione. Verrà visualizzato lo stato di avanzamento segnalato per ogni pagina restituita da GitHub. È possibile osservare una pausa notevole prima che GitHub restituisca ogni nuova pagina di problemi. Infine, i problemi vengono visualizzati solo dopo che tutte le 10 pagine sono state recuperate da GitHub.
Esaminare l'implementazione
L'implementazione rivela il motivo per cui si è osservato il comportamento descritto nella sezione precedente. Esaminare il codice per RunPagedQueryAsync
:
private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
JArray finalResults = new JArray();
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
}
return finalResults;
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
La prima operazione eseguita da questo metodo consiste nel creare l'oggetto POST, usando la GraphQLRequest
classe :
public class GraphQLRequest
{
[JsonProperty("query")]
public string? Query { get; set; }
[JsonProperty("variables")]
public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();
public string ToJsonText() =>
JsonConvert.SerializeObject(this);
}
che consente di formare il corpo dell'oggetto POST e di convertirlo correttamente in JSON presentato come singola stringa con il ToJsonText
metodo , che rimuove tutti i caratteri di nuova riga dal corpo della richiesta contrassegnandoli con il \
carattere di escape (barra rovesciata).
Si esaminerà ora l'algoritmo di paging e la struttura asincrona del codice precedente. Per informazioni dettagliate sull'API GraphQL di GitHub, vedere la documentazione di GitHub GraphQL . Il RunPagedQueryAsync
metodo enumera i problemi dall'ultimo al meno recente. Richiede 25 problemi per pagina ed esamina la pageInfo
struttura della risposta per continuare con la pagina precedente. Segue il supporto standard di paging di GraphQL per le risposte a più pagine. La risposta include un oggetto che contiene un valore pageInfo
e un valore hasPreviousPages
che vengono utilizzati per richiedere la pagina precedente. I problemi si trovano nella nodes
matrice. Il RunPagedQueryAsync
metodo aggiunge questi nodi a una matrice che contiene tutti i risultati di tutte le pagine.
Dopo aver recuperato e ripristinato una pagina di risultati, RunPagedQueryAsync
segnala lo stato di avanzamento e verifica l'annullamento. Se è stato richiesto l'annullamento, RunPagedQueryAsync
genera un'eccezione OperationCanceledException.
In questo codice sono disponibili diversi elementi che possono essere migliorati. Soprattutto, RunPagedQueryAsync
deve allocare spazio di archiviazione per tutti i problemi restituiti. Questo esempio si arresta a 250 problemi perché il recupero di tutti i problemi aperti richiederebbe molta più memoria per archiviare tutti i problemi recuperati. I protocolli per supportare i report sullo stato di avanzamento e l'annullamento rendono l'algoritmo più difficile da comprendere per la prima lettura. Sono coinvolti più tipi e API. È necessario tracciare le comunicazioni tramite CancellationTokenSource e il relativo associato CancellationToken per capire dove viene richiesto l'annullamento e dove viene concesso.
I flussi asincroni offrono un modo migliore
I flussi asincroni e il supporto linguistico associato rispondono a tutte queste problematiche. Il codice che genera la sequenza può ora usare yield return
per restituire elementi in un metodo dichiarato con il async
modificatore. È possibile utilizzare un flusso asincrono usando un await foreach
ciclo esattamente come si utilizza qualsiasi sequenza usando un foreach
ciclo.
Queste nuove funzionalità del linguaggio dipendono da tre nuove interfacce aggiunte a .NET Standard 2.1 e implementate in .NET Core 3.0:
- System.Collections.Generic.IAsyncEnumerable<T>
- System.Collections.Generic.IAsyncEnumerator<T>
- System.IAsyncDisposable
Queste tre interfacce devono essere familiari per la maggior parte degli sviluppatori C#. Si comportano in modo simile alle controparti sincrone:
- System.Collections.Generic.IEnumerable<T>
- System.Collections.Generic.IEnumerator<T>
- System.IDisposable
Un tipo che potrebbe essere meno conosciuto è System.Threading.Tasks.ValueTask. Lo ValueTask
struct fornisce un'API simile alla System.Threading.Tasks.Task classe .
ValueTask
viene usato in queste interfacce per motivi di prestazioni.
Eseguire la conversione in flussi asincroni
Convertire quindi il RunPagedQueryAsync
metodo per generare un flusso asincrono. Prima di tutto, modificare la firma di RunPagedQueryAsync
per restituire un IAsyncEnumerable<JToken>
, e rimuovere i token di annullamento e gli oggetti di avanzamento dall'elenco dei parametri, come illustrato nel codice seguente:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
Il codice iniziale elabora ogni pagina quando viene recuperata la pagina, come illustrato nel codice seguente:
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
Sostituire queste tre righe con il codice seguente:
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
È anche possibile rimuovere precedentemente in questo metodo la dichiarazione di finalResults
e l'istruzione return
che segue il ciclo modificato.
Sono state completate le modifiche per generare un flusso asincrono. Il metodo completato dovrebbe essere simile al codice seguente:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
Successivamente, modifichi il codice che utilizza la raccolta per consumare lo stream asincrono. Trovare il codice seguente in Main
che elabora la raccolta di problemi:
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
Sostituire il codice con il ciclo seguente await foreach
:
int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
La nuova interfaccia IAsyncEnumerator<T> deriva da IAsyncDisposable. Ciò significa che il ciclo precedente eliminerà in modo asincrono il flusso al termine del ciclo. È possibile immaginare che il ciclo sia simile al codice seguente:
int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
while (await enumerator.MoveNextAsync())
{
var issue = enumerator.Current;
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
} finally
{
if (enumerator != null)
await enumerator.DisposeAsync();
}
Per impostazione predefinita, gli elementi del flusso vengono elaborati nel contesto acquisito. Se si vuole disabilitare l'acquisizione del contesto, usare il TaskAsyncEnumerableExtensions.ConfigureAwait metodo di estensione. Per altre informazioni sui contesti di sincronizzazione e sull'acquisizione del contesto corrente, vedere l'articolo sull'utilizzo del modello asincrono basato su attività.
I flussi di dati asincroni supportano l'annullamento utilizzando lo stesso protocollo degli altri metodi async
. Per supportare l'annullamento, si dovrebbe modificare la firma del metodo iteratore asincrono nel modo seguente:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
L'attributo System.Runtime.CompilerServices.EnumeratorCancellationAttribute fa in modo che il compilatore generi codice per IAsyncEnumerator<T> che rende il token GetAsyncEnumerator
visibile al corpo dell'iteratore asincrono come argomento. All'interno di runQueryAsync
, è possibile esaminare lo stato del token e annullare le operazioni successive, se necessario.
Si usa un altro metodo di estensione, WithCancellation, per passare il token di annullamento al flusso asincrono. Modificare il ciclo che enumera i problemi come indicato di seguito:
private static async Task EnumerateWithCancellation(GitHubClient client)
{
int num = 0;
var cancellation = new CancellationTokenSource();
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
.WithCancellation(cancellation.Token))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
}
È possibile ottenere il codice per l'esercitazione completata dal repository dotnet/docs nella cartella asynchronous-programming/snippets .
Eseguire l'applicazione completata
Eseguire di nuovo l'applicazione. Confronta il comportamento con quello dell'applicazione di avvio. La prima pagina dei risultati viene enumerata non appena è disponibile. È presente una pausa osservabile quando ogni nuova pagina viene richiesta e recuperata, quindi i risultati della pagina successiva vengono enumerati rapidamente. Il try
/ catch
blocco non è necessario per gestire l'annullamento: il chiamante può interrompere l'enumerazione della raccolta. Lo stato di avanzamento viene segnalato chiaramente perché il flusso asincrono genera risultati durante il download di ogni pagina. Lo stato per ogni problema restituito è incluso senza problemi nel await foreach
ciclo. Non è necessario un oggetto callback per tenere traccia dello stato di avanzamento.
È possibile visualizzare i miglioramenti apportati all'uso della memoria esaminando il codice. Non è più necessario allocare una raccolta per archiviare tutti i risultati prima che vengano enumerati. Il chiamante può determinare come utilizzare i risultati e se è necessaria una raccolta di archiviazione.
Esegui sia le applicazioni iniziali che quelle completate e puoi osservare le differenze tra le implementazioni personalmente. È possibile eliminare il token di accesso GitHub creato al termine dell'esercitazione. Se un utente malintenzionato ha ottenuto l'accesso a tale token, può accedere alle API GitHub usando le credenziali.
In questa esercitazione sono stati usati flussi asincroni per leggere singoli elementi da un'API di rete che restituisce pagine di dati. I flussi asincroni possono anche leggere da "flussi mai terminati" come un ticker di stock o un dispositivo sensore. La chiamata a MoveNextAsync
restituisce l'elemento successivo non appena è disponibile.