OrderablePartitioner<TSource> Klasse
Definition
Wichtig
Einige Informationen beziehen sich auf Vorabversionen, die vor dem Release ggf. grundlegend überarbeitet werden. Microsoft übernimmt hinsichtlich der hier bereitgestellten Informationen keine Gewährleistungen, seien sie ausdrücklich oder konkludent.
Stellt eine bestimmte Methode dar, mit der eine sortierbare Datenquelle in mehrere Partitionen aufgeteilt wird.
generic <typename TSource>
public ref class OrderablePartitioner abstract : System::Collections::Concurrent::Partitioner<TSource>
public abstract class OrderablePartitioner<TSource> : System.Collections.Concurrent.Partitioner<TSource>
type OrderablePartitioner<'Source> = class
inherit Partitioner<'Source>
Public MustInherit Class OrderablePartitioner(Of TSource)
Inherits Partitioner(Of TSource)
Typparameter
- TSource
Typ der Elemente in der Auflistung.
- Vererbung
Beispiele
Das folgende Beispiel zeigt, wie Sie einen geordneten Partitioner implementieren, der jeweils ein Element zurückgibt:
using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace OrderablePartitionerDemo
{
// Simple partitioner that will extract one (index,item) pair at a time,
// in a thread-safe fashion, from the underlying collection.
class SingleElementOrderablePartitioner<T> : OrderablePartitioner<T>
{
// The collection being wrapped by this Partitioner
IEnumerable<T> m_referenceEnumerable;
// Class used to wrap m_index for the purpose of sharing access to it
// between an InternalEnumerable and multiple InternalEnumerators
private class Shared<U>
{
internal U Value;
public Shared(U item)
{
Value = item;
}
}
// Internal class that serves as a shared enumerable for the
// underlying collection.
private class InternalEnumerable : IEnumerable<KeyValuePair<long, T>>, IDisposable
{
IEnumerator<T> m_reader;
bool m_disposed = false;
Shared<long> m_index = null;
// These two are used to implement Dispose() when static partitioning is being performed
int m_activeEnumerators;
bool m_downcountEnumerators;
// "downcountEnumerators" will be true for static partitioning, false for
// dynamic partitioning.
public InternalEnumerable(IEnumerator<T> reader, bool downcountEnumerators)
{
m_reader = reader;
m_index = new Shared<long>(0);
m_activeEnumerators = 0;
m_downcountEnumerators = downcountEnumerators;
}
public IEnumerator<KeyValuePair<long, T>> GetEnumerator()
{
if (m_disposed)
throw new ObjectDisposedException("InternalEnumerable: Can't call GetEnumerator() after disposing");
// For static partitioning, keep track of the number of active enumerators.
if (m_downcountEnumerators) Interlocked.Increment(ref m_activeEnumerators);
return new InternalEnumerator(m_reader, this, m_index);
}
IEnumerator IEnumerable.GetEnumerator()
{
return ((IEnumerable<KeyValuePair<long, T>>)this).GetEnumerator();
}
public void Dispose()
{
if (!m_disposed)
{
// Only dispose the source enumerator if you are doing dynamic partitioning
if (!m_downcountEnumerators)
{
m_reader.Dispose();
}
m_disposed = true;
}
}
// Called from Dispose() method of spawned InternalEnumerator. During
// static partitioning, the source enumerator will be automatically
// disposed once all requested InternalEnumerators have been disposed.
public void DisposeEnumerator()
{
if (m_downcountEnumerators)
{
if (Interlocked.Decrement(ref m_activeEnumerators) == 0)
{
m_reader.Dispose();
}
}
}
}
// Internal class that serves as a shared enumerator for
// the underlying collection.
private class InternalEnumerator : IEnumerator<KeyValuePair<long, T>>
{
KeyValuePair<long, T> m_current;
IEnumerator<T> m_source;
InternalEnumerable m_controllingEnumerable;
Shared<long> m_index = null;
bool m_disposed = false;
public InternalEnumerator(IEnumerator<T> source, InternalEnumerable controllingEnumerable, Shared<long> index)
{
m_source = source;
m_current = default(KeyValuePair<long, T>);
m_controllingEnumerable = controllingEnumerable;
m_index = index;
}
object IEnumerator.Current
{
get { return m_current; }
}
KeyValuePair<long, T> IEnumerator<KeyValuePair<long, T>>.Current
{
get { return m_current; }
}
void IEnumerator.Reset()
{
throw new NotSupportedException("Reset() not supported");
}
// This method is the crux of this class. Under lock, it calls
// MoveNext() on the underlying enumerator, grabs Current and index,
// and increments the index.
bool IEnumerator.MoveNext()
{
bool rval = false;
lock (m_source)
{
rval = m_source.MoveNext();
if (rval)
{
m_current = new KeyValuePair<long, T>(m_index.Value, m_source.Current);
m_index.Value = m_index.Value + 1;
}
else
{
m_current = default(KeyValuePair<long, T>);
}
}
return rval;
}
void IDisposable.Dispose()
{
if (!m_disposed)
{
// Delegate to parent enumerable's DisposeEnumerator() method
m_controllingEnumerable.DisposeEnumerator();
m_disposed = true;
}
}
}
// Constructor just grabs the collection to wrap
public SingleElementOrderablePartitioner(IEnumerable<T> enumerable)
: base(true, true, true)
{
// Verify that the source IEnumerable is not null
if (enumerable == null)
throw new ArgumentNullException("enumerable");
m_referenceEnumerable = enumerable;
}
// Produces a list of "numPartitions" IEnumerators that can each be
// used to traverse the underlying collection in a thread-safe manner.
// This will return a static number of enumerators, as opposed to
// GetOrderableDynamicPartitions(), the result of which can be used to produce
// any number of enumerators.
public override IList<IEnumerator<KeyValuePair<long, T>>> GetOrderablePartitions(int numPartitions)
{
if (numPartitions < 1)
throw new ArgumentOutOfRangeException("NumPartitions");
List<IEnumerator<KeyValuePair<long, T>>> list = new List<IEnumerator<KeyValuePair<long, T>>>(numPartitions);
// Since we are doing static partitioning, create an InternalEnumerable with reference
// counting of spawned InternalEnumerators turned on. Once all of the spawned enumerators
// are disposed, dynamicPartitions will be disposed.
var dynamicPartitions = new InternalEnumerable(m_referenceEnumerable.GetEnumerator(), true);
for (int i = 0; i < numPartitions; i++)
list.Add(dynamicPartitions.GetEnumerator());
return list;
}
// Returns an instance of our internal Enumerable class. GetEnumerator()
// can then be called on that (multiple times) to produce shared enumerators.
public override IEnumerable<KeyValuePair<long, T>> GetOrderableDynamicPartitions()
{
// Since we are doing dynamic partitioning, create an InternalEnumerable with reference
// counting of spawned InternalEnumerators turned off. This returned InternalEnumerable
// will need to be explicitly disposed.
return new InternalEnumerable(m_referenceEnumerable.GetEnumerator(), false);
}
// Must be set to true if GetDynamicPartitions() is supported.
public override bool SupportsDynamicPartitions
{
get { return true; }
}
}
class Program
{
static void Main()
{
//
// First a fairly simple visual test
//
var someCollection = new string[] { "four", "score", "and", "twenty", "years", "ago" };
var someOrderablePartitioner = new SingleElementOrderablePartitioner<string>(someCollection);
Parallel.ForEach(someOrderablePartitioner, (item, state, index) =>
{
Console.WriteLine("ForEach: item = {0}, index = {1}, thread id = {2}", item, index, Thread.CurrentThread.ManagedThreadId);
});
//
// Now a test of static partitioning, using 2 partitions and 2 tasks
//
var staticPartitioner = someOrderablePartitioner.GetOrderablePartitions(2);
// staticAction will consume the shared enumerable
int partitionerListIndex = 0;
Action staticAction = () =>
{
int myIndex = Interlocked.Increment(ref partitionerListIndex) - 1;
var enumerator = staticPartitioner[myIndex];
while (enumerator.MoveNext())
Console.WriteLine("Static partitioning: item = {0}, index = {1}, thread id = {2}",
enumerator.Current.Value, enumerator.Current.Key, Thread.CurrentThread.ManagedThreadId);
enumerator.Dispose();
};
// Now launch two of them
Parallel.Invoke(staticAction, staticAction);
//
// Now a more rigorous test of dynamic partitioning (used by Parallel.ForEach)
//
Console.WriteLine("OrderablePartitioner test: testing for index mismatches");
List<int> src = Enumerable.Range(0, 100000).ToList();
SingleElementOrderablePartitioner<int> myOP = new SingleElementOrderablePartitioner<int>(src);
int counter = 0;
bool mismatch = false;
Parallel.ForEach(myOP, (item, state, index) =>
{
if (item != index) mismatch = true;
Interlocked.Increment(ref counter);
});
if (mismatch) Console.WriteLine("OrderablePartitioner Test: index mismatch detected");
Console.WriteLine("OrderablePartitioner test: counter = {0}, should be 100000", counter);
}
}
}
Imports System.Collections.Concurrent
Imports System.Threading
Imports System.Threading.Tasks
Module OrderablePartitionerDemo
' Simple partitioner that will extract one (index,item) pair at a time,
' in a thread-safe fashion, from the underlying collection.
Class SingleElementOrderablePartitioner(Of T)
Inherits OrderablePartitioner(Of T)
' The collection being wrapped by this Partitioner
Private m_referenceEnumerable As IEnumerable(Of T)
' Class used to wrap m_index for the purpose of sharing access to it
' between an InternalEnumerable and multiple InternalEnumerators
Private Class [Shared](Of U)
Friend Value As U
Public Sub New(ByVal item As U)
Value = item
End Sub
End Class
' Internal class that serves as a shared enumerable for the
' underlying collection.
Private Class InternalEnumerable
Implements IEnumerable(Of KeyValuePair(Of Long, T))
Implements IDisposable
Private m_reader As IEnumerator(Of T)
Private m_disposed As Boolean = False
Private m_index As [Shared](Of Long) = Nothing
' These two are used to implement Dispose() when static partitioning is being performed
Private m_activeEnumerators As Integer
Private m_downcountEnumerators As Boolean
' "downcountEnumerators" will be true for static partitioning, false for
' dynamic partitioning.
Public Sub New(ByVal reader As IEnumerator(Of T), ByVal downcountEnumerators As Boolean)
m_reader = reader
m_index = New [Shared](Of Long)(0)
m_activeEnumerators = 0
m_downcountEnumerators = downcountEnumerators
End Sub
Public Function GetEnumerator() As IEnumerator(Of KeyValuePair(Of Long, T)) _
Implements IEnumerable(Of System.Collections.Generic.KeyValuePair(Of Long, T)).GetEnumerator
If m_disposed Then
Throw New ObjectDisposedException("InternalEnumerable: Can't call GetEnumerator() after disposing")
End If
' For static partitioning, keep track of the number of active enumerators.
If m_downcountEnumerators Then
Interlocked.Increment(m_activeEnumerators)
End If
Return New InternalEnumerator(m_reader, Me, m_index)
End Function
Private Function GetEnumerator2() As IEnumerator Implements IEnumerable.GetEnumerator
Return DirectCast(Me, IEnumerable(Of KeyValuePair(Of Long, T))).GetEnumerator()
End Function
Public Sub Dispose() Implements IDisposable.Dispose
If Not m_disposed Then
' Only dispose the source enumerator if you are doing dynamic partitioning
If Not m_downcountEnumerators Then
m_reader.Dispose()
End If
m_disposed = True
End If
End Sub
' Called from Dispose() method of spawned InternalEnumerator. During
' static partitioning, the source enumerator will be automatically
' disposed once all requested InternalEnumerators have been disposed.
Public Sub DisposeEnumerator()
If m_downcountEnumerators Then
If Interlocked.Decrement(m_activeEnumerators) = 0 Then
m_reader.Dispose()
End If
End If
End Sub
End Class
' Internal class that serves as a shared enumerator for
' the underlying collection.
Private Class InternalEnumerator
Implements IEnumerator(Of KeyValuePair(Of Long, T))
Private m_current As KeyValuePair(Of Long, T)
Private m_source As IEnumerator(Of T)
Private m_controllingEnumerable As InternalEnumerable
Private m_index As [Shared](Of Long) = Nothing
Private m_disposed As Boolean = False
Public Sub New(ByVal source As IEnumerator(Of T), ByVal controllingEnumerable As InternalEnumerable, ByVal index As [Shared](Of Long))
m_source = source
m_current = Nothing
m_controllingEnumerable = controllingEnumerable
m_index = index
End Sub
Private ReadOnly Property Current2() As Object Implements IEnumerator.Current
Get
Return m_current
End Get
End Property
Private ReadOnly Property Current() As KeyValuePair(Of Long, T) Implements IEnumerator(Of KeyValuePair(Of Long, T)).Current
Get
Return m_current
End Get
End Property
Private Sub Reset() Implements IEnumerator.Reset
Throw New NotSupportedException("Reset() not supported")
End Sub
' This method is the crux of this class. Under lock, it calls
' MoveNext() on the underlying enumerator, grabs Current and index,
' and increments the index.
Private Function MoveNext() As Boolean Implements IEnumerator.MoveNext
Dim rval As Boolean = False
SyncLock m_source
rval = m_source.MoveNext()
If rval Then
m_current = New KeyValuePair(Of Long, T)(m_index.Value, m_source.Current)
m_index.Value = m_index.Value + 1
Else
m_current = Nothing
End If
End SyncLock
Return rval
End Function
Private Sub Dispose() Implements IDisposable.Dispose
If Not m_disposed Then
' Delegate to parent enumerable's DisposeEnumerator() method
m_controllingEnumerable.DisposeEnumerator()
m_disposed = True
End If
End Sub
End Class
' Constructor just grabs the collection to wrap
Public Sub New(ByVal enumerable As IEnumerable(Of T))
MyBase.New(True, True, True)
' Verify that the source IEnumerable is not null
If enumerable Is Nothing Then
Throw New ArgumentNullException("enumerable")
End If
m_referenceEnumerable = enumerable
End Sub
' Produces a list of "numPartitions" IEnumerators that can each be
' used to traverse the underlying collection in a thread-safe manner.
' This will return a static number of enumerators, as opposed to
' GetOrderableDynamicPartitions(), the result of which can be used to produce
' any number of enumerators.
Public Overloads Overrides Function GetOrderablePartitions(ByVal numPartitions As Integer) As IList(Of IEnumerator(Of KeyValuePair(Of Long, T)))
If numPartitions < 1 Then
Throw New ArgumentOutOfRangeException("NumPartitions")
End If
Dim list As New List(Of IEnumerator(Of KeyValuePair(Of Long, T)))(numPartitions)
' Since we are doing static partitioning, create an InternalEnumerable with reference
' counting of spawned InternalEnumerators turned on. Once all of the spawned enumerators
' are disposed, dynamicPartitions will be disposed.
Dim dynamicPartitions = New InternalEnumerable(m_referenceEnumerable.GetEnumerator(), True)
For i As Integer = 0 To numPartitions - 1
list.Add(dynamicPartitions.GetEnumerator())
Next
Return list
End Function
' Returns an instance of our internal Enumerable class. GetEnumerator()
' can then be called on that (multiple times) to produce shared enumerators.
Public Overloads Overrides Function GetOrderableDynamicPartitions() As IEnumerable(Of KeyValuePair(Of Long, T))
' Since we are doing dynamic partitioning, create an InternalEnumerable with reference
' counting of spawned InternalEnumerators turned off. This returned InternalEnumerable
' will need to be explicitly disposed.
Return New InternalEnumerable(m_referenceEnumerable.GetEnumerator(), False)
End Function
' Must be set to true if GetDynamicPartitions() is supported.
Public Overloads Overrides ReadOnly Property SupportsDynamicPartitions() As Boolean
Get
Return True
End Get
End Property
End Class
Class Program
Shared Sub Main()
'
' First a fairly simple visual test
'
Dim someCollection = New String() {"four", "score", "and", "twenty", "years", "ago"}
Dim someOrderablePartitioner = New SingleElementOrderablePartitioner(Of String)(someCollection)
Parallel.ForEach(someOrderablePartitioner,
Sub(item, state, index)
Console.WriteLine("ForEach: item = {0}, index = {1}, thread id = {2}", item, index, Thread.CurrentThread.ManagedThreadId)
End Sub)
'
' Now a test of static partitioning, using 2 partitions and 2 tasks
'
Dim staticPartitioner = someOrderablePartitioner.GetOrderablePartitions(2)
' staticAction will consume the shared enumerable
Dim partitionerListIndex As Integer = 0
Dim staticAction As Action =
Sub()
Dim myIndex As Integer = Interlocked.Increment(partitionerListIndex) - 1
Dim enumerator = staticPartitioner(myIndex)
While enumerator.MoveNext()
Console.WriteLine("Static partitioning: item = {0}, index = {1}, thread id = {2}", enumerator.Current.Value, enumerator.Current.Key, Thread.CurrentThread.ManagedThreadId)
End While
enumerator.Dispose()
End Sub
' Now launch two of them
Parallel.Invoke(staticAction, staticAction)
'
' Now a more rigorous test of dynamic partitioning (used by Parallel.ForEach)
'
Console.WriteLine("OrderablePartitioner test: testing for index mismatches")
Dim src As List(Of Integer) = Enumerable.Range(0, 100000).ToList()
Dim myOP As New SingleElementOrderablePartitioner(Of Integer)(src)
Dim counter As Integer = 0
Dim mismatch As Boolean = False
Parallel.ForEach(myOP,
Sub(item, state, index)
If item <> index Then
mismatch = True
End If
Interlocked.Increment(counter)
End Sub)
If mismatch Then
Console.WriteLine("OrderablePartitioner Test: index mismatch detected")
End If
Console.WriteLine("OrderablePartitioner test: counter = {0}, should be 100000", counter)
End Sub
End Class
End Module
Hinweise
Die Implementierung der abgeleiteten Klasse ist für die Reihenfolge der Elemente in Schlüssel-Wert-Paaren verantwortlich, unabhängig davon, wie sie geeignet ist. Weitere Informationen finden Sie unter Custom Partitioners for PLINQ and TPL (Benutzerdefinierte Partitionierer für PLINQ und TPL).
Konstruktoren
OrderablePartitioner<TSource>(Boolean, Boolean, Boolean) |
Wird von Konstruktoren in abgeleiteten Klassen aufgerufen, um die OrderablePartitioner<TSource>-Klasse mit den angegebenen Einschränkungen für die Indexschlüssel zu initialisieren. |
Eigenschaften
KeysNormalized |
Ruft ab, ob Sortierschlüssel normalisiert sind. |
KeysOrderedAcrossPartitions |
Ruft ab, ob Elemente in einer früheren Partition immer vor Elementen in einer späteren Partition vorkommen. |
KeysOrderedInEachPartition |
Ruft ab, ob Elemente in jeder Partition in der Reihenfolge zunehmender Schlüssel zurückgegeben werden. |
SupportsDynamicPartitions |
Ruft ab, ob zusätzliche Partitionen dynamisch erstellt werden können. (Geerbt von Partitioner<TSource>) |
Methoden
Equals(Object) |
Bestimmt, ob das angegebene Objekt gleich dem aktuellen Objekt ist. (Geerbt von Object) |
GetDynamicPartitions() |
Erstellt ein Objekt, das die zugrunde liegende Auflistung in eine variable Anzahl von Partitionen partitionieren kann. |
GetHashCode() |
Fungiert als Standardhashfunktion. (Geerbt von Object) |
GetOrderableDynamicPartitions() |
Erstellt ein Objekt, das die zugrunde liegende Auflistung in eine variable Anzahl von Partitionen partitionieren kann. |
GetOrderablePartitions(Int32) |
Partitioniert die zugrunde liegende Auflistung in die angegebene Anzahl sortierbarer Partitionen. |
GetPartitions(Int32) |
Partitioniert die zugrunde liegende Auflistung in die angegebene Anzahl geordneter Partitionen. |
GetType() |
Ruft den Type der aktuellen Instanz ab. (Geerbt von Object) |
MemberwiseClone() |
Erstellt eine flache Kopie des aktuellen Object. (Geerbt von Object) |
ToString() |
Gibt eine Zeichenfolge zurück, die das aktuelle Objekt darstellt. (Geerbt von Object) |
Erweiterungsmethoden
AsParallel<TSource>(Partitioner<TSource>) |
Ermöglicht die Parallelisierung einer Abfrage mit einem benutzerdefinierten Partitionierer, der für die Aufteilung der Eingabesequenz in Partitionen zuständig ist, als Quelle. |
Gilt für:
Threadsicherheit
Alle öffentlichen Member von OrderablePartitioner<TSource> sind threadsicher und können von mehreren Threads gleichzeitig aufgerufen werden.