Поделиться через


Практическое руководство. Перебор каталогов с файлами с помощью параллельного класса

Во многих случаях итерацию файла можно легко параллелизовать. В статье Практическое руководство. Перебор каталогов с файлами с помощью PLINQ демонстрируется самый простой способ, позволяющий выполнить эту задачу во многих сценариях. Но с ним могут возникнуть трудности, если код должен поддерживать много разных исключений, которые могут возникнуть при доступе к файловой системе. В примере ниже представлен один из подходов к решению этой проблемы. Он применяет итерацию на основе стека для просмотра всех файлов и папок в указанном каталоге и позволяет перехватывать и обрабатывать разные исключения в пользовательском коде. Разумеется, за вами остается конкретный механизм обработки исключений.

Пример

В следующем примере директории обрабатываются последовательно, а файлы — параллельно. Возможно, это самый лучший подход, когда файлов намного больше, чем каталогов. Также вы можете параллелизовать итерацию каталогов, а доступ к файлам осуществлять последовательно. Параллельное выполнение обоих циклов обычно работает эффективно только на компьютерах с большим числом процессоров. Впрочем, как и во всех остальных случаях, наилучший подход можно найти только при тщательном тестировании конкретного приложения.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Security;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        try
        {
            TraverseTreeParallelForEach(@"C:\Program Files", (f) =>
            {
                // Exceptions are no-ops.
                try
                {
                    // Do nothing with the data except read it.
                    byte[] data = File.ReadAllBytes(f);
                }
                catch (FileNotFoundException) { }
                catch (IOException) { }
                catch (UnauthorizedAccessException) { }
                catch (SecurityException) { }
                // Display the filename.
                Console.WriteLine(f);
            });
        }
        catch (ArgumentException)
        {
            Console.WriteLine(@"The directory 'C:\Program Files' does not exist.");
        }

        // Keep the console window open.
        Console.ReadKey();
    }

    public static void TraverseTreeParallelForEach(string root, Action<string> action)
    {
        //Count of files traversed and timer for diagnostic output
        int fileCount = 0;
        var sw = Stopwatch.StartNew();

        // Determine whether to parallelize file processing on each folder based on processor count.
        int procCount = Environment.ProcessorCount;

        // Data structure to hold names of subfolders to be examined for files.
        Stack<string> dirs = new Stack<string>();

        if (!Directory.Exists(root))
        {
            throw new ArgumentException(
                "The given root directory doesn't exist.", nameof(root));
        }
        dirs.Push(root);

        while (dirs.Count > 0)
        {
            string currentDir = dirs.Pop();
            string[] subDirs = { };
            string[] files = { };

            try
            {
                subDirs = Directory.GetDirectories(currentDir);
            }
            // Thrown if we do not have discovery permission on the directory.
            catch (UnauthorizedAccessException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }
            // Thrown if another process has deleted the directory after we retrieved its name.
            catch (DirectoryNotFoundException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }

            try
            {
                files = Directory.GetFiles(currentDir);
            }
            catch (UnauthorizedAccessException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }
            catch (DirectoryNotFoundException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }
            catch (IOException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }

            // Execute in parallel if there are enough files in the directory.
            // Otherwise, execute sequentially.Files are opened and processed
            // synchronously but this could be modified to perform async I/O.
            try
            {
                if (files.Length < procCount)
                {
                    foreach (var file in files)
                    {
                        action(file);
                        fileCount++;
                    }
                }
                else
                {
                    Parallel.ForEach(files, () => 0,
                        (file, loopState, localCount) =>
                        {
                            action(file);
                            return (int)++localCount;
                        },
                        (c) =>
                        {
                            Interlocked.Add(ref fileCount, c);
                        });
                }
            }
            catch (AggregateException ae)
            {
                ae.Handle((ex) =>
                {
                    if (ex is UnauthorizedAccessException)
                    {
                        // Here we just output a message and go on.
                        Console.WriteLine(ex.Message);
                        return true;
                    }
                    // Handle other exceptions here if necessary...

                    return false;
                });
            }

            // Push the subdirectories onto the stack for traversal.
            // This could also be done before handing the files.
            foreach (string str in subDirs)
                dirs.Push(str);
        }

        // For diagnostic purposes.
        Console.WriteLine("Processed {0} files in {1} milliseconds", fileCount, sw.ElapsedMilliseconds);
    }
}
Imports System.Collections.Generic
Imports System.Diagnostics
Imports System.IO
Imports System.Security
Imports System.Threading
Imports System.Threading.Tasks

