Optymalizacja przetwarzania danych dla platformy Apache Spark
W tym artykule omówiono sposób optymalizacji konfiguracji klastra Apache Spark w celu uzyskania najlepszej wydajności w usłudze Azure HDInsight.
Omówienie
Jeśli masz wolne zadania w sprzężeniu lub mieszania, przyczyną jest prawdopodobnie niesymetryczność danych. Niesymetryczność danych to asymetria danych zadania. Na przykład zadanie mapy może potrwać 20 sekund. Jednak uruchomienie zadania, w którym dane są przyłączone lub przetasowane, trwa kilka godzin. Aby naprawić niesymetryczność danych, należy skonsolić cały klucz lub użyć izolowanej soli tylko dla niektórych podzestawów kluczy. Jeśli używasz izolowanej soli, należy dodatkowo filtrować, aby odizolować podzestaw kluczy z solą w sprzężeniach mapy. Inną opcją jest wprowadzenie kolumny zasobnika i wstępne agregowanie w zasobnikach.
Innym czynnikiem powodującym powolne sprzężenia może być typ sprzężenia. Domyślnie platforma Spark używa SortMerge
typu sprzężenia. Ten typ sprzężenia najlepiej nadaje się do dużych zestawów danych. Jednak w przeciwnym razie jest kosztowna obliczeniowo, ponieważ musi najpierw posortować lewe i prawe strony danych przed ich scaleniem.
Sprzężenia Broadcast
najlepiej nadaje się do mniejszych zestawów danych lub gdy jedna strona sprzężenia jest znacznie mniejsza niż druga strona. Ten typ sprzężenia emituje jedną stronę do wszystkich funkcji wykonawczych, dlatego wymaga więcej pamięci dla emisji w ogóle.
Możesz zmienić typ sprzężenia w konfiguracji, ustawiając spark.sql.autoBroadcastJoinThreshold
wartość , lub możesz ustawić wskazówkę sprzężenia przy użyciu interfejsów API ramki danych (dataframe.join(broadcast(df2))
).
// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)
// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
createOrReplaceTempView("V_JOIN")
sql("SELECT col1, col2 FROM V_JOIN")
Jeśli używasz tabel zasobnikowych, masz trzeci typ sprzężenia, sprzężenia Merge
. Poprawnie podzielony na partycje i wstępnie posortowany zestaw danych pominie kosztowną fazę sortowania SortMerge
z sprzężenia.
Kolejność sprzężeń ma znaczenie, szczególnie w bardziej złożonych zapytaniach. Zacznij od najbardziej selektywnych sprzężeń. Ponadto przenoszenie sprzężeń, które zwiększają liczbę wierszy po agregacji, gdy jest to możliwe.
Aby zarządzać równoległością sprzężeń kartezjańskich, możesz dodać zagnieżdżone struktury, okna i być może pominąć co najmniej jeden krok w zadaniu platformy Spark.
Optymalizowanie wykonywania zadań
- Pamięć podręczna w razie potrzeby, jeśli używasz danych dwa razy, a następnie buforuj je.
- Emisja zmiennych do wszystkich funkcji wykonawczych. Zmienne są serializowane tylko raz, co powoduje szybsze wyszukiwanie.
- Użyj puli wątków w sterowniku, co powoduje szybszą operację dla wielu zadań.
Regularnie monitoruj uruchomione zadania pod kątem problemów z wydajnością. Jeśli potrzebujesz więcej informacji na temat niektórych problemów, rozważ jedną z następujących narzędzi profilowania wydajności:
- Narzędzie Intel PAL monitoruje użycie procesora CPU, magazynu i przepustowości sieci.
- Oracle Java 8 Mission Control profile Spark i kod wykonawcy.
Kluczem do wydajności zapytań platformy Spark 2.x jest aparat Tungsten, który zależy od generowania kodu na całym etapie. W niektórych przypadkach generowanie kodu na całym etapie może być wyłączone. Jeśli na przykład w wyrażeniu SortAggregate
agregacji zostanie użyty typ niezmienialny (string
), pojawi się zamiast HashAggregate
. Na przykład w celu uzyskania lepszej wydajności spróbuj wykonać następujące czynności, a następnie ponownie włącz generowanie kodu:
MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))