Udostępnij za pośrednictwem


Niestandardowe partycjonatory dla PLINQ i TPL

Aby zrównoleglić operację w źródle danych, jednym z podstawowych kroków jest podzielenie źródła na wiele sekcji, do których można uzyskiwać dostęp współbieżnie przez wiele wątków. PLINQ i Biblioteka równoległa zadań (TPL) udostępniają domyślne partycjonatory, które działają w sposób przezroczysty podczas pisania zapytania równoległego lub ForEach pętli. W przypadku bardziej zaawansowanych scenariuszy możesz podłączyć własny moduł partycjonatora.

Rodzaje partycjonowania

Istnieje wiele sposobów partycjonowania źródła danych. W najbardziej wydajnych podejściach wiele wątków współpracuje, aby przetwarzać oryginalną sekwencję źródłową, zamiast fizycznie dzielić źródło na wiele podsekwencji. W przypadku tablic i innych indeksowanych źródeł, takich jak IList kolekcje, w których długość jest znana z wyprzedzeniem, partycjonowanie zakresu jest najprostszym rodzajem partycjonowania. Każdy wątek otrzymuje unikalne indeksy początkowe i końcowe, dzięki czemu może przetwarzać swój zakres źródła bez nadpisywania lub bycia nadpisywanym przez inne wątki. Jedynym obciążeniem związanym z partycjonowaniem zakresu jest początkowa praca tworzenia zakresów; po tym nie jest wymagana żadna dodatkowa synchronizacja. W związku z tym może zapewnić dobrą wydajność, o ile obciążenie jest równomiernie podzielone. Wadą partycjonowania zakresu jest to, że jeśli jeden wątek zakończy się wcześnie, nie może pomóc innym wątkom zakończyć swoją pracę.

W przypadku list połączonych lub innych kolekcji, których długość nie jest znana, można użyć podziału na fragmenty. W partycjonowaniu fragmentów każdy wątek lub zadanie w pętli równoległej lub zapytaniu zużywa pewną liczbę elementów źródłowych w jednym kawałku, przetwarza je, a następnie wraca do pobierania dodatkowych elementów. Moduł partycjonujący zapewnia, że wszystkie elementy są rozproszone i że nie ma duplikatów. Fragment może mieć dowolny rozmiar. Na przykład partycjonator pokazany w temacie Instrukcje: Implementowanie partycji dynamicznych tworzy fragmenty zawierające tylko jeden element. Dopóki fragmenty nie są zbyt duże, tego rodzaju partycjonowanie jest z natury równoważące obciążenie, ponieważ przypisanie elementów do wątków nie jest wstępnie określone. Jednak moduł partycjonujący generuje obciążenie związane z synchronizacją każdorazowo, gdy wątek musi pobrać kolejny fragment. Ilość synchronizacji poniesionej w tych przypadkach jest odwrotnie proporcjonalna do rozmiaru fragmentów.

Ogólnie rzecz biorąc partycjonowanie zakresu jest szybsze tylko wtedy, gdy czas wykonywania delegata jest niewielki do umiarkowanego, a źródło ma dużą liczbę elementów, a łączna praca każdej partycji jest w przybliżeniu równoważna. Partycjonowanie fragmentów jest zatem ogólnie szybsze w większości przypadków. W przypadku źródeł z niewielką liczbą elementów lub dłuższymi czasami wykonywania dla delegata, wydajność partycjonowania kawałków i zakresu jest podobna.

Partycjonatory TPL obsługują również dynamiczną liczbę partycji. Oznacza to, że mogą one tworzyć partycje dynamicznie, na przykład gdy pętla ForEach inicjuje nowe zadanie. Ta funkcja umożliwia narzędziu do partycjonowania skalowanie razem z pętlą. Dynamiczne partycjonatory są również z natury równoważące obciążenie. Podczas tworzenia niestandardowego partycjonatora należy obsługiwać partycjonowanie dynamiczne, aby było możliwe jego użycie w pętli ForEach.

Konfigurowanie partycjonatorów równoważenia obciążenia dla PLINQ

Niektóre przeciążenia metody Partitioner.Create umożliwiają utworzenie partycjonera dla tablicy lub źródła IList oraz określenie, czy należy spróbować zrównoważyć obciążenie między wątkami. Gdy partycjonator jest skonfigurowany do równoważenia obciążenia, partycjonowanie fragmentów jest używane, a elementy są przekazywane do każdej partycji w małych fragmentach, gdy są one żądane. Takie podejście pomaga zapewnić, że wszystkie partycje mają elementy do przetworzenia do momentu ukończenia całej pętli lub zapytania. Może być użyte dodatkowe przeciążenie, aby zapewnić partycjonowanie dla równoważenia ładowania dowolnego źródła IEnumerable.

Ogólnie rzecz biorąc, równoważenie obciążenia wymaga, aby partycje żądały elementów stosunkowo często z partycjonatora. Natomiast partycjonator, który wykonuje partycjonowanie statyczne, może przypisać elementy do każdego partycjonatora jednocześnie przy użyciu partycjonowania zakresu lub fragmentu. Wymaga to mniejszego obciążenia niż równoważenie obciążenia, ale wykonanie jednego wątku może potrwać dłużej, jeśli jeden wątek kończy się znacznie większą pracą niż inne. Domyślnie przy przekazywaniu listy IList lub tablicy, system PLINQ zawsze używa partycjonowania zakresowego bez balansowania obciążenia. Aby włączyć load balancing dla PLINQ, użyj metody Partitioner.Create, jak pokazano w poniższym przykładzie.

// Static partitioning requires indexable source. Load balancing
// can use any IEnumerable.
var nums = Enumerable.Range(0, 100000000).ToArray();

// Create a load-balancing partitioner. Or specify false for static partitioning.
Partitioner<int> customPartitioner = Partitioner.Create(nums, true);

// The partitioner is the query's data source.
var q = from x in customPartitioner.AsParallel()
        select x * Math.PI;

q.ForAll((x) =>
{
    ProcessData(x);
});
' Static number of partitions requires indexable source.
Dim nums = Enumerable.Range(0, 100000000).ToArray()

' Create a load-balancing partitioner. Or specify false For  Shared partitioning.
Dim customPartitioner = Partitioner.Create(nums, True)

' The partitioner is the query's data source.
Dim q = From x In customPartitioner.AsParallel()
        Select x * Math.PI

q.ForAll(Sub(x) ProcessData(x))

Najlepszym sposobem określenia, czy w danym scenariuszu ma być użycie równoważenia obciążenia, jest eksperymentowanie i mierzenie czasu wykonywania operacji w ramach reprezentatywnych obciążeń i konfiguracji komputerów. Na przykład partycjonowanie statyczne może zapewnić znaczną szybkość pracy na komputerze wielordzeniowym, który ma tylko kilka rdzeni, ale może to spowodować spowolnienie na komputerach, które mają stosunkowo wiele rdzeni.

W poniższej tabeli wymieniono dostępne przeciążenia Create metody. Te partycjonatory nie są ograniczone wyłącznie do użycia z PLINQ lub Task. Mogą być również używane z dowolną niestandardową konstrukcją równoległą.

Przeciążenie Używa równoważenia obciążenia
Create<TSource>(IEnumerable<TSource>) Zawsze
Create<TSource>(TSource[], Boolean) Gdy argument logiczny jest określony jako true
Create<TSource>(IList<TSource>, Boolean) Gdy argument logiczny jest określony jako true
Create(Int32, Int32) Nigdy
Create(Int32, Int32, Int32) Nigdy
Create(Int64, Int64) Nigdy
Create(Int64, Int64, Int64) Nigdy

Konfigurowanie partycjonerów statycznych zakresów w Parallel.ForEach

W pętli For blok kodu pętli jest dostarczany do metody jako delegat. Koszt wywoływania tego delegata jest z grubsza taki sam jak wywołanie metody wirtualnej. W niektórych scenariuszach treść pętli równoległej może być na tyle mała, że koszt wywołania delegacji przy każdej iteracji pętli staje się istotny. W takich sytuacjach można użyć jednego z Create przeciążeń, aby utworzyć IEnumerable<T> zakresowe partycje na elementach źródłowych. Następnie można przekazać tę kolekcję zakresów do ForEach metody, której treść składa się z pętli regularnej for . Zaletą tego podejścia jest to, że koszt wywołania delegata jest naliczany tylko raz na zakres, a nie raz na element. W poniższym przykładzie przedstawiono podstawowy wzorzec.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {

        // Source must be array or IList.
        var source = Enumerable.Range(0, 100000).ToArray();

        // Partition the entire source array.
        var rangePartitioner = Partitioner.Create(0, source.Length);

        double[] results = new double[source.Length];

        // Loop over the partitions in parallel.
        Parallel.ForEach(rangePartitioner, (range, loopState) =>
        {
            // Loop over each range element without a delegate invocation.
            for (int i = range.Item1; i < range.Item2; i++)
            {
                results[i] = source[i] * Math.PI;
            }
        });

        Console.WriteLine("Operation complete. Print results? y/n");
        char input = Console.ReadKey().KeyChar;
        if (input == 'y' || input == 'Y')
        {
            foreach(double d in results)
            {
                Console.Write("{0} ", d);
            }
        }
    }
}
Imports System.Threading.Tasks
Imports System.Collections.Concurrent

Module PartitionDemo

    Sub Main()
        ' Source must be array or IList.
        Dim source = Enumerable.Range(0, 100000).ToArray()

        ' Partition the entire source array. 
        ' Let the partitioner size the ranges.
        Dim rangePartitioner = Partitioner.Create(0, source.Length)

        Dim results(source.Length - 1) As Double

        ' Loop over the partitions in parallel. The Sub is invoked
        ' once per partition.
        Parallel.ForEach(rangePartitioner, Sub(range, loopState)

                                               ' Loop over each range element without a delegate invocation.
                                               For i As Integer = range.Item1 To range.Item2 - 1
                                                   results(i) = source(i) * Math.PI
                                               Next
                                           End Sub)
        Console.WriteLine("Operation complete. Print results? y/n")
        Dim input As Char = Console.ReadKey().KeyChar
        If input = "y"c Or input = "Y"c Then
            For Each d As Double In results
                Console.Write("{0} ", d)
            Next
        End If

    End Sub
End Module

Każdy wątek w pętli otrzymuje własny Tuple<T1,T2>, który zawiera wartości indeksu - początkowego oraz końcowego dla określonego podzakresu. Pętla wewnętrzna for wykorzystuje wartości fromInclusive oraz toExclusive do iteracji po tablicy lub bezpośrednio po IList.

Create Jedno z przeciążeń umożliwia określenie rozmiaru partycji i liczby partycji. To przeciążenie może być używane w scenariuszach, w których ilość pracy przypadającej na każdy element jest tak mała, że nawet jedno wywołanie metody wirtualnej na każdy element ma zauważalny wpływ na wydajność.

Niestandardowe partycjonatory

W niektórych scenariuszach warto, a nawet może być konieczne, zaimplementować własny partycjonator. Na przykład możesz mieć niestandardową klasę kolekcji, którą można partycjonować wydajniej niż domyślne partycjonatory, na podstawie wiedzy o wewnętrznej strukturze klasy. Możesz też utworzyć partycje zakresu o różnych rozmiarach na podstawie wiedzy o tym, jak długo potrwa przetwarzanie elementów w różnych lokalizacjach w kolekcji źródłowej.

Aby utworzyć podstawowy niestandardowy partycjonator, wyprowadź klasę z System.Collections.Concurrent.Partitioner<TSource> i nadpisz metody wirtualne, zgodnie z opisem w poniższej tabeli.

Metoda Opis
GetPartitions Ta metoda jest wywoływana raz przez główny wątek i zwraca element IList(IEnumerator(TSource)). Każdy wątek roboczy w pętli lub zapytaniu może wywołać GetEnumerator na liście, aby pobrać IEnumerator<T> z odrębnej partycji.
SupportsDynamicPartitions Wróć true, jeśli zaimplementujesz GetDynamicPartitions, w przeciwnym razie false.
GetDynamicPartitions Jeśli SupportsDynamicPartitions jest trueto , można opcjonalnie wywołać tę metodę zamiast GetPartitions.

Jeśli wyniki muszą być sortowalne lub jest wymagany indeksowany dostęp do elementów, należy dziedziczyć z System.Collections.Concurrent.OrderablePartitioner<TSource> i nadpisywać jego metody wirtualne zgodnie z opisem System.Collections.Concurrent.OrderablePartitioner<TSource> w poniższej tabeli.

Metoda Opis
GetPartitions Ta metoda jest wywoływana raz przez główny wątek i zwraca wartość IList(IEnumerator(TSource)). Każdy wątek roboczy w pętli lub zapytaniu może wywołać GetEnumerator na liście, aby pobrać IEnumerator<T> z odrębnej partycji.
SupportsDynamicPartitions Zwróć true, jeśli zaimplementujesz GetDynamicPartitions; w przeciwnym razie zwróć false.
GetDynamicPartitions Zazwyczaj po prostu wywołuje GetOrderableDynamicPartitions.
GetOrderableDynamicPartitions Jeśli SupportsDynamicPartitions jest trueto , można opcjonalnie wywołać tę metodę zamiast GetPartitions.

Poniższa tabela zawiera dodatkowe szczegóły dotyczące sposobu, w jaki trzy rodzaje partycjonatorów równoważenia obciążenia implementują klasę OrderablePartitioner<TSource>.

Metoda/Właściwość Lista IList/Tablica bez równoważenia obciążenia Lista IList/Tablica z równoważeniem obciążenia IEnumerable (interfejs programistyczny)
GetOrderablePartitions Używa partycjonowania zakresu Używa partycjonowania fragmentów zoptymalizowanego pod kątem list dla określonej liczby partycji. Używa partycjonowania fragmentów przez utworzenie statycznej liczby partycji.
OrderablePartitioner<TSource>.GetOrderableDynamicPartitions Rzuca wyjątek związany z brakiem wsparcia Używa podziału na fragmenty, zoptymalizowanego pod listy i partycje dynamiczne Używa partycjonowania fragmentów przez utworzenie dynamicznej liczby partycji.
KeysOrderedInEachPartition Zwraca true Zwraca true Zwraca true
KeysOrderedAcrossPartitions Zwraca true Zwraca false Zwraca false
KeysNormalized Zwraca true Zwraca true Zwraca true
SupportsDynamicPartitions Zwraca false Zwraca true Zwraca true

Partycje dynamiczne

Jeśli zamierzasz używać partycjonatora w metodzie ForEach , musisz mieć możliwość zwrócenia dynamicznej liczby partycji. Oznacza to, że partycjonator może dostarczyć moduł wyliczający dla nowej partycji na żądanie w dowolnym momencie podczas wykonywania pętli. Zasadniczo za każdym razem, gdy pętla dodaje nowe zadanie równoległe, żąda nowej partycji dla tego zadania. Jeśli chcesz, aby dane można było uporządkować, należy je uzyskać System.Collections.Concurrent.OrderablePartitioner<TSource> , aby każdy element w każdej partycji był przypisany unikatowy indeks.

Aby uzyskać więcej informacji i przykład, zobacz How to: Implement Dynamic Partitions (Instrukcje: implementowanie partycji dynamicznych).

Kontrakt dla rozgraniczających

Podczas implementowania niestandardowego partycjonatora postępuj zgodnie z tymi wytycznymi, aby zapewnić poprawną interakcję z PLINQ i ForEach w TPL:

  • Jeśli GetPartitions jest wywoływany z argumentem zero lub mniej dla partitionsCount, wyrzuć ArgumentOutOfRangeException. Chociaż PLINQ i TPL nigdy nie przekażą partitionCount równego 0, zalecamy jednak zabezpieczenie się przed taką możliwością.

  • GetPartitions i GetOrderablePartitions zawsze powinny zwracać partitionsCount liczbę partycji. Jeśli partycjonator zabraknie danych i nie może utworzyć tak wielu partycji, jak zażądano, metoda powinna zwrócić pusty moduł wyliczający dla każdej z pozostałych partycji. W przeciwnym razie zarówno PLINQ, jak i TPL będą zgłaszać wartość InvalidOperationException.

  • GetPartitions, GetOrderablePartitions, GetDynamicPartitions i GetOrderableDynamicPartitions nigdy nie powinny zwracać null (Nothing w Visual Basic). Jeśli to zrobią, PLINQ / TPL zgłosi błąd InvalidOperationException.

  • Metody zwracające partycje powinny zawsze zwracać partycje, które mogą w pełni i jednoznacznie wyliczać źródło danych. W źródle danych nie powinno być duplikowane ani pomijane elementy, chyba że jest to wymagane specjalnie zgodnie z projektem partycjonatora. Jeśli ta reguła nie jest przestrzegana, kolejność danych wyjściowych może być zaburzona.

  • Następujące gettery logiczne muszą zawsze dokładnie zwracać następujące wartości, aby kolejność danych wyjściowych nie była pomieszana.

    • KeysOrderedInEachPartition: Każda partycja zwraca elementy z rosnącymi indeksami kluczy.

    • KeysOrderedAcrossPartitions: Dla wszystkich zwracanych partycji indeksy kluczy w partycji i są wyższe niż indeksy kluczy w partycji i-1.

    • KeysNormalized: Wszystkie kluczowe indeksy są monotonicznie rosnące bez przerw, począwszy od zera.

  • Wszystkie indeksy muszą być unikatowe. Indeksy nie mogą być zduplikowane. Jeśli ta reguła nie jest przestrzegana, kolejność danych wyjściowych może być zaburzona.

  • Wszystkie indeksy muszą być nienegacyjne. Jeśli ta reguła nie jest przestrzegana, PLINQ/TPL może zgłaszać wyjątki.

Zobacz także