非同期タスクを並列で実行する
並列プログラミングは、複数のタスクを同時に実行し、アプリケーションのパフォーマンスと応答性を向上させる強力な手法です。
C# では、タスク並列ライブラリ (TPL) を使用して、並列コードを記述するプロセスを簡略化できます。 TPL は、 System.Threading および System.Threading.Tasks 名前空間のパブリック型と API のセットです。 TPL の目的は、アプリケーションに並列処理とコンカレンシーを追加するプロセスを簡略化することで、開発者の生産性を高めるものです。 TPL は、使用可能なすべてのプロセッサを最も効率的に使用するようにコンカレンシーの程度を動的にスケーリングします。 さらに、TPL は、作業のパーティション分割、ThreadPool でのスレッドのスケジュール設定、取り消しのサポート、状態管理、およびその他の低レベルの詳細を処理します。 TPL を使用すると、プログラムが実現するように設計されている作業に集中しながら、コードのパフォーマンスを最大化できます。
TPL は、次の領域でサポートを提供します。
- データの並列処理: TPL には、データ並列処理を実行するためのメソッドが用意されており、複数のデータ要素に対して同じ操作を同時に実行できます。 これは、大規模なデータセットがあり、各要素に対して個別に計算または変換を実行する場合に特に便利です。
- タスク ベースの非同期プログラミング: TPL は、非同期操作を表す
Taskクラスを提供します。asyncキーワードとawaitキーワードを使用すると、非同期コードを記述するプロセスを簡略化できます。 これにより、並列処理を利用しながら、読みやすく保守しやすいコードを記述できます。 - データフロー: TPL には、複雑なデータ処理パイプラインを作成できるデータフロー プログラミング モデルが用意されています。 このモデルは、データを非同期的に処理し、メッセージを使用して相互に通信できる "ブロック" の概念に基づいています。
Von Bedeutung
並列プログラミングとマルチスレッドは、コンカレンシーと同期について十分に理解する必要がある高度なトピックです。 TPL はマルチスレッド シナリオを簡略化しますが、TPL を効果的に使用できるように、スレッドの概念 (ロック、デッドロック、競合状態など) を基本的に理解することをお勧めします。 このトレーニングでは、TPL を使用した並列プログラミングの限定的な概要について説明します。
データ並列
データ並列処理は、複数のデータ要素に対して同じ操作を同時に実行することに重点を置いた並列プログラミングの形式です。 これは、大規模なデータセットがあり、各要素に対して個別に計算または変換を実行する場合に特に便利です。 C# では、 Parallel.For メソッドと Parallel.ForEach メソッドを使用して、データの並列処理を簡単に実現できます。 これらのメソッドを使用すると、複数のスレッドにワークロードを分散して、データのコレクションまたは範囲を並列で反復処理できます。
タスク並列ライブラリは、 System.Threading.Tasks.Parallel クラスを介したデータ並列処理をサポートします。 このクラスは、 for ループと foreach ループのメソッドベースの並列実装を提供します。 シーケンシャル ループを記述するのと同じように、 Parallel.For または Parallel.ForEach ループのループ ロジックを記述します。 TPL は、すべての低レベルの作業を自動的に処理します。
次のコード例は、単純な foreach ループとその並列の同等のループを示しています。
// Sequential version
foreach (var item in sourceCollection)
{
Process(item);
}
// Parallel equivalent
Parallel.ForEach(sourceCollection, item => Process(item));
TPL には、 ConcurrentBag、 ConcurrentQueue、 ConcurrentDictionaryなど、同時アクセス用に最適化された一連のデータ構造も用意されています。 これらのデータ構造を使用すると、明示的なロックを必要とせずに、複数のスレッドから要素を安全に追加、削除、アクセスできます。
次のコード例では、 ConcurrentBag を使用して、並列で実行されている複数のタスクの結果を格納する方法を示します。
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
class Program
{
static void Main()
{
var results = new ConcurrentBag<int>();
Parallel.For(0, 100, i =>
{
// Simulate some work
Task.Delay(100).Wait();
results.Add(i);
});
Console.WriteLine($"Processed {results.Count} items in parallel.");
}
}
この例では、 ConcurrentBag を使用して並列処理の結果を格納します。 各タスクは、明示的なロックを必要とせずに結果をセットに追加し、スレッドの安全性を確保します。
タスクを並列で実行するために Task.WhenAll と Task.WhenAny を使用する
Task.WhenAllメソッドとTask.WhenAny メソッドは、C# のタスク並列ライブラリの一部です。 これらのメソッドを使用すると、複数のタスクを並列で実行し、タスクが完了するまで待機できます。
Task.WhenAll は、すべてのタスクが完了するのを待ってから続行する場合に使用されます。 入力としてタスクの配列を受け取り、すべての入力タスクの完了を表す 1 つのタスクを返します。 これは、複数の API 呼び出しを行ったり、複数のファイルを同時に処理したりするなど、複数の独立したタスクを同時に実行できる場合に便利です。
Task.WhenAny は、いずれかのタスクが完了するのを待つ場合に使用されます。 入力としてタスクの配列を受け取り、完了した最初のタスクを表すタスクを返します。 これは、すべてのタスクが完了するのを待たずに、いずれかのタスクが完了したらすぐに何らかのアクションを実行する場合に便利です。
次のコード例では、 Task.WhenAll を使用して複数のタスクを並列で実行し、完了するまで待機する方法を示します。
using System;
using System.Net.Http;
using System.Threading.Tasks;
using System.Collections.Generic;
class Program
{
static async Task Main(string[] args)
{
var urls = new List<string>
{
"https://example.com",
"https://example.org",
"https://example.net"
};
var tasks = new List<Task<string>>();
foreach (var url in urls)
{
tasks.Add(FetchDataAsync(url));
}
// Wait for all tasks to complete
var results = await Task.WhenAll(tasks);
foreach (var result in results)
{
Console.WriteLine(result);
}
}
static async Task<string> FetchDataAsync(string url)
{
using (var client = new HttpClient())
{
return await client.GetStringAsync(url);
}
}
}
この例では、 FetchDataAsync メソッドは、 Task.WhenAllを使用して複数の URL からデータを並列にフェッチします。 結果は、すべてのタスクが完了するとコンソールに出力されます。
複数のファイル I/O 操作を同時に実行する
多くの場合、ファイルの反復処理は簡単に並列化できる操作です。
次の例では、ディレクトリを順番に反復処理しますが、ファイルは並列で処理します。 これは、ファイルとディレクトリの比率が大きい場合に最適な方法です。 ディレクトリの反復処理を並列化し、各ファイルに順番にアクセスすることもできます。 プロセッサの数が多いマシンを具体的にターゲットにしない限り、両方のループを並列化することはおそらく効率的ではありません。 ただし、すべての場合と同様に、アプリケーションを徹底的にテストして最適なアプローチを決定する必要があります。
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Security;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main()
{
try
{
TraverseTreeParallelForEach(@"C:\Program Files", (f) =>
{
// Exceptions are no-ops.
try
{
// Do nothing with the data except read it.
byte[] data = File.ReadAllBytes(f);
}
catch (FileNotFoundException) { }
catch (IOException) { }
catch (UnauthorizedAccessException) { }
catch (SecurityException) { }
// Display the filename.
Console.WriteLine(f);
});
}
catch (ArgumentException)
{
Console.WriteLine(@"The directory 'C:\Program Files' does not exist.");
}
// Keep the console window open.
Console.ReadKey();
}
public static void TraverseTreeParallelForEach(string root, Action<string> action)
{
//Count of files traversed and timer for diagnostic output
int fileCount = 0;
var sw = Stopwatch.StartNew();
// Determine whether to parallelize file processing on each folder based on processor count.
int procCount = Environment.ProcessorCount;
// Data structure to hold names of subfolders to be examined for files.
Stack<string> dirs = new Stack<string>();
if (!Directory.Exists(root))
{
throw new ArgumentException(
"The given root directory doesn't exist.", nameof(root));
}
dirs.Push(root);
while (dirs.Count > 0)
{
string currentDir = dirs.Pop();
string[] subDirs = { };
string[] files = { };
try
{
subDirs = Directory.GetDirectories(currentDir);
}
// Thrown if we do not have discovery permission on the directory.
catch (UnauthorizedAccessException e)
{
Console.WriteLine(e.Message);
continue;
}
// Thrown if another process has deleted the directory after we retrieved its name.
catch (DirectoryNotFoundException e)
{
Console.WriteLine(e.Message);
continue;
}
try
{
files = Directory.GetFiles(currentDir);
}
catch (UnauthorizedAccessException e)
{
Console.WriteLine(e.Message);
continue;
}
catch (DirectoryNotFoundException e)
{
Console.WriteLine(e.Message);
continue;
}
catch (IOException e)
{
Console.WriteLine(e.Message);
continue;
}
// Execute in parallel if there are enough files in the directory.
// Otherwise, execute sequentially.Files are opened and processed
// synchronously but this could be modified to perform async I/O.
try
{
if (files.Length < procCount)
{
foreach (var file in files)
{
action(file);
fileCount++;
}
}
else
{
Parallel.ForEach(files, () => 0,
(file, loopState, localCount) =>
{
action(file);
return (int)++localCount;
},
(c) =>
{
Interlocked.Add(ref fileCount, c);
});
}
}
catch (AggregateException ae)
{
ae.Handle((ex) =>
{
if (ex is UnauthorizedAccessException)
{
// Here we just output a message and go on.
Console.WriteLine(ex.Message);
return true;
}
// Handle other exceptions here if necessary...
return false;
});
}
// Push the subdirectories onto the stack for traversal.
// This could also be done before handing the files.
foreach (string str in subDirs)
dirs.Push(str);
}
// For diagnostic purposes.
Console.WriteLine($"Processed {fileCount} files in {sw.ElapsedMilliseconds} milliseconds");
}
}
この例では、ファイル I/O は同期的に実行されます。 コードが大きなファイルや低速なネットワーク接続で動作している場合は、ファイルに非同期的にアクセスすることをお勧めします。 非同期 I/O 手法と並列反復を組み合わせることができます。
この例では、ローカル fileCount 変数を使用して、処理されたファイルの合計数のカウントを維持します。 変数は複数のタスクによって同時にアクセスされる可能性があるため、 Interlocked.Add メソッドを呼び出すことによってアクセスが同期されます。
メイン スレッドで例外がスローされた場合、ForEach メソッドによって開始されるスレッドは引き続き実行される可能性があることに注意してください。 これらのスレッドを停止するには、例外ハンドラーでブール変数を設定し、並列ループの各イテレーションでその値を確認します。 値が例外がスローされたことを示す場合は、ParallelLoopState 変数を使用してループを停止または中断します。
概要
このユニットでは、並列処理とタスク並列ライブラリ (TPL) に重点を置いています。
Task.WhenAllメソッドを使用して非同期タスクを並列で実行する方法、Parallel.ForメソッドとParallel.ForEachメソッドを使用したデータ並列処理、およびConcurrentBag、ConcurrentQueue、ConcurrentDictionaryなどの同時実行データ構造の使用について説明します。 また、複数のファイル I/O 操作を同時に実行する方法についても説明します。
重要なポイント
- C# での並列プログラミングでは、複数のタスクを同時に実行できます。
- 並列プログラミングの実装には、
Taskクラス、async、およびawaitキーワードが使用されます。 -
Task.WhenAllメソッドは、続行する前に複数のタスクが完了するのを待機するために使用されます。 - データの並列処理は、
Parallel.ForメソッドとParallel.ForEachメソッドを使用して実現されます。 -
ConcurrentBag、ConcurrentQueue、ConcurrentDictionaryなどの同時実行データ構造は、同時アクセス用に最適化されています。 -
Task.WhenAllメソッドとTask.WhenAnyメソッドを使用すると、複数のタスクを並列で実行し、その完了を待機できます。 - 複数のファイル I/O 操作を同時に実行できます。