Delta Lake のテーブル スキーマを更新する

Delta Lake を使用すると、テーブルのスキーマを更新できます。 サポートされている変更の種類は次のとおりです。

  • 新しい列を追加する (任意の位置で)
  • 既存の列を並べ替える
  • 既存の列の名前を変更する

これらの変更は、DDL を使用して明示的に行うか、DML を使用して暗黙的に行うことができます。

重要

Delta テーブル スキーマの更新は、すべての同時差分書き込み操作と競合する操作です。

Delta テーブル スキーマを更新すると、そのテーブルから読み取られたストリームが終了します。 ストリームを継続したい場合は、再起動する必要があります。 推奨される方法については、「Azure Databricks での構造化ストリーミング アプリケーションの運用に関する考慮事項」を参照してください。

スキーマを明示的に更新して列を追加する

ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

既定では、null 値の許容は true です。

入れ子になったフィールドに列を追加するには、次のように入力します。

ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

たとえば、ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) を実行する前のスキーマが次の場合:

- root
| - colA
| - colB
| +-field1
| +-field2

後のスキーマは次のようになります。

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

Note

入れ子になった列の追加がサポートされているのは構造体の場合だけです。 配列とマップはサポートされていません。

スキーマを明示的に更新して列のコメントまたは順序を変更する

ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)

入れ子になったフィールドの列を変更するには、次のように入力します。

ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)

たとえば、ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST を実行する前のスキーマが次の場合:

- root
| - colA
| - colB
| +-field1
| +-field2

後のスキーマは次のようになります。

- root
| - colA
| - colB
| +-field2
| +-field1

スキーマを明示的に更新して列を置き換える

ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

たとえば、次の DDL を実行する場合:

ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

前のスキーマが次の場合:

- root
| - colA
| - colB
| +-field1
| +-field2

後のスキーマは次のようになります。

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

スキーマを明示的に更新して列の名前を変更する

重要

この機能はパブリック プレビュー段階にあります。

Note

この機能は、Databricks Runtime 10.4 LTS 以降で使用できます。

列の既存データを書き直すことなく列の名前を変更するには、テーブルの列マッピングを有効にする必要があります。 「Delta Lake の列マッピングを使用して列の名前変更と削除を行う」をご覧ください。

列名を変更するには:

ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name

入れ子になったフィールドの名前を変更するには:

ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field

たとえば、次のコマンドを実行する場合:

ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

前のスキーマが次の場合:

- root
| - colA
| - colB
| +-field1
| +-field2

後のスキーマは次のようになります。

- root
| - colA
| - colB
| +-field001
| +-field2

Delta Lake の列マッピングを使用して列の名前変更と削除を行う」をご覧ください。

スキーマを明示的に更新して列を削除する

重要

この機能はパブリック プレビュー段階にあります。

Note

この機能は、Databricks Runtime 11.3 LTS 以降で使用できます。

データ ファイルを書き換えずに列をメタデータのみの操作として削除するには、テーブルの列マッピングを有効にする必要があります。 「Delta Lake の列マッピングを使用して列の名前変更と削除を行う」をご覧ください。

重要

メタデータから列を削除しても、ファイル内の列の基になるデータは削除されません。 削除された列データを消去するには、REORG TABLE を使用してファイルを書き換えます。 その後、VACUUM を使用して、削除された列データを含むファイルを物理的に削除できます。

列を削除するには

ALTER TABLE table_name DROP COLUMN col_name

複数の列を削除するには

ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)

スキーマを明示的に更新して列の種類または名前を変更する

テーブルを書き直すことで、列の型または名前を変更することも、列を破棄することもできます。 これを行うには、overwriteSchema オプションを使用します。

次の例では、列の種類の変更を示しています。

