ベスト プラクティス: Delta Lake

この記事では、Delta Lake を使用する際のベスト プラクティスについて説明します。

Databricks では、予測最適化を使うことをお勧めします。 「Delta Lake の予測最適化」を参照してください。

同じ場所でテーブルを削除して再作成する場合は、常に CREATE OR REPLACE TABLE ステートメントを使う必要があります。 「Delta テーブルのドロップまたは置き換え」を参照してください。

最適化されたデータ スキップにリキッド クラスタリングを使用する

Databricks は、データ スキップ用にデータ レイアウトを最適化するために、パーティション分割、Z オーダー、またはその他のデータ編成戦略ではなく、リキッド クラスタリングを使用することをお勧めしています。 詳しくは、「Delta テーブルにリキッド クラスタリングを使用する」をご覧ください。

ファイルを圧縮する

予測最適化により、Unity Catalo のマネージド テーブルに対して OPTIMIZEVACUUM のコマンドが自動的に実行されます。 「Delta Lake の予測最適化」を参照してください。

Databricks は、OPTIMIZE コマンドを頻繁に実行して、小さなファイルを圧縮することをお勧めしています。

Note

この操作では、古いファイルは削除されません。 それらを削除するには、VACUUM コマンドを実行します。

テーブルの内容またはスキーマを置き換える

場合によっては、Delta テーブルを置き換えたい場合があります。 次に例を示します。

  • テーブル内のデータが間違っていて、内容を差し替える場合。
  • 互換性のないスキーマ変更 (列の型の変更など) を行うために、テーブル全体を書き換えたい場合。

Delta テーブルのディレクトリ全体を削除し、同じパスに新しいテーブルを作成することはできますが、次の理由からお勧めされません

  • ディレクトリの削除は効率的ではありません。 非常に大きなファイルを含むディレクトリは、削除に数時間または数日かかることがあります。
  • 削除されたファイルの内容はすべて失われます。間違ったテーブルを削除すると、復旧は困難です。
  • ディレクトリの削除はアトミックではありません。 テーブルを削除している間に、テーブルを読み込んでいる同時クエリが失敗したり、テーブルの一部が表示されたりすることがあります。

テーブル スキーマを変更する必要がない場合は、Delta テーブルからデータを削除して新しいデータを挿入するか、テーブルを更新して不正な値を修正することができます。

テーブル スキーマを変更する場合は、テーブル全体をアトミックに置き換えることができます。 次に例を示します。

Python

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("<your-table>") # Managed table

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .option("path", "<your-table-path>") \
  .saveAsTable("<your-table>") # External table

SQL

REPLACE TABLE <your-table> USING DELTA AS SELECT ... -- Managed table
REPLACE TABLE <your-table> USING DELTA LOCATION "<your-table-path>" AS SELECT ... -- External table

Scala

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable("<your-table>") // Managed table

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .option("path", "<your-table-path>")
  .saveAsTable("<your-table>") // External table

この方法にはいくつかの利点があります。

  • ディレクトリを再帰的に一覧表示したり、ファイルを削除したりする必要がないため、テーブルの上書きをより高速に実行できます。
  • 以前のバージョンのテーブルが残されます。 間違ったテーブルを削除しても、タイム トラベルを使用して以前のデータを簡単に取得できます。 「Delta Lake テーブル履歴を操作する」を参照してください。
  • これはアトミックな操作です。 テーブルを削除している間も、同時クエリでテーブルを読み取ることができます。
  • Delta Lake の ACID トランザクション保証により、テーブルの上書きに失敗した場合、テーブルは以前の状態になります。

また、テーブルの上書き後にストレージ コストを削減するために古いファイルを削除したい場合は、VACUUM を使用してファイルを削除できます。 これはファイル削除用に最適化されており、通常はディレクトリ全体の削除よりも高速です。

Spark キャッシュ

Databricks では、次の理由で、Spark キャッシュの使用はお勧めしません。

  • キャッシュされた DataFrame に追加されたフィルターによって発生するデータのスキップは失われます。
  • 別の識別子を使用してテーブルにアクセスした場合、キャッシュされたデータが更新されないことがあります。

Apache Spark での Delta Lake と Parquet の違い

