Параллельное выполнение асинхронных задач

Завершено

Параллельное программирование — это мощный метод, позволяющий одновременно выполнять несколько задач, повышая производительность и скорость реагирования приложений.

В C#можно использовать библиотеку параллельных задач (TPL), чтобы упростить процесс написания параллельного кода. TPL — это набор общедоступных типов и API в пространствах имен System.Threading и System.Threading.Tasks. Цель TPL — сделать разработчиков более продуктивными, упрощая процесс добавления параллелизма и асинхронности в приложения. TPL динамически масштабирует степень параллелизма, чтобы использовать все доступные процессоры наиболее эффективно. Кроме того, TPL обрабатывает разделение работы, планирование потоков в ThreadPool, поддержку отмены, управление состоянием и другие низкоуровневые аспекты. Используя TPL, вы можете максимально повысить производительность кода при фокусе на работе, которую ваша программа предназначена для выполнения.

TPL обеспечивает поддержку в следующих областях:

  • Параллелизм данных: TPL предоставляет методы для параллельного выполнения параллелизма данных, что позволяет одновременно выполнять одну и ту же операцию с несколькими элементами данных. Это особенно полезно, если у вас есть большие наборы данных и требуется выполнять вычисления или преобразования для каждого элемента независимо.
  • Асинхронное программирование на основе задач: TPL предоставляет Task класс, представляющий асинхронную операцию. Вы можете использовать ключевые слова async и await, чтобы упростить процесс написания асинхронного кода. Это позволяет писать код, который проще считывать и поддерживать при использовании параллелизма.
  • Поток данных: TPL предоставляет модель программирования потока данных, которая позволяет создавать сложные конвейеры обработки данных. Эта модель основана на концепции "блоков", которые могут асинхронно обрабатывать данные и взаимодействовать друг с другом с помощью сообщений.

Это важно

Параллельное программирование и многопоточность — это расширенные темы, требующие хорошего понимания параллелизма и синхронизации. Хотя TPL упрощает многопоточные сценарии, рекомендуется ознакомиться с основными понятиями потоков, например блокировками, взаимоблокировками и условиями гонки, чтобы эффективно использовать TPL. Это обучение предоставляет ограниченное введение в параллельное программирование с помощью TPL.

Параллелизм данных

Параллелизм данных — это форма параллельного программирования, которая фокусируется на выполнении одной операции на нескольких элементах данных одновременно. Это особенно полезно, если у вас есть большие наборы данных и требуется выполнять вычисления или преобразования для каждого элемента независимо. В C# можно легко использовать Parallel.For методы и Parallel.ForEach методы для обеспечения параллелизма данных. Эти методы позволяют выполнять итерацию по коллекциям или диапазонам данных параллельно, распределяя рабочую нагрузку между несколькими потоками.

Библиотека параллельных задач поддерживает параллелизм данных через System.Threading.Tasks.Parallel класс. Этот класс предоставляет параллельные реализации циклов for и foreach на основе методов. Вы пишете логику для цикла Parallel.For или Parallel.ForEach так же, как при написании последовательного цикла. TPL обрабатывает все низкоуровневые задачи для вас.

В следующем примере кода показан простой цикл foreach и его параллельный эквивалент.


// Sequential version
foreach (var item in sourceCollection)
{
    Process(item);
}

// Parallel equivalent
Parallel.ForEach(sourceCollection, item => Process(item));

TPL также предоставляет набор структур данных, оптимизированных для параллельного доступа, таких как ConcurrentBag, ConcurrentQueueи ConcurrentDictionary. Эти структуры данных позволяют безопасно добавлять, удалять и получать доступ к элементам из нескольких потоков без необходимости явной блокировки.

В следующем примере кода показано, как использовать ConcurrentBag для хранения результатов из нескольких задач, выполняемых параллельно.


using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        var results = new ConcurrentBag<int>();
        Parallel.For(0, 100, i =>
        {
            // Simulate some work
            Task.Delay(100).Wait();
            results.Add(i);
        });

        Console.WriteLine($"Processed {results.Count} items in parallel.");
    }
}