(spark.read.table(...)
  .withColumn("birthDate", col("birthDate").cast("date"))
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

次の例では、列の名前の変更を示しています。

(spark.read.table(...)
  .withColumnRenamed("dateOfBirth", "birthDate")
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

スキーマの自動更新で列を追加する

DataFrame に存在するけれどもテーブルにない列は、次の場合に書き込みトランザクションの一部として自動的に追加されます。

  • write または writeStream.option("mergeSchema", "true") があります
  • spark.databricks.delta.schema.autoMerge.enabledtrue です

両方のオプションを指定すると、DataFrameWriter のオプションが優先されます。 追加された列は、それらが存在する構造体の末尾に追加されます。 新しい列を追加するとき、大文字と小文字が維持されます。

Note

  • mergeSchemaINSERT INTO または .write.insertInto() と併用できません。

Delta Lake マージの自動スキーマの進化

スキーマ展開により、ユーザーはマージ時のターゲット テーブルとソース テーブルのスキーマの不一致を解決できます。 これにより、次の 2 つのケースが処理されます。

  1. ソース テーブルの列がターゲット テーブルに存在しません。 新しい列がターゲット スキーマに追加され、その値がソース値を使用して挿入または更新されます。
  2. ターゲット テーブルの列がソース テーブルに存在しません。 ターゲット スキーマは変更されません。追加のターゲット列の値は、UPDATE の場合は変更されず、INSERT の場合は NULL に設定されます。

重要

スキーマ展開を使用するには、merge コマンドを実行する前に、Spark セッション構成 spark.databricks.delta.schema.autoMerge.enabledtrue に設定する必要があります。

Note

  • Databricks Runtime 12.2 LTS 以降では、ソース テーブルに存在する列は、挿入または更新の各アクションで、名前で指定できます。 Databricks Runtime 11.3 LTS 以下では、マージを使用したスキーマの展開には、INSERT * または UPDATE SET * アクションのみを使用できます。

スキーマの展開がある場合とない場合の merge 操作の効果を、次の例に示します。

[列] クエリ (SQL の場合) スキーマ展開のない動作 (既定) スキーマ展開のある動作
ターゲット列: key, value

ソース列: key, value, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
テーブル スキーマは変更されません。列 keyvalue のみが更新または挿入されます。 テーブル スキーマが (key, value, new_value) に変更されます。 一致する既存のレコードは、ソースの valuenew_value を使用して更新されます。 スキーマ (key, value, new_value) と共に新しい行が挿入されます。
ターゲット列: key, old_value

ソース列: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
UPDATEINSERT アクションは、ターゲット列 old_value がソース内にないため、エラーをスローします。 テーブル スキーマが (key, old_value, new_value) に変更されます。 一致する既存のレコードは、old_value は変更されずに、ソースの new_value を使用して更新されます。 新しいレコードは、指定した keynew_value (old_value には NULL) を使用して挿入されます。
ターゲット列: key, old_value

ソース列: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET new_value = s.new_value
UPDATE は、ターゲット テーブルに列 new_value が存在しないため、エラーをスローします。 テーブル スキーマが (key, old_value, new_value) に変更されます。 一致する既存のレコードは、old_value は変更されずに、ソースの new_value を使用して更新されます。不一致のレコードでは、new_valueNULL が入力されます。 注 (1) を参照。
ターゲット列: key, old_value

ソース列: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN NOT MATCHED
THEN INSERT (key, new_value) VALUES (s.key, s.new_value)
INSERT は、ターゲット テーブルに列 new_value が存在しないため、エラーをスローします。 テーブル スキーマが (key, old_value, new_value) に変更されます。 新しいレコードは、指定した keynew_value (old_value には NULL) を使用して挿入されます。 既存のレコードでは、old_value は変更されず、new_valueNULL が入力されます。 注 (1) を参照。

(1) この動作は、Databricks Runtime 12.2 LTS 以降で使用できます。Databricks Runtime 11.3 LTS 以下では、この条件はエラーになります。

Delta Lake マージを使用して列を除外する

Databricks Runtime 12.2 LTS 以降では、マージ条件で EXCEPT 句を使用して列を明示的に除外できます。 EXCEPT キーワードの動作は、スキーマの展開が有効になっているかどうかによって異なります。

スキーマの展開を無効にすると、EXCEPT キーワードがターゲット テーブル内の列のリストに適用され、UPDATE アクションまたは INSERT アクションから列を除外できます。 除外された列は null に設定されます。

スキーマの展開を有効にすると、EXCEPT キーワードがソース テーブル内の列のリストに適用され、スキーマの展開から列を除外できます。 ターゲット内に存在しないソース内の新しい列は、EXCEPT 句にリストされている場合、ターゲット スキーマに追加されません。 ターゲットに既に存在する除外された列は、null に設定されます。

この構文の例を次に示します。

クエリ (SQL の場合) スキーマ展開のない動作 (既定) スキーマ展開のある動作
ターゲット列: id, title, last_updated

ソース列: id, title, review, last_updated
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated)
一致した行は、last_updated フィールドを現在の日付に設定して更新されます。 idtitle の値を使用して、新しい行が挿入されます。 除外されたフィールド last_updatednull に設定されます。 フィールド review はターゲット内にないため無視されます。 一致した行は、last_updated フィールドを現在の日付に設定して更新されます。 スキーマが展開され、フィールド review が追加されます。 null に設定されている last_updated を除き、すべてのソース フィールドを使用して新しい行が挿入されます。
ターゲット列: id, title, last_updated

ソース列: id, title, review, internal_count
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated, internal_count)
INSERT は、ターゲット テーブルに列 internal_count が存在しないため、エラーをスローします。 一致した行は、last_updated フィールドを現在の日付に設定して更新されます。 review フィールドはターゲット テーブルに追加されますが、internal_count フィールドは無視されます。 新しく挿入された行の last_updatednull に設定されています。

