この記事では、Delta Lake を使用する際のベスト プラクティスについて説明します。
ベスト プラクティスの概要
ほとんどの Delta Lake ワークロードに適用される一般的な推奨事項を次に示します。
- Unity カタログのマネージド テーブルを使用します。 Delta Lake と Apache Iceberg 用の Azure Databricks の Unity カタログマネージド テーブルを参照してください。
- 予測最適化を使用します。 「Unity Catalog 管理テーブルの予測最適化」を参照してください。
- 液体クラスタリングを使用します。 表に液体クラスタリングを使用するを参照してください。
- 同じ場所でテーブルを削除して再作成する場合は、常に
CREATE OR REPLACE TABLE
ステートメントを使う必要があります。 「Delta テーブルのドロップまたは置き換え」を参照してください。
従来の Delta 構成を削除する
Databricks では、新しい Databricks Runtime のバージョンにアップグレードするときに、Spark 構成とテーブル プロパティから最も明示的な従来の Delta 構成を削除することをお勧めします。 レガシ構成では、Databricks によって導入された新しい最適化と既定値が移行されたワークロードに適用されないようにすることができます。
ファイルを圧縮する
予測最適化により、Unity Catalo のマネージド テーブルに対して OPTIMIZE
と VACUUM
のコマンドが自動的に実行されます。 「Unity Catalog 管理テーブルの予測最適化」を参照してください。
Databricks では、小さなファイルを圧縮するために、OPTIMIZE コマンドを頻繁に実行することをお勧めします。
注
この操作では、古いファイルは削除されません。 削除するには、VACUUM コマンドを実行します。
Delta Lake で Spark キャッシュを使用しない
Databricks では、次の理由で、Spark キャッシュの使用はお勧めしません。
- キャッシュされた
DataFrame
に追加されたフィルターによって発生するデータのスキップは失われます。 - 別の識別子を使用してテーブルにアクセスした場合、キャッシュされたデータが更新されないことがあります。
Apache Spark での Delta Lake と Parquet の違い
Delta Lake では、次の操作が自動的に処理されます。 これらの操作は手動で実行しないでください。
-
REFRESH TABLE
: 差分テーブルは常に最新の情報を返します。そのため、変更後にREFRESH TABLE
を手動で呼び出す必要はありません。 -
パーティションの追加と削除: Delta Lake は、テーブルに存在するパーティションのセットを自動的に追跡し、データが追加または削除されると一覧を更新します。 そのため、
ALTER TABLE [ADD|DROP] PARTITION
またはMSCK
を実行する必要はありません。 -
1 つのパーティションを読み込む: パーティションを直接読み取る必要はありません。 たとえば、
spark.read.format("parquet").load("/data/date=2017-01-01")
を実行する必要はありません。 代わりに、データのスキップにWHERE
句を使用します (例:spark.read.table("<table-name>").where("date = '2017-01-01'")
)。 - データ ファイルを手動で変更しないでください。Delta Lake はトランザクション ログを使用してテーブルへの変更をアトミックにコミットします。 Delta テーブル内の Parquet データ ファイルを直接変更、追加、または削除しないでください。これは、データの損失やテーブルの破損につながる可能性があるためです。
Delta Lake マージのパフォーマンスを向上させる
以下の方法を使用することで、マージに必要な時間を短縮できます。
一致に対する検索領域を減らす: 既定では、
merge
操作は Delta テーブル全体を検索してソース テーブル内の一致を検索します。merge
を高速化する方法の 1 つは、一致条件に既知の制約を追加して検索領域を減らすことです。 たとえば、country
とdate
でパーティション分割されたテーブルがあり、merge
を使用して最終日と特定の国の情報を更新するとします。 次の条件を追加すると、関連するパーティションでのみ一致が検索されるため、クエリが高速になります。events.date = current_date() AND events.country = 'USA'
さらに、このクエリでは、他の同時実行操作と競合する可能性も減少します。 詳細については、Azure Databricks での分離レベルと書き込みの競合に関するページを参照してください。
ファイルを圧縮する: データが多数の小さなファイルに保存されている場合、一致を検索するためのデータの読み取りが遅くなる可能性があります。 小さなファイルを大きなファイルに圧縮して、読み取りスループットを向上させることができます。 詳細については、「データ ファイル レイアウトの最適化」を参照してください。
書き込みのシャッフル パーティションを制御する:
merge
操作では、データを複数回シャッフルして、更新されたデータを計算して書き込みます。 シャッフルに使用されるタスクの数は、Spark セッション構成spark.sql.shuffle.partitions
で制御します。 このパラメーターを設定することで、並列処理を制御できるだけでなく、出力ファイルの数も決定します。 値を大きくすると並列処理が増加しますが、より多くの小さなデータ ファイルも生成されます。最適化された書き込みを有効にする: パーティション テーブルの場合、
merge
はシャッフル パーティションの数よりもはるかに多くの小さなファイルを生成できます。 これは、すべてのシャッフル タスクが複数のパーティションに複数のファイルを書き込むことで、パフォーマンスのボトルネックになる可能性があるためです。 [最適化された書き込み] を有効にすると、ファイルの数を減らせます。 「Delta Lake on Azure Databricks の最適化された書き込み」を参照してください。テーブル内のファイル サイズを調整する: Azure Databricks では、Delta テーブルでファイルを書き換える
merge
操作が頻繁に行われている場合にそれを自動的に検出し、今後のファイルの書き換えを予想して、書き換えられるファイルのサイズを小さくすることを選択する場合があります。 詳細については、ファイル サイズのチューニングに関するセクションを参照してください。低シャッフル マージ: 低シャッフル マージによって
MERGE
の実装が最適化され、最も一般的なワークロードのパフォーマンスが向上します。 さらに、変更されていないデータに対して、Z オーダーなどの既存のデータ レイアウトの最適化が保持されます。