Поделиться через


Руководство. Создание и использование асинхронных потоков с помощью C# и .NET

Асинхронные потоки моделируют потоковый источник данных. Потоки данных часто извлекают или создают элементы асинхронно. Они предоставляют естественную модель программирования для асинхронных источников данных потоковой передачи.

В этом руководстве описано, как:

  • Создайте источник данных, который создает последовательность элементов данных асинхронно.
  • Используйте этот источник данных асинхронно.
  • Поддержка отмены и зафиксированных контекстов для асинхронных потоков.
  • Распознавать, когда новый интерфейс и источник данных предпочтительнее более ранних синхронных последовательностей данных.

Предпосылки

Вам потребуется настроить компьютер для запуска .NET, включая компилятор C#. Компилятор C# доступен с Visual Studio 2022 или пакета SDK для .NET.

Необходимо создать маркер доступа GitHub , чтобы получить доступ к конечной точке GitHub GraphQL. Выберите следующие разрешения для маркера доступа GitHub:

  • статус репозитория
  • публичный репозиторий

Сохраните маркер доступа в безопасном месте, чтобы получить доступ к конечной точке API GitHub.

Предупреждение

Защитите личный маркер доступа. Любое программное обеспечение с личным маркером доступа может вызывать API GitHub с помощью ваших прав доступа.

В этом руководстве предполагается, что вы знакомы с C# и .NET, включая Visual Studio или .NET CLI.

Запуск начального приложения

Вы можете получить код для начального приложения, используемого в этом руководстве, из репозитория dotnet/docs в папке асинхронного программирования или фрагментов кода.

Начальное приложение — это консольное приложение, использующее интерфейс GraphQL GitHub для получения последних проблем, записанных в репозитории dotnet/docs . Начните с следующего кода для метода начального приложения Main :

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Можно задать GitHubKey переменную среды для личного маркера доступа или заменить последний аргумент в вызове GetEnvVariable личным маркером доступа. Не помещайте код доступа в исходный код, если вы будете совместно использовать источник с другими пользователями. Никогда не отправлять коды доступа в общий исходный репозиторий.

После создания клиента GitHub код в Main создает объект для отчетности о ходе выполнения и маркер отмены. После создания этих объектов вызывается Main, чтобы получить последние 250 созданных задач. После завершения этой задачи отображаются результаты.

При запуске начального приложения можно сделать некоторые важные наблюдения о том, как работает это приложение. Вы увидите отчет о прогрессе для каждой веб-страницы, полученной с GitHub. Вы можете наблюдать заметную паузу, прежде чем GitHub возвращает каждую новую страницу вопросов. Наконец, проблемы отображаются только после получения всех 10 страниц из GitHub.

Изучение реализации

Реализация показывает, почему вы наблюдали поведение, описанное в предыдущем разделе. Проверьте код для RunPagedQueryAsync:

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Первое, что делает этот метод, заключается в создании объекта POST с помощью GraphQLRequest класса:

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

который помогает сформировать текст объекта POST и правильно преобразовать его в JSON, представленный в виде одной строки с методом ToJsonText , который удаляет все новые символы из текста запроса, \ помечая их с помощью escape-символа (обратная косая черта).

Давайте сосредоточимся на алгоритме разбиения по страницам и асинхронной структуре предыдущего кода. (Дополнительные сведения об API GraphQL GitHub см. в документации по GitHub GraphQL .) Метод RunPagedQueryAsync перечисляет проблемы, возникающие от последнего до самого старого. Он запрашивает 25 вопросов на страницу и изучает структуру ответа, чтобы продолжить с предыдущей страницей. Это соответствует стандартной поддержке пагинации в GraphQL для многостраничных ответов. Ответ включает pageInfo объект, включающий hasPreviousPages значение и startCursor значение, используемое для запроса предыдущей страницы. Проблемы находятся в массиве nodes . Метод RunPagedQueryAsync добавляет эти узлы в массив, содержащий все результаты со всех страниц.

После получения и восстановления страницы результатов RunPagedQueryAsync сообщает о ходе выполнения и проверяет отмену. Если отмена запрошена, RunPagedQueryAsync вызывает исключение OperationCanceledException.

В этом коде можно улучшить несколько элементов. Самое главное, RunPagedQueryAsync необходимо выделить хранилище для всех возвращаемых вопросов. Этот пример останавливается на 250 проблемах, так как для получения всех открытых проблем потребуется гораздо больше памяти для хранения всех извлеченных проблем. Протоколы для поддержки отчетов о ходе выполнения и отмены затрудняют понимание алгоритма при первом чтении. Используются дополнительные типы и API. Необходимо отследить коммуникации через CancellationTokenSource и связанные с ним CancellationToken, чтобы понять, где запрашивается отмена и где она предоставляется.

