Bagikan melalui


Memperbarui skema tabel Delta Lake

Delta Lake memungkinkan Anda memperbarui skema tabel. Jenis perubahan berikut ini didukung:

  • Menambahkan kolom baru (pada posisi arbitrer)
  • Menyusun ulang kolom yang ada
  • Mengganti nama kolom yang ada

Anda dapat membuat perubahan ini secara eksplisit menggunakan DDL atau secara implisit menggunakan DML.

Penting

Pembaruan ke skema tabel Delta adalah operasi yang bertentangan dengan semua operasi penulisan Delta bersamaan.

Saat Anda memperbarui skema tabel Delta, aliran yang dibaca dari tabel tersebut akan berakhir. Jika Anda ingin aliran berlanjut, Anda harus memulai ulang. Untuk metode yang direkomendasikan, lihat Pertimbangan produksi untuk Streaming Terstruktur.

Memperbarui skema secara eksplisit untuk menambahkan kolom

ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Secara default, nullability adalah true.

Untuk menambahkan kolom ke bidang bertumpuk, gunakan:

ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Misalnya, jika skema sebelum berjalan ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) adalah:

- root
| - colA
| - colB
| +-field1
| +-field2

skema setelahnya adalah:

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

Catatan

Menambahkan kolom bertumpuk hanya didukung untuk struktur. Array dan peta tidak didukung.

Memperbarui skema secara eksplisit untuk mengubah komentar atau pengurutan kolom

ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Untuk mengubah kolom di bidang bertumpuk, gunakan:

ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Misalnya, jika skema sebelum berjalan ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST adalah:

- root
| - colA
| - colB
| +-field1
| +-field2

skema setelahnya adalah:

- root
| - colA
| - colB
| +-field2
| +-field1

Memperbarui skema secara eksplisit untuk mengganti kolom

ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

Misalnya, saat menjalankan DDL berikut:

ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

jika skema sebelumnya adalah:

- root
| - colA
| - colB
| +-field1
| +-field2

skema setelahnya adalah:

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

Memperbarui skema secara eksplisit untuk mengganti nama kolom

Catatan

Fitur ini tersedia di Databricks Runtime 10.4 LTS ke atas.

Untuk mengganti nama kolom tanpa menulis ulang data kolom yang ada, Anda harus mengaktifkan pemetaan kolom untuk tabel. Lihat Mengganti nama dan meletakkan kolom dengan pemetaan kolom Delta Lake.

Untuk mengganti nama kolom:

ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name

Untuk mengganti nama bidang bertumpu:

ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field

Misalnya, saat Anda menjalankan perintah berikut:

ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

Jika skema sebelumnya adalah:

- root
| - colA
| - colB
| +-field1
| +-field2

Lalu skema setelahnya adalah:

- root
| - colA
| - colB
| +-field001
| +-field2

Lihat Mengganti nama dan meletakkan kolom dengan pemetaan kolom Delta Lake.

Memperbarui skema secara eksplisit untuk menghilangkan kolom

Catatan

Fitur ini tersedia di Databricks Runtime 11.3 LTS ke atas.

Untuk menghilangkan kolom sebagai operasi khusus metadata tanpa menulis ulang file data apa pun, Anda harus mengaktifkan pemetaan kolom untuk tabel. Lihat Mengganti nama dan meletakkan kolom dengan pemetaan kolom Delta Lake.

Penting

Menghilangkan kolom dari metadata tidak menghapus data yang mendasar untuk kolom dalam file. Untuk menghapus menyeluruh data kolom yang dihilangkan, Anda dapat menggunakan REORG TABLE untuk menulis ulang file. Anda kemudian dapat menggunakan VACUUM untuk menghapus file yang berisi data kolom yang dihilangkan secara fisik.

Untuk menjatuhkan kolom:

ALTER TABLE table_name DROP COLUMN col_name

Untuk menjatuhkan beberapa kolom:

ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)

Memperbarui skema secara eksplisit untuk mengubah jenis atau nama kolom

Anda dapat mengubah jenis atau nama kolom atau menjatuhkan kolom dengan menulis ulang tabel. Untuk melakukan ini, gunakan overwriteSchema opsi .

Contoh berikut menunjukkan perubahan jenis kolom:

