병렬로 비동기 작업 실행

완료됨

병렬 프로그래밍은 여러 작업을 동시에 실행하여 애플리케이션의 성능과 응답성을 향상시킬 수 있는 강력한 기술입니다.

C#에서는 TPL(작업 병렬 라이브러리)을 사용하여 병렬 코드 작성 프로세스를 간소화할 수 있습니다. TPL은 System.ThreadingSystem.Threading.Tasks 네임스페이스의 공용 타입과 API 집합입니다. TPL의 목적은 애플리케이션에 병렬 처리 및 동시성을 추가하는 프로세스를 간소화하여 개발자의 생산성을 높이는 것입니다. TPL은 동시성 수준을 동적으로 조정하여 사용 가능한 모든 프로세서를 가장 효율적으로 사용합니다. 또한 TPL은 작업의 분할, ThreadPool의 스레드 예약, 취소 지원, 상태 관리 및 기타 하위 수준 세부 정보를 처리합니다. TPL을 사용하면 프로그램이 수행하도록 설계된 작업에 집중하면서 코드의 성능을 최대화할 수 있습니다.

TPL은 다음 영역에서 지원을 제공합니다.

  • 데이터 병렬 처리: TPL은 여러 데이터 요소에 대해 동일한 작업을 동시에 수행할 수 있도록 데이터 병렬 처리를 수행하는 메서드를 제공합니다. 이는 큰 데이터 세트가 있고 각 요소에 대해 독립적으로 계산 또는 변환을 수행하려는 경우에 특히 유용합니다.
  • 작업 기반 비동기 프로그래밍: TPL은 비동기 작업을 나타내는 클래스를 제공합니다 Task . asyncawait 키워드를 사용하여 비동기 코드를 작성하는 과정을 간소화할 수 있습니다. 이를 통해 병렬 처리를 활용하면서 읽기 및 유지 관리가 더 쉬운 코드를 작성할 수 있습니다.
  • 데이터 흐름: TPL은 복잡한 데이터 처리 파이프라인을 만들 수 있는 데이터 흐름 프로그래밍 모델을 제공합니다. 이 모델은 비동기적으로 데이터를 처리하고 메시지를 사용하여 서로 통신할 수 있는 "블록"의 개념을 기반으로 합니다.

중요합니다

병렬 프로그래밍 및 다중 스레딩은 동시성 및 동기화를 잘 이해해야 하는 고급 항목입니다. TPL은 다중 스레드 시나리오를 간소화하지만 TPL을 효과적으로 사용할 수 있도록 스레딩 개념(예: 잠금, 교착 상태 및 경합 조건)을 기본적으로 이해하는 것이 좋습니다. 이 교육에서는 TPL을 사용하는 병렬 프로그래밍에 대한 제한된 소개를 제공합니다.

데이터 병렬 처리

데이터 병렬 처리는 여러 데이터 요소에서 동일한 작업을 동시에 수행하는 데 중점을 둔 병렬 프로그래밍의 한 형태입니다. 이는 큰 데이터 세트가 있고 각 요소에 대해 독립적으로 계산 또는 변환을 수행하려는 경우에 특히 유용합니다. C#에서는 데이터 병렬 처리를 쉽게 달성하기 위해 Parallel.For 메서드와 Parallel.ForEach 메서드를 사용할 수 있습니다. 이러한 메서드를 사용하면 여러 스레드에 워크로드를 분산하여 컬렉션 또는 데이터 범위를 병렬로 반복할 수 있습니다.

작업 병렬 라이브러리는 클래스를 통한 System.Threading.Tasks.Parallel 데이터 병렬 처리를 지원합니다. 이 클래스는 메서드 기반 병렬 구현 및 forforeach 루프를 제공합니다. Parallel.For 또는 Parallel.ForEach 루프에 대한 루프 논리는 순차 루프를 작성하는 것처럼 작성합니다. TPL은 모든 하위 수준 작업을 처리합니다.

다음 코드 예제에서는 간단한 foreach 루프 및 해당 병렬 동등을 보여 집니다.


// Sequential version
foreach (var item in sourceCollection)
{
    Process(item);
}

// Parallel equivalent
Parallel.ForEach(sourceCollection, item => Process(item));

또한 TPL은 동시 액세스(예: ConcurrentBag, ConcurrentQueueConcurrentDictionary)에 최적화된 데이터 구조 집합을 제공합니다. 이러한 데이터 구조를 사용하면 명시적 잠금 없이도 여러 스레드에서 요소를 안전하게 추가, 제거 및 액세스할 수 있습니다.

다음 코드 예제에서는 병렬로 실행되는 여러 작업의 결과를 ConcurrentBag에 저장하는 방법을 보여 줍니다.


