Optymalizowanie zadań platformy Apache Spark w usłudze Azure Synapse Analytics

Dowiedz się, jak zoptymalizować konfigurację klastra Apache Spark dla określonego obciążenia. Najczęstszym wyzwaniem jest wykorzystanie pamięci spowodowane niewłaściwą konfiguracją (zwłaszcza funkcji wykonawczych o nieprawidłowym rozmiarze), długotrwałymi operacjami i zadaniami, których wynikiem są działania kartezjańskie. Możesz przyspieszyć zadania przy użyciu odpowiedniego buforowania i umożliwić niesymetryczność danych. Aby uzyskać najlepszą wydajność, monitoruj i przejrzyj długotrwałe i zużywające zasoby wykonania zadań platformy Spark.

W poniższych sekcjach opisano typowe optymalizacje i zalecenia dotyczące zadań platformy Spark.

Wybieranie abstrakcji danych

Starsze wersje platformy Spark używają RDD do abstrakcji danych, platformy Spark 1.3 i wersji 1.6, odpowiednio, wprowadzają ramki danych i zestawy danych. Rozważ następujące względne zalety:

  • Ramki danych
    • Najlepszy wybór w większości sytuacji.
    • Zapewnia optymalizację zapytań za pośrednictwem katalizatora.
    • Generowanie kodu na całym etapie.
    • Bezpośredni dostęp do pamięci.
    • Niskie obciążenie odzyskiwania pamięci (GC).
    • Nie tak przyjazny dla deweloperów, jak zestawy danych, ponieważ nie ma kontroli czasu kompilacji ani programowania obiektów domeny.
  • Zestawach danych
    • Dobry w złożonych potokach ETL, w których wpływ na wydajność jest akceptowalny.
    • Nie ma dobrej wydajności w agregacjach, w których wpływ na wydajność może być znaczny.
    • Zapewnia optymalizację zapytań za pośrednictwem katalizatora.
    • Przyjazny dla deweloperów przez zapewnienie programowania obiektów domeny i sprawdzania czasu kompilacji.
    • Dodaje obciążenie serializacji/deserializacji.
    • Wysokie obciążenie GC.
    • Przerywa generowanie kodu na całym etapie.
  • RDD
    • Nie trzeba używać RDD, chyba że trzeba utworzyć nowy niestandardowy RDD.
    • Brak optymalizacji zapytań za pośrednictwem katalizatora.
    • Brak generowania całego kodu etapowego.
    • Wysokie obciążenie GC.
    • Musi używać starszych interfejsów API platformy Spark 1.x.

Korzystanie z optymalnego formatu danych

Platforma Spark obsługuje wiele formatów, takich jak csv, json, xml, parquet, orc i avro. Platformę Spark można rozszerzyć, aby obsługiwać wiele formatów z zewnętrznymi źródłami danych — aby uzyskać więcej informacji, zobacz Pakiety platformy Apache Spark.

Najlepszym formatem wydajności jest parquet z kompresją snappy, która jest domyślna w spark 2.x. Parquet przechowuje dane w formacie kolumnowym i jest wysoce zoptymalizowany na platformie Spark. Ponadto kompresja snappy może spowodować większe pliki niż kompresja gzip. Ze względu na charakter rozdzielany tych plików będzie dekompresowany szybciej.

Korzystanie z pamięci podręcznej

Platforma Spark udostępnia własne natywne mechanizmy buforowania, których można używać za pomocą różnych metod, takich jak .persist(), .cache()i CACHE TABLE. Ta natywna pamięć podręczna jest efektywna w przypadku małych zestawów danych, a także w potokach ETL, w których należy buforować wyniki pośrednie. Jednak buforowanie natywne platformy Spark nie działa obecnie w przypadku partycjonowania, ponieważ buforowana tabela nie przechowuje danych partycjonowania.

Wydajne korzystanie z pamięci

Platforma Spark działa przez umieszczenie danych w pamięci, dlatego zarządzanie zasobami pamięci jest kluczowym aspektem optymalizacji wykonywania zadań platformy Spark. Istnieje kilka technik, które można zastosować, aby efektywnie korzystać z pamięci klastra.

  • Preferuj mniejsze partycje danych i konto dla rozmiaru danych, typów i dystrybucji w strategii partycjonowania.
  • Rozważmy nowszą, wydajniejszą serializacji danych Kryo, a nie domyślną serializacji Języka Java.
  • Monitorowanie i dostrajanie ustawień konfiguracji platformy Spark.

Na potrzeby dokumentacji struktura pamięci platformy Spark i niektóre parametry pamięci wykonawczej klucza są wyświetlane na następnej ilustracji.