Module Example
    Sub Main()
        Try
            TraverseTreeParallelForEach("C:\Program Files",
                                        Sub(f)
                                            ' Exceptions are No-ops.         
                                            Try
                                                ' Do nothing with the data except read it.
                                                Dim data() As Byte = File.ReadAllBytes(f)
                                                ' In the event the file has been deleted.
                                            Catch e As FileNotFoundException

                                                ' General I/O exception, especially if the file is in use.
                                            Catch e As IOException

                                                ' Lack of adequate permissions.
                                            Catch e As UnauthorizedAccessException

                                                ' Lack of adequate permissions.
                                            Catch e As SecurityException

                                            End Try
                                            ' Display the filename.
                                            Console.WriteLine(f)
                                        End Sub)
        Catch e As ArgumentException
            Console.WriteLine("The directory 'C:\Program Files' does not exist.")
        End Try
        ' Keep the console window open.
        Console.ReadKey()
    End Sub

    Public Sub TraverseTreeParallelForEach(ByVal root As String, ByVal action As Action(Of String))
        'Count of files traversed and timer for diagnostic output
        Dim fileCount As Integer = 0
        Dim sw As Stopwatch = Stopwatch.StartNew()

        ' Determine whether to parallelize file processing on each folder based on processor count.
        Dim procCount As Integer = System.Environment.ProcessorCount

        ' Data structure to hold names of subfolders to be examined for files.
        Dim dirs As New Stack(Of String)

        If Not Directory.Exists(root) Then Throw New ArgumentException(
            "The given root directory doesn't exist.", NameOf(root))

        dirs.Push(root)

        While (dirs.Count > 0)
            Dim currentDir As String = dirs.Pop()
            Dim subDirs() As String = Nothing
            Dim files() As String = Nothing

            Try
                subDirs = Directory.GetDirectories(currentDir)
                ' Thrown if we do not have discovery permission on the directory.
            Catch e As UnauthorizedAccessException
                Console.WriteLine(e.Message)
                Continue While
                ' Thrown if another process has deleted the directory after we retrieved its name.
            Catch e As DirectoryNotFoundException
                Console.WriteLine(e.Message)
                Continue While
            End Try

            Try
                files = Directory.GetFiles(currentDir)
            Catch e As UnauthorizedAccessException
                Console.WriteLine(e.Message)
                Continue While
            Catch e As DirectoryNotFoundException
                Console.WriteLine(e.Message)
                Continue While
            Catch e As IOException
                Console.WriteLine(e.Message)
                Continue While
            End Try

            ' Execute in parallel if there are enough files in the directory.
            ' Otherwise, execute sequentially.Files are opened and processed
            ' synchronously but this could be modified to perform async I/O.
            Try
                If files.Length < procCount Then
                    For Each file In files
                        action(file)
                        fileCount += 1
                    Next
                Else
                    Parallel.ForEach(files, Function() 0, Function(file, loopState, localCount)
                                                              action(file)
                                                              localCount = localCount + 1
                                                              Return localCount
                                                          End Function,
                                     Sub(c)
                                         Interlocked.Add(fileCount, c)
                                     End Sub)
                End If
            Catch ae As AggregateException
                ae.Handle(Function(ex)

                              If TypeOf (ex) Is UnauthorizedAccessException Then

                                  ' Here we just output a message and go on.
                                  Console.WriteLine(ex.Message)
                                  Return True
                              End If
                              ' Handle other exceptions here if necessary...

                              Return False
                          End Function)
            End Try
            ' Push the subdirectories onto the stack for traversal.
            ' This could also be done before handing the files.
            For Each str As String In subDirs
                dirs.Push(str)
            Next

            ' For diagnostic purposes.
            Console.WriteLine("Processed {0} files in {1} milliseconds", fileCount, sw.ElapsedMilliseconds)
        End While
    End Sub
End Module

В этом примере файловый ввод-вывод выполняется синхронно. При работе с большими файлами или медленном сетевом подключении асинхронный доступ к файлам может оказаться предпочтительнее. Вы можете сочетать методы асинхронного ввода-вывода с параллельным выполнением итерации. Дополнительные сведения см. в статье TPL и традиционное асинхронное программирование .NET.

В этом примере используется локальная переменная fileCount для учета общего количества обработанных файлов. Поскольку доступ к этой переменной могут получать одновременно несколько задач, доступ к ней синхронизируется путем вызова метода Interlocked.Add.

Обратите внимание, что создание исключений в основном потоке не всегда приводит к остановке потоков, запущенных из метода ForEach. Чтобы остановить эти потоки, создайте в обработчиках исключений логическую переменную и проверяйте ее значение при каждой итерации параллельного цикла. Если значение указывает на созданное исключение, остановите или прервите выполнение цикла с помощью переменной ParallelLoopState. Подробнее см. статью How to: Stop or Break from a Parallel.For Loop (Практическое руководство. Остановка цикла Parallel.For или выход из этого цикла).

См. также