Procedimiento para implementar particiones dinámicas
En el ejemplo siguiente se muestra cómo implementar una clase System.Collections.Concurrent.OrderablePartitioner<TSource> personalizada que implementa la partición dinámica y que puede usarse desde determinadas sobrecargas ForEach y desde PLINQ.
Ejemplo
Cada vez que se llama a una partición MoveNext en el enumerador, este proporciona la partición con un elemento de lista. En el caso de PLINQ y ForEach, la partición es una instancia Task. Dado que las solicitudes se producen simultáneamente en varios subprocesos, se sincroniza el acceso al índice actual.
//
// An orderable dynamic partitioner for lists
//
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml.Linq;
using System.Numerics;
class OrderableListPartitioner<TSource> : OrderablePartitioner<TSource>
{
private readonly IList<TSource> m_input;
// Must override to return true.
public override bool SupportsDynamicPartitions => true;
public OrderableListPartitioner(IList<TSource> input) : base(true, false, true) =>
m_input = input;
public override IList<IEnumerator<KeyValuePair<long, TSource>>> GetOrderablePartitions(int partitionCount)
{
var dynamicPartitions = GetOrderableDynamicPartitions();
var partitions =
new IEnumerator<KeyValuePair<long, TSource>>[partitionCount];
for (int i = 0; i < partitionCount; i++)
{
partitions[i] = dynamicPartitions.GetEnumerator();
}
return partitions;
}
public override IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions() =>
new ListDynamicPartitions(m_input);
private class ListDynamicPartitions : IEnumerable<KeyValuePair<long, TSource>>
{
private IList<TSource> m_input;
private int m_pos = 0;
internal ListDynamicPartitions(IList<TSource> input) =>
m_input = input;
public IEnumerator<KeyValuePair<long, TSource>> GetEnumerator()
{
while (true)
{
// Each task gets the next item in the list. The index is
// incremented in a thread-safe manner to avoid races.
int elemIndex = Interlocked.Increment(ref m_pos) - 1;
if (elemIndex >= m_input.Count)
{
yield break;
}
yield return new KeyValuePair<long, TSource>(
elemIndex, m_input[elemIndex]);
}
}
IEnumerator IEnumerable.GetEnumerator() =>
((IEnumerable<KeyValuePair<long, TSource>>)this).GetEnumerator();
}
}
class ConsumerClass
{
static void Main()
{
var nums = Enumerable.Range(0, 10000).ToArray();
OrderableListPartitioner<int> partitioner = new OrderableListPartitioner<int>(nums);
// Use with Parallel.ForEach
Parallel.ForEach(partitioner, (i) => Console.WriteLine(i));
// Use with PLINQ
var query = from num in partitioner.AsParallel()
where num % 2 == 0
select num;
foreach (var v in query)
Console.WriteLine(v);
}
}
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Collections.Concurrent
Module Module1
Public Class OrderableListPartitioner(Of TSource)
Inherits OrderablePartitioner(Of TSource)
Private ReadOnly m_input As IList(Of TSource)
Public Sub New(ByVal input As IList(Of TSource))
MyBase.New(True, False, True)
m_input = input
End Sub
' Must override to return true.
Public Overrides ReadOnly Property SupportsDynamicPartitions As Boolean
Get
Return True
End Get
End Property
Public Overrides Function GetOrderablePartitions(ByVal partitionCount As Integer) As IList(Of IEnumerator(Of KeyValuePair(Of Long, TSource)))
Dim dynamicPartitions = GetOrderableDynamicPartitions()
Dim partitions(partitionCount - 1) As IEnumerator(Of KeyValuePair(Of Long, TSource))
For i = 0 To partitionCount - 1
partitions(i) = dynamicPartitions.GetEnumerator()
Next
Return partitions
End Function
Public Overrides Function GetOrderableDynamicPartitions() As IEnumerable(Of KeyValuePair(Of Long, TSource))
Return New ListDynamicPartitions(m_input)
End Function
Private Class ListDynamicPartitions
Implements IEnumerable(Of KeyValuePair(Of Long, TSource))
Private m_input As IList(Of TSource)
Friend Sub New(ByVal input As IList(Of TSource))
m_input = input
End Sub
Public Function GetEnumerator() As IEnumerator(Of KeyValuePair(Of Long, TSource)) Implements IEnumerable(Of KeyValuePair(Of Long, TSource)).GetEnumerator
Return New ListDynamicPartitionsEnumerator(m_input)
End Function
Public Function GetEnumerator1() As IEnumerator Implements IEnumerable.GetEnumerator
Return CType(Me, IEnumerable).GetEnumerator()
End Function
End Class
Private Class ListDynamicPartitionsEnumerator
Implements IEnumerator(Of KeyValuePair(Of Long, TSource))
Private m_input As IList(Of TSource)
Shared m_pos As Integer = 0
Private m_current As KeyValuePair(Of Long, TSource)
Public Sub New(ByVal input As IList(Of TSource))
m_input = input
m_pos = 0
Me.disposedValue = False
End Sub
Public ReadOnly Property Current As KeyValuePair(Of Long, TSource) Implements IEnumerator(Of KeyValuePair(Of Long, TSource)).Current
Get
Return m_current
End Get
End Property
Public ReadOnly Property Current1 As Object Implements IEnumerator.Current
Get
Return Me.Current
End Get
End Property
Public Function MoveNext() As Boolean Implements IEnumerator.MoveNext
Dim elemIndex = Interlocked.Increment(m_pos) - 1
If elemIndex >= m_input.Count Then
Return False
End If
m_current = New KeyValuePair(Of Long, TSource)(elemIndex, m_input(elemIndex))
Return True
End Function
Public Sub Reset() Implements IEnumerator.Reset
m_pos = 0
End Sub
Private disposedValue As Boolean ' To detect redundant calls
Protected Overridable Sub Dispose(ByVal disposing As Boolean)
If Not Me.disposedValue Then
m_input = Nothing
m_current = Nothing
End If
Me.disposedValue = True
End Sub
Public Sub Dispose() Implements IDisposable.Dispose
Dispose(True)
GC.SuppressFinalize(Me)
End Sub
End Class
End Class
Class ConsumerClass
Shared Sub Main()
Console.BufferHeight = 20000
Dim nums = Enumerable.Range(0, 2000).ToArray()
Dim partitioner = New OrderableListPartitioner(Of Integer)(nums)
' Use with Parallel.ForEach
Parallel.ForEach(partitioner, Sub(i) Console.Write("{0}:{1} ", i, Thread.CurrentThread.ManagedThreadId))
Console.WriteLine("PLINQ -----------------------------------")
' create a new partitioner, since Enumerators are not reusable.
Dim partitioner2 = New OrderableListPartitioner(Of Integer)(nums)
' Use with PLINQ
Dim query = From num In partitioner2.AsParallel()
Where num Mod 8 = 0
Select num
For Each v In query
Console.Write("{0} ", v)
Next
Console.WriteLine("press any key")
Console.ReadKey()
End Sub
End Class
End Module
Este es un ejemplo de creación de particiones por fragmentos, donde cada fragmento consta de un elemento. Proporcionando más elementos a la vez, podría reducir la contención sobre el bloqueo y teóricamente lograr un rendimiento más rápido. Sin embargo, en algún momento, los fragmentos más grandes podrían requerir lógica de equilibrio de carga adicional con el fin de mantener todos los subprocesos ocupados hasta que se completa todo el trabajo.