Delen via


Aangepaste partitioneerfuncties voor PLINQ en TPL

Als u een bewerking op een gegevensbron wilt parallelliseren, is een van de essentiële stappen om de bron te partitioneren in meerdere secties die gelijktijdig kunnen worden geopend door meerdere threads. PLINQ en de TPL (Task Parallel Library) bieden standaardpartitioneerfuncties die transparant werken wanneer u een parallelle query of ForEach lus schrijft. Voor meer geavanceerde scenario's kunt u uw eigen partitioner aansluiten.

Soorten partitionering

Er zijn veel manieren om een gegevensbron te partitioneren. In de meest efficiënte benaderingen werken meerdere threads samen om de oorspronkelijke bronreeks te verwerken, in plaats van de bron fysiek te scheiden in meerdere subsequences. Voor matrices en andere geïndexeerde bronnen, zoals IList verzamelingen waarvan de lengte van tevoren bekend is, is bereikpartitionering het eenvoudigste type partitionering. Elke thread ontvangt unieke begin- en eindindexen, zodat het bereik van de bron kan worden verwerkt zonder dat deze wordt overschreven of overschreven door een andere thread. De enige overhead die betrokken is bij het partitioneren van bereiken is het eerste werk van het maken van de bereiken; daarna is geen extra synchronisatie vereist. Het kan daarom goede prestaties bieden zolang de workload gelijkmatig is verdeeld. Een nadeel van het partitioneren van bereiken is dat als één thread vroeg wordt voltooid, de andere threads niet kunnen helpen hun werk te voltooien.

Voor gekoppelde lijsten of andere verzamelingen waarvan de lengte niet bekend is, kunt u segmentpartitionering gebruiken. Bij segmentpartitionering verbruikt elke thread of taak in een parallelle lus of query een aantal bronelementen in één segment, verwerkt ze en wordt vervolgens teruggezet om extra elementen op te halen. De partitioner zorgt ervoor dat alle elementen worden gedistribueerd en dat er geen duplicaten zijn. Een segment kan elke grootte hebben. De partitioner die wordt gedemonstreerd in Procedure: Dynamische partities implementeren, maakt bijvoorbeeld segmenten die slechts één element bevatten. Zolang de segmenten niet te groot zijn, is dit soort partitionering inherent taakverdeling omdat de toewijzing van elementen aan threads niet vooraf is bepaald. De partitioner brengt echter steeds de synchronisatie-overhead met zich mee wanneer de thread een ander segment moet ophalen. De hoeveelheid synchronisatie die in deze gevallen wordt gemaakt, is omgekeerd evenredig met de grootte van de segmenten.

Over het algemeen is het partitioneren van bereiken alleen sneller wanneer de uitvoeringstijd van de gemachtigde klein tot gemiddeld is en de bron een groot aantal elementen heeft en het totale werk van elke partitie ongeveer gelijk is. Segmentpartitionering is daarom meestal sneller in de meeste gevallen. Op bronnen met een klein aantal elementen of langere uitvoeringstijden voor de gemachtigde is de prestaties van segment- en bereikpartitionering ongeveer gelijk.

De TPL-partitioners ondersteunen ook een dynamisch aantal partities. Dit betekent dat ze partities on-the-fly kunnen maken, bijvoorbeeld wanneer de ForEach lus een nieuwe taak voortsleept. Met deze functie kan de partitioner samen met de lus zelf worden geschaald. Dynamische partities zijn ook inherent taakverdeling. Wanneer u een aangepaste partitioner maakt, moet u dynamische partitionering ondersteunen om te kunnen worden gebruikt vanuit een ForEach lus.

Load Balancing Partitioners configureren voor PLINQ

Bij sommige overbelastingen van de Partitioner.Create methode kunt u een partitioneerfunctie voor een matrix of IList bron maken en opgeven of de workload tussen de threads moet worden verdeeld. Wanneer de partitioner is geconfigureerd voor taakverdeling, wordt segmentpartitionering gebruikt en worden de elementen aan elke partitie in kleine segmenten overgedragen zoals ze worden aangevraagd. Deze aanpak zorgt ervoor dat alle partities elementen bevatten die moeten worden verwerkt totdat de hele lus of query is voltooid. Een extra overbelasting kan worden gebruikt om taakverdelingspartitionering van elke IEnumerable bron te bieden.

