Paralelní spouštění asynchronních úloh

Dokončeno

Paralelní programování je výkonná technika, která umožňuje současně spouštět více úloh, což zlepšuje výkon a rychlost odezvy aplikací.

V jazyce C# můžete pomocí knihovny TPL (Task Parallel Library) zjednodušit proces psaní paralelního kódu. TPL je sada veřejných typů a rozhraní API v oborech názvů System.Threading a System.Threading.Tasks. Účelem TPL je zvýšit produktivitu vývojářů zjednodušením procesu přidávání paralelismu a souběžnosti do aplikací. TPL dynamicky škáluje stupeň souběžnosti tak, aby používal všechny dostupné procesory nejefektivněji. TPL navíc zpracovává dělení práce, plánování vláken ve fondu vláken, podporu zrušení, správu stavu a další podrobnosti nízké úrovně. Pomocí TPL můžete maximalizovat výkon kódu a soustředit se na práci, kterou má program dosáhnout.

TPL poskytuje podporu v následujících oblastech:

  • Datový paralelismus: TPL poskytuje metody pro provádění paralelismu dat, což umožňuje provádět stejnou operaci na více datových prvcích současně. To je užitečné zejména v případě, že máte velké datové sady a chcete provádět výpočty nebo transformace u každého prvku nezávisle.
  • Asynchronní programování založené na úlohách: TPL poskytuje Task třídu, která představuje asynchronní operaci. Pomocí klíčových slov async a await můžete zjednodušit proces psaní asynchronního kódu. Díky tomu můžete psát kód, který se snadněji čte a udržuje a zároveň využívá paralelismus.
  • Tok dat: TPL poskytuje programovací model toku dat, který umožňuje vytvářet komplexní kanály pro zpracování dat. Tento model je založen na konceptu "bloků", které můžou zpracovávat data asynchronně a komunikovat mezi sebou pomocí zpráv.

Důležité

Paralelní programování a multithreading jsou pokročilá témata, která vyžadují dobré porozumění souběžnosti a synchronizaci. I když TPL zjednodušuje scénáře s více vlákny, doporučujeme, abyste měli základní znalosti o konceptech vláken, například zámky, zablokování a podmínky časování, abyste mohli efektivně používat TPL. Toto školení poskytuje omezený úvod do paralelního programování pomocí TPL.

Datový paralelismus

Datový paralelismus je forma paralelního programování, která se zaměřuje na provádění stejné operace u více datových prvků současně. To je užitečné zejména v případě, že máte velké datové sady a chcete provádět výpočty nebo transformace u každého prvku nezávisle. V jazyce C# můžete pomocí Parallel.For metod a Parallel.ForEach dat snadno dosáhnout paralelismu. Tyto metody umožňují paralelně iterovat kolekce nebo rozsahy dat a distribuovat úlohy napříč několika vlákny.

Knihovna Task Parallel Library podporuje datový paralelismus prostřednictvím System.Threading.Tasks.Parallel třídy. Tato třída poskytuje paralelní implementace smyček for a foreach založené na metodách. Logiku cyklu pro Parallel.For nebo Parallel.ForEach cyklus napíšete podobně jako byste psali obyčejný cyklus. TPL zpracovává veškerou práci na nízké úrovni za vás.

Následující příklad kódu ukazuje jednoduchou smyčku foreach a jeho paralelní ekvivalent.


// Sequential version
foreach (var item in sourceCollection)
{
    Process(item);
}

// Parallel equivalent
Parallel.ForEach(sourceCollection, item => Process(item));

TPL také poskytuje sadu datových struktur, které jsou optimalizované pro souběžný přístup, například ConcurrentBag, ConcurrentQueuea ConcurrentDictionary. Tyto datové struktury umožňují bezpečně přidávat, odebírat a přistupovat k prvkům z více vláken bez nutnosti explicitního uzamčení.

Následující příklad kódu ukazuje, jak použít ConcurrentBag k ukládání výsledků z více úloh spuštěných paralelně:


using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        var results = new ConcurrentBag<int>();
        Parallel.For(0, 100, i =>
        {
            // Simulate some work
            Task.Delay(100).Wait();
            results.Add(i);
        });

        Console.WriteLine($"Processed {results.Count} items in parallel.");
    }
}