В этом примере ConcurrentBag используется для хранения результатов параллельной обработки. Каждая задача добавляет свой результат в пакет без явной блокировки, обеспечивая безопасность потоков.

Использование Task.WhenAll и Task.WhenAny выполнение задач параллельно

Методы Task.WhenAll и Task.WhenAny являются частью библиотеки параллельных задач в C#. Эти методы позволяют выполнять несколько задач параллельно и ждать их завершения.

Task.WhenAll используется, когда вы хотите ждать завершения всех задач перед продолжением. Он принимает массив задач в качестве входных данных и возвращает одну задачу, представляющую завершение всех входных задач. Это полезно, если у вас есть несколько независимых задач, которые могут выполняться одновременно, например выполнение нескольких вызовов API или обработка нескольких файлов одновременно.

Task.WhenAny используется, когда вы хотите ждать завершения любой из задач. Он принимает массив задач в качестве входных данных и возвращает задачу, представляющую первую задачу, которая завершается. Это полезно, если вы хотите выполнить некоторые действия сразу после завершения любой задачи, не ожидая завершения всех задач.

В следующем примере кода показано, как выполнять Task.WhenAll несколько задач параллельно и ожидать их завершения:


using System;
using System.Net.Http;
using System.Threading.Tasks;
using System.Collections.Generic;

class Program
{
    static async Task Main(string[] args)
    {
        var urls = new List<string>
        {
            "https://example.com",
            "https://example.org",
            "https://example.net"
        };

        var tasks = new List<Task<string>>();

        foreach (var url in urls)
        {
            tasks.Add(FetchDataAsync(url));
        }

        // Wait for all tasks to complete
        var results = await Task.WhenAll(tasks);

        foreach (var result in results)
        {
            Console.WriteLine(result);
        }
    }

    static async Task<string> FetchDataAsync(string url)
    {
        using (var client = new HttpClient())
        {
            return await client.GetStringAsync(url);
        }
    }
}

В этом примере FetchDataAsync метод извлекает данные из нескольких URL-адресов параллельно с помощью Task.WhenAll. Результаты печатаются в консоли после завершения всех задач.

Одновременное выполнение нескольких операций ввода-вывода файлов

Во многих случаях итерация файлов — это операция, которую можно легко параллелизировать.

В следующем примере каталоги перебираются последовательно, а файлы обрабатываются параллельно. Это, вероятно, лучший подход, если у вас есть большое соотношение между файлами и каталогами. Кроме того, можно параллелизировать итерацию каталога и получить доступ к каждому файлу последовательно. Вероятно, это не эффективно, чтобы параллелизировать оба цикла, если вы специально не нацелены на компьютер с большим количеством процессоров. Однако, как и во всех случаях, необходимо тщательно протестировать приложение, чтобы определить оптимальный подход.


