Menjalankan tugas asinkron secara paralel

Selesai

Pemrograman paralel adalah teknik canggih yang memungkinkan Anda menjalankan beberapa tugas secara bersamaan, meningkatkan performa dan respons aplikasi Anda.

Di C#, Anda dapat menggunakan Pustaka Paralel Tugas (TPL) untuk menyederhanakan proses penulisan kode paralel. TPL adalah sekumpulan jenis publik dan API di namespace System.Threading dan System.Threading.Tasks. Tujuan dari TPL adalah untuk membuat pengembang lebih produktif dengan menyederhanakan proses penambahan paralelisme dan konkurensi ke aplikasi. TPL secara dinamis menskalakan tingkat konkurensi untuk menggunakan semua prosesor yang tersedia secara paling efisien. Selain itu, TPL menangani pembagian tugas, pengaturan utas pada ThreadPool, dukungan pembatalan, manajemen status, dan detail tingkat rendah lainnya. Dengan menggunakan TPL, Anda dapat memaksimalkan performa kode sambil berfokus pada pekerjaan yang dirancang untuk dicapai program Anda.

TPL menyediakan dukungan di area berikut:

  • Paralelisme data: TPL menyediakan metode untuk melakukan paralelisme data, memungkinkan Anda melakukan operasi yang sama pada beberapa elemen data secara bersamaan. Ini sangat berguna ketika Anda memiliki himpunan data besar dan ingin melakukan komputasi atau transformasi pada setiap elemen secara independen.
  • Pemrograman asinkron berbasis tugas: TPL menyediakan Task kelas , yang mewakili operasi asinkron. Anda dapat menggunakan async kata kunci dan await untuk menyederhanakan proses penulisan kode asinkron. Ini memungkinkan Anda untuk menulis kode yang lebih mudah dibaca dan dikelola sambil tetap memanfaatkan paralelisme.
  • Aliran data: TPL menyediakan model pemrograman aliran data yang memungkinkan Anda membuat alur pemrosesan data yang kompleks. Model ini didasarkan pada konsep "blok" yang dapat memproses data secara asinkron dan berkomunikasi satu sama lain menggunakan pesan.

Penting

Pemrograman paralel dan multithreading adalah topik lanjutan yang memerlukan pemahaman yang baik tentang konkurensi dan sinkronisasi. Meskipun TPL menyederhanakan skenario multithreaded, kami sarankan Anda memiliki pemahaman dasar tentang konsep utas, misalnya, kunci, kebuntuan, dan kondisi balapan, sehingga Anda dapat menggunakan TPL secara efektif. Pelatihan ini memberikan pengenalan terbatas tentang pemrograman paralel menggunakan TPL.

Paralelisme data

Paralelisme data adalah bentuk pemrograman paralel yang berfokus pada melakukan operasi yang sama pada beberapa elemen data secara bersamaan. Ini sangat berguna ketika Anda memiliki himpunan data besar dan ingin melakukan komputasi atau transformasi pada setiap elemen secara independen. Di C#, Anda dapat menggunakan Parallel.For metode dan Parallel.ForEach untuk mencapai paralelisme data dengan mudah. Metode ini memungkinkan Anda melakukan iterasi atas koleksi atau rentang data secara paralel, mendistribusikan beban kerja di beberapa utas.

Pustaka Tugas Paralel mendukung paralelisme data melalui kelas System.Threading.Tasks.Parallel. Kelas ini menyediakan implementasi paralel berbasis metode untuk for dan loop foreach. Anda menulis logika perulangan untuk perulangan Parallel.For atau Parallel.ForEach dengan cara yang sama seperti Anda menulis perulangan berurutan. TPL menangani semua pekerjaan tingkat rendah untuk Anda.

Contoh kode berikut menunjukkan perulangan foreach sederhana dan setara paralelnya.


// Sequential version
foreach (var item in sourceCollection)
{
    Process(item);
}

// Parallel equivalent
Parallel.ForEach(sourceCollection, item => Process(item));

TPL juga menyediakan sekumpulan struktur data yang dioptimalkan untuk akses bersamaan, seperti ConcurrentBag, , ConcurrentQueuedan ConcurrentDictionary. Struktur data ini memungkinkan Anda menambahkan, menghapus, dan mengakses elemen dengan aman pada beberapa utas tanpa perlu penguncian eksplisit.

Contoh kode berikut menunjukkan cara menggunakan ConcurrentBag untuk menyimpan hasil dari beberapa tugas yang berjalan secara paralel:


