Delta Lake を使用してデータを選択的に上書きする

Azure Databricks は Delta Lake 機能を利用して、選択的上書きの 2 つの異なるオプションをサポートします。

  • replaceWhere オプションは、特定の述語に一致するすべてのレコードをアトミックに置き換えます。
  • 動的なパーティションの上書きを使用すると、テーブルのパーティション分割方法に基づいてデータのディレクトリを置き換えることができます。

Databricks では、ほとんどの操作で replaceWhere を使用して、上書きするデータを指定することをお勧めします。

重要

データが誤って上書きされた場合は、復元を使用して変更を元に戻すことができます。

replaceWhere を使用した任意の選択的上書き

任意の式に一致するデータだけを選択的に上書きすることができます。

Note

SQL には、Databricks Runtime 12.2 LTS 以降が必要です。

次のコマンドは、start_date によってパーティション分割されているターゲット テーブルの 1 月のイベントを、replace_data のデータにアトミックに置き換えます。

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

このサンプル コードでは、replace_data にデータを書き出し、すべての行が述語に一致することを検証し、overwrite セマンティクスを使用してアトミックな置換を実行します。 操作内の値が制約の範囲外にある場合、この操作は既定でエラーが発生して失敗します。

この動作を、述語範囲内の overwrite 値と、指定された範囲外の insert レコードに変更できます。 これを行うには、次のいずれかの設定を使用して、spark.databricks.delta.replaceWhere.constraintCheck.enabled を false に設定して制約チェックを無効にします。

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

従来の動作

従来の既定の動作では、replaceWhere では、述語に一致するデータがパーティション列だけで上書きされていました。 このレガシ モデルでは、次のコマンドは、date によってパーティション分割されているターゲット テーブルの 1 月の月を、df のデータにアトミックに置き換えていました。

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")

以前の動作に戻したい場合は、spark.databricks.delta.replaceWhere.dataColumns.enabled フラグを無効にすることができます。

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

パーティションの動的な上書き

重要

この機能はパブリック プレビュー段階にあります。

Databricks Runtime 11.3 LTS 以降では、パーティション テーブルのパーティションの "動的な" 上書きモードがサポートされています。 複数のパーティションを持つテーブルの場合、Databricks Runtime 11.3 LTS 以下では、すべてのパーティション列が同じデータ型の場合にのみ、パーティションの動的な上書きがサポートされます。

パーティションの動的な上書きモードでは、書き込みが新しいデータをコミットする各論理パーティション内のすべての既存データが上書きされます。 書き込みにデータが含まれていない既存の論理パーティションは変更されません。 このモードは、データが上書きモードで書き込まれる SQL の INSERT OVERWRITE または df.write.mode("overwrite") の DataFrame 書き込みのいずれかの場合にのみ適用されます。

パーティションの動的な上書きモードを構成するには、Spark セッションを spark.sql.sources.partitionOverwriteMode から dynamic に構成します。 これは、DataFrameWriter オプションの partitionOverwriteModedynamic に設定することによっても有効にできます。 存在する場合、セッション構成で定義されているモードは、クエリ固有のオプションによりオーバーライドされます。 partitionOverwriteMode の規定値は static です。

重要

パーティションの動的な上書きで、期待しているパーティションにのみデータが書き込まれれることを確認してください。 正しくないパーティション内の 1 つの行によって、意図せずにパーティション全体が上書きされる可能性があります。

パーティションの動的な上書きの使用例は次のとおりです。

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

注意

  • パーティションの動的な上書きは、パーティション テーブルのオプション replaceWhere と競合します。
    • Spark セッション構成でパーティションの動的な上書きが有効になっていて、replaceWhereDataFrameWriter オプションとして指定されている場合、Delta Lake は replaceWhere 式に従ってデータを上書きします (セッション構成はクエリ固有のオプションによりオーバーライドされます)。
    • DataFrameWriter オプションで、パーティションの動的な上書きと replaceWhere が両方とも有効になっている場合、エラーが表示されます。
  • パーティションの動的な上書きを使用する場合は、overwriteSchematrue として指定することはできません。