using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        var results = new ConcurrentBag<int>();
        Parallel.For(0, 100, i =>
        {
            // Simulate some work
            Task.Delay(100).Wait();
            results.Add(i);
        });

        Console.WriteLine($"Processed {results.Count} items in parallel.");
    }
}

이 예제에서는 ConcurrentBag 병렬 처리 결과를 저장하는 데 사용됩니다. 각 작업은 명시적 잠금 없이 결과를 백에 추가하여 스레드 안전성을 보장합니다.

병렬로 작업을 실행하기 위해 Task.WhenAllTask.WhenAny를 사용합니다.

Task.WhenAll 메서드는 Task.WhenAny C#의 작업 병렬 라이브러리에 속합니다. 이러한 메서드를 사용하면 여러 작업을 병렬로 실행하고 완료될 때까지 기다릴 수 있습니다.

Task.WhenAll 는 계속하기 전에 모든 작업이 완료되기를 기다리려는 경우에 사용됩니다. 태스크 배열을 입력으로 사용하고 모든 입력 작업의 완료를 나타내는 단일 작업을 반환합니다. 여러 API 호출을 수행하거나 동시에 여러 파일을 처리하는 등 동시에 실행할 수 있는 여러 독립 작업이 있는 경우에 유용합니다.

Task.WhenAny 는 작업이 완료되기를 기다리는 경우에 사용됩니다. 태스크 배열을 입력으로 사용하고 완료된 첫 번째 작업을 나타내는 작업을 반환합니다. 이 기능은 모든 작업이 완료되기를 기다리지 않고 작업이 완료되는 즉시 일부 작업을 수행하려는 경우에 유용합니다.

다음 코드 예제에서는 여러 작업을 병렬로 Task.WhenAll 실행하고 완료될 때까지 기다리는 방법을 보여 줍니다.


using System;
using System.Net.Http;
using System.Threading.Tasks;
using System.Collections.Generic;

class Program
{
    static async Task Main(string[] args)
    {
        var urls = new List<string>
        {
            "https://example.com",
            "https://example.org",
            "https://example.net"
        };

        var tasks = new List<Task<string>>();

        foreach (var url in urls)
        {
            tasks.Add(FetchDataAsync(url));
        }

        // Wait for all tasks to complete
        var results = await Task.WhenAll(tasks);

        foreach (var result in results)
        {
            Console.WriteLine(result);
        }
    }

    static async Task<string> FetchDataAsync(string url)
    {
        using (var client = new HttpClient())
        {
            return await client.GetStringAsync(url);
        }
    }
}

이 예제에서는 FetchDataAsync 메서드를 사용하여 Task.WhenAll로 여러 URL에서 데이터를 병렬로 가져옵니다. 모든 작업이 완료되면 결과가 콘솔에 출력됩니다.

여러 파일 I/O 작업을 동시에 수행

대부분의 경우 파일 반복은 쉽게 병렬 처리할 수 있는 작업입니다.

다음 예제에서는 디렉터리를 순차적으로 반복하지만 파일을 병렬로 처리합니다. 이는 파일 대 디렉터리 비율이 큰 경우 가장 좋은 방법일 것입니다. 디렉터리 반복을 병렬 처리하고 각 파일에 순차적으로 액세스할 수도 있습니다. 프로세서 수가 많은 컴퓨터를 구체적으로 대상으로 지정하지 않는 한 두 루프를 병렬화하는 것은 효율적이지 않을 수 있습니다. 그러나 모든 경우와 마찬가지로 애플리케이션을 철저히 테스트하여 최상의 방법을 결정해야 합니다.