Zagadnienia dotyczące pamięci platformy Spark

Platforma Apache Spark w Azure Synapse używa usługi YARN Apache Hadoop YARN, usługa YARN kontroluje maksymalną sumę pamięci używanej przez wszystkie kontenery w każdym węźle platformy Spark. Na poniższym diagramie przedstawiono kluczowe obiekty i ich relacje.

Zarządzanie pamięcią platformy Spark usługi YARN

Aby rozwiązać problem z komunikatami braku pamięci, spróbuj:

  • Przejrzyj shuffles zarządzania DAG. Zmniejszanie danych źródłowych po stronie mapy, wstępne partycjonowanie (lub zasobniki) danych źródłowych, maksymalizowanie pojedynczych tasów i zmniejszanie ilości wysyłanych danych.
  • Preferuj ReduceByKey ze stałym limitem pamięci do GroupByKey, który zapewnia agregacje, okna i inne funkcje, ale ma limit pamięci bez ograniczeń.
  • Preferuj TreeReducemetodę , która wykonuje więcej pracy z funkcjami wykonawczych lub partycjami, do Reduce, co wykonuje całą pracę nad sterownikiem.
  • Korzystaj z ramek danych, a nie obiektów RDD niższego poziomu.
  • Utwórz typy complexType, które hermetyzują akcje, takie jak "Top N", różne agregacje lub operacje okien.

Optymalizowanie serializacji danych

Zadania platformy Spark są dystrybuowane, dlatego odpowiednia serializacja danych jest ważna dla najlepszej wydajności. Istnieją dwie opcje serializacji dla platformy Spark:

  • Serializacja w języku Java jest domyślna.
  • Serializacja Kryo jest nowszym formatem i może spowodować szybszą i bardziej kompaktową serializacji niż Java. Kryo wymaga zarejestrowania klas w programie i nie obsługuje jeszcze wszystkich typów możliwych do serializacji.

Korzystanie z zasobników

Zasobniki są podobne do partycjonowania danych, ale każdy zasobnik może przechowywać zestaw wartości kolumn, a nie tylko jeden. Zasobniki dobrze sprawdzają się w przypadku partycjonowania na dużych (w milionach lub większej liczbie) wartości, takich jak identyfikatory produktów. Zasobnik jest określany przez utworzenie skrótu klucza zasobnika wiersza. Tabele z zasobnikami oferują unikatowe optymalizacje, ponieważ przechowują metadane dotyczące sposobu ich zasobnika i sortowania.

Oto niektóre zaawansowane funkcje zasobnika:

  • Optymalizacja zapytań oparta na zasobnikach metadanych.
  • Zoptymalizowane agregacje.
  • Zoptymalizowane sprzężenia.

Można używać partycjonowania i zasobnika w tym samym czasie.

Optymalizowanie sprzężeń i odczytów losowych

Jeśli masz wolne zadania w funkcji Join lub Shuffle, przyczyną jest prawdopodobnie niesymetryczność danych, która jest asymetrią w danych zadania. Na przykład zadanie mapy może potrwać 20 sekund, ale uruchomienie zadania, w którym dane są przyłączone lub tasowane, trwa kilka godzin. Aby naprawić niesymetryczność danych, należy skonsolić cały klucz lub użyć izolowanej soli tylko dla niektórych podzestawów kluczy. Jeśli używasz soli izolowanej, należy dodatkowo filtrować, aby odizolować podzbiór solonych kluczy w sprzężeniach mapy. Inną opcją jest wprowadzenie kolumny zasobnika i wstępne agregowanie w zasobnikach.

Innym czynnikiem powodującym powolne sprzężenia może być typ sprzężenia. Domyślnie platforma Spark używa SortMerge typu sprzężenia. Ten typ sprzężenia najlepiej nadaje się do dużych zestawów danych, ale w przeciwnym razie jest kosztowny obliczeniowo, ponieważ musi najpierw posortować lewe i prawe strony danych przed ich scaleniem.

Broadcast Sprzężenia najlepiej nadaje się do mniejszych zestawów danych lub gdy jedna strona sprzężenia jest znacznie mniejsza niż druga. Ten typ sprzężenia emituje jedną stronę do wszystkich funkcji wykonawczych, dlatego wymaga więcej pamięci dla emisji w ogóle.

Typ sprzężenia można zmienić w konfiguracji, ustawiając spark.sql.autoBroadcastJoinThresholdwartość , lub możesz ustawić wskazówkę sprzężenia przy użyciu interfejsów API ramki danych (dataframe.join(broadcast(df2))).

// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)

// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
    createOrReplaceTempView("V_JOIN")

sql("SELECT col1, col2 FROM V_JOIN")

