Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In diesem Artikel wird erläutert, wie Sie die Konfiguration Ihres Apache Spark-Clusters für eine optimale Leistung in Azure HDInsight optimieren.
Übersicht
Wenn Aufträge in einem Join oder Shuffle langsam sind, ist die Ursache dafür wahrscheinlich eine Datenschiefe. Hierbei handelt es sich um eine Asymmetrie in Ihren Auftragsdaten. Beispielsweise kann ein Map-Job 20 Sekunden dauern. Das Ausführen eines Auftrags, bei dem die Daten verknüpft oder neu gemischt werden, dauert jedoch Stunden. Um eine Datenverzerrung zu beheben, sollten Sie den gesamten Schlüssel salzen oder ein isoliertes Salz für nur eine Teilmenge von Schlüsseln verwenden. Wenn Sie einen isolierten Salt verwenden, sollten Sie eine weitere Filterung anwenden, um die Teilmenge der Schlüssel mit Salts in Zuordnungsjoins zu isolieren. Eine andere Option besteht darin, eine Bucketspalte einzuführen und zuerst vorab eine Aggregation in den Buckets durchzuführen.
Ein weiterer Faktor, der langsame Verknüpfungen verursacht, könnte der Verknüpfungstyp sein. Standardmäßig verwendet Spark den Verknüpfungstyp SortMerge . Dieser Verknüpfungstyp eignet sich am besten für große Datasets. Ist aber ansonsten rechenintensiv, da sie zuerst die linken und rechten Seiten der Daten sortieren müssen, bevor sie zusammengeführt werden.
Eine Broadcast Verknüpfung eignet sich am besten für kleinere Datasets, oder eine Seite der Verknüpfung ist wesentlich kleiner als die andere Seite. Diese Art von Join sendet eine Seite an alle Executoren und erfordert daher mehr Arbeitsspeicher für Übertragungen im Allgemeinen.
Sie können den Verknüpfungstyp in Ihrer Konfiguration ändern, indem Sie festlegen spark.sql.autoBroadcastJoinThreshold, oder Sie können einen Verknüpfungshinweis mithilfe der DataFrame-APIs (dataframe.join(broadcast(df2))) festlegen.
// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)
// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
createOrReplaceTempView("V_JOIN")
sql("SELECT col1, col2 FROM V_JOIN")
Wenn Sie zusammengefasste Tabellen verwenden, haben Sie einen dritten Verknüpfungstyp, die Merge Verknüpfung. Ein ordnungsgemäß vorab partitioniertes und vorab sortiertes Dataset überspringt die teure Sortierungsphase eines SortMerge-Joins.
Die Reihenfolge der Verknüpfungen ist wichtig, insbesondere bei komplexeren Abfragen. Beginnen Sie mit den selektivsten Verknüpfungen. Verschieben Sie außerdem Verknüpfungen, die die Anzahl der Zeilen nach Aggregationen erhöhen, wenn möglich.
Um Parallelität für kartesische Verknüpfungen zu verwalten, können Sie geschachtelte Strukturen, Fensterung hinzufügen und vielleicht einen oder mehrere Schritte in Ihrem Spark Job überspringen.
Optimieren der Auftragsausführung
- Zwischenspeichern Sie die Daten, wenn nötig, z. B. wenn Sie die Daten zweimal verwenden.
- Übertragen von Variablen an alle Executoren. Die Variablen werden nur einmal serialisiert, was zu schnelleren Nachschlagevorgängen führt.
- Verwenden Sie den Threadpool auf dem Treiber, was zu einem schnelleren Betrieb für viele Aufgaben führt.
Überwachen Sie Ihre ausgeführten Aufträge regelmäßig auf Leistungsprobleme. Wenn Sie mehr Einblick in bestimmte Probleme benötigen, sollten Sie eines der folgenden Leistungsprofilerstellungstools berücksichtigen:
- Intel PAL Tool überwacht CPU-, Speicher- und Netzwerkbandbreitennutzung.
- Oracle Java 8 Mission Control profiliert Spark- und Executor-Code.
Der Schlüssel zur Spark 2.x-Abfrageleistung ist die Tungsten-Engine, die von der Whole-Stage-Code-Generierung abhängt. In einigen Fällen ist die Generierung von ganzstufigem Code möglicherweise deaktiviert. Wenn Sie beispielsweise einen unveränderlichen Typ (string) im Aggregationsausdruck verwenden, wird SortAggregate anstelle von HashAggregate angezeigt. Um beispielsweise eine bessere Leistung zu erzielen, probieren Sie Folgendes aus, und aktivieren Sie dann die Codegenerierung erneut:
MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))