using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Security;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        try
        {
            TraverseTreeParallelForEach(@"C:\Program Files", (f) =>
            {
                // Exceptions are no-ops.
                try
                {
                    // Do nothing with the data except read it.
                    byte[] data = File.ReadAllBytes(f);
                }
                catch (FileNotFoundException) { }
                catch (IOException) { }
                catch (UnauthorizedAccessException) { }
                catch (SecurityException) { }
                // Display the filename.
                Console.WriteLine(f);
            });
        }
        catch (ArgumentException)
        {
            Console.WriteLine(@"The directory 'C:\Program Files' does not exist.");
        }

        // Keep the console window open.
        Console.ReadKey();
    }

    public static void TraverseTreeParallelForEach(string root, Action<string> action)
    {
        //Count of files traversed and timer for diagnostic output
        int fileCount = 0;
        var sw = Stopwatch.StartNew();

        // Determine whether to parallelize file processing on each folder based on processor count.
        int procCount = Environment.ProcessorCount;

        // Data structure to hold names of subfolders to be examined for files.
        Stack<string> dirs = new Stack<string>();

        if (!Directory.Exists(root))
        {
            throw new ArgumentException(
                "The given root directory doesn't exist.", nameof(root));
        }
        dirs.Push(root);

        while (dirs.Count > 0)
        {
            string currentDir = dirs.Pop();
            string[] subDirs = { };
            string[] files = { };

            try
            {
                subDirs = Directory.GetDirectories(currentDir);
            }
            // Thrown if we do not have discovery permission on the directory.
            catch (UnauthorizedAccessException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }
            // Thrown if another process has deleted the directory after we retrieved its name.
            catch (DirectoryNotFoundException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }

            try
            {
                files = Directory.GetFiles(currentDir);
            }
            catch (UnauthorizedAccessException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }
            catch (DirectoryNotFoundException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }
            catch (IOException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }

            // Execute in parallel if there are enough files in the directory.
            // Otherwise, execute sequentially.Files are opened and processed
            // synchronously but this could be modified to perform async I/O.
            try
            {
                if (files.Length < procCount)
                {
                    foreach (var file in files)
                    {
                        action(file);
                        fileCount++;
                    }
                }
                else
                {
                    Parallel.ForEach(files, () => 0,
                        (file, loopState, localCount) =>
                        {
                            action(file);
                            return (int)++localCount;
                        },
                        (c) =>
                        {
                            Interlocked.Add(ref fileCount, c);
                        });
                }
            }
            catch (AggregateException ae)
            {
                ae.Handle((ex) =>
                {
                    if (ex is UnauthorizedAccessException)
                    {
                        // Here we just output a message and go on.
                        Console.WriteLine(ex.Message);
                        return true;
                    }
                    // Handle other exceptions here if necessary...

                    return false;
                });
            }

            // Push the subdirectories onto the stack for traversal.
            // This could also be done before handing the files.
            foreach (string str in subDirs)
                dirs.Push(str);
        }

        // For diagnostic purposes.
        Console.WriteLine($"Processed {fileCount} files in {sw.ElapsedMilliseconds} milliseconds");
    }
}

이 예제에서는 파일 I/O가 동기적으로 수행됩니다. 코드가 대용량 파일 또는 느린 네트워크 연결로 작업하는 경우 파일에 비동기적으로 액세스하는 것이 좋습니다. 비동기 I/O 기술을 병렬 반복과 결합할 수 있습니다.

이 예제에서는 로컬 fileCount 변수를 사용하여 처리된 총 파일 수의 수를 유지 관리합니다. 변수는 여러 태스크에서 동시에 액세스할 수 있으므로 메서드를 호출하여 액세스가 Interlocked.Add 동기화됩니다.

주 스레드에서 예외가 발생하면 ForEach 메서드에서 시작된 스레드가 계속 실행될 수 있습니다. 이러한 스레드를 중지하려면 예외 처리기에서 부울 변수를 설정하고 병렬 루프의 각 반복에서 해당 값을 확인할 수 있습니다. 값이 예외가 throw되었음을 나타내는 경우 ParallelLoopState 변수를 사용하여 루프를 중지하거나 중단합니다.

요약

이 단원에서는 병렬 처리 및 TPL(작업 병렬 라이브러리)에 집중했습니다. 메서드를 사용하여 Task.WhenAll 비동기 작업을 병렬로 실행하는 방법, 메서드와 함께 Parallel.For 데이터 병렬 처리, Parallel.ForEach 동시 데이터 구조(예 ConcurrentBag: , ConcurrentQueueConcurrentDictionary)를 사용하는 방법을 설명합니다. 또한 콘텐츠는 여러 파일 I/O 작업을 동시에 수행하는 방법을 보여 줍니다.

핵심 사항

  • C#에서 병렬 프로그래밍을 사용하면 여러 작업을 동시에 실행할 수 있습니다.
  • Task 클래스 asyncawait 키워드는 병렬 프로그래밍을 구현하는 데 사용됩니다.
  • Task.WhenAll 메서드는 계속하기 전에 여러 작업이 완료되기를 기다리는 데 사용됩니다.
  • 데이터 병렬 처리는 Parallel.ForParallel.ForEach 메서드를 사용하여 달성됩니다.
  • 와 같은 ConcurrentBagConcurrentQueue동시 데이터 구조는 ConcurrentDictionary 동시 액세스에 최적화되어 있습니다.
  • Task.WhenAllTask.WhenAny 메서드를 사용하면 여러 작업을 병렬로 실행하고 완료될 때까지 대기할 수 있습니다.
  • 여러 파일 I/O 작업을 동시에 수행할 수 있습니다.