Delta Lake には、選択的上書き用に次の個別のオプションがあります。
-
replaceWhereオプションは、特定の述語に一致するすべてのレコードをアトミックに置き換えます。 - 動的なパーティションの上書きを使用すると、テーブルのパーティション分割方法に基づいてデータのディレクトリを置き換えることができます。
Databricks では、ほとんどの操作で replaceWhere を使用して、上書きするデータを指定することをお勧めします。
Important
データが誤って上書きされた場合は、 復元 を使用して変更を元に戻すことができます。
replaceWhere を使用した任意の選択的上書き
任意の式に一致するデータだけを選択的に上書きすることができます。
Note
SQL には Databricks Runtime 12.2 LTS 以上が必要です。
たとえば、ターゲットテーブルをstart_dateでパーティション分割し、replace_data内のデータに基づいて1月のイベントをアトミックに置き換えます。
Python
(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.saveAsTable("events")
)
Scala
replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.saveAsTable("events")
SQL
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
このサンプル コードは、replace_data のデータを書き出し、そのすべての行が述語と一致することを検証し、overwrite セマンティックを使用してアトミックな置換を実行します。 操作内の値が制約の範囲外にある場合、この操作は既定でエラーが発生して失敗します。
この動作を、述語範囲内の overwrite 値と、指定された範囲外の insert レコードに変更できます。 これを行うには、次のいずれかの設定を使用して、spark.databricks.delta.replaceWhere.constraintCheck.enabled を false に設定して制約チェックを無効にします。
Python
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
Note
REPLACE WHERE は、いくつかの制限がある boolean_expression を受け入れます。 SQL 言語リファレンスの INSERT を参照してください。
従来の動作
replaceWhereの従来の動作を使用する場合、クエリはパーティション列に対してのみ述語と一致するデータを上書きします。 次のコマンドは、dateでパーティション分割されているターゲットテーブルの1月を、df内のデータでアトミックに置き換えます。
Python
(df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.saveAsTable("people10m")
)
Scala
df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.saveAsTable("people10m")
従来の動作を使用するには、 spark.databricks.delta.replaceWhere.dataColumns.enabled フラグを falseに設定します。
Python
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
パーティションの動的な上書き
動的パーティションの上書きは、書き込みが新しいデータをコミットし、他のパーティションを変更せずに残すパーティションのみを更新します。
Azure Databricksでは、動的パーティションの上書きをトリガーするために REPLACE USING をお勧めします。 このモードは、Databricks SQL ウェアハウス、サーバーレス コンピューティング、クラシック コンピューティングなど、すべてのコンピューティングの種類で機能し、Spark セッション構成を設定する必要はありません。
REPLACE USING
による動的パーティションの上書きを参照してください。
partitionOverwriteMode は、クラシック コンピューティングを使用して Spark セッション構成を設定する必要がある、動的パーティション上書きのレガシ モードです。 Databricks SQL またはサーバーレス コンピューティングではサポートされていません。
partitionOverwriteModeによる動的パーティションの上書き (レガシ) を参照してください。
以下のセクションでは、各モードの使用方法を示します。
REPLACE USING でのパーティションの動的な上書き
Databricks Runtime 16.3 以降では、 REPLACE USINGを使用したテーブルの動的パーティションの上書きがサポートされています。 Spark セッション構成を設定しなくても、すべてのコンピューティングの種類のデータを選択的に上書きできます。
REPLACE USING では、Databricks SQL ウェアハウス、サーバーレス コンピューティング、クラシック コンピューティングで動作する、コンピューティングに依存しないアトミック上書き動作が可能になります。
REPLACE USING は、受信データの対象となるパーティションのみを上書きします。 他のすべてのパーティションは変更されません。
REPLACE USING は SQL でのみサポートされています。 詳細については、SQL 言語リファレンスの INSERT を参照してください。
次の例では、 REPLACE USINGを使用します。
INSERT INTO TABLE events
REPLACE USING (event_id, start_date)
SELECT * FROM source_data
動的パーティションの上書きについては、次の制約と動作に留意してください。
-
USING句では、テーブルのパーティション列の完全なセットを指定する必要があります。 - 書き込まれたデータが予期されるパーティションのみに接していることを常に検証します。 間違ったパーティション内の 1 つの行によって、誤ってパーティション全体が上書きされる可能性があります。
REPLACE USING値を等しい値として扱うなど、NULLがサポートするロジックよりもカスタマイズ可能な照合ロジックが必要な場合は、代わりに補完的なREPLACE ONを使用します。 詳細については、INSERT を参照してください。
動的パーティションが partitionOverwriteMode で上書きされる (レガシ)
Important
この機能は パブリック プレビュー段階です。
Databricks Runtime 11.3 LTS 以降では、SQL で INSERT OVERWRITE するか、 df.write.mode("overwrite")を使用した DataFrame 書き込みのどちらかの上書きモードを使用したパーティション テーブルの動的パーティション上書きがサポートされています。 この種類の上書きは、Databricks SQL ウェアハウスやサーバーレス コンピューティングではなく、クラシック コンピューティングでのみ使用できます。
Warnung
可能な場合は、パーティションの上書きINSERT OVERWRITE REPLACE USINGとINSERT OVERWRITE PARTITIONの代わりにspark.sql.sources.partitionOverwriteMode=dynamicを使用します。 パーティション分割の変更時には、古いデータが使用される場合があります。
動的パーティション上書きモードを使用するには、Spark セッション構成 spark.sql.sources.partitionOverwriteMode を dynamicに設定します。 または、 DataFrameWriter オプション partitionOverwriteMode を dynamic に設定することもできます。 存在する場合、セッション構成で定義されているモードは、クエリ固有のオプションによりオーバーライドされます。
spark.sql.sources.partitionOverwriteMode の既定値は static です。
partitionOverwriteMode の使用例を次に示します。
SQL
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
Python
(df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
)
Scala
df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
partitionOverwriteModeについては、次の制約と動作に留意してください。
-
overwriteSchemaをtrueに設定することはできません。 - 同じ
partitionOverwriteMode操作でreplaceWhereとDataFrameWriterの両方を指定することはできません。 -
replaceWhereオプションを使用してDataFrameWriter条件を指定した場合、Delta Lake はその条件を適用して、上書きするデータを制御します。 このオプションは、partitionOverwriteModeセッション レベルの構成よりも優先されます。 - 書き込まれたデータが予期されるパーティションのみに接していることを常に検証します。 間違ったパーティション内の 1 つの行によって、誤ってパーティション全体が上書きされる可能性があります。