Delta Lake のテーブル スキーマを更新する
Delta Lake を使用すると、テーブルのスキーマを更新できます。 サポートされている変更の種類は次のとおりです。
- 新しい列を追加する (任意の位置で)
- 既存の列を並べ替える
- 既存の列の名前を変更する
これらの変更は、DDL を使用して明示的に行うか、DML を使用して暗黙的に行うことができます。
重要
Delta テーブル スキーマの更新は、すべての同時差分書き込み操作と競合する操作です。
Delta テーブル スキーマを更新すると、そのテーブルから読み取られたストリームが終了します。 ストリームを継続したい場合は、再起動する必要があります。 推奨される方法については、「Azure Databricks での構造化ストリーミング アプリケーションの運用に関する考慮事項」を参照してください。
スキーマを明示的に更新して列を追加する
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
既定では、null 値の許容は true
です。
入れ子になったフィールドに列を追加するには、次のように入力します。
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
たとえば、ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)
を実行する前のスキーマが次の場合:
- root
| - colA
| - colB
| +-field1
| +-field2
後のスキーマは次のようになります。
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
Note
入れ子になった列の追加がサポートされているのは構造体の場合だけです。 配列とマップはサポートされていません。
スキーマを明示的に更新して列のコメントまたは順序を変更する
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
入れ子になったフィールドの列を変更するには、次のように入力します。
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
たとえば、ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST
を実行する前のスキーマが次の場合:
- root
| - colA
| - colB
| +-field1
| +-field2
後のスキーマは次のようになります。
- root
| - colA
| - colB
| +-field2
| +-field1
スキーマを明示的に更新して列を置き換える
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
たとえば、次の DDL を実行する場合:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
前のスキーマが次の場合:
- root
| - colA
| - colB
| +-field1
| +-field2
後のスキーマは次のようになります。
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
スキーマを明示的に更新して列の名前を変更する
Note
この機能は、Databricks Runtime 10.4 LTS 以降で使用できます。
列の既存データを書き直すことなく列の名前を変更するには、テーブルの列マッピングを有効にする必要があります。 「Delta Lake の列マッピングを使用して列の名前変更と削除を行う」をご覧ください。
列名を変更するには:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
入れ子になったフィールドの名前を変更するには:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
たとえば、次のコマンドを実行する場合:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
前のスキーマが次の場合:
- root
| - colA
| - colB
| +-field1
| +-field2
後のスキーマは次のようになります。
- root
| - colA
| - colB
| +-field001
| +-field2
「Delta Lake の列マッピングを使用して列の名前変更と削除を行う」をご覧ください。
スキーマを明示的に更新して列を削除する
Note
この機能は、Databricks Runtime 11.3 LTS 以降で使用できます。
データ ファイルを書き換えずに列をメタデータのみの操作として削除するには、テーブルの列マッピングを有効にする必要があります。 「Delta Lake の列マッピングを使用して列の名前変更と削除を行う」をご覧ください。
重要
メタデータから列を削除しても、ファイル内の列の基になるデータは削除されません。 削除された列データを消去するには、REORG TABLE を使用してファイルを書き換えます。 その後、VACUUM を使用して、削除された列データを含むファイルを物理的に削除できます。
列を削除するには
ALTER TABLE table_name DROP COLUMN col_name
複数の列を削除するには
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
スキーマを明示的に更新して列の種類または名前を変更する
テーブルを書き直すことで、列の型または名前を変更することも、列を破棄することもできます。 これを行うには、overwriteSchema
オプションを使用します。
次の例では、列の種類の変更を示しています。
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
次の例では、列の名前の変更を示しています。
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
スキーマの展開を有効にする
スキーマの展開を有効にするには、次のいずれかの操作を行います。
.option("mergeSchema", "true")
を Spark DataFramewrite
またはwriteStream
操作に設定します。 「新しい列を追加する書き込みについては、スキーマの展開を有効にする」を参照してください。MERGE WITH SCHEMA EVOLUTION
構文を使用する。 「マージ用のスキーマ展開構文」を参照してください。- Spark 構成
spark.databricks.delta.schema.autoMerge.enabled
を現在の SparkSession のtrue
に設定します。
Databricks では、Spark 構成を設定するのではなく、書き込み操作ごとにスキーマの展開を有効にすることをお勧めします。
書き込み操作でスキーマの展開を有効にするためにオプションまたは構文を使用する場合、これは Spark 構成よりも優先されます。
Note
INSERT INTO
ステートメントのスキーマ展開句はありません。
新しい列を追加する書き込みについては、スキーマの展開を有効にする
ソース クエリに存在するがターゲット テーブルにない列は、スキーマの展開を有効にした場合に、書き込みトランザクションの一部として自動的に追加されます。 スキーマ展開を有効にする
新しい列を追加するとき、大文字と小文字が維持されます。 新しい列はテーブル スキーマの末尾に追加されます。 追加の列が構造体内にある場合は、ターゲット テーブルの構造体の末尾に追加されます。
次の例では、自動ローダーで mergeSchema
オプションを使用する方法を示します。 「自動ローダー」を参照してください。
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
次の例では、バッチ書き込み操作で mergeSchema
オプションを使用する方法を示します。
(spark.read
.table(source_table)
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("table_name")
)
Delta Lake マージの自動スキーマの展開
スキーマの展開により、ユーザーはマージ時のターゲット テーブルとソース テーブルのスキーマの不一致を解決できます。 これにより、次の 2 つのケースが処理されます。
- ソース テーブルの列がターゲット テーブルに存在しません。 新しい列がターゲット スキーマに追加され、その値がソース値を使用して挿入または更新されます。
- ターゲット テーブルの列がソース テーブルに存在しません。 ターゲット スキーマは変更されません。追加のターゲット列の値は、
UPDATE
の場合は変更されず、INSERT
の場合はNULL
に設定されます。
スキーマの自動展開を手動で有効にする必要があります。 「スキーマの展開を有効にする」を参照してください。
Note
Databricks Runtime 12.2 LTS 以降では、ソース テーブルに存在する列と構造体フィールドは、挿入または更新の各アクションで、名前で指定できます。 Databricks Runtime 11.3 LTS 以下では、マージを使用したスキーマの展開には、INSERT *
または UPDATE SET *
アクションのみを使用できます。
Databricks Runtime 13.3 LTS 以降では、map<int, struct<a: int, b: int>>
などのマップ内に入れ子になった構造体でスキーマの展開を使用できます。
マージ用のスキーマ展開構文
Databricks Runtime 15.2 以降では、SQL あるいは Delta Table API を使用して、マージ ステートメントでスキーマの展開を指定できます。
SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Python
from delta.tables import *
(targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
import io.delta.tables._
targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
スキーマの展開によるマージの操作例
スキーマの展開がある場合とない場合の merge
操作の効果を、次の例に示します。
[列] | クエリ (SQL の場合) | スキーマ展開のない動作 (既定) | スキーマ展開のある動作 |
---|---|---|---|
ターゲット列: key, value ソース列: key, value, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
テーブル スキーマは変更されません。列 key 、value のみが更新または挿入されます。 |
テーブル スキーマが (key, value, new_value) に変更されます。 一致する既存のレコードは、ソースの value と new_value を使用して更新されます。 スキーマ (key, value, new_value) と共に新しい行が挿入されます。 |
ターゲット列: key, old_value ソース列: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
UPDATE と INSERT アクションは、ターゲット列 old_value がソース内にないため、エラーをスローします。 |
テーブル スキーマが (key, old_value, new_value) に変更されます。 一致する既存のレコードは、old_value は変更されずに、ソースの new_value を使用して更新されます。 新しいレコードは、指定した key 、new_value (old_value には NULL ) を使用して挿入されます。 |
ターゲット列: key, old_value ソース列: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET new_value = s.new_value |
UPDATE は、ターゲット テーブルに列 new_value が存在しないため、エラーをスローします。 |
テーブル スキーマが (key, old_value, new_value) に変更されます。 一致する既存のレコードは、old_value は変更されずに、ソースの new_value を使用して更新されます。不一致のレコードでは、new_value に NULL が入力されます。 注 (1) を参照。 |
ターゲット列: key, old_value ソース列: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN NOT MATCHED THEN INSERT (key, new_value) VALUES (s.key, s.new_value) |
INSERT は、ターゲット テーブルに列 new_value が存在しないため、エラーをスローします。 |
テーブル スキーマが (key, old_value, new_value) に変更されます。 新しいレコードは、指定した key 、new_value (old_value には NULL ) を使用して挿入されます。 既存のレコードでは、old_value は変更されず、new_value に NULL が入力されます。 注 (1) を参照。 |
(1) この動作は、Databricks Runtime 12.2 LTS 以降で使用できます。Databricks Runtime 11.3 LTS 以下では、この条件はエラーになります。
Delta Lake マージを使用して列を除外する
Databricks Runtime 12.2 LTS 以降では、マージ条件で EXCEPT
句を使用して列を明示的に除外できます。 EXCEPT
キーワードの動作は、スキーマの展開が有効になっているかどうかによって異なります。
スキーマの展開を無効にすると、EXCEPT
キーワードがターゲット テーブル内の列のリストに適用され、UPDATE
アクションまたは INSERT
アクションから列を除外できます。 除外された列は null
に設定されます。
スキーマの展開を有効にすると、EXCEPT
キーワードがソース テーブル内の列のリストに適用され、スキーマの展開から列を除外できます。 ターゲット内に存在しないソース内の新しい列は、EXCEPT
句にリストされている場合、ターゲット スキーマに追加されません。 ターゲットに既に存在する除外された列は、null
に設定されます。
この構文の例を次に示します。
列 | クエリ (SQL の場合) | スキーマ展開のない動作 (既定) | スキーマ展開のある動作 |
---|---|---|---|
ターゲット列: id, title, last_updated ソース列: id, title, review, last_updated |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated) |
一致した行は、last_updated フィールドを現在の日付に設定して更新されます。 id とtitle の値を使用して、新しい行が挿入されます。 除外されたフィールド last_updated は null に設定されます。 フィールド review はターゲット内にないため無視されます。 |
一致した行は、last_updated フィールドを現在の日付に設定して更新されます。 スキーマが展開され、フィールド review が追加されます。 null に設定されている last_updated を除き、すべてのソース フィールドを使用して新しい行が挿入されます。 |
ターゲット列: id, title, last_updated ソース列: id, title, review, internal_count |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated, internal_count) |
INSERT は、ターゲット テーブルに列 internal_count が存在しないため、エラーをスローします。 |
一致した行は、last_updated フィールドを現在の日付に設定して更新されます。 review フィールドはターゲット テーブルに追加されますが、internal_count フィールドは無視されます。 新しく挿入された行の last_updated は null に設定されています。 |
スキーマ更新での NullType
列の処理
Parquet では NullType
がサポートされていないため、Delta テーブルへの書き込み時に DataFrame から NullType
列が破棄されますが、スキーマにはまだ格納されています。 その列に対して別のデータ型を受け取ったとき、Delta Lake はスキーマを新しいデータ型にマージします。 Delta Lake が既存の列に対して NullType
を受け取ると、古いスキーマは保持され、新しい列は書き込み中に破棄されます。
ストリーミングにおける NullType
がサポートされていません。 ストリーミングを使用するときはスキーマを設定する必要があるため、これは非常にまれです。 NullType
が ArrayType
や MapType
などの複合型でも受け入れられません。
テーブル スキーマの置換
既定では、テーブル内のデータを上書きしてもスキーマは上書きされません。 replaceWhere
のない mode("overwrite")
を使用して テーブルを上書きするとき、書き込まれているデータのスキーマを上書きすることもできます。 テーブルのスキーマとパーティション分割を置き換えるには、overwriteSchema
オプションを true
に設定します。
df.write.option("overwriteSchema", "true")
重要
動的パーティションの上書きを使用する場合は、overwriteSchema
を true
として指定することはできません。