Over het algemeen vereist taakverdeling dat de partities elementen relatief vaak van de partitioner aanvragen. Een partitioner die statische partitionering doet, kan daarentegen de elementen tegelijk toewijzen aan elke partitioner met behulp van bereik- of segmentpartitionering. Dit vereist minder overhead dan taakverdeling, maar het kan langer duren om uit te voeren als één thread aanzienlijk meer werk heeft dan de andere threads. Wanneer deze wordt doorgegeven aan een IList of een matrix, gebruikt PLINQ altijd bereikpartitionering zonder taakverdeling. Als u taakverdeling voor PLINQ wilt inschakelen, gebruikt u de Partitioner.Create methode, zoals wordt weergegeven in het volgende voorbeeld.

// Static partitioning requires indexable source. Load balancing
// can use any IEnumerable.
var nums = Enumerable.Range(0, 100000000).ToArray();

// Create a load-balancing partitioner. Or specify false for static partitioning.
Partitioner<int> customPartitioner = Partitioner.Create(nums, true);

// The partitioner is the query's data source.
var q = from x in customPartitioner.AsParallel()
        select x * Math.PI;

q.ForAll((x) =>
{
    ProcessData(x);
});
' Static number of partitions requires indexable source.
Dim nums = Enumerable.Range(0, 100000000).ToArray()

' Create a load-balancing partitioner. Or specify false For  Shared partitioning.
Dim customPartitioner = Partitioner.Create(nums, True)

' The partitioner is the query's data source.
Dim q = From x In customPartitioner.AsParallel()
        Select x * Math.PI

q.ForAll(Sub(x) ProcessData(x))

De beste manier om te bepalen of u taakverdeling in een bepaald scenario wilt gebruiken, is experimenteren en meten hoe lang het duurt voordat bewerkingen zijn voltooid onder representatieve belastingen en computerconfiguraties. Statische partitionering kan bijvoorbeeld een aanzienlijke versnelling bieden op een computer met meerdere kernen met slechts een paar kernen, maar dit kan leiden tot vertragingen op computers met relatief veel kernen.

De volgende tabel bevat de beschikbare overbelastingen van de Create methode. Deze partities zijn niet beperkt tot gebruik alleen met PLINQ of Task. Ze kunnen ook worden gebruikt met elke aangepaste parallelle constructie.

Overbelasting Maakt gebruik van taakverdeling
Create<TSource>(IEnumerable<TSource>) Altijd
Create<TSource>(TSource[], Boolean) Wanneer het Booleaanse argument is opgegeven als waar
Create<TSource>(IList<TSource>, Boolean) Wanneer het Booleaanse argument is opgegeven als waar
Create(Int32, Int32) Nooit
Create(Int32, Int32, Int32) Nooit
Create(Int64, Int64) Nooit
Create(Int64, Int64, Int64) Nooit

Static Range Partitioners configureren voor Parallel.ForEach

In een For lus wordt de hoofdtekst van de lus als gemachtigde aan de methode verstrekt. De kosten voor het aanroepen van die gemachtigde zijn ongeveer hetzelfde als een aanroep van een virtuele methode. In sommige scenario's kan de hoofdtekst van een parallelle lus klein genoeg zijn dat de kosten van de aanroep van de gemachtigde voor elke lusiteratie aanzienlijk worden. In dergelijke situaties kunt u een van de Create overbelastingen gebruiken om een IEnumerable<T> reeks bereikpartities te maken over de bronelementen. Vervolgens kunt u deze verzameling bereiken doorgeven aan een ForEach methode waarvan het lichaam uit een reguliere for lus bestaat. Het voordeel van deze benadering is dat de kosten voor het aanroepen van gemachtigden slechts eenmaal per bereik worden gemaakt, in plaats van één keer per element. In het volgende voorbeeld ziet u het basispatroon.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {

        // Source must be array or IList.
        var source = Enumerable.Range(0, 100000).ToArray();

        // Partition the entire source array.
        var rangePartitioner = Partitioner.Create(0, source.Length);

        double[] results = new double[source.Length];

        // Loop over the partitions in parallel.
        Parallel.ForEach(rangePartitioner, (range, loopState) =>
        {
            // Loop over each range element without a delegate invocation.
            for (int i = range.Item1; i < range.Item2; i++)
            {
                results[i] = source[i] * Math.PI;
            }
        });

        Console.WriteLine("Operation complete. Print results? y/n");
        char input = Console.ReadKey().KeyChar;
        if (input == 'y' || input == 'Y')
        {
            foreach(double d in results)
            {
                Console.Write("{0} ", d);
            }
        }
    }
}
Imports System.Threading.Tasks
Imports System.Collections.Concurrent

