この記事では、Azure HDInsight で最適なパフォーマンスを得るために Apache Spark クラスターの構成を最適化する方法について説明します。
概要
結合またはシャッフルで処理が遅い場合は、データスキューが原因である可能性があります。 データ スキューは、ジョブ データの非対称性です。 たとえば、マップ ジョブには 20 秒かかる場合があります。 ただし、データが結合またはシャッフルされるジョブの実行には数時間かかります。 データ スキューを修正するには、キー全体を塩分けするか、キーの一部のサブセットにのみ 分離されたソルト を使用する必要があります。 分離されたソルトを使用している場合は、マップ結合においてソルト化されたキーのサブセットをさらにフィルターして分離する必要があります。 もう 1 つのオプションは、最初にバケットカラムを導入して、バケット毎に事前集計することです。
結合が遅くなるもう 1 つの要因は、結合の種類です。 既定では、Spark は SortMerge 結合の種類を使用します。 この種類の結合は、大規模なデータ セットに最適です。 しかし、それ以外の場合は、データをマージする前に最初にデータの左側と右側を並べ替える必要があるため、計算コストがかかります。
Broadcast結合は、小さいデータ セットや、結合の一方の側が他方の側よりもはるかに小さい場合に最適です。 この種の結合では、一方の側がすべてのExecutorにブロードキャストされるため、ブロードキャストのためにより多くのメモリが必要です。
spark.sql.autoBroadcastJoinThresholdを設定して構成の結合の種類を変更することも、DataFrame API (dataframe.join(broadcast(df2))) を使用して結合ヒントを設定することもできます。
// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)
// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
createOrReplaceTempView("V_JOIN")
sql("SELECT col1, col2 FROM V_JOIN")
バケット化されたテーブルを使用している場合は、3 番目の結合の種類である Merge 結合があります。 正しく事前にパーティション分割され、事前に並べ替えられたデータセットでは、コストの高い並べ替えフェーズが SortMerge 結合からスキップされます。
結合の順序は、特に複雑なクエリで重要です。 最も選択的な結合から始めます。 また、可能な場合は、集計後の行数を増やす結合を移動します。
デカルト結合の並列処理を管理するには、入れ子構造、ウィンドウ化を追加し、Spark ジョブで 1 つ以上の手順をスキップすることができます。
ジョブの実行を最適化する
- データを 2 回使用する場合など、必要に応じてキャッシュし、キャッシュします。
- すべての Executor に変数をブロードキャストします。 変数は 1 回だけシリアル化されるため、検索が高速になります。
- ドライバーでスレッド プールを使用すると、多くのタスクの操作が高速化されます。
実行中のプロセスでの個々のパフォーマンス問題を定期的に監視します。 特定の問題に関する詳細な分析情報が必要な場合は、次のいずれかのパフォーマンス プロファイリング ツールを検討してください。
- Intel PAL ツール は、CPU、ストレージ、およびネットワーク帯域幅の使用状況を監視します。
- Oracle Java 8 Mission Control では 、Spark と Executor コードがプロファイルされます。
Spark 2.x クエリ パフォーマンスの鍵となるのは、全段階のコード生成に依存するタングステン エンジンです。 場合によっては、ステージ全体のコード生成が無効になる場合があります。 たとえば、集計式で変更できない型 (string) を使用すると、SortAggregateではなくHashAggregate表示されます。 たとえば、パフォーマンスを向上させるには、次を試してから、コード生成を再度有効にします。
MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))