Delta Lake では、次の操作が自動的に処理されます。 これらの操作は手動で実行しないでください。

  • REFRESH TABLE: 差分テーブルは常に最新の情報を返します。そのため、変更後に REFRESH TABLE を手動で呼び出す必要はありません。
  • パーティションの追加と削除: Delta Lake は、テーブルに存在するパーティションのセットを自動的に追跡し、データが追加または削除されると一覧を更新します。 そのため、ALTER TABLE [ADD|DROP] PARTITION または MSCK を実行する必要はありません。
  • 1 つのパーティションを読み込む: パーティションを直接読み取る必要はありません。 たとえば、spark.read.format("parquet").load("/data/date=2017-01-01") を実行する必要はありません。 代わりに、データのスキップに WHERE 句を使用します (例: spark.read.table("<table-name>").where("date = '2017-01-01'"))。
  • データ ファイルを手動で変更しない: Delta Lake では、トランザクション ログを使用してテーブルへの変更をアトミックにコミットします。 Delta テーブル内の Parquet データ ファイルを直接変更、追加、または削除しないでください。これは、データの損失やテーブルの破損につながる可能性があるためです。

Delta Lake マージのパフォーマンスを向上させる

以下の方法を使用することで、マージに必要な時間を短縮できます。

  • 一致に対する検索領域を減らす: 既定では、merge 操作は Delta テーブル全体を検索してソース テーブル内の一致を検索します。 merge を高速化する方法の 1 つは、一致条件に既知の制約を追加して検索領域を減らすことです。 たとえば、countrydate でパーティション分割されたテーブルがあり、merge を使用して最終日と特定の国の情報を更新するとします。 次の条件を追加すると、関連するパーティションでのみ一致が検索されるため、クエリが高速になります。

    events.date = current_date() AND events.country = 'USA'
    

    さらに、このクエリでは、他の同時実行操作と競合する可能性も減少します。 詳細については、Azure Databricks での分離レベルと書き込みの競合に関するページを参照してください。

  • ファイルを圧縮する: データが多数の小さなファイルに保存されている場合、一致を検索するためのデータの読み取りが遅くなる可能性があります。 小さなファイルを大きなファイルに圧縮して、読み取りスループットを向上させることができます。 詳細については、「Delta Lake で optimize を使用してデータ ファイルを圧縮する」を参照してください。

  • 書き込みのシャッフル パーティションを制御する: merge 操作では、データを複数回シャッフルして、更新されたデータを計算して書き込みます。 シャッフルに使用されるタスクの数は、Spark セッション構成 spark.sql.shuffle.partitions で制御します。 このパラメーターを設定することで、並列処理を制御できるだけでなく、出力ファイルの数も決定します。 値を大きくすると並列処理が増加しますが、より多くの小さなデータ ファイルも生成されます。

  • 最適化された書き込みを有効にする: パーティション テーブルの場合、merge はシャッフル パーティションの数よりもはるかに多くの小さなファイルを生成できます。 これは、すべてのシャッフル タスクが複数のパーティションに複数のファイルを書き込むことで、パフォーマンスのボトルネックになる可能性があるためです。 [最適化された書き込み] を有効にすると、ファイルの数を減らせます。 「Delta Lake on Azure Databricks の最適化された書き込み」を参照してください。

  • テーブル内のファイル サイズを調整する: Azure Databricks では、Delta テーブルでファイルを書き換える merge 操作が頻繁に行われている場合にそれを自動的に検出し、今後のファイルの書き換えを予想して、書き換えられるファイルのサイズを小さくすることを選択する場合があります。 詳細については、ファイル サイズのチューニングに関するセクションを参照してください。

  • 低シャッフル マージ: 低シャッフル マージによって MERGE の実装が最適化され、最も一般的なワークロードのパフォーマンスが向上します。 さらに、変更されていないデータに対して、Z オーダーなどの既存のデータ レイアウトの最適化が保持されます。

データの最新性を管理する

