Partitioner<TSource> 类
定义
重要
一些信息与预发行产品相关,相应产品在发行之前可能会进行重大修改。 对于此处提供的信息,Microsoft 不作任何明示或暗示的担保。
表示将数据源拆分为多个分区的特定方式。
generic <typename TSource>
public ref class Partitioner abstract
public abstract class Partitioner<TSource>
type Partitioner<'Source> = class
Public MustInherit Class Partitioner(Of TSource)
类型参数
- TSource
集合中的元素的类型。
- 继承
-
Partitioner<TSource>
- 派生
示例
以下示例演示如何实现一次返回单个元素的分区程序:
using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace PartitionerDemo
{
// Simple partitioner that will extract one item at a time, in a thread-safe fashion,
// from the underlying collection.
class SingleElementPartitioner<T> : Partitioner<T>
{
// The collection being wrapped by this Partitioner
IEnumerable<T> m_referenceEnumerable;
// Internal class that serves as a shared enumerable for the
// underlying collection.
private class InternalEnumerable : IEnumerable<T>, IDisposable
{
IEnumerator<T> m_reader;
bool m_disposed = false;
// These two are used to implement Dispose() when static partitioning is being performed
int m_activeEnumerators;
bool m_downcountEnumerators;
// "downcountEnumerators" will be true for static partitioning, false for
// dynamic partitioning.
public InternalEnumerable(IEnumerator<T> reader, bool downcountEnumerators)
{
m_reader = reader;
m_activeEnumerators = 0;
m_downcountEnumerators = downcountEnumerators;
}
public IEnumerator<T> GetEnumerator()
{
if (m_disposed)
throw new ObjectDisposedException("InternalEnumerable: Can't call GetEnumerator() after disposing");
// For static partitioning, keep track of the number of active enumerators.
if (m_downcountEnumerators) Interlocked.Increment(ref m_activeEnumerators);
return new InternalEnumerator(m_reader, this);
}
IEnumerator IEnumerable.GetEnumerator()
{
return ((IEnumerable<T>)this).GetEnumerator();
}
public void Dispose()
{
if (!m_disposed)
{
// Only dispose the source enumerator if you are doing dynamic partitioning
if (!m_downcountEnumerators)
{
m_reader.Dispose();
}
m_disposed = true;
}
}
// Called from Dispose() method of spawned InternalEnumerator. During
// static partitioning, the source enumerator will be automatically
// disposed once all requested InternalEnumerators have been disposed.
public void DisposeEnumerator()
{
if (m_downcountEnumerators)
{
if (Interlocked.Decrement(ref m_activeEnumerators) == 0)
{
m_reader.Dispose();
}
}
}
}
// Internal class that serves as a shared enumerator for
// the underlying collection.
private class InternalEnumerator : IEnumerator<T>
{
T m_current;
IEnumerator<T> m_source;
InternalEnumerable m_controllingEnumerable;
bool m_disposed = false;
public InternalEnumerator(IEnumerator<T> source, InternalEnumerable controllingEnumerable)
{
m_source = source;
m_current = default(T);
m_controllingEnumerable = controllingEnumerable;
}
object IEnumerator.Current
{
get { return m_current; }
}
T IEnumerator<T>.Current
{
get { return m_current; }
}
void IEnumerator.Reset()
{
throw new NotSupportedException("Reset() not supported");
}
// This method is the crux of this class. Under lock, it calls
// MoveNext() on the underlying enumerator and grabs Current.
bool IEnumerator.MoveNext()
{
bool rval = false;
lock (m_source)
{
rval = m_source.MoveNext();
m_current = rval ? m_source.Current : default(T);
}
return rval;
}
void IDisposable.Dispose()
{
if (!m_disposed)
{
// Delegate to parent enumerable's DisposeEnumerator() method
m_controllingEnumerable.DisposeEnumerator();
m_disposed = true;
}
}
}
// Constructor just grabs the collection to wrap
public SingleElementPartitioner(IEnumerable<T> enumerable)
{
// Verify that the source IEnumerable is not null
if (enumerable == null)
throw new ArgumentNullException("enumerable");
m_referenceEnumerable = enumerable;
}
// Produces a list of "numPartitions" IEnumerators that can each be
// used to traverse the underlying collection in a thread-safe manner.
// This will return a static number of enumerators, as opposed to
// GetDynamicPartitions(), the result of which can be used to produce
// any number of enumerators.
public override IList<IEnumerator<T>> GetPartitions(int numPartitions)
{
if (numPartitions < 1)
throw new ArgumentOutOfRangeException("NumPartitions");
List<IEnumerator<T>> list = new List<IEnumerator<T>>(numPartitions);
// Since we are doing static partitioning, create an InternalEnumerable with reference
// counting of spawned InternalEnumerators turned on. Once all of the spawned enumerators
// are disposed, dynamicPartitions will be disposed.
var dynamicPartitions = new InternalEnumerable(m_referenceEnumerable.GetEnumerator(), true);
for (int i = 0; i < numPartitions; i++)
list.Add(dynamicPartitions.GetEnumerator());
return list;
}
// Returns an instance of our internal Enumerable class. GetEnumerator()
// can then be called on that (multiple times) to produce shared enumerators.
public override IEnumerable<T> GetDynamicPartitions()
{
// Since we are doing dynamic partitioning, create an InternalEnumerable with reference
// counting of spawned InternalEnumerators turned off. This returned InternalEnumerable
// will need to be explicitly disposed.
return new InternalEnumerable(m_referenceEnumerable.GetEnumerator(), false);
}
// Must be set to true if GetDynamicPartitions() is supported.
public override bool SupportsDynamicPartitions
{
get { return true; }
}
}
class Program
{
// Test our SingleElementPartitioner(T) class
static void Main()
{
// Our sample collection
string[] collection = new string[] {"red", "orange", "yellow", "green", "blue", "indigo",
"violet", "black", "white", "grey"};
// Instantiate a partitioner for our collection
SingleElementPartitioner<string> myPart = new SingleElementPartitioner<string>(collection);
//
// Simple test with ForEach
//
Console.WriteLine("Testing with Parallel.ForEach");
Parallel.ForEach(myPart, item =>
{
Console.WriteLine(" item = {0}, thread id = {1}", item, Thread.CurrentThread.ManagedThreadId);
});
//
//
// Demonstrate the use of static partitioning, which really means
// "using a static number of partitioners". The partitioners themselves
// may still be "dynamic" in the sense that their outputs may not be
// deterministic.
//
//
// Perform static partitioning of collection
var staticPartitions = myPart.GetPartitions(2);
int index = 0;
Console.WriteLine("Static Partitioning, 2 partitions, 2 tasks:");
// Action will consume from static partitions
Action staticAction = () =>
{
int myIndex = Interlocked.Increment(ref index) - 1; // compute your index
var myItems = staticPartitions[myIndex]; // grab your static partition
int id = Thread.CurrentThread.ManagedThreadId; // cache your thread id
// Enumerate through your static partition
while (myItems.MoveNext())
{
Thread.Sleep(50); // guarantees that multiple threads have a chance to run
Console.WriteLine(" item = {0}, thread id = {1}", myItems.Current, Thread.CurrentThread.ManagedThreadId);
}
myItems.Dispose();
};
// Spawn off 2 actions to consume 2 static partitions
Parallel.Invoke(staticAction, staticAction);
//
//
// Demonstrate the use of dynamic partitioning
//
//
// Grab an IEnumerable which can then be used to generate multiple
// shared IEnumerables.
var dynamicPartitions = myPart.GetDynamicPartitions();
Console.WriteLine("Dynamic Partitioning, 3 tasks:");
// Action will consume from dynamic partitions
Action dynamicAction = () =>
{
// Grab an enumerator from the dynamic partitions
var enumerator = dynamicPartitions.GetEnumerator();
int id = Thread.CurrentThread.ManagedThreadId; // cache our thread id
// Enumerate through your dynamic enumerator
while (enumerator.MoveNext())
{
Thread.Sleep(50); // guarantees that multiple threads will have a chance to run
Console.WriteLine(" item = {0}, thread id = {1}", enumerator.Current, id);
}
enumerator.Dispose();
};
// Spawn 3 concurrent actions to consume the dynamic partitions
Parallel.Invoke(dynamicAction, dynamicAction, dynamicAction);
// Clean up
if (dynamicPartitions is IDisposable)
((IDisposable)dynamicPartitions).Dispose();
}
}
}
Imports System.Collections.Concurrent
Imports System.Threading
Imports System.Threading.Tasks
Module PartitionerDemo
' Simple partitioner that will extract one item at a time, in a thread-safe fashion,
' from the underlying collection.
Class SingleElementPartitioner(Of T)
Inherits Partitioner(Of T)
' The collection being wrapped by this Partitioner
Private m_referenceEnumerable As IEnumerable(Of T)
' Internal class that serves as a shared enumerable for the
' underlying collection.
Private Class InternalEnumerable
Implements IEnumerable(Of T)
Implements IDisposable
Private m_reader As IEnumerator(Of T)
Private m_disposed As Boolean = False
' These two are used to implement Dispose() when static partitioning is being performed
Private m_activeEnumerators As Integer
Private m_downcountEnumerators As Boolean
' "downcountEnumerators" will be true for static partitioning, false for
' dynamic partitioning.
Public Sub New(ByVal reader As IEnumerator(Of T), ByVal downcountEnumerators As Boolean)
m_reader = reader
m_activeEnumerators = 0
m_downcountEnumerators = downcountEnumerators
End Sub
Public Function GetEnumerator() As IEnumerator(Of T) Implements IEnumerable(Of T).GetEnumerator
If m_disposed Then
Throw New ObjectDisposedException("InternalEnumerable: Can't call GetEnumerator() after disposing")
End If
' For static partitioning, keep track of the number of active enumerators.
If m_downcountEnumerators Then
Interlocked.Increment(m_activeEnumerators)
End If
Return New InternalEnumerator(m_reader, Me)
End Function
Private Function GetEnumerator2() As IEnumerator Implements IEnumerable.GetEnumerator
Return DirectCast(Me, IEnumerable(Of T)).GetEnumerator()
End Function
Public Sub Dispose() Implements IDisposable.Dispose
If Not m_disposed Then
' Only dispose the source enumerator if you are doing dynamic partitioning
If Not m_downcountEnumerators Then
m_reader.Dispose()
End If
m_disposed = True
End If
End Sub
' Called from Dispose() method of spawned InternalEnumerator. During
' static partitioning, the source enumerator will be automatically
' disposed once all requested InternalEnumerators have been disposed.
Public Sub DisposeEnumerator()
If m_downcountEnumerators Then
If Interlocked.Decrement(m_activeEnumerators) = 0 Then
m_reader.Dispose()
End If
End If
End Sub
End Class
' Internal class that serves as a shared enumerator for
' the underlying collection.
Private Class InternalEnumerator
Implements IEnumerator(Of T)
Private m_current As T
Private m_source As IEnumerator(Of T)
Private m_controllingEnumerable As InternalEnumerable
Private m_disposed As Boolean = False
Public Sub New(ByVal source As IEnumerator(Of T), ByVal controllingEnumerable As InternalEnumerable)
m_source = source
m_current = Nothing
m_controllingEnumerable = controllingEnumerable
End Sub
Private ReadOnly Property Current2() As Object Implements IEnumerator.Current
Get
Return m_current
End Get
End Property
Private ReadOnly Property Current() As T Implements IEnumerator(Of T).Current
Get
Return m_current
End Get
End Property
Private Sub Reset() Implements IEnumerator.Reset
Throw New NotSupportedException("Reset() not supported")
End Sub
' This method is the crux of this class. Under lock, it calls
' MoveNext() on the underlying enumerator and grabs Current.
Private Function MoveNext() As Boolean Implements IEnumerator.MoveNext
Dim rval As Boolean = False
SyncLock m_source
rval = m_source.MoveNext()
m_current = If(rval, m_source.Current, Nothing)
End SyncLock
Return rval
End Function
Private Sub Dispose() Implements IDisposable.Dispose
If Not m_disposed Then
' Delegate to parent enumerable's DisposeEnumerator() method
m_controllingEnumerable.DisposeEnumerator()
m_disposed = True
End If
End Sub
End Class
' Constructor just grabs the collection to wrap
Public Sub New(ByVal enumerable As IEnumerable(Of T))
' Verify that the source IEnumerable is not null
If enumerable Is Nothing Then
Throw New ArgumentNullException("enumerable")
End If
m_referenceEnumerable = enumerable
End Sub
' Produces a list of "numPartitions" IEnumerators that can each be
' used to traverse the underlying collection in a thread-safe manner.
' This will return a static number of enumerators, as opposed to
' GetDynamicPartitions(), the result of which can be used to produce
' any number of enumerators.
Public Overloads Overrides Function GetPartitions(ByVal numPartitions As Integer) As IList(Of IEnumerator(Of T))
If numPartitions < 1 Then
Throw New ArgumentOutOfRangeException("NumPartitions")
End If
Dim list As New List(Of IEnumerator(Of T))(numPartitions)
' Since we are doing static partitioning, create an InternalEnumerable with reference
' counting of spawned InternalEnumerators turned on. Once all of the spawned enumerators
' are disposed, dynamicPartitions will be disposed.
Dim dynamicPartitions = New InternalEnumerable(m_referenceEnumerable.GetEnumerator(), True)
For i As Integer = 0 To numPartitions - 1
list.Add(dynamicPartitions.GetEnumerator())
Next
Return list
End Function
' Returns an instance of our internal Enumerable class. GetEnumerator()
' can then be called on that (multiple times) to produce shared enumerators.
Public Overloads Overrides Function GetDynamicPartitions() As IEnumerable(Of T)
' Since we are doing dynamic partitioning, create an InternalEnumerable with reference
' counting of spawned InternalEnumerators turned off. This returned InternalEnumerable
' will need to be explicitly disposed.
Return New InternalEnumerable(m_referenceEnumerable.GetEnumerator(), False)
End Function
' Must be set to true if GetDynamicPartitions() is supported.
Public Overloads Overrides ReadOnly Property SupportsDynamicPartitions() As Boolean
Get
Return True
End Get
End Property
End Class
Class Program
' Test our SingleElementPartitioner(T) class
Shared Sub Main()
' Our sample collection
Dim collection As String() = New String() {"red", "orange", "yellow", "green", "blue", "indigo", _
"violet", "black", "white", "grey"}
' Instantiate a partitioner for our collection
Dim myPart As New SingleElementPartitioner(Of String)(Collection)
'
' Simple test with ForEach
'
Console.WriteLine("Testing with Parallel.ForEach")
Parallel.ForEach(myPart,
Sub(item)
Console.WriteLine(" item = {0}, thread id = {1}", item, Thread.CurrentThread.ManagedThreadId)
End Sub)
'
'
' Demonstrate the use of static partitioning, which really means
' "using a static number of partitioners". The partitioners themselves
' may still be "dynamic" in the sense that their outputs may not be
' deterministic.
'
'
' Perform static partitioning of collection
Dim staticPartitions = myPart.GetPartitions(2)
Dim index As Integer = 0
Console.WriteLine("Static Partitioning, 2 partitions, 2 tasks:")
' Action will consume from static partitions
Dim staticAction As Action =
Sub()
Dim myIndex As Integer = Interlocked.Increment(index) - 1
' compute your index
Dim myItems = staticPartitions(myIndex)
' grab your static partition
Dim id As Integer = Thread.CurrentThread.ManagedThreadId
' cache your thread id
' Enumerate through your static partition
While myItems.MoveNext()
Thread.Sleep(50)
' guarantees that multiple threads have a chance to run
Console.WriteLine(" item = {0}, thread id = {1}", myItems.Current, Thread.CurrentThread.ManagedThreadId)
End While
myItems.Dispose()
End Sub
' Spawn off 2 actions to consume 2 static partitions
Parallel.Invoke(staticAction, staticAction)
'
'
' Demonstrate the use of dynamic partitioning
'
'
' Grab an IEnumerable which can then be used to generate multiple
' shared IEnumerables.
Dim dynamicPartitions = myPart.GetDynamicPartitions()
Console.WriteLine("Dynamic Partitioning, 3 tasks:")
' Action will consume from dynamic partitions
Dim dynamicAction As Action =
Sub()
' Grab an enumerator from the dynamic partitioner
Dim enumerator = dynamicPartitions.GetEnumerator()
Dim id As Integer = Thread.CurrentThread.ManagedThreadId
' cache our thread id
' Enumerate through your dynamic enumerator
While enumerator.MoveNext()
Thread.Sleep(50)
' guarantees that multiple threads will have a chance to run
Console.WriteLine(" item = {0}, thread id = {1}", enumerator.Current, id)
End While
enumerator.Dispose()
End Sub
' Spawn 3 concurrent actions to consume the dynamic partitions
Parallel.Invoke(dynamicAction, dynamicAction, dynamicAction)
' Clean up
If TypeOf dynamicPartitions Is IDisposable Then
DirectCast(dynamicPartitions, IDisposable).Dispose()
End If
End Sub
End Class
End Module
注解
有关详细信息,请参阅 PLINQ 和 TPL 的自定义分区程序。
构造函数
Partitioner<TSource>() |
创建新的分区程序实例。 |
属性
SupportsDynamicPartitions |
获取是否可以动态创建附加分区。 |
方法
Equals(Object) |
确定指定对象是否等于当前对象。 (继承自 Object) |
GetDynamicPartitions() |
创建一个可将基础集合分区成可变数目的分区的对象。 |
GetHashCode() |
作为默认哈希函数。 (继承自 Object) |
GetPartitions(Int32) |
将基础集合分区成给定数目的分区。 |
GetType() |
获取当前实例的 Type。 (继承自 Object) |
MemberwiseClone() |
创建当前 Object 的浅表副本。 (继承自 Object) |
ToString() |
返回表示当前对象的字符串。 (继承自 Object) |
扩展方法
AsParallel<TSource>(Partitioner<TSource>) |
启用查询的并行化,并由负责将输入序列拆分成各个分区的自定义分区程序指明其出处。 |
适用于
线程安全性
上的 Partitioner<TSource> 静态方法都是线程安全的,可以从多个线程并发使用。 但是,在使用创建的分区程序时,无论是从使用分区程序所在的同一线程还是从单独的线程,都不应修改基础数据源。