Apache Spark における書き込みの最適化の必要性について

Apache Spark のようなビッグデータ処理エンジンの分析ワークロードは、標準化されたより大きなサイズのファイルを使用したときに最も効率的に実行されます。 ファイル サイズ、ファイル数、Spark ワーカーの数、およびその構成の間の関係は、パフォーマンスに大きく影響します。 データ レイク テーブルへの取り込みワークロードは、大量の小さなファイルを絶えず書き込むという特性を受け継いでいる場合があります。このシナリオは、一般に「スモール ファイル問題」として知られています。

書き込みの最適化は、Synapse の Delta Lake 機能で、書き込むファイルの数を減らし、書き込むデータの個々のファイル サイズを大きくすることを目的としています。 128MB (既定値) のサイズのファイルを生成しながら、パーティションを動的に最適化します。 目標とするファイル サイズは、ワークロードの要件に応じて構成で変更することが可能です。

この機能では、パーティション間でデータのシャッフル フェーズを余分に行うことでファイル サイズを実現するため、データを書き込む際に余分な処理コストが発生します。 この書き込みによって生じる小さなペナルティは、テーブルの読み取り効率によって相殺されるはずです。

注意

  • これは、Apache Spark バージョン 3.1 以降の Synapse プールで使用できます。

書き込みの最適化の利点

  • Delta Lake テーブルで、バッチとストリーミングの両方の書き込みパターンで利用できます。
  • spark.write コマンドのパターンを変更する必要はありません。 この機能は、構成設定またはテーブル プロパティで有効にできます。
  • OPTIMIZE コマンドと比較して、書き込みトランザクションの数を減らすことができます。
  • OPTIMIZE 操作は、より少ないファイルで動作するので、より高速になります。
  • 参照されていない古いファイルを削除するための VACUUM コマンドもより高速に動作するようになります。
  • クエリでは、スキャンされるファイルが少なくなり、ファイル サイズもより適切になるた、め、読み取りパフォーマンスまたはリソース使用量が向上します。

書き込みの最適化の使用シナリオ

いつ使用するか

  • Delta Lake パーティション テーブルは、最適ではない (128 MB 未満) または標準化されていないファイル サイズ (それ自体のサイズが異なるファイル) を生成する書き込みパターンの影響を受けます。
  • 最適でないファイル サイズでディスクに書き込まれる再パーティションされたデータ フレーム。
  • UPDATE、DELETE、MERGE、CREATE TABLE AS SELECT、INSERT INTO などの小規模なバッチ SQL コマンドの対象となる Delta Lake パーティション テーブル。
  • Delta Lake パーティション テーブルにデータ パターンを追加するストリーミング インジェスト シナリオで、追加の書き込み待機時間が許容される場合。

使用すべきでない場合

  • パーティション分割されていないテーブル。
  • 追加の書き込み待機時間が許容されないユース ケース。
  • 最適化スケジュールと読み取りパターンが明確に定義された大きなテーブル。

書き込みの最適化機能を有効または無効にする方法

書き込みの最適化機能は既定では無効になっています。 Spark 3.3 プールでは、パーティション テーブルに対して既定で有効になっています。

プールまたはセッションの構成設定が完了すると、すべての Spark 書き込みパターンでこの機能が使用されます。

書き込みの最適化機能を使用するには、次の構成を使用して有効にします。

  1. Scala と PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
  1. Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled` = true

現在の構成値を確認するには、次のようにコマンドを使用します。

  1. Scala と PySpark
spark.conf.get("spark.microsoft.delta.optimizeWrite.enabled")
  1. Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled`

書き込みの最適化機能を無効にするには、次のように以下の構成を変更します。

  1. Scala と PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "false")
  1. Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled` = false

テーブル プロパティを使用した書き込みの最適化の制御

新しいテーブルの場合

  1. SQL
CREATE TABLE <table_name> TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)
  1. Scala

DeltaTableBuilder API を使用する場合:

val table = DeltaTable.create()
  .tableName("<table_name>")
  .addColumnn("<colName>", <dataType>)
  .location("<table_location>")
  .property("delta.autoOptimize.optimizeWrite", "true") 
  .execute()

既存のテーブルの場合:

  1. SQL
ALTER TABLE <table_name> SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)
  1. Scala

DeltaTableBuilder API を使用する場合:

val table = DeltaTable.replace()
  .tableName("<table_name>")
  .location("<table_location>")
  .property("delta.autoOptimize.optimizeWrite", "true") 
  .execute()

書き込みの最適化の現在の最大ファイル サイズ構成を取得および変更する方法

現在の構成値を取得するには、次のコマンドを使用します。 既定値は 128 MB です。

  1. Scala と PySpark
spark.conf.get("spark.microsoft.delta.optimizeWrite.binSize")
  1. SQL
SET `spark.microsoft.delta.optimizeWrite.binSize`
  • 構成値を変更する場合
  1. Scala と PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "134217728")
  1. SQL
SET `spark.microsoft.delta.optimizeWrite.binSize` = 134217728

次のステップ