Module PartitionDemo

    Sub Main()
        ' Source must be array or IList.
        Dim source = Enumerable.Range(0, 100000).ToArray()

        ' Partition the entire source array. 
        ' Let the partitioner size the ranges.
        Dim rangePartitioner = Partitioner.Create(0, source.Length)

        Dim results(source.Length - 1) As Double

        ' Loop over the partitions in parallel. The Sub is invoked
        ' once per partition.
        Parallel.ForEach(rangePartitioner, Sub(range, loopState)

                                               ' Loop over each range element without a delegate invocation.
                                               For i As Integer = range.Item1 To range.Item2 - 1
                                                   results(i) = source(i) * Math.PI
                                               Next
                                           End Sub)
        Console.WriteLine("Operation complete. Print results? y/n")
        Dim input As Char = Console.ReadKey().KeyChar
        If input = "y"c Or input = "Y"c Then
            For Each d As Double In results
                Console.Write("{0} ", d)
            Next
        End If

    End Sub
End Module

Elke thread in de lus ontvangt een eigen Tuple<T1,T2> thread die de begin- en eindindexwaarden in het opgegeven subbereik bevat. De binnenste for lus gebruikt de fromInclusive en toExclusive waarden om de matrix of de IList rechtstreeks te doorlopen.

Met een van de Create overbelastingen kunt u de grootte van de partities en het aantal partities opgeven. Deze overbelasting kan worden gebruikt in scenario's waarbij het werk per element zo laag is dat zelfs één aanroep van een virtuele methode per element een merkbare invloed heeft op de prestaties.

Aangepaste partitioneerfuncties

In sommige scenario's is het misschien de moeite waard of zelfs vereist om uw eigen partitioner te implementeren. U hebt bijvoorbeeld een aangepaste verzamelingsklasse die u efficiënter kunt partitioneren dan de standaardpartitioneerfuncties kunnen, op basis van uw kennis van de interne structuur van de klasse. U kunt ook bereikpartities van verschillende grootten maken op basis van uw kennis van hoe lang het duurt om elementen op verschillende locaties in de bronverzameling te verwerken.

Als u een standaard aangepaste partitioner wilt maken, moet u een klasse afleiden van System.Collections.Concurrent.Partitioner<TSource> de virtuele methoden en deze overschrijven, zoals beschreven in de volgende tabel.

Wijze Description
GetPartitions Deze methode wordt eenmaal aangeroepen door de hoofdthread en retourneert een IList(IEnumerator(TSource)). Elke werkrolthread in de lus of query kan in de lijst worden aangeroepen GetEnumerator om een IEnumerator<T> afzonderlijke partitie op te halen.
SupportsDynamicPartitions Retourneer true als u implementeertGetDynamicPartitions, anders. false
GetDynamicPartitions Als SupportsDynamicPartitions dat het is true, kan deze methode desgewenst worden aangeroepen in plaats van GetPartitions.

Als de resultaten sorteerbaar moeten zijn of als u geïndexeerde toegang tot de elementen nodig hebt, moet u de virtuele methoden ervan afleiden System.Collections.Concurrent.OrderablePartitioner<TSource> en overschrijven, zoals beschreven in de volgende tabel.

Wijze Description
GetPartitions Deze methode wordt eenmaal aangeroepen door de hoofdthread en retourneert een IList(IEnumerator(TSource)). Elke werkrolthread in de lus of query kan in de lijst worden aangeroepen GetEnumerator om een IEnumerator<T> afzonderlijke partitie op te halen.
SupportsDynamicPartitions Retourneer true als u implementeert GetDynamicPartitions; anders onwaar.
GetDynamicPartitions Normaal gesproken roept GetOrderableDynamicPartitionsdit gewoon aan.
GetOrderableDynamicPartitions Als SupportsDynamicPartitions dat het is true, kan deze methode desgewenst worden aangeroepen in plaats van GetPartitions.