Jeśli używasz tabel z zasobnikami, masz trzeci typ sprzężenia, sprzężenia Merge . Poprawnie partycjonowany i wstępnie posortowany zestaw danych pomija kosztowną fazę sortowania ze sprzężenia SortMerge .

Kolejność sprzężeń ma znaczenie, szczególnie w bardziej złożonych zapytaniach. Zacznij od najbardziej selektywnych sprzężeń. Ponadto przenieś sprzężenia, które zwiększają liczbę wierszy po agregacjach, gdy jest to możliwe.

Aby zarządzać równoległością sprzężeń kartezjańskich, możesz dodać zagnieżdżone struktury, okna i być może pominąć co najmniej jeden krok w zadaniu platformy Spark.

Wybieranie prawidłowego rozmiaru funkcji wykonawczej

Podczas podejmowania decyzji o konfiguracji funkcji wykonawczej należy wziąć pod uwagę obciążenie odzyskiwania pamięci java (GC).

  • Czynniki zmniejszające rozmiar funkcji wykonawczej:

    • Zmniejsz rozmiar sterty poniżej 32 GB, aby utrzymać obciążenie < GC o 10%.
    • Zmniejsz liczbę rdzeni, aby utrzymać obciążenie < GC na 10%.
  • Czynniki zwiększające rozmiar funkcji wykonawczej:

    • Zmniejsz obciążenie komunikacji między funkcjami wykonawczej.
    • Zmniejsz liczbę otwartych połączeń między funkcjami wykonawczej (N2) w większych klastrach (>100 funkcji wykonawczych).
    • Zwiększ rozmiar sterty w celu uwzględnienia zadań intensywnie korzystających z pamięci.
    • Opcjonalnie: Zmniejsz obciążenie pamięci na funkcję wykonawczej.
    • Opcjonalnie: Zwiększ wykorzystanie i współbieżność przez nadmierne subskrybowanie procesora CPU.

Ogólnie rzecz biorąc, podczas wybierania rozmiaru funkcji wykonawczej:

  • Zacznij od 30 GB na funkcję wykonawcza i dystrybuuj dostępne rdzenie maszyn.
  • Zwiększ liczbę rdzeni funkcji wykonawczej dla większych klastrów (> 100 funkcji wykonawczych).
  • Zmodyfikuj rozmiar na podstawie przebiegów próbnych i na poprzednich czynnikach, takich jak obciążenie GC.

Podczas uruchamiania współbieżnych zapytań należy wziąć pod uwagę następujące kwestie:

  • Zacznij od 30 GB na funkcję wykonawcza i wszystkich rdzeni komputera.
  • Utwórz wiele równoległych aplikacji Spark przez nadmierne subskrybowanie procesora CPU (około 30% poprawy opóźnienia).
  • Dystrybuuj zapytania w aplikacjach równoległych.
  • Zmodyfikuj rozmiar na podstawie przebiegów próbnych i na poprzednich czynnikach, takich jak obciążenie GC.

Monitoruj wydajność zapytań pod kątem odstających lub innych problemów z wydajnością, przeglądając widok osi czasu, wykres SQL, statystyki zadań itd. Czasami jeden lub kilka funkcji wykonawczych jest wolniejszych niż inne, a wykonywanie zadań trwa znacznie dłużej. Często zdarza się to w przypadku większych klastrów (> 30 węzłów). W takim przypadku należy podzielić pracę na większą liczbę zadań, aby harmonogram mógł zrekompensować powolne zadania.

Na przykład ma co najmniej dwa razy więcej zadań niż liczba rdzeni funkcji wykonawczej w aplikacji. Można również włączyć spekulacyjne wykonywanie zadań za pomocą polecenia conf: spark.speculation = true.

Optymalizowanie wykonywania zadań

  • Pamięć podręczna w razie potrzeby, jeśli używasz danych dwa razy, a następnie buforuj je.
  • Emisja zmiennych do wszystkich funkcji wykonawczych. Zmienne są serializowane tylko raz, co powoduje szybsze wyszukiwanie.
  • Użyj puli wątków w sterowniku, co powoduje szybszą operację dla wielu zadań.

Kluczem do wydajności zapytań platformy Spark 2.x jest aparat Tungsten, który zależy od generowania kodu na całym etapie. W niektórych przypadkach generowanie kodu na całym etapie może być wyłączone.

Jeśli na przykład w wyrażeniu SortAggregate agregacji zostanie użyty typ niezmienialny (string), pojawi się zamiast HashAggregate. Na przykład w celu uzyskania lepszej wydajności spróbuj wykonać następujące czynności, a następnie ponownie włącz generowanie kodu:

MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))

Następne kroki