Практическое руководство. Перебор каталогов с файлами с помощью параллельного класса
Во многих случаях итерацию файла можно легко параллелизовать. В статье Практическое руководство. Перебор каталогов с файлами с помощью PLINQ демонстрируется самый простой способ, позволяющий выполнить эту задачу во многих сценариях. Но с ним могут возникнуть трудности, если код должен поддерживать много разных исключений, которые могут возникнуть при доступе к файловой системе. В примере ниже представлен один из подходов к решению этой проблемы. Он применяет итерацию на основе стека для просмотра всех файлов и папок в указанном каталоге и позволяет перехватывать и обрабатывать разные исключения в пользовательском коде. Разумеется, за вами остается конкретный механизм обработки исключений.
Пример
В следующем примере директории обрабатываются последовательно, а файлы — параллельно. Возможно, это самый лучший подход, когда файлов намного больше, чем каталогов. Также вы можете параллелизовать итерацию каталогов, а доступ к файлам осуществлять последовательно. Параллельное выполнение обоих циклов обычно работает эффективно только на компьютерах с большим числом процессоров. Впрочем, как и во всех остальных случаях, наилучший подход можно найти только при тщательном тестировании конкретного приложения.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Security;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main()
{
try
{
TraverseTreeParallelForEach(@"C:\Program Files", (f) =>
{
// Exceptions are no-ops.
try
{
// Do nothing with the data except read it.
byte[] data = File.ReadAllBytes(f);
}
catch (FileNotFoundException) { }
catch (IOException) { }
catch (UnauthorizedAccessException) { }
catch (SecurityException) { }
// Display the filename.
Console.WriteLine(f);
});
}
catch (ArgumentException)
{
Console.WriteLine(@"The directory 'C:\Program Files' does not exist.");
}
// Keep the console window open.
Console.ReadKey();
}
public static void TraverseTreeParallelForEach(string root, Action<string> action)
{
//Count of files traversed and timer for diagnostic output
int fileCount = 0;
var sw = Stopwatch.StartNew();
// Determine whether to parallelize file processing on each folder based on processor count.
int procCount = Environment.ProcessorCount;
// Data structure to hold names of subfolders to be examined for files.
Stack<string> dirs = new Stack<string>();
if (!Directory.Exists(root))
{
throw new ArgumentException(
"The given root directory doesn't exist.", nameof(root));
}
dirs.Push(root);
while (dirs.Count > 0)
{
string currentDir = dirs.Pop();
string[] subDirs = { };
string[] files = { };
try
{
subDirs = Directory.GetDirectories(currentDir);
}
// Thrown if we do not have discovery permission on the directory.
catch (UnauthorizedAccessException e)
{
Console.WriteLine(e.Message);
continue;
}
// Thrown if another process has deleted the directory after we retrieved its name.
catch (DirectoryNotFoundException e)
{
Console.WriteLine(e.Message);
continue;
}
try
{
files = Directory.GetFiles(currentDir);
}
catch (UnauthorizedAccessException e)
{
Console.WriteLine(e.Message);
continue;
}
catch (DirectoryNotFoundException e)
{
Console.WriteLine(e.Message);
continue;
}
catch (IOException e)
{
Console.WriteLine(e.Message);
continue;
}
// Execute in parallel if there are enough files in the directory.
// Otherwise, execute sequentially.Files are opened and processed
// synchronously but this could be modified to perform async I/O.
try
{
if (files.Length < procCount)
{
foreach (var file in files)
{
action(file);
fileCount++;
}
}
else
{
Parallel.ForEach(files, () => 0,
(file, loopState, localCount) =>
{
action(file);
return (int)++localCount;
},
(c) =>
{
Interlocked.Add(ref fileCount, c);
});
}
}
catch (AggregateException ae)
{
ae.Handle((ex) =>
{
if (ex is UnauthorizedAccessException)
{
// Here we just output a message and go on.
Console.WriteLine(ex.Message);
return true;
}
// Handle other exceptions here if necessary...
return false;
});
}
// Push the subdirectories onto the stack for traversal.
// This could also be done before handing the files.
foreach (string str in subDirs)
dirs.Push(str);
}
// For diagnostic purposes.
Console.WriteLine("Processed {0} files in {1} milliseconds", fileCount, sw.ElapsedMilliseconds);
}
}
Imports System.Collections.Generic
Imports System.Diagnostics
Imports System.IO
Imports System.Security
Imports System.Threading
Imports System.Threading.Tasks
Module Example
Sub Main()
Try
TraverseTreeParallelForEach("C:\Program Files",
Sub(f)
' Exceptions are No-ops.
Try
' Do nothing with the data except read it.
Dim data() As Byte = File.ReadAllBytes(f)
' In the event the file has been deleted.
Catch e As FileNotFoundException
' General I/O exception, especially if the file is in use.
Catch e As IOException
' Lack of adequate permissions.
Catch e As UnauthorizedAccessException
' Lack of adequate permissions.
Catch e As SecurityException
End Try
' Display the filename.
Console.WriteLine(f)
End Sub)
Catch e As ArgumentException
Console.WriteLine("The directory 'C:\Program Files' does not exist.")
End Try
' Keep the console window open.
Console.ReadKey()
End Sub
Public Sub TraverseTreeParallelForEach(ByVal root As String, ByVal action As Action(Of String))
'Count of files traversed and timer for diagnostic output
Dim fileCount As Integer = 0
Dim sw As Stopwatch = Stopwatch.StartNew()
' Determine whether to parallelize file processing on each folder based on processor count.
Dim procCount As Integer = System.Environment.ProcessorCount
' Data structure to hold names of subfolders to be examined for files.
Dim dirs As New Stack(Of String)
If Not Directory.Exists(root) Then Throw New ArgumentException(
"The given root directory doesn't exist.", NameOf(root))
dirs.Push(root)
While (dirs.Count > 0)
Dim currentDir As String = dirs.Pop()
Dim subDirs() As String = Nothing
Dim files() As String = Nothing
Try
subDirs = Directory.GetDirectories(currentDir)
' Thrown if we do not have discovery permission on the directory.
Catch e As UnauthorizedAccessException
Console.WriteLine(e.Message)
Continue While
' Thrown if another process has deleted the directory after we retrieved its name.
Catch e As DirectoryNotFoundException
Console.WriteLine(e.Message)
Continue While
End Try
Try
files = Directory.GetFiles(currentDir)
Catch e As UnauthorizedAccessException
Console.WriteLine(e.Message)
Continue While
Catch e As DirectoryNotFoundException
Console.WriteLine(e.Message)
Continue While
Catch e As IOException
Console.WriteLine(e.Message)
Continue While
End Try
' Execute in parallel if there are enough files in the directory.
' Otherwise, execute sequentially.Files are opened and processed
' synchronously but this could be modified to perform async I/O.
Try
If files.Length < procCount Then
For Each file In files
action(file)
fileCount += 1
Next
Else
Parallel.ForEach(files, Function() 0, Function(file, loopState, localCount)
action(file)
localCount = localCount + 1
Return localCount
End Function,
Sub(c)
Interlocked.Add(fileCount, c)
End Sub)
End If
Catch ae As AggregateException
ae.Handle(Function(ex)
If TypeOf (ex) Is UnauthorizedAccessException Then
' Here we just output a message and go on.
Console.WriteLine(ex.Message)
Return True
End If
' Handle other exceptions here if necessary...
Return False
End Function)
End Try
' Push the subdirectories onto the stack for traversal.
' This could also be done before handing the files.
For Each str As String In subDirs
dirs.Push(str)
Next
' For diagnostic purposes.
Console.WriteLine("Processed {0} files in {1} milliseconds", fileCount, sw.ElapsedMilliseconds)
End While
End Sub
End Module
В этом примере файловый ввод-вывод выполняется синхронно. При работе с большими файлами или медленном сетевом подключении асинхронный доступ к файлам может оказаться предпочтительнее. Вы можете сочетать методы асинхронного ввода-вывода с параллельным выполнением итерации. Дополнительные сведения см. в статье TPL и традиционное асинхронное программирование .NET.
В этом примере используется локальная переменная fileCount
для учета общего количества обработанных файлов. Поскольку доступ к этой переменной могут получать одновременно несколько задач, доступ к ней синхронизируется путем вызова метода Interlocked.Add.
Обратите внимание, что создание исключений в основном потоке не всегда приводит к остановке потоков, запущенных из метода ForEach. Чтобы остановить эти потоки, создайте в обработчиках исключений логическую переменную и проверяйте ее значение при каждой итерации параллельного цикла. Если значение указывает на созданное исключение, остановите или прервите выполнение цикла с помощью переменной ParallelLoopState. Подробнее см. статью How to: Stop or Break from a Parallel.For Loop (Практическое руководство. Остановка цикла Parallel.For или выход из этого цикла).
См. также
Обратная связь
https://aka.ms/ContentUserFeedback.
Ожидается в ближайшее время: в течение 2024 года мы постепенно откажемся от GitHub Issues как механизма обратной связи для контента и заменим его новой системой обратной связи. Дополнительные сведения см. в разделеОтправить и просмотреть отзыв по