(spark.read.table(...)
  .withColumn("birthDate", col("birthDate").cast("date"))
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

Contoh berikut menunjukkan perubahan nama kolom:

(spark.read.table(...)
  .withColumnRenamed("dateOfBirth", "birthDate")
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

Mengaktifkan evolusi skema

Anda dapat mengaktifkan evolusi skema dengan melakukan salah satu hal berikut:

Databricks merekomendasikan untuk mengaktifkan evolusi skema untuk setiap operasi tulis daripada mengatur Spark conf.

Saat Anda menggunakan opsi atau sintaks untuk mengaktifkan evolusi skema dalam operasi tulis, ini lebih diutamakan daripada keyakinan Spark.

Catatan

Tidak ada klausul evolusi skema untuk INSERT INTO pernyataan.

Aktifkan evolusi skema untuk penulisan untuk menambahkan kolom baru

Kolom yang ada dalam kueri sumber tetapi hilang dari tabel target secara otomatis ditambahkan sebagai bagian dari transaksi tulis saat evolusi skema diaktifkan. Lihat Mengaktifkan evolusi skema.

Kasus dipertahankan saat menambahkan kolom baru. Kolom baru ditambahkan ke akhir skema tabel. Jika kolom tambahan berada dalam struktur, kolom tersebut ditambahkan ke akhir struktur dalam tabel target.

Contoh berikut menunjukkan menggunakan mergeSchema opsi dengan Auto Loader. Lihat Apa itu Pemuat Otomatis?.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .trigger(availableNow=True)
  .toTable("table_name")
)

Contoh berikut menunjukkan penggunaan mergeSchema opsi dengan operasi penulisan batch:

(spark.read
  .table(source_table)
  .write
  .option("mergeSchema", "true")
  .mode("append")
  .saveAsTable("table_name")
)

Evolusi skema otomatis untuk penggabungan Delta Lake

Evolusi skema memungkinkan pengguna untuk mengatasi ketidakcocokan skema antara tabel target dan sumber yang digabungkan. Ini menangani dua kasus berikut:

  1. Kolom dalam tabel sumber tidak ada dalam tabel target. Kolom baru ditambahkan ke skema target, dan nilainya disisipkan atau diperbarui menggunakan nilai sumber.
  2. Kolom dalam tabel target tidak ada dalam tabel sumber. Skema target dibiarkan tidak berubah; nilai dalam kolom target tambahan dibiarkan tidak berubah (untuk UPDATE) atau diatur ke NULL (untuk INSERT).

Anda harus mengaktifkan evolusi skema otomatis secara manual. Lihat Mengaktifkan evolusi skema.

Catatan

Di Databricks Runtime 12.2 LTS ke atas, kolom dan bidang struktur yang ada dalam tabel sumber dapat ditentukan berdasarkan nama dalam tindakan sisipkan atau perbarui. Dalam Databricks Runtime 11.3 LTS dan di bawahnya, hanya INSERT * atau UPDATE SET * tindakan yang dapat digunakan untuk evolusi skema dengan penggabungan.

Dalam Databricks Runtime 13.3 LTS ke atas, Anda dapat menggunakan evolusi skema dengan struktur yang berlapis di dalam peta, seperti map<int, struct<a: int, b: int>>.

Sintaks evolusi skema untuk penggabungan

Dalam Databricks Runtime 15.2 ke atas, Anda dapat menentukan evolusi skema dalam pernyataan penggabungan menggunakan API tabel SQL atau Delta:

SQL

MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

Python

from delta.tables import *

(targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

import io.delta.tables._

targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

Contoh operasi penggabungan dengan evolusi skema

Berikut adalah beberapa contoh efek operasi merge dengan dan tanpa evolusi skema.

Kolom Kueri (dalam SQL) Perilaku tanpa evolusi skema (default) Perilaku dengan evolusi skema
Kolom target: key, value

Kolom sumber: key, value, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
Skema tabel tetap tidak berubah; hanya kolom key, value saja yang diperbarui/disisipkan. Skema tabel diubah menjadi (key, value, new_value). Rekaman yang ada dengan kecocokan value diperbarui dengan dan new_value di sumber. Baris baru disisipkan dengan skema (key, value, new_value).
Kolom target: key, old_value

Kolom sumber: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
Tindakan UPDATE dan INSERT menampilkan kesalahan karena kolom target old_value tidak ada di sumber. Skema tabel diubah menjadi (key, old_value, new_value). Rekaman yang ada dengan kecocokan new_value diperbarui dengan sumber yang tidak old_value berubah. Rekaman baru disisipkan dengan , , new_valuedan NULL yang ditentukan keyuntuk old_value.
Kolom target: key, old_value

Kolom sumber: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET new_value = s.new_value
UPDATE menampilkan kesalahan karena kolom new_value tidak ada di tabel target. Skema tabel diubah menjadi (key, old_value, new_value). Rekaman yang ada dengan kecocokan new_value diperbarui dengan di sumber yang tidak old_value berubah, dan rekaman yang tidak cocok telah NULL dimasukkan untuk new_value. Lihat catatan (1).
Kolom target: key, old_value

Kolom sumber: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN NOT MATCHED
THEN INSERT (key, new_value) VALUES (s.key, s.new_value)
INSERT menampilkan kesalahan karena kolom new_value tidak ada di tabel target. Skema tabel diubah menjadi (key, old_value, new_value). Rekaman baru disisipkan dengan , , new_valuedan NULL yang ditentukan keyuntuk old_value. Rekaman yang ada telah NULL dimasukkan untuk new_value membiarkan old_value tidak berubah. Lihat catatan (1).

(1) Perilaku ini tersedia di Databricks Runtime 12.2 LTS ke atas; Databricks Runtime 11.3 LTS dan kesalahan di bawah ini dalam kondisi ini.

Mengecualikan kolom dengan penggabungan Delta Lake

Di Databricks Runtime 12.2 LTS ke atas, Anda dapat menggunakan EXCEPT klausul dalam kondisi penggabungan untuk mengecualikan kolom secara eksplisit. Perilaku EXCEPT kata kunci bervariasi tergantung pada apakah evolusi skema diaktifkan atau tidak.

Dengan evolusi skema dinonaktifkan, EXCEPT kata kunci berlaku untuk daftar kolom dalam tabel target dan memungkinkan pengecualian kolom dari UPDATE atau INSERT tindakan. Kolom yang dikecualikan diatur ke null.

Dengan evolusi skema diaktifkan, EXCEPT kata kunci berlaku untuk daftar kolom dalam tabel sumber dan memungkinkan pengecualian kolom dari evolusi skema. Kolom baru di sumber yang tidak ada di target tidak ditambahkan ke skema target jika tercantum dalam EXCEPT klausa. Kolom yang dikecualikan yang sudah ada di target diatur ke null.

Contoh berikut menunjukkan sintaks ini:

Kolom Kueri (dalam SQL) Perilaku tanpa evolusi skema (default) Perilaku dengan evolusi skema
Kolom target: id, title, last_updated

Kolom sumber: id, title, review, last_updated
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated)
Baris yang cocok diperbarui dengan mengatur last_updated bidang ke tanggal saat ini. Baris baru disisipkan menggunakan nilai untuk id dan title. Bidang yang dikecualikan last_updated diatur ke null. Bidang review diabaikan karena tidak ada di target. Baris yang cocok diperbarui dengan mengatur last_updated bidang ke tanggal saat ini. Skema berevolusi untuk menambahkan bidang review. Baris baru disisipkan menggunakan semua bidang sumber kecuali last_updated yang diatur ke null.
Kolom target: id, title, last_updated

Kolom sumber: id, title, review, internal_count
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated, internal_count)
INSERT menampilkan kesalahan karena kolom internal_count tidak ada di tabel target. Baris yang cocok diperbarui dengan mengatur last_updated bidang ke tanggal saat ini. Bidang review ditambahkan ke tabel target, tetapi internal_count bidang diabaikan. Baris baru yang disisipkan telah last_updated diatur ke null.

NullType Menangani kolom dalam pembaruan skema

Karena Parquet tidak mendukung NullType, NullType kolom dijatuhkan dari DataFrame saat menulis ke tabel Delta, tetapi masih disimpan dalam skema. Ketika tipe data yang berbeda diterima untuk kolom itu, Delta Lake menggabungkan skema ke tipe data baru. Jika Delta Lake menerima NullType untuk kolom yang ada, skema lama dipertahankan dan kolom baru dijatuhkan selama penulisan.

NullType dalam streaming tidak didukung. Karena Anda harus mengatur skema saat menggunakan streaming, ini seharusnya sangat jarang. NullType juga tidak diterima untuk jenis yang kompleks seperti ArrayType dan MapType.

Mengganti skema tabel

Secara default, menimpa data dalam tabel tidak menimpa skema. Saat menimpa tabel menggunakan mode("overwrite") tanpa replaceWhere, Anda mungkin masih ingin menimpa skema data yang sedang ditulis. Anda mengganti skema dan partisi tabel dengan mengatur opsi overwriteSchema ke true:

df.write.option("overwriteSchema", "true")

Penting

Anda tidak dapat menentukan overwriteSchema kapan true menggunakan timpa partisi dinamis.