using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        var results = new ConcurrentBag<int>();
        Parallel.For(0, 100, i =>
        {
            // Simulate some work
            Task.Delay(100).Wait();
            results.Add(i);
        });

        Console.WriteLine($"Processed {results.Count} items in parallel.");
    }
}

Dalam contoh ini, ConcurrentBag digunakan untuk menyimpan hasil pemrosesan paralel. Setiap tugas menambahkan hasilnya ke wadah tanpa memerlukan penguncian eksplisit, memastikan keamanan utas.

Gunakan Task.WhenAll dan Task.WhenAny untuk menjalankan tugas secara paralel

Metode Task.WhenAll dan Task.WhenAny adalah bagian dari Pustaka Paralel Tugas di C#. Metode ini memungkinkan Anda menjalankan beberapa tugas secara paralel dan menunggu penyelesaiannya.

Task.WhenAll digunakan ketika Anda ingin menunggu semua tugas selesai sebelum melanjutkan. Dibutuhkan array tugas sebagai input dan mengembalikan satu tugas yang mewakili penyelesaian semua tugas input. Ini berguna ketika Anda memiliki beberapa tugas independen yang dapat dijalankan secara bersamaan, seperti melakukan beberapa panggilan API atau memproses beberapa file secara bersamaan.

Task.WhenAny digunakan ketika Anda ingin menunggu salah satu tugas selesai. Memerlukan array tugas sebagai input dan mengembalikan tugas yang merepresentasikan tugas pertama yang selesai. Ini berguna ketika Anda ingin melakukan beberapa tindakan segera setelah salah satu tugas selesai, tanpa menunggu semuanya selesai.

Contoh kode berikut menunjukkan cara menggunakan Task.WhenAll untuk menjalankan beberapa tugas secara paralel dan menunggu penyelesaiannya:


using System;
using System.Net.Http;
using System.Threading.Tasks;
using System.Collections.Generic;

class Program
{
    static async Task Main(string[] args)
    {
        var urls = new List<string>
        {
            "https://example.com",
            "https://example.org",
            "https://example.net"
        };

        var tasks = new List<Task<string>>();

        foreach (var url in urls)
        {
            tasks.Add(FetchDataAsync(url));
        }

        // Wait for all tasks to complete
        var results = await Task.WhenAll(tasks);

        foreach (var result in results)
        {
            Console.WriteLine(result);
        }
    }

    static async Task<string> FetchDataAsync(string url)
    {
        using (var client = new HttpClient())
        {
            return await client.GetStringAsync(url);
        }
    }
}

Dalam contoh ini, metode FetchDataAsync mengumpulkan data dari beberapa URL secara paralel dengan menggunakan Task.WhenAll. Hasilnya dicetak ke konsol setelah semua tugas selesai.

Melakukan beberapa operasi I/O file secara bersamaan

Dalam banyak kasus, iterasi file adalah operasi yang dapat dengan mudah diparalelkan.

Contoh berikut berulang direktori secara berurutan, tetapi memproses file secara paralel. Ini mungkin pendekatan terbaik ketika Anda memiliki rasio file-ke-direktori yang besar. Dimungkinkan juga untuk paralelisasi iterasi direktori, dan mengakses setiap file secara berurutan. Mungkin tidak efisien untuk memproses kedua perulangan secara paralel kecuali Anda secara khusus menargetkan mesin dengan jumlah prosesor yang besar. Namun, seperti dalam semua kasus, Anda harus menguji aplikasi Anda secara menyeluruh untuk menentukan pendekatan terbaik.