Асинхронные потоки обеспечивают лучший способ

Асинхронные потоки и соответствующая поддержка языка решают все эти проблемы. Код, создающий последовательность, теперь может использовать yield return для возврата элементов в методе, объявленном с модификатором async . Асинхронный поток можно обрабатывать с помощью await foreach цикла так же, как и любую последовательность, используя foreach цикл.

Эти новые возможности языка зависят от трех новых интерфейсов, добавленных в .NET Standard 2.1 и реализованных в .NET Core 3.0:

Эти три интерфейса должны быть знакомы большинству разработчиков C#. Они ведут себя так же, как и их синхронные аналоги:

Один из типов, которые могут быть незнакомы, это System.Threading.Tasks.ValueTask. Структура ValueTask предоставляет API, аналогичный классу System.Threading.Tasks.Task. ValueTask используется в этих интерфейсах по соображениям производительности.

Преобразуйте в асинхронные потоки

Затем преобразуйте RunPagedQueryAsync метод для создания асинхронного потока. Сначала измените сигнатуру RunPagedQueryAsync так, чтобы она возвращала объект типа IAsyncEnumerable<JToken>, и удалите маркер отмены и объекты хода выполнения из списка параметров, как показано в следующем коде:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

Начальный код обрабатывает каждую страницу по мере извлечения страницы, как показано в следующем коде:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Замените эти три строки следующим кодом:

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

Вы также можете удалить объявление finalResults, указанное ранее в этом методе, и оператор return, который следует за измененным циклом.

Вы завершили изменения для создания асинхронного потока. Готовый метод должен выглядеть следующим образом:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Затем вы измените код, который использует коллекцию для использования асинхронного потока. Найдите код в Main, который обрабатывает следующую коллекцию вопросов:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Замените этот код следующим await foreach циклом:

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

Новый интерфейс IAsyncEnumerator<T> является производным от IAsyncDisposable. Это означает, что предыдущий цикл асинхронно удаляет поток после завершения цикла. Можно представить, что цикл выглядит следующим образом:

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

По умолчанию элементы потока обрабатываются в захваченном контексте. Если вы хотите отключить запись контекста, используйте TaskAsyncEnumerableExtensions.ConfigureAwait метод расширения. Дополнительные сведения о контекстах синхронизации и записи текущего контекста см. в статье об использовании асинхронного шаблона на основе задач.

Асинхронные потоки поддерживают отмену с помощью того же протокола, что и другие async методы. Вы измените сигнатуру для метода асинхронного итератора следующим образом, чтобы обеспечить отмену:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Атрибут System.Runtime.CompilerServices.EnumeratorCancellationAttribute заставляет компилятор генерировать код для IAsyncEnumerator<T>, который делает передаваемый в GetAsyncEnumerator токен видимым для тела асинхронного итератора как этот аргумент. Внутри runQueryAsync можно проверить состояние маркера и отменить дальнейшую работу при запросе.

Вы используете другой метод расширения, WithCancellation, чтобы передать маркер отмены в асинхронный поток. Вы измените цикл перечисления проблем следующим образом:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

Код готового руководства можно получить из репозитория dotnet/docs в папке асинхронного программирования и фрагментов кода.

Запуск готового приложения

Снова запустите приложение. Сравните его поведение с поведением начального приложения. Первая страница результатов перечисляется, как только она становится доступной. Существует наблюдаемая пауза, так как каждая новая страница запрашивается и извлекается, затем результаты следующей страницы быстро перечисляются. Блок try / catch не нужен для обработки отмены: вызывающий объект может прекратить перечисление коллекции. Ход выполнения явно сообщается, так как асинхронный поток создает результаты по мере загрузки каждой страницы. Состояние каждой возвращенной проблемы беспрепятственно включается в цикл await foreach. Для отслеживания хода выполнения не требуется объект обратного вызова.

Улучшения использования памяти можно увидеть, проверив код. Вам больше не нужно выделять коллекцию для хранения всех результатов перед перечислением. Вызывающий объект может решить, как обрабатывать результаты, и определить, необходима ли коллекция для хранения данных.

Запустив и исходное, и завершенное приложение, вы сможете сами увидеть различия между их реализациями. Вы можете удалить маркер доступа GitHub, созданный при запуске этого руководства после завершения работы. Если злоумышленник получил доступ к такому маркеру, он может получить доступ к API GitHub с помощью учетных данных.

В этом руководстве вы использовали асинхронные потоки для чтения отдельных элементов из сетевого API, который возвращает страницы данных. Асинхронные потоки также могут читать данные из непрерывных потоков, таких как тикер акций или сенсорные устройства. Вызов MoveNextAsync возвращает следующий элемент, как только он становится доступным.