V tomto příkladu ConcurrentBag se používá k uložení výsledků paralelního zpracování. Každý úkol přidá svůj výsledek do tašky bez nutnosti explicitních zámků a zajišťuje bezpečnost vláken.

Použijte Task.WhenAll a Task.WhenAny pro spuštění úloh paralelně

Task.WhenAll A Task.WhenAny metody jsou součástí paralelní knihovny úloh v jazyce C#. Tyto metody umožňují paralelně spouštět více úloh a čekat na jejich dokončení.

Task.WhenAll se používá, když chcete před pokračováním počkat na dokončení všech úkolů. Jako vstup vezme pole úkolů a vrátí jeden úkol, který představuje dokončení všech vstupních úkolů. To je užitečné v případě, že máte několik nezávislých úloh, které je možné spustit souběžně, například provádění více volání rozhraní API nebo zpracování více souborů najednou.

Task.WhenAny se používá, když chcete počkat na dokončení některého z úkolů. Jako vstup vezme pole úkolů a vrátí úkol, který představuje první dokončený úkol. To je užitečné, když chcete provést nějakou akci hned po dokončení některého z úkolů, aniž byste čekali na dokončení všech úkolů.

Následující příklad kódu ukazuje použití Task.WhenAll ke spuštění více úloh paralelně a čekání na dokončení:


using System;
using System.Net.Http;
using System.Threading.Tasks;
using System.Collections.Generic;

class Program
{
    static async Task Main(string[] args)
    {
        var urls = new List<string>
        {
            "https://example.com",
            "https://example.org",
            "https://example.net"
        };

        var tasks = new List<Task<string>>();

        foreach (var url in urls)
        {
            tasks.Add(FetchDataAsync(url));
        }

        // Wait for all tasks to complete
        var results = await Task.WhenAll(tasks);

        foreach (var result in results)
        {
            Console.WriteLine(result);
        }
    }

    static async Task<string> FetchDataAsync(string url)
    {
        using (var client = new HttpClient())
        {
            return await client.GetStringAsync(url);
        }
    }
}

V tomto příkladu FetchDataAsync metoda načte data z více adres URL paralelně pomocí Task.WhenAll. Po dokončení všech úkolů se výsledky vytisknou do konzoly.

Souběžné provádění několika vstupně-výstupních operací souborů

V mnoha případech je iterace souboru operací, kterou lze snadno paralelizovat.

Následující příklad iteruje adresáře postupně, ale zpracovává soubory paralelně. Tento přístup je pravděpodobně nejlepší, pokud máte velký poměr souborů k adresáři. Je také možné paralelizovat iteraci adresáře a postupně přistupovat k jednotlivým souborům. Pravděpodobně není efektivní paralelizovat obě smyčky, pokud necílíte konkrétně na počítač s velkým množstvím procesorů. Nicméně, stejně jako ve všech případech, byste měli svou aplikaci důkladně otestovat, abyste určili nejlepší přístup.


