Поделиться через


Практическое руководство. Создание планировщика заданий, ограничивающего степень параллелизма

В некоторых сценариях, создание пользовательского планировщика заданий, который является производным от класса System.Threading.Tasks.TaskScheduler, может повысить производительность. Этот планировщик можно указать в методе For или ForEach() с помощью перечисления System.Threading.Tasks.ParallelOptions. При непосредственном использовании объектов Task, пользовательский планировщик можно указать с помощью конструктора TaskFactory, который принимает в качестве входного параметра значение TaskScheduler или с помощью некоторых других средств, например, StartNew().

Пользовательский планировщик можно использовать для получения доступа к функциональности, которая не доступна в планировщике по умолчанию, например, строгий порядок выполнения "первым пришел - первым обслужен" (FIFO). В следующем примере демонстрируется создание пользовательского планировщика заданий. Этот планировщик позволяет указывать степень параллелизма.

Пример

Следующий пример взят из раздела Parallel Extensions Samples на веб-сайте коллекции кодов MSDN.

Imports System
Imports System.Collections.Generic
Imports System.Linq
Imports System.Threading
Imports System.Threading.Tasks
Module Module2


    Sub Main()

        ' Create a scheduler that uses only one thread.
        Dim lcts As New LimitedConcurrencyLevelTaskScheduler(1)

        ' Create a TaskFactory and pass it our custom scheduler.
        Dim factory As New TaskFactory(lcts)
        Dim cts As New CancellationTokenSource()

        ' Use our factory to run a task.
        Dim t As Task = factory.StartNew(Sub()
                                             For i As Integer = 1 To 50000
                                                 Console.Write("{0} on thread {1}.   ", i, Thread.CurrentThread.ManagedThreadId)
                                             Next
                                         End Sub,
                                cts.Token)

        Console.WriteLine("Press any key to exit.")
        Console.ReadKey()

    End Sub


    ''' <summary>
    ''' Provides a task scheduler that ensures a maximum concurrency level While
    ''' running on top of the ThreadPool.
    ''' </summary>
    Public Class LimitedConcurrencyLevelTaskScheduler
        Inherits TaskScheduler

        ''' <summary>Whether the current thread is processing work items.</summary>
        <ThreadStatic()>
        Private Shared _currentThreadIsProcessingItems As Boolean
        ''' <summary>The list of tasks to be executed.</summary>
        Private ReadOnly _tasks As LinkedList(Of Task) = New LinkedList(Of Task)() ' protected by lock(_tasks)
        ''' <summary>The maximum concurrency level allowed by this scheduler.</summary>
        Private ReadOnly _maxDegreeOfParallelism As Integer
        ''' <summary>Whether the scheduler is currently processing work items.</summary>
        Private _delegatesQueuedOrRunning As Integer = 0 ' protected by lock(_tasks)

        ''' <summary>
        ''' Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
        ''' specified degree of parallelism.
        ''' </summary>
        ''' <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
        Public Sub New(ByVal maxDegreeOfParallelism As Integer)

            If (maxDegreeOfParallelism < 1) Then
                Throw New ArgumentOutOfRangeException("maxDegreeOfParallelism")
            End If
            _maxDegreeOfParallelism = maxDegreeOfParallelism

        End Sub
        ''' <summary>Queues a task to the scheduler.</summary>
        ''' <param name="t">The task to be queued.</param>
        Protected Overrides Sub QueueTask(ByVal t As Task)

            ' Add the task to the list of tasks to be processed.  If there aren't enough
            ' delegates currently queued or running to process tasks, schedule another.
            SyncLock (_tasks)

                _tasks.AddLast(t)
                If (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) Then

                    _delegatesQueuedOrRunning = _delegatesQueuedOrRunning + 1
                    NotifyThreadPoolOfPendingWork()
                End If
            End SyncLock
        End Sub

        ''' <summary>
        ''' Informs the ThreadPool that there's work to be executed for this scheduler.
        ''' </summary>
        Private Sub NotifyThreadPoolOfPendingWork()

            ThreadPool.UnsafeQueueUserWorkItem(Sub()
                                                   ' Note that the current thread is now processing work items.
                                                   ' This is necessary to enable inlining of tasks into this thread.
                                                   _currentThreadIsProcessingItems = True
                                                   Try

                                                       ' Process all available items in the queue.
                                                       While (True)
                                                           Dim item As Task
                                                           SyncLock (_tasks)
                                                               ' When there are no more items to be processed,
                                                               ' note that we're done processing, and get out.
                                                               If (_tasks.Count = 0) Then
                                                                   _delegatesQueuedOrRunning = _delegatesQueuedOrRunning - 1
                                                                   Exit While
                                                               End If

                                                               ' Get the next item from the queue
                                                               item = _tasks.First.Value
                                                               _tasks.RemoveFirst()
                                                           End SyncLock

                                                           ' Execute the task we pulled out of the queue
                                                           MyBase.TryExecuteTask(item)

                                                       End While
                                                       ' We're done processing items on the current thread
                                                   Finally
                                                       _currentThreadIsProcessingItems = False
                                                   End Try
                                               End Sub,
                                          Nothing)
        End Sub

        ''' <summary>Attempts to execute the specified task on the current thread.</summary>
        ''' <param name="task">The task to be executed.</param>
        ''' <param name="taskWasPreviouslyQueued"></param>
        ''' <returns>Whether the task could be executed on the current thread.</returns>
        Protected Overrides Function TryExecuteTaskInline(ByVal t As Task, ByVal taskWasPreviouslyQueued As Boolean) As Boolean

            ' If this thread isn't already processing a task, we don't support inlining
            If (Not _currentThreadIsProcessingItems) Then
                Return False
            End If

            ' If the task was previously queued, remove it from the queue
            If (taskWasPreviouslyQueued) Then
                TryDequeue(t)
            End If
            ' Try to run the task.
            Return MyBase.TryExecuteTask(t)
        End Function

        ''' <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
        ''' <param name="t">The task to be removed.</param>
        ''' <returns>Whether the task could be found and removed.</returns>
        Protected Overrides Function TryDequeue(ByVal t As Task) As Boolean

            SyncLock (_tasks)
                Return _tasks.Remove(t)
            End SyncLock

        End Function

        ''' <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
        Public Overrides ReadOnly Property MaximumConcurrencyLevel As Integer

            Get
                Return _maxDegreeOfParallelism
            End Get
        End Property

        ''' <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
        ''' <returns>An enumerable of the tasks currently scheduled.</returns>
        Protected Overrides Function GetScheduledTasks() As IEnumerable(Of Task)

            Dim lockTaken As Boolean = False
            Try

                Monitor.TryEnter(_tasks, lockTaken)
                If (lockTaken) Then
                    Return _tasks.ToArray()
                Else
                    Throw New NotSupportedException()
                End If
            Finally

                If (lockTaken) Then
                    Monitor.Exit(_tasks)
                End If
            End Try
        End Function
    End Class
End Module
namespace System.Threading.Tasks.Schedulers
{

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;

    class Program
    {
        static void Main()
        {
            LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(1);
            TaskFactory factory = new TaskFactory(lcts);

            factory.StartNew(()=> 
                {
                    for (int i = 0; i < 500; i++)
                    {
                        Console.Write("{0} on thread {1}", i, Thread.CurrentThread.ManagedThreadId);
                    }
                }
            );

            Console.ReadKey();
        }
    }

    /// <summary>
    /// Provides a task scheduler that ensures a maximum concurrency level while
    /// running on top of the ThreadPool.
    /// </summary>
    public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
    {
        /// <summary>Whether the current thread is processing work items.</summary>
        [ThreadStatic]
        private static bool _currentThreadIsProcessingItems;
        /// <summary>The list of tasks to be executed.</summary>
        private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
        /// <summary>The maximum concurrency level allowed by this scheduler.</summary>
        private readonly int _maxDegreeOfParallelism;
        /// <summary>Whether the scheduler is currently processing work items.</summary>
        private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks)

        /// <summary>
        /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
        /// specified degree of parallelism.
        /// </summary>
        /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
        public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
        {
            if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
            _maxDegreeOfParallelism = maxDegreeOfParallelism;
        }

        /// <summary>Queues a task to the scheduler.</summary>
        /// <param name="task">The task to be queued.</param>
        protected sealed override void QueueTask(Task task)
        {
            // Add the task to the list of tasks to be processed.  If there aren't enough
            // delegates currently queued or running to process tasks, schedule another.
            lock (_tasks)
            {
                _tasks.AddLast(task);
                if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
                {
                    ++_delegatesQueuedOrRunning;
                    NotifyThreadPoolOfPendingWork();
                }
            }
        }

        /// <summary>
        /// Informs the ThreadPool that there's work to be executed for this scheduler.
        /// </summary>
        private void NotifyThreadPoolOfPendingWork()
        {
            ThreadPool.UnsafeQueueUserWorkItem(_ =>
            {
                // Note that the current thread is now processing work items.
                // This is necessary to enable inlining of tasks into this thread.
                _currentThreadIsProcessingItems = true;
                try
                {
                    // Process all available items in the queue.
                    while (true)
                    {
                        Task item;
                        lock (_tasks)
                        {
                            // When there are no more items to be processed,
                            // note that we're done processing, and get out.
                            if (_tasks.Count == 0)
                            {
                                --_delegatesQueuedOrRunning;
                                break;
                            }

                            // Get the next item from the queue
                            item = _tasks.First.Value;
                            _tasks.RemoveFirst();
                        }

                        // Execute the task we pulled out of the queue
                        base.TryExecuteTask(item);
                    }
                }
                // We're done processing items on the current thread
                finally { _currentThreadIsProcessingItems = false; }
            }, null);
        }

        /// <summary>Attempts to execute the specified task on the current thread.</summary>
        /// <param name="task">The task to be executed.</param>
        /// <param name="taskWasPreviouslyQueued"></param>
        /// <returns>Whether the task could be executed on the current thread.</returns>
        protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            // If this thread isn't already processing a task, we don't support inlining
            if (!_currentThreadIsProcessingItems) return false;

            // If the task was previously queued, remove it from the queue
            if (taskWasPreviouslyQueued) TryDequeue(task);

            // Try to run the task.
            return base.TryExecuteTask(task);
        }

        /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
        /// <param name="task">The task to be removed.</param>
        /// <returns>Whether the task could be found and removed.</returns>
        protected sealed override bool TryDequeue(Task task)
        {
            lock (_tasks) return _tasks.Remove(task);
        }

        /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
        public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }

        /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
        /// <returns>An enumerable of the tasks currently scheduled.</returns>
        protected sealed override IEnumerable<Task> GetScheduledTasks()
        {
            bool lockTaken = false;
            try
            {
                Monitor.TryEnter(_tasks, ref lockTaken);
                if (lockTaken) return _tasks.ToArray();
                else throw new NotSupportedException();
            }
            finally
            {
                if (lockTaken) Monitor.Exit(_tasks);
            }
        }
    }
}

См. также

Основные понятия

Планировщики заданий