構造体の配列へのスキーマの自動展開

Delta MERGE INTO では、名前による構造体フィールドの解決と、構造体の配列に対するスキーマの展開がサポートされています。 スキーマの展開を有効にすると、ターゲット テーブル スキーマは構造体の配列に対して展開します。これは、配列内の入れ子になった構造体でも機能します。

Note

Databricks Runtime 12.2 LTS 以降では、ソース テーブルに存在する構造体フィールドは、挿入または更新の各コマンドで、名前で指定できます。 Databricks Runtime 11.3 LTS 以下では、マージを使用したスキーマの展開には、INSERT * または UPDATE SET * コマンドのみを使用できます。

構造体の配列に対するスキーマの展開を使用する場合と使用しない場合の、マージ操作の効果を次の例に示します。

送信元スキーマ [送信先スキーマ] スキーマ展開のない動作 (既定) スキーマ展開のある動作
array<struct<b: string, a: string>> array<struct<a: int, b: int>> テーブル スキーマは変更されません。 列は名前で解決され、更新または挿入されます。 テーブル スキーマは変更されません。 列は名前で解決され、更新または挿入されます。
array<struct<a: int, c: string, d: string>> array<struct<a: string, b: string>> updateinsert は、ターゲット テーブルに cd が存在しないので、エラーをスローします。 テーブル スキーマが array<struct<a: string, b: string, c: string, d: string>> に変更されます。 cd は、ターゲット テーブルの既存のエントリに対して NULL として挿入されます。 updateinsert は、a を文字列にキャストし、bNULL としてソース テーブル内のエントリをフィルします。
array<struct<a: string, b: struct<c: string, d: string>>> array<struct<a: string, b: struct<c: string>>> updateinsert は、ターゲット テーブルに d が存在しないので、エラーをスローします。 ターゲット テーブル スキーマが array<struct<a: string, b: struct<c: string, d: string>>> に変更されます。 d は、ターゲット テーブルの既存のエントリに対して NULL として挿入されます。

スキーマ更新での NullType 列の処理

Parquet では NullType がサポートされていないため、Delta テーブルへの書き込み時に DataFrame から NullType 列が破棄されますが、スキーマにはまだ格納されています。 その列に対して別のデータ型を受け取ったとき、Delta Lake はスキーマを新しいデータ型にマージします。 Delta Lake が既存の列に対して NullType を受け取ると、古いスキーマは保持され、新しい列は書き込み中に破棄されます。

ストリーミングにおける NullType がサポートされていません。 ストリーミングを使用するときはスキーマを設定する必要があるため、これは非常にまれです。 NullTypeArrayTypeMapType などの複合型でも受け入れられません。

テーブル スキーマの置換

既定では、テーブル内のデータを上書きしてもスキーマは上書きされません。 replaceWhere のない mode("overwrite") を使用して テーブルを上書きするとき、書き込まれているデータのスキーマを上書きすることもできます。 テーブルのスキーマとパーティション分割を置き換えるには、overwriteSchema オプションを true に設定します。

df.write.option("overwriteSchema", "true")

重要

動的パーティションの上書きを使用する場合は、overwriteSchematrue として指定することはできません。