using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Security;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        try
        {
            TraverseTreeParallelForEach(@"C:\Program Files", (f) =>
            {
                // Exceptions are no-ops.
                try
                {
                    // Do nothing with the data except read it.
                    byte[] data = File.ReadAllBytes(f);
                }
                catch (FileNotFoundException) { }
                catch (IOException) { }
                catch (UnauthorizedAccessException) { }
                catch (SecurityException) { }
                // Display the filename.
                Console.WriteLine(f);
            });
        }
        catch (ArgumentException)
        {
            Console.WriteLine(@"The directory 'C:\Program Files' does not exist.");
        }

        // Keep the console window open.
        Console.ReadKey();
    }

    public static void TraverseTreeParallelForEach(string root, Action<string> action)
    {
        //Count of files traversed and timer for diagnostic output
        int fileCount = 0;
        var sw = Stopwatch.StartNew();

        // Determine whether to parallelize file processing on each folder based on processor count.
        int procCount = Environment.ProcessorCount;

        // Data structure to hold names of subfolders to be examined for files.
        Stack<string> dirs = new Stack<string>();

        if (!Directory.Exists(root))
        {
            throw new ArgumentException(
                "The given root directory doesn't exist.", nameof(root));
        }
        dirs.Push(root);

        while (dirs.Count > 0)
        {
            string currentDir = dirs.Pop();
            string[] subDirs = { };
            string[] files = { };

            try
            {
                subDirs = Directory.GetDirectories(currentDir);
            }
            // Thrown if we do not have discovery permission on the directory.
            catch (UnauthorizedAccessException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }
            // Thrown if another process has deleted the directory after we retrieved its name.
            catch (DirectoryNotFoundException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }

            try
            {
                files = Directory.GetFiles(currentDir);
            }
            catch (UnauthorizedAccessException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }
            catch (DirectoryNotFoundException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }
            catch (IOException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }

            // Execute in parallel if there are enough files in the directory.
            // Otherwise, execute sequentially.Files are opened and processed
            // synchronously but this could be modified to perform async I/O.
            try
            {
                if (files.Length < procCount)
                {
                    foreach (var file in files)
                    {
                        action(file);
                        fileCount++;
                    }
                }
                else
                {
                    Parallel.ForEach(files, () => 0,
                        (file, loopState, localCount) =>
                        {
                            action(file);
                            return (int)++localCount;
                        },
                        (c) =>
                        {
                            Interlocked.Add(ref fileCount, c);
                        });
                }
            }
            catch (AggregateException ae)
            {
                ae.Handle((ex) =>
                {
                    if (ex is UnauthorizedAccessException)
                    {
                        // Here we just output a message and go on.
                        Console.WriteLine(ex.Message);
                        return true;
                    }
                    // Handle other exceptions here if necessary...

                    return false;
                });
            }

            // Push the subdirectories onto the stack for traversal.
            // This could also be done before handing the files.
            foreach (string str in subDirs)
                dirs.Push(str);
        }

        // For diagnostic purposes.
        Console.WriteLine($"Processed {fileCount} files in {sw.ElapsedMilliseconds} milliseconds");
    }
}

В этом примере операции ввода-вывода файла выполняются синхронно. Если код работает с большими файлами или медленными сетевыми подключениями, возможно, предпочтительнее асинхронно получить доступ к файлам. Вы можете объединить асинхронные методы ввода-вывода с параллельной итерацией.

В примере используется локальная переменная fileCount для поддержания количества обработанных файлов. Так как переменная может быть доступна одновременно несколькими задачами, доступ синхронизируется путем вызова Interlocked.Add метода.

Обратите внимание, что если исключение создается в основном потоке, потоки, запущенные методом ForEach, могут продолжать выполняться. Чтобы остановить эти потоки, можно задать логическую переменную в обработчиках исключений и проверить его значение на каждой итерации параллельного цикла. Если значение указывает на то, что исключение было создано, используйте переменную ParallelLoopState для остановки или разрыва цикла.

Сводка

Эта единица сосредоточена на параллелизме и библиотеке параллельных задач (TPL). В нем описывается параллельное выполнение асинхронных задач с помощью Task.WhenAll метода, параллелизма данных и Parallel.ForParallel.ForEach методов, а также использование параллельных структур данных, таких как ConcurrentBag, ConcurrentQueueи ConcurrentDictionary. В содержимом также показано, как одновременно выполнять несколько операций ввода-вывода файлов.

Ключевые моменты

  • Параллельное программирование в C# позволяет одновременно выполнять несколько задач.
  • Класс Task, ключевые слова async и await используются для реализации параллельного программирования.
  • Task.WhenAll Метод используется для ожидания завершения нескольких задач перед продолжением.
  • Параллелизм данных достигается с помощью Parallel.For и Parallel.ForEach методов.
  • Параллельные структуры данных, такие как ConcurrentBag, ConcurrentQueueи ConcurrentDictionary оптимизированы для параллельного доступа.
  • Task.WhenAll и Task.WhenAny методы позволяют выполнять несколько задач параллельно и ожидать их завершения.
  • Одновременно можно выполнять несколько операций ввода-вывода файлов.