De volgende tabel bevat aanvullende informatie over hoe de drie soorten taakverdelingspartitioners de OrderablePartitioner<TSource> klasse implementeren.

Methode/eigenschap IList/matrix zonder taakverdeling IList/Matrix met taakverdeling IEnumerable
GetOrderablePartitions Maakt gebruik van bereikpartitionering Maakt gebruik van segmentpartitionering die is geoptimaliseerd voor Lijsten voor de opgegeven partitionCount Maakt gebruik van segmentpartitionering door een statisch aantal partities te maken.
OrderablePartitioner<TSource>.GetOrderableDynamicPartitions Genereert niet-ondersteunde uitzondering Maakt gebruik van segmentpartitionering die is geoptimaliseerd voor lijsten en dynamische partities Maakt gebruik van segmentpartitionering door een dynamisch aantal partities te maken.
KeysOrderedInEachPartition Retourneert true Retourneert true Retourneert true
KeysOrderedAcrossPartitions Retourneert true Retourneert false Retourneert false
KeysNormalized Retourneert true Retourneert true Retourneert true
SupportsDynamicPartitions Retourneert false Retourneert true Retourneert true

Dynamische partities

Als u van plan bent de partitioner te gebruiken in een ForEach methode, moet u een dynamisch aantal partities kunnen retourneren. Dit betekent dat de partitioner een enumerator kan leveren voor een nieuwe partitie op aanvraag op elk moment tijdens het uitvoeren van een lus. Wanneer de lus een nieuwe parallelle taak toevoegt, wordt een nieuwe partitie voor die taak aangevraagd. Als u wilt dat de gegevens kunnen worden besteld, moet u deze afleiden zodat System.Collections.Concurrent.OrderablePartitioner<TSource> aan elk item in elke partitie een unieke index wordt toegewezen.

Zie Voor meer informatie en een voorbeeld: Dynamische partities implementeren.

Contract voor partitioneerfuncties

Wanneer u een aangepaste partitioner implementeert, volgt u deze richtlijnen om de juiste interactie met PLINQ en ForEach in de TPL te garanderen:

  • Als GetPartitions wordt aangeroepen met een argument van nul of minder voor partitionsCount, gooien ArgumentOutOfRangeException. Hoewel PLINQ en TPL nooit gelijk partitionCount aan 0 zullen doorgeven, raden we u echter aan om u te beschermen tegen de mogelijkheid.

  • GetPartitions en GetOrderablePartitions moet altijd het aantal partities retourneren partitionsCount . Als de partitioner onvoldoende gegevens bevat en er niet zoveel partities kunnen worden gemaakt als aangevraagd, moet de methode een lege opsommingsfunctie retourneren voor elk van de resterende partities. Anders gooien zowel PLINQ als TPL een InvalidOperationException.

  • GetPartitions, GetOrderablePartitions, GetDynamicPartitionsen GetOrderableDynamicPartitions mag nooit retourneren null (Nothing in Visual Basic). Als ze dat doen, zal PLINQ / TPL een InvalidOperationException.

  • Methoden die partities retourneren, moeten altijd partities retourneren die de gegevensbron volledig en uniek kunnen inventariseren. Er mag geen duplicatie zijn in de gegevensbron of overgeslagen items, tenzij dit specifiek vereist is voor het ontwerp van de partitioner. Als deze regel niet wordt gevolgd, is de uitvoervolgorde mogelijk gecrambleerd.

  • De volgende Booleaanse getters moeten altijd de volgende waarden nauwkeurig retourneren, zodat de uitvoervolgorde niet wordt gecrambleerd:

    • KeysOrderedInEachPartition: Elke partitie retourneert elementen met toenemende sleutelindexen.

    • KeysOrderedAcrossPartitions: Voor alle partities die worden geretourneerd, zijn de sleutelindexen in partitie i hoger dan de sleutelindexen in partitie i-1.

    • KeysNormalized: Alle sleutelindexen worden monotonisch verhoogd zonder hiaten, beginnend vanaf nul.

  • Alle indexen moeten uniek zijn. Er zijn mogelijk geen dubbele indexen. Als deze regel niet wordt gevolgd, is de uitvoervolgorde mogelijk gecrambleerd.

  • Alle indexen moeten niet-negatief zijn. Als deze regel niet wordt gevolgd, kan PLINQ/TPL uitzonderingen genereren.

Zie ook