using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Security;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        try
        {
            TraverseTreeParallelForEach(@"C:\Program Files", (f) =>
            {
                // Exceptions are no-ops.
                try
                {
                    // Do nothing with the data except read it.
                    byte[] data = File.ReadAllBytes(f);
                }
                catch (FileNotFoundException) { }
                catch (IOException) { }
                catch (UnauthorizedAccessException) { }
                catch (SecurityException) { }
                // Display the filename.
                Console.WriteLine(f);
            });
        }
        catch (ArgumentException)
        {
            Console.WriteLine(@"The directory 'C:\Program Files' does not exist.");
        }

        // Keep the console window open.
        Console.ReadKey();
    }

    public static void TraverseTreeParallelForEach(string root, Action<string> action)
    {
        //Count of files traversed and timer for diagnostic output
        int fileCount = 0;
        var sw = Stopwatch.StartNew();

        // Determine whether to parallelize file processing on each folder based on processor count.
        int procCount = Environment.ProcessorCount;

        // Data structure to hold names of subfolders to be examined for files.
        Stack<string> dirs = new Stack<string>();

        if (!Directory.Exists(root))
        {
            throw new ArgumentException(
                "The given root directory doesn't exist.", nameof(root));
        }
        dirs.Push(root);

        while (dirs.Count > 0)
        {
            string currentDir = dirs.Pop();
            string[] subDirs = { };
            string[] files = { };

            try
            {
                subDirs = Directory.GetDirectories(currentDir);
            }
            // Thrown if we do not have discovery permission on the directory.
            catch (UnauthorizedAccessException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }
            // Thrown if another process has deleted the directory after we retrieved its name.
            catch (DirectoryNotFoundException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }

            try
            {
                files = Directory.GetFiles(currentDir);
            }
            catch (UnauthorizedAccessException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }
            catch (DirectoryNotFoundException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }
            catch (IOException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }

            // Execute in parallel if there are enough files in the directory.
            // Otherwise, execute sequentially.Files are opened and processed
            // synchronously but this could be modified to perform async I/O.
            try
            {
                if (files.Length < procCount)
                {
                    foreach (var file in files)
                    {
                        action(file);
                        fileCount++;
                    }
                }
                else
                {
                    Parallel.ForEach(files, () => 0,
                        (file, loopState, localCount) =>
                        {
                            action(file);
                            return (int)++localCount;
                        },
                        (c) =>
                        {
                            Interlocked.Add(ref fileCount, c);
                        });
                }
            }
            catch (AggregateException ae)
            {
                ae.Handle((ex) =>
                {
                    if (ex is UnauthorizedAccessException)
                    {
                        // Here we just output a message and go on.
                        Console.WriteLine(ex.Message);
                        return true;
                    }
                    // Handle other exceptions here if necessary...

                    return false;
                });
            }

            // Push the subdirectories onto the stack for traversal.
            // This could also be done before handing the files.
            foreach (string str in subDirs)
                dirs.Push(str);
        }

        // For diagnostic purposes.
        Console.WriteLine($"Processed {fileCount} files in {sw.ElapsedMilliseconds} milliseconds");
    }
}

Dalam contoh ini, I/O file dilakukan secara sinkron. Ketika kode Anda bekerja dengan file besar atau koneksi jaringan yang lambat, mungkin lebih baik mengakses file secara asinkron. Anda dapat menggabungkan teknik I/O asinkron dengan iterasi paralel.

Contoh menggunakan variabel fileCount lokal untuk mempertahankan hitungan jumlah total file yang diproses. Karena variabel mungkin diakses secara bersamaan oleh beberapa tugas, akses disinkronkan dengan memanggil Interlocked.Add metode .

Perhatikan bahwa jika terjadi pengecualian di thread utama, thread yang dimulai oleh metode ForEach mungkin akan berlanjut. Untuk menghentikan utas ini, Anda dapat mengatur variabel Boolean di handler pengecualian Anda dan memeriksa nilainya pada setiap iterasi perulangan paralel. Jika nilai menunjukkan bahwa pengecualian telah dilemparkan, gunakan variabel ParallelLoopState untuk menghentikan atau memisahkan dari perulangan.

Ringkasan

Unit ini berfokus pada paralelisme dan Pustaka Paralel Tugas (TPL). Ini mencakup cara menjalankan tugas asinkron secara paralel menggunakan Task.WhenAll metode, paralelisme data dengan Parallel.For metode dan Parallel.ForEach , dan penggunaan struktur data bersamaan seperti ConcurrentBag, , ConcurrentQueuedan ConcurrentDictionary. Konten juga menunjukkan cara melakukan beberapa operasi I/O file secara bersamaan.

Poin-poin penting

  • Pemrograman paralel dalam C# memungkinkan eksekusi beberapa tugas secara bersamaan.
  • Kelas Task , async dan await kata kunci digunakan untuk menerapkan pemrograman paralel.
  • Task.WhenAll metode digunakan untuk menunggu beberapa tugas selesai sebelum melanjutkan.
  • Paralelisme data dicapai menggunakan Parallel.For metode dan Parallel.ForEach .
  • Struktur data bersamaan seperti ConcurrentBag, ConcurrentQueue, dan ConcurrentDictionary dioptimalkan untuk akses bersamaan.
  • Task.WhenAll metode dan Task.WhenAny memungkinkan menjalankan beberapa tugas secara paralel dan menunggu penyelesaiannya.
  • Beberapa operasi I/O file dapat dilakukan secara bersamaan.