Delta テーブルは、各クエリの開始時にそのテーブルの最新バージョンに自動更新されます。 このプロセスは、コマンドの状態が Updating the Delta table's state と報告されているときに、ノートブックで確認できます。 ただし、テーブルに対する履歴分析を実行している場合、特にストリーミング データが頻繁に取り込まれているテーブルでは、必ずしも最新のデータが必要とは限りません。 このような場合、Delta テーブルの古いスナップショットでクエリを実行できます。 このアプローチを使用すると、クエリの結果を受け取るときの待機時間を短縮できます。

Spark セッション構成 spark.databricks.delta.stalenessLimit1h15m などの時間文字列値 (それぞれ 1 時間、15 分の意味) を設定することでデータの陳腐化までの耐久時間を構成できます。 この構成はセッション固有であり、テーブルにアクセスしている他のクライアントには影響しません。 テーブルの状態が更新されたのが陳腐化までの耐久時間内である場合、テーブルに対するクエリは、最新のテーブルの更新を待たずに結果を返します。 この設定により、テーブルの更新が妨げられることはなく、陳腐化したデータが返される場合、更新はバックグラウンドで処理されます。 テーブルの最後の更新が陳腐化までの耐久時間より前である場合、クエリはテーブルの状態の更新が完了するまで結果を返しません。

待機時間の短いクエリに対する拡張チェックポイント

Delta Lake では、最適化された頻度で、Delta テーブルの集計状態としてチェックポイントを書き込みます。 これらのチェックポイントは、テーブルの最新の状態を計算するときの開始点の役目を果たします。 チェックポイントがない場合、Delta Lake では、テーブルの状態を計算するために、トランザクション ログへのコミットを表す JSON ファイル ("デルタ" ファイル) の大規模なコレクションを読み取る必要があります。 また、Delta Lake でデータのスキップを実行するために使用される列レベルの統計はチェックポイントに格納されます。

重要

Delta Lake のチェックポイントは、構造化ストリーミングのチェックポイントとは異なります。

列レベルの統計値は構造体および JSON (後方互換のため) として格納されます。 構造体形式では、Delta Lake による読み取りが大幅に高速化されます。理由は次のとおりです。

  • Delta Lake では、列レベルの統計を取得するために、コストの高い JSON 解析を行いません。
  • Parquet 列の排除機能を使用すると、列の統計の読み取りに必要な I/O が大幅に削減されます。

構造体形式を使用すると、Delta Lake の読み取り操作のオーバーヘッドを数秒から数十ミリ秒にまで削減する最適化コレクションが実現します。これにより、短いクエリでの待機時間が大幅に短縮されます。

チェックポイントで列レベルの統計を管理する

チェックポイントで統計がどのように書き込まれるかを管理するには、テーブル プロパティ delta.checkpoint.writeStatsAsJsondelta.checkpoint.writeStatsAsStruct を使用します。 両方のテーブル プロパティが false の場合、Delta Lake ではデータのスキップを行うことは "できません"。

  • バッチ書き込みでは、統計を JSON および構造体の両方の形式で書き込みます。 delta.checkpoint.writeStatsAsJsontrue です。
  • delta.checkpoint.writeStatsAsStruct は、既定では未定義になっています。
  • リーダーでは、使用できる場合は構造体列を使用し、そうでない場合はフォール バックして JSON 列を使用します。

重要

拡張チェックポイントによって、オープン ソースの Delta Lake リーダーとの互換性は失われません。 ただし、delta.checkpoint.writeStatsAsJsonfalse に設定すると、専用の Delta Lake リーダーに影響が生じる場合があります。 パフォーマンスへの影響の詳細については、ベンダーにお問い合わせください。

構造化ストリーミング クエリの拡張チェックポイントを有効にする

構造化ストリーミング ワークロードに短い待機時間 (待機時間 1 分以下) の要件がない場合は、次の SQL コマンドを実行して拡張チェックポイントを有効にすることができます。

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')

次のテーブル プロパティを設定することでチェックポイントの書き込み遅延を短縮することもできます。

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
(
 'delta.checkpoint.writeStatsAsStruct' = 'true',
 'delta.checkpoint.writeStatsAsJson' = 'false'
)

お使いのアプリケーションでデータのスキップを利用する必要がない場合は、両方のプロパティを false に設定できます。 その場合、統計は収集も書き込みもされません。 Databricks にはこの構成はお勧めしません。