using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Security;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        try
        {
            TraverseTreeParallelForEach(@"C:\Program Files", (f) =>
            {
                // Exceptions are no-ops.
                try
                {
                    // Do nothing with the data except read it.
                    byte[] data = File.ReadAllBytes(f);
                }
                catch (FileNotFoundException) { }
                catch (IOException) { }
                catch (UnauthorizedAccessException) { }
                catch (SecurityException) { }
                // Display the filename.
                Console.WriteLine(f);
            });
        }
        catch (ArgumentException)
        {
            Console.WriteLine(@"The directory 'C:\Program Files' does not exist.");
        }

        // Keep the console window open.
        Console.ReadKey();
    }

    public static void TraverseTreeParallelForEach(string root, Action<string> action)
    {
        //Count of files traversed and timer for diagnostic output
        int fileCount = 0;
        var sw = Stopwatch.StartNew();

        // Determine whether to parallelize file processing on each folder based on processor count.
        int procCount = Environment.ProcessorCount;

        // Data structure to hold names of subfolders to be examined for files.
        Stack<string> dirs = new Stack<string>();

        if (!Directory.Exists(root))
        {
            throw new ArgumentException(
                "The given root directory doesn't exist.", nameof(root));
        }
        dirs.Push(root);

        while (dirs.Count > 0)
        {
            string currentDir = dirs.Pop();
            string[] subDirs = { };
            string[] files = { };

            try
            {
                subDirs = Directory.GetDirectories(currentDir);
            }
            // Thrown if we do not have discovery permission on the directory.
            catch (UnauthorizedAccessException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }
            // Thrown if another process has deleted the directory after we retrieved its name.
            catch (DirectoryNotFoundException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }

            try
            {
                files = Directory.GetFiles(currentDir);
            }
            catch (UnauthorizedAccessException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }
            catch (DirectoryNotFoundException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }
            catch (IOException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }

            // Execute in parallel if there are enough files in the directory.
            // Otherwise, execute sequentially.Files are opened and processed
            // synchronously but this could be modified to perform async I/O.
            try
            {
                if (files.Length < procCount)
                {
                    foreach (var file in files)
                    {
                        action(file);
                        fileCount++;
                    }
                }
                else
                {
                    Parallel.ForEach(files, () => 0,
                        (file, loopState, localCount) =>
                        {
                            action(file);
                            return (int)++localCount;
                        },
                        (c) =>
                        {
                            Interlocked.Add(ref fileCount, c);
                        });
                }
            }
            catch (AggregateException ae)
            {
                ae.Handle((ex) =>
                {
                    if (ex is UnauthorizedAccessException)
                    {
                        // Here we just output a message and go on.
                        Console.WriteLine(ex.Message);
                        return true;
                    }
                    // Handle other exceptions here if necessary...

                    return false;
                });
            }

            // Push the subdirectories onto the stack for traversal.
            // This could also be done before handing the files.
            foreach (string str in subDirs)
                dirs.Push(str);
        }

        // For diagnostic purposes.
        Console.WriteLine($"Processed {fileCount} files in {sw.ElapsedMilliseconds} milliseconds");
    }
}

V tomto příkladu se vstupně-výstupní operace souboru provádí synchronně. Pokud váš kód pracuje s velkými soubory nebo pomalými síťovými připojeními, může být vhodnější přistupovat k souborům asynchronně. Asynchronní vstupně-výstupní techniky můžete kombinovat s paralelní iterací.

Příklad používá místní proměnnou fileCount k zachování počtu celkového počtu zpracovaných souborů. Vzhledem k tomu, že k proměnné může přistupovat souběžně více úloh, je přístup synchronizován voláním Interlocked.Add metody.

Všimněte si, že pokud je výjimka vyvolána v hlavním vlákně, vlákna, která jsou spuštěna metodou ForEach, mohou pokračovat v běhu. Pokud chcete tato vlákna zastavit, můžete v obslužných rutinách výjimek nastavit logickou proměnnou a zkontrolovat její hodnotu pro každou iteraci paralelní smyčky. Pokud hodnota označuje, že došlo k vyvolání výjimky, pomocí proměnné ParallelLoopState zastavte nebo přerušte smyčku.

Shrnutí

Tato lekce se zaměřila na paralelismus a knihovnu TPL (Task Parallel Library). Popisuje, jak paralelně spouštět asynchronní úlohy pomocí Task.WhenAll metody, paralelismu dat s metodami Parallel.For a Parallel.ForEach metod a použití souběžných datových struktur, jako ConcurrentBagje , ConcurrentQueuea ConcurrentDictionary. Obsah také ukazuje, jak souběžně provádět více vstupně-výstupních operací se soubory.

Klíčové body

  • Paralelní programování v jazyce C# umožňuje souběžné spouštění více úloh.
  • Třída Taskasync a await klíčová slova se používají k implementaci paralelního programování.
  • Task.WhenAll Metoda se používá k čekání na dokončení více úkolů před pokračováním.
  • Paralelismus dat se dosahuje pomocí Parallel.For a Parallel.ForEach metod.
  • Souběžné datové struktury, jako je ConcurrentBag, ConcurrentQueuea ConcurrentDictionary jsou optimalizované pro souběžný přístup.
  • Task.WhenAll a Task.WhenAny metody umožňují paralelně spouštět více úloh a čekat na jejich dokončení.
  • Souběžně lze provádět více vstupně-výstupních operací souborů.