Udostępnij za pośrednictwem


Niestandardowe partycjonery dla PLINQ i TPL

Aby zrównoleglić operację w źródle danych, jednym z podstawowych kroków jest podzielenie źródła na wiele sekcji, do których można uzyskiwać dostęp współbieżnie przez wiele wątków. PlINQ i Biblioteka równoległa zadań (TPL) udostępniają domyślne partycjonatory, które działają w sposób niewidoczny podczas pisania zapytania równoległego lub ForEach pętli. W przypadku bardziej zaawansowanych scenariuszy możesz podłączyć własny moduł partycjonatora.

Rodzaje partycjonowania

Istnieje wiele sposobów partycjonowania źródła danych. W najbardziej wydajnych podejściach wiele wątków współpracuje z procesem oryginalnej sekwencji źródłowej, a nie fizycznie oddzielając źródło do wielu podsekwencjonowania. W przypadku tablic i innych indeksowanych źródeł, takich jak IList kolekcje, w których długość jest znana z wyprzedzeniem, partycjonowanie zakresu jest najprostszym rodzajem partycjonowania. Każdy wątek otrzymuje unikatowe indeksy początkowe i końcowe, dzięki czemu może przetwarzać zakres źródła bez zastępowania lub zastępowania przez inny wątek. Jedynym obciążeniem związanym z partycjonowaniem zakresu jest początkowa praca tworzenia zakresów; po tym nie jest wymagana żadna dodatkowa synchronizacja. W związku z tym może zapewnić dobrą wydajność, o ile obciążenie jest równomiernie podzielone. Wadą partycjonowania zakresu jest to, że jeśli jeden wątek zakończy się wcześnie, nie może pomóc innym wątkom zakończyć swoją pracę.

W przypadku list połączonych lub innych kolekcji, których długość nie jest znana, można użyć partycjonowania fragmentów. W partycjonowaniu fragmentów każdy wątek lub zadanie w pętli równoległej lub zapytaniu zużywa pewną liczbę elementów źródłowych w jednym kawałku, przetwarza je, a następnie wraca do pobierania dodatkowych elementów. Moduł partycjonujący zapewnia, że wszystkie elementy są rozproszone i że nie ma duplikatów. Fragment może mieć dowolny rozmiar. Na przykład partycjonator pokazany w temacie Instrukcje: Implementowanie partycji dynamicznych tworzy fragmenty zawierające tylko jeden element. Tak długo, jak fragmenty nie są zbyt duże, tego rodzaju partycjonowanie jest z natury równoważenia obciążenia, ponieważ przypisanie elementów do wątków nie jest wstępnie określone. Jednak partycjonator generuje obciążenie synchronizacji za każdym razem, gdy wątek musi uzyskać inny fragment. Ilość synchronizacji poniesionej w tych przypadkach jest odwrotnie proporcjonalna do rozmiaru fragmentów.

Ogólnie rzecz biorąc partycjonowanie zakresu jest szybsze tylko wtedy, gdy czas wykonywania delegata jest niewielki do umiarkowanego, a źródło ma dużą liczbę elementów, a łączna praca każdej partycji jest w przybliżeniu równoważna. Partycjonowanie fragmentów jest zatem ogólnie szybsze w większości przypadków. W przypadku źródeł z niewielką liczbą elementów lub dłuższych czasów wykonywania dla delegata wydajność partycjonowania fragmentów i zakresu jest równa.

Partycjonatory TPL obsługują również dynamiczną liczbę partycji. Oznacza to, że mogą one tworzyć partycje na bieżąco, na przykład gdy pętla ForEach zduplikuje nowe zadanie. Ta funkcja umożliwia partycjonatorowi skalowanie razem z samą pętlą. Dynamiczne partycjonatory są również z natury równoważenia obciążenia. Podczas tworzenia niestandardowego partycjonatora należy obsługiwać partycjonowanie dynamiczne, aby można je było eksploatować z ForEach pętli.

Konfigurowanie partycjonatorów równoważenia obciążenia dla PLINQ

Niektóre przeciążenia Partitioner.Create metody umożliwiają utworzenie partycjonatora dla tablicy lub IList źródła i określenie, czy należy spróbować zrównoważyć obciążenie między wątkami. Gdy partycjonator jest skonfigurowany do równoważenia obciążenia, partycjonowanie fragmentów jest używane, a elementy są przekazywane do każdej partycji w małych fragmentach, gdy są one żądane. Takie podejście pomaga zapewnić, że wszystkie partycje mają elementy do przetworzenia do momentu ukończenia całej pętli lub zapytania. Dodatkowe przeciążenie może służyć do zapewnienia partycjonowania równoważenia obciążenia dowolnego IEnumerable źródła.

Ogólnie rzecz biorąc, równoważenie obciążenia wymaga, aby partycje żądały elementów stosunkowo często z partycjonatora. Natomiast partycjonator, który wykonuje partycjonowanie statyczne, może przypisać elementy do każdego partycjonatora jednocześnie przy użyciu partycjonowania zakresu lub fragmentu. Wymaga to mniejszego obciążenia niż równoważenie obciążenia, ale wykonanie jednego wątku może potrwać dłużej, jeśli jeden wątek kończy się znacznie większą pracą niż inne. Domyślnie po przekazaniu listy IList lub tablicy funkcja PLINQ zawsze używa partycjonowania zakresu bez równoważenia obciążenia. Aby włączyć równoważenie obciążenia dla PLINQ, użyj Partitioner.Create metody , jak pokazano w poniższym przykładzie.

// Static partitioning requires indexable source. Load balancing
// can use any IEnumerable.
var nums = Enumerable.Range(0, 100000000).ToArray();

// Create a load-balancing partitioner. Or specify false for static partitioning.
Partitioner<int> customPartitioner = Partitioner.Create(nums, true);

// The partitioner is the query's data source.
var q = from x in customPartitioner.AsParallel()
        select x * Math.PI;

q.ForAll((x) =>
{
    ProcessData(x);
});
' Static number of partitions requires indexable source.
Dim nums = Enumerable.Range(0, 100000000).ToArray()

' Create a load-balancing partitioner. Or specify false For  Shared partitioning.
Dim customPartitioner = Partitioner.Create(nums, True)

' The partitioner is the query's data source.
Dim q = From x In customPartitioner.AsParallel()
        Select x * Math.PI

q.ForAll(Sub(x) ProcessData(x))

Najlepszym sposobem określenia, czy w danym scenariuszu ma być użycie równoważenia obciążenia, jest eksperymentowanie i mierzenie czasu wykonywania operacji w ramach reprezentatywnych obciążeń i konfiguracji komputerów. Na przykład partycjonowanie statyczne może zapewnić znaczną szybkość pracy na komputerze wielordzeniowym, który ma tylko kilka rdzeni, ale może to spowodować spowolnienie na komputerach, które mają stosunkowo wiele rdzeni.

W poniższej tabeli wymieniono dostępne przeciążenia Create metody. Te partycjonatory nie są ograniczone tylko do używania z PLINQ lub Task. Mogą być również używane z dowolną niestandardową konstrukcją równoległą.

Przeciążenie Używa równoważenia obciążenia
Create<TSource>(IEnumerable<TSource>) Zawsze
Create<TSource>(TSource[], Boolean) Gdy argument logiczny jest określony jako true
Create<TSource>(IList<TSource>, Boolean) Gdy argument logiczny jest określony jako true
Create(Int32, Int32) Nigdy
Create(Int32, Int32, Int32) Nigdy
Create(Int64, Int64) Nigdy
Create(Int64, Int64, Int64) Nigdy

Konfigurowanie partycji zakresu statycznego dla elementu Parallel.ForEach

For W pętli treść pętli jest dostarczana do metody jako delegata. Koszt wywoływania tego delegata jest taki sam jak wywołanie metody wirtualnej. W niektórych scenariuszach treść pętli równoległej może być wystarczająco mała, że koszt wywołania delegata w każdej iteracji pętli staje się znaczący. W takich sytuacjach można użyć jednego z Create przeciążeń, aby utworzyć IEnumerable<T> partycje zakresu na elementach źródłowych. Następnie można przekazać tę kolekcję zakresów do ForEach metody, której treść składa się z pętli regularnej for . Zaletą tego podejścia jest to, że koszt wywołania delegata jest naliczany tylko raz na zakres, a nie raz na element. W poniższym przykładzie przedstawiono podstawowy wzorzec.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {

        // Source must be array or IList.
        var source = Enumerable.Range(0, 100000).ToArray();

        // Partition the entire source array.
        var rangePartitioner = Partitioner.Create(0, source.Length);

        double[] results = new double[source.Length];

        // Loop over the partitions in parallel.
        Parallel.ForEach(rangePartitioner, (range, loopState) =>
        {
            // Loop over each range element without a delegate invocation.
            for (int i = range.Item1; i < range.Item2; i++)
            {
                results[i] = source[i] * Math.PI;
            }
        });

        Console.WriteLine("Operation complete. Print results? y/n");
        char input = Console.ReadKey().KeyChar;
        if (input == 'y' || input == 'Y')
        {
            foreach(double d in results)
            {
                Console.Write("{0} ", d);
            }
        }
    }
}
Imports System.Threading.Tasks
Imports System.Collections.Concurrent

Module PartitionDemo

    Sub Main()
        ' Source must be array or IList.
        Dim source = Enumerable.Range(0, 100000).ToArray()

        ' Partition the entire source array. 
        ' Let the partitioner size the ranges.
        Dim rangePartitioner = Partitioner.Create(0, source.Length)

        Dim results(source.Length - 1) As Double

        ' Loop over the partitions in parallel. The Sub is invoked
        ' once per partition.
        Parallel.ForEach(rangePartitioner, Sub(range, loopState)

                                               ' Loop over each range element without a delegate invocation.
                                               For i As Integer = range.Item1 To range.Item2 - 1
                                                   results(i) = source(i) * Math.PI
                                               Next
                                           End Sub)
        Console.WriteLine("Operation complete. Print results? y/n")
        Dim input As Char = Console.ReadKey().KeyChar
        If input = "y"c Or input = "Y"c Then
            For Each d As Double In results
                Console.Write("{0} ", d)
            Next
        End If

    End Sub
End Module

Każdy wątek w pętli otrzymuje własny Tuple<T1,T2> , który zawiera wartości indeksu początkowego i końcowego w określonym podzakresie. Pętla wewnętrzna for używa fromInclusive wartości i toExclusive do pętli na tablicy lub IList bezpośrednio.

Create Jedno z przeciążeń umożliwia określenie rozmiaru partycji i liczby partycji. To przeciążenie może być używane w scenariuszach, w których praca na element jest tak niska, że nawet jedno wywołanie metody wirtualnej na element ma zauważalny wpływ na wydajność.

Niestandardowe partycjonatory

W niektórych scenariuszach warto zaimplementować własny partycjonator. Na przykład możesz mieć niestandardową klasę kolekcji, którą można partycjonować wydajniej niż domyślne partycjonatory, na podstawie wiedzy o wewnętrznej strukturze klasy. Możesz też utworzyć partycje zakresu o różnych rozmiarach na podstawie wiedzy o tym, jak długo potrwa przetwarzanie elementów w różnych lokalizacjach w kolekcji źródłowej.

Aby utworzyć podstawowy niestandardowy partycjonator, utwórz klasę na podstawie System.Collections.Concurrent.Partitioner<TSource> metod wirtualnych i przesłoń ją zgodnie z opisem w poniższej tabeli.

Metoda opis
GetPartitions Ta metoda jest wywoływana raz przez główny wątek i zwraca element IList(IEnumerator(TSource)). Każdy wątek procesu roboczego w pętli lub zapytaniu może wywołać GetEnumerator na liście, aby pobrać IEnumerator<T> obiekt na odrębnej partycji.
SupportsDynamicPartitions Wróć true , jeśli zaimplementujesz GetDynamicPartitionswartość , w przeciwnym razie false.
GetDynamicPartitions Jeśli SupportsDynamicPartitions jest trueto , można opcjonalnie wywołać tę metodę zamiast GetPartitions.

Jeśli wyniki muszą być sortowalne lub wymagane jest zaindeksowany dostęp do elementów, należy utworzyć i zastąpić jego metody wirtualne zgodnie z System.Collections.Concurrent.OrderablePartitioner<TSource> opisem w poniższej tabeli.

Metoda opis
GetPartitions Ta metoda jest wywoływana raz przez główny wątek i zwraca wartość IList(IEnumerator(TSource)). Każdy wątek procesu roboczego w pętli lub zapytaniu może wywołać GetEnumerator na liście, aby pobrać IEnumerator<T> obiekt na odrębnej partycji.
SupportsDynamicPartitions Zwracaj true wartość w przypadku implementacji GetDynamicPartitions; w przeciwnym razie, false.
GetDynamicPartitions Zazwyczaj wywołuje to polecenie GetOrderableDynamicPartitions.
GetOrderableDynamicPartitions Jeśli SupportsDynamicPartitions jest trueto , można opcjonalnie wywołać tę metodę zamiast GetPartitions.

Poniższa tabela zawiera dodatkowe szczegóły dotyczące sposobu implementowania klasy przez trzy rodzaje partycjonatorów równoważenia OrderablePartitioner<TSource> obciążenia.

Method/Property Lista IList/Tablica bez równoważenia obciążenia Lista IList/Tablica z równoważeniem obciążenia Ienumerable
GetOrderablePartitions Używa partycjonowania zakresu Używa partycjonowania fragmentów zoptymalizowanego pod kątem list dla określonej partycjiCount Używa partycjonowania fragmentów przez utworzenie statycznej liczby partycji.
OrderablePartitioner<TSource>.GetOrderableDynamicPartitions Zgłasza nieobsługiwany wyjątek Używa partycjonowania fragmentów zoptymalizowanego pod kątem list i partycji dynamicznych Używa partycjonowania fragmentów przez utworzenie dynamicznej liczby partycji.
KeysOrderedInEachPartition Zwraca true Zwraca true Zwraca true
KeysOrderedAcrossPartitions Zwraca true Zwraca false Zwraca false
KeysNormalized Zwraca true Zwraca true Zwraca true
SupportsDynamicPartitions Zwraca false Zwraca true Zwraca true

Partycje dynamiczne

Jeśli zamierzasz używać partycjonatora w metodzie ForEach , musisz mieć możliwość zwrócenia dynamicznej liczby partycji. Oznacza to, że partycjonator może dostarczyć moduł wyliczający dla nowej partycji na żądanie w dowolnym momencie podczas wykonywania pętli. Zasadniczo za każdym razem, gdy pętla dodaje nowe zadanie równoległe, żąda nowej partycji dla tego zadania. Jeśli chcesz, aby dane można było uporządkować, należy je uzyskać System.Collections.Concurrent.OrderablePartitioner<TSource> , aby każdy element w każdej partycji był przypisany unikatowy indeks.

Aby uzyskać więcej informacji i przykład, zobacz How to: Implement Dynamic Partitions (Instrukcje: implementowanie partycji dynamicznych).

Kontrakt dla partycjonatorów

Podczas implementowania niestandardowego partycjonatora postępuj zgodnie z tymi wytycznymi, aby zapewnić poprawną interakcję z PLINQ i ForEach w TPL:

  • Jeśli GetPartitions jest wywoływany z argumentem zero lub mniej dla partitionsCount, wyrzuć ArgumentOutOfRangeException. Mimo że PLINQ i TPL nigdy nie przejdą partitionCount równe 0, zalecamy jednak ochronę przed możliwością.

  • GetPartitions i GetOrderablePartitions zawsze powinna zwracać partitionsCount liczbę partycji. Jeśli partycjonator zabraknie danych i nie może utworzyć tak wielu partycji, jak zażądano, metoda powinna zwrócić pusty moduł wyliczający dla każdej z pozostałych partycji. W przeciwnym razie zarówno PLINQ, jak i TPL będą zgłaszać wartość InvalidOperationException.

  • GetPartitions, , GetDynamicPartitionsGetOrderablePartitionsi GetOrderableDynamicPartitions nigdy nie należy zwracać null (Nothing w Visual Basic). Jeśli to zrobią, PLINQ / TPL zgłosi błąd InvalidOperationException.

  • Metody zwracające partycje powinny zawsze zwracać partycje, które mogą w pełni i jednoznacznie wyliczać źródło danych. W źródle danych nie powinno być duplikowane ani pomijane elementy, chyba że jest to wymagane specjalnie zgodnie z projektem partycjonatora. Jeśli ta reguła nie jest przestrzegana, kolejność danych wyjściowych może być zakodowana.

  • Następujące metody pobierania warunkowego muszą zawsze dokładnie zwracać następujące wartości, aby kolejność danych wyjściowych nie była szyfrowana:

    • KeysOrderedInEachPartition: Każda partycja zwraca elementy z rosnącymi indeksami kluczy.

    • KeysOrderedAcrossPartitions: Dla wszystkich zwracanych partycji indeksy kluczy w partycji i są wyższe niż indeksy kluczy w partycji i-1.

    • KeysNormalized: Wszystkie kluczowe indeksy są monotonicznie rosnące bez przerw, począwszy od zera.

  • Wszystkie indeksy muszą być unikatowe. Indeksy mogą nie być zduplikowane. Jeśli ta reguła nie jest przestrzegana, kolejność danych wyjściowych może być zakodowana.

  • Wszystkie indeksy muszą być nienegacyjne. Jeśli ta reguła nie jest przestrzegana, może wystąpić wyjątek PLINQ/TPL.

Zobacz też