HOW TO:建立會限制並行程度的工作排程器
在極少數情節中,您可以透過下列方式達到效能加速目標:建立衍生自 System.Threading.Tasks.TaskScheduler 類別的自訂工作排程器, 然後使用 System.Threading.Tasks.ParallelOptions 列舉,在 For 方法或 ForEach() 方法中指定這個排程器。 當您直接使用 Task 物件時,可以使用以 TaskScheduler 做為輸入參數的 TaskFactory 建構函式,或使用 StartNew() 之類的其他方法,來指定自訂排程器。
您也可以使用自訂排程器來實現預設排程器未提供的功能,例如嚴格的先進先出 (FIFO) 執行順序。 下列程式碼範例示範如何建立自訂工作排程器。 這個排程器可讓您指定並行程度。
範例
下列範例取自 MSDN Code Gallery 網站上的平行擴充功能範例 (英文)。
Imports System
Imports System.Collections.Generic
Imports System.Linq
Imports System.Threading
Imports System.Threading.Tasks
Module Module2
Sub Main()
' Create a scheduler that uses only one thread.
Dim lcts As New LimitedConcurrencyLevelTaskScheduler(1)
' Create a TaskFactory and pass it our custom scheduler.
Dim factory As New TaskFactory(lcts)
Dim cts As New CancellationTokenSource()
' Use our factory to run a task.
Dim t As Task = factory.StartNew(Sub()
For i As Integer = 1 To 50000
Console.Write("{0} on thread {1}. ", i, Thread.CurrentThread.ManagedThreadId)
Next
End Sub,
cts.Token)
Console.WriteLine("Press any key to exit.")
Console.ReadKey()
End Sub
''' <summary>
''' Provides a task scheduler that ensures a maximum concurrency level While
''' running on top of the ThreadPool.
''' </summary>
Public Class LimitedConcurrencyLevelTaskScheduler
Inherits TaskScheduler
''' <summary>Whether the current thread is processing work items.</summary>
<ThreadStatic()>
Private Shared _currentThreadIsProcessingItems As Boolean
''' <summary>The list of tasks to be executed.</summary>
Private ReadOnly _tasks As LinkedList(Of Task) = New LinkedList(Of Task)() ' protected by lock(_tasks)
''' <summary>The maximum concurrency level allowed by this scheduler.</summary>
Private ReadOnly _maxDegreeOfParallelism As Integer
''' <summary>Whether the scheduler is currently processing work items.</summary>
Private _delegatesQueuedOrRunning As Integer = 0 ' protected by lock(_tasks)
''' <summary>
''' Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
''' specified degree of parallelism.
''' </summary>
''' <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
Public Sub New(ByVal maxDegreeOfParallelism As Integer)
If (maxDegreeOfParallelism < 1) Then
Throw New ArgumentOutOfRangeException("maxDegreeOfParallelism")
End If
_maxDegreeOfParallelism = maxDegreeOfParallelism
End Sub
''' <summary>Queues a task to the scheduler.</summary>
''' <param name="t">The task to be queued.</param>
Protected Overrides Sub QueueTask(ByVal t As Task)
' Add the task to the list of tasks to be processed. If there aren't enough
' delegates currently queued or running to process tasks, schedule another.
SyncLock (_tasks)
_tasks.AddLast(t)
If (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) Then
_delegatesQueuedOrRunning = _delegatesQueuedOrRunning + 1
NotifyThreadPoolOfPendingWork()
End If
End SyncLock
End Sub
''' <summary>
''' Informs the ThreadPool that there's work to be executed for this scheduler.
''' </summary>
Private Sub NotifyThreadPoolOfPendingWork()
ThreadPool.UnsafeQueueUserWorkItem(Sub()
' Note that the current thread is now processing work items.
' This is necessary to enable inlining of tasks into this thread.
_currentThreadIsProcessingItems = True
Try
' Process all available items in the queue.
While (True)
Dim item As Task
SyncLock (_tasks)
' When there are no more items to be processed,
' note that we're done processing, and get out.
If (_tasks.Count = 0) Then
_delegatesQueuedOrRunning = _delegatesQueuedOrRunning - 1
Exit While
End If
' Get the next item from the queue
item = _tasks.First.Value
_tasks.RemoveFirst()
End SyncLock
' Execute the task we pulled out of the queue
MyBase.TryExecuteTask(item)
End While
' We're done processing items on the current thread
Finally
_currentThreadIsProcessingItems = False
End Try
End Sub,
Nothing)
End Sub
''' <summary>Attempts to execute the specified task on the current thread.</summary>
''' <param name="task">The task to be executed.</param>
''' <param name="taskWasPreviouslyQueued"></param>
''' <returns>Whether the task could be executed on the current thread.</returns>
Protected Overrides Function TryExecuteTaskInline(ByVal t As Task, ByVal taskWasPreviouslyQueued As Boolean) As Boolean
' If this thread isn't already processing a task, we don't support inlining
If (Not _currentThreadIsProcessingItems) Then
Return False
End If
' If the task was previously queued, remove it from the queue
If (taskWasPreviouslyQueued) Then
TryDequeue(t)
End If
' Try to run the task.
Return MyBase.TryExecuteTask(t)
End Function
''' <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
''' <param name="t">The task to be removed.</param>
''' <returns>Whether the task could be found and removed.</returns>
Protected Overrides Function TryDequeue(ByVal t As Task) As Boolean
SyncLock (_tasks)
Return _tasks.Remove(t)
End SyncLock
End Function
''' <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
Public Overrides ReadOnly Property MaximumConcurrencyLevel As Integer
Get
Return _maxDegreeOfParallelism
End Get
End Property
''' <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
''' <returns>An enumerable of the tasks currently scheduled.</returns>
Protected Overrides Function GetScheduledTasks() As IEnumerable(Of Task)
Dim lockTaken As Boolean = False
Try
Monitor.TryEnter(_tasks, lockTaken)
If (lockTaken) Then
Return _tasks.ToArray()
Else
Throw New NotSupportedException()
End If
Finally
If (lockTaken) Then
Monitor.Exit(_tasks)
End If
End Try
End Function
End Class
End Module
namespace System.Threading.Tasks.Schedulers
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
class Program
{
static void Main()
{
LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(1);
TaskFactory factory = new TaskFactory(lcts);
factory.StartNew(()=>
{
for (int i = 0; i < 500; i++)
{
Console.Write("{0} on thread {1}", i, Thread.CurrentThread.ManagedThreadId);
}
}
);
Console.ReadKey();
}
}
/// <summary>
/// Provides a task scheduler that ensures a maximum concurrency level while
/// running on top of the ThreadPool.
/// </summary>
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
/// <summary>Whether the current thread is processing work items.</summary>
[ThreadStatic]
private static bool _currentThreadIsProcessingItems;
/// <summary>The list of tasks to be executed.</summary>
private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
/// <summary>The maximum concurrency level allowed by this scheduler.</summary>
private readonly int _maxDegreeOfParallelism;
/// <summary>Whether the scheduler is currently processing work items.</summary>
private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks)
/// <summary>
/// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
/// specified degree of parallelism.
/// </summary>
/// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
_maxDegreeOfParallelism = maxDegreeOfParallelism;
}
/// <summary>Queues a task to the scheduler.</summary>
/// <param name="task">The task to be queued.</param>
protected sealed override void QueueTask(Task task)
{
// Add the task to the list of tasks to be processed. If there aren't enough
// delegates currently queued or running to process tasks, schedule another.
lock (_tasks)
{
_tasks.AddLast(task);
if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
{
++_delegatesQueuedOrRunning;
NotifyThreadPoolOfPendingWork();
}
}
}
/// <summary>
/// Informs the ThreadPool that there's work to be executed for this scheduler.
/// </summary>
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
// Note that the current thread is now processing work items.
// This is necessary to enable inlining of tasks into this thread.
_currentThreadIsProcessingItems = true;
try
{
// Process all available items in the queue.
while (true)
{
Task item;
lock (_tasks)
{
// When there are no more items to be processed,
// note that we're done processing, and get out.
if (_tasks.Count == 0)
{
--_delegatesQueuedOrRunning;
break;
}
// Get the next item from the queue
item = _tasks.First.Value;
_tasks.RemoveFirst();
}
// Execute the task we pulled out of the queue
base.TryExecuteTask(item);
}
}
// We're done processing items on the current thread
finally { _currentThreadIsProcessingItems = false; }
}, null);
}
/// <summary>Attempts to execute the specified task on the current thread.</summary>
/// <param name="task">The task to be executed.</param>
/// <param name="taskWasPreviouslyQueued"></param>
/// <returns>Whether the task could be executed on the current thread.</returns>
protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// If this thread isn't already processing a task, we don't support inlining
if (!_currentThreadIsProcessingItems) return false;
// If the task was previously queued, remove it from the queue
if (taskWasPreviouslyQueued) TryDequeue(task);
// Try to run the task.
return base.TryExecuteTask(task);
}
/// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
/// <param name="task">The task to be removed.</param>
/// <returns>Whether the task could be found and removed.</returns>
protected sealed override bool TryDequeue(Task task)
{
lock (_tasks) return _tasks.Remove(task);
}
/// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }
/// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
/// <returns>An enumerable of the tasks currently scheduled.</returns>
protected sealed override IEnumerable<Task> GetScheduledTasks()
{
bool lockTaken = false;
try
{
Monitor.TryEnter(_tasks, ref lockTaken);
if (lockTaken) return _tasks.ToArray();
else throw new NotSupportedException();
}
finally
{
if (lockTaken) Monitor.Exit(_tasks);
}
}
}
}