Optymalizacja przetwarzania danych dla platformy Apache Spark

W tym artykule omówiono sposób optymalizacji konfiguracji klastra Apache Spark w celu uzyskania najlepszej wydajności w usłudze Azure HDInsight.

Omówienie

Jeśli masz wolne zadania w sprzężeniu lub mieszania, przyczyną jest prawdopodobnie niesymetryczność danych. Niesymetryczność danych to asymetria danych zadania. Na przykład zadanie mapy może potrwać 20 sekund. Jednak uruchomienie zadania, w którym dane są przyłączone lub przetasowane, trwa kilka godzin. Aby naprawić niesymetryczność danych, należy skonsolić cały klucz lub użyć izolowanej soli tylko dla niektórych podzestawów kluczy. Jeśli używasz izolowanej soli, należy dodatkowo filtrować, aby odizolować podzestaw kluczy z solą w sprzężeniach mapy. Inną opcją jest wprowadzenie kolumny zasobnika i wstępne agregowanie w zasobnikach.

Innym czynnikiem powodującym powolne sprzężenia może być typ sprzężenia. Domyślnie platforma Spark używa SortMerge typu sprzężenia. Ten typ sprzężenia najlepiej nadaje się do dużych zestawów danych. Jednak w przeciwnym razie jest kosztowna obliczeniowo, ponieważ musi najpierw posortować lewe i prawe strony danych przed ich scaleniem.

Sprzężenia Broadcast najlepiej nadaje się do mniejszych zestawów danych lub gdy jedna strona sprzężenia jest znacznie mniejsza niż druga strona. Ten typ sprzężenia emituje jedną stronę do wszystkich funkcji wykonawczych, dlatego wymaga więcej pamięci dla emisji w ogóle.

Możesz zmienić typ sprzężenia w konfiguracji, ustawiając spark.sql.autoBroadcastJoinThresholdwartość , lub możesz ustawić wskazówkę sprzężenia przy użyciu interfejsów API ramki danych (dataframe.join(broadcast(df2))).

// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)

// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
    createOrReplaceTempView("V_JOIN")

sql("SELECT col1, col2 FROM V_JOIN")

Jeśli używasz tabel zasobnikowych, masz trzeci typ sprzężenia, sprzężenia Merge . Poprawnie podzielony na partycje i wstępnie posortowany zestaw danych pominie kosztowną fazę sortowania SortMerge z sprzężenia.

Kolejność sprzężeń ma znaczenie, szczególnie w bardziej złożonych zapytaniach. Zacznij od najbardziej selektywnych sprzężeń. Ponadto przenoszenie sprzężeń, które zwiększają liczbę wierszy po agregacji, gdy jest to możliwe.

Aby zarządzać równoległością sprzężeń kartezjańskich, możesz dodać zagnieżdżone struktury, okna i być może pominąć co najmniej jeden krok w zadaniu platformy Spark.

Optymalizowanie wykonywania zadań

  • Pamięć podręczna w razie potrzeby, jeśli używasz danych dwa razy, a następnie buforuj je.
  • Emisja zmiennych do wszystkich funkcji wykonawczych. Zmienne są serializowane tylko raz, co powoduje szybsze wyszukiwanie.
  • Użyj puli wątków w sterowniku, co powoduje szybszą operację dla wielu zadań.

Regularnie monitoruj uruchomione zadania pod kątem problemów z wydajnością. Jeśli potrzebujesz więcej informacji na temat niektórych problemów, rozważ jedną z następujących narzędzi profilowania wydajności:

Kluczem do wydajności zapytań platformy Spark 2.x jest aparat Tungsten, który zależy od generowania kodu na całym etapie. W niektórych przypadkach generowanie kodu na całym etapie może być wyłączone. Jeśli na przykład w wyrażeniu SortAggregate agregacji zostanie użyty typ niezmienialny (string), pojawi się zamiast HashAggregate. Na przykład w celu uzyskania lepszej wydajności spróbuj wykonać następujące czynności, a następnie ponownie włącz generowanie kodu:

MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))

Następne kroki