Memperbarui skema tabel Delta Lake
Delta Lake memungkinkan Anda memperbarui skema tabel. Jenis perubahan berikut ini didukung:
- Menambahkan kolom baru (pada posisi arbitrer)
- Menyusun ulang kolom yang ada
- Mengganti nama kolom yang ada
Anda dapat membuat perubahan ini secara eksplisit menggunakan DDL atau secara implisit menggunakan DML.
Penting
Pembaruan ke skema tabel Delta adalah operasi yang bertentangan dengan semua operasi penulisan Delta bersamaan.
Saat Anda memperbarui skema tabel Delta, aliran yang dibaca dari tabel tersebut akan berakhir. Jika Anda ingin aliran berlanjut, Anda harus memulai ulang. Untuk metode yang direkomendasikan, lihat Pertimbangan produksi untuk Streaming Terstruktur.
Memperbarui skema secara eksplisit untuk menambahkan kolom
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Secara default, nullability adalah true
.
Untuk menambahkan kolom ke bidang bertumpuk, gunakan:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Misalnya, jika skema sebelum berjalan ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)
adalah:
- root
| - colA
| - colB
| +-field1
| +-field2
skema setelahnya adalah:
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
Catatan
Menambahkan kolom bertumpuk hanya didukung untuk struktur. Array dan peta tidak didukung.
Memperbarui skema secara eksplisit untuk mengubah komentar atau pengurutan kolom
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Untuk mengubah kolom di bidang bertumpuk, gunakan:
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Misalnya, jika skema sebelum berjalan ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST
adalah:
- root
| - colA
| - colB
| +-field1
| +-field2
skema setelahnya adalah:
- root
| - colA
| - colB
| +-field2
| +-field1
Memperbarui skema secara eksplisit untuk mengganti kolom
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
Misalnya, saat menjalankan DDL berikut:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
jika skema sebelumnya adalah:
- root
| - colA
| - colB
| +-field1
| +-field2
skema setelahnya adalah:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
Memperbarui skema secara eksplisit untuk mengganti nama kolom
Catatan
Fitur ini tersedia di Databricks Runtime 10.4 LTS ke atas.
Untuk mengganti nama kolom tanpa menulis ulang data kolom yang ada, Anda harus mengaktifkan pemetaan kolom untuk tabel. Lihat Mengganti nama dan meletakkan kolom dengan pemetaan kolom Delta Lake.
Untuk mengganti nama kolom:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
Untuk mengganti nama bidang bertumpu:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
Misalnya, saat Anda menjalankan perintah berikut:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
Jika skema sebelumnya adalah:
- root
| - colA
| - colB
| +-field1
| +-field2
Lalu skema setelahnya adalah:
- root
| - colA
| - colB
| +-field001
| +-field2
Lihat Mengganti nama dan meletakkan kolom dengan pemetaan kolom Delta Lake.
Memperbarui skema secara eksplisit untuk menghilangkan kolom
Catatan
Fitur ini tersedia di Databricks Runtime 11.3 LTS ke atas.
Untuk menghilangkan kolom sebagai operasi khusus metadata tanpa menulis ulang file data apa pun, Anda harus mengaktifkan pemetaan kolom untuk tabel. Lihat Mengganti nama dan meletakkan kolom dengan pemetaan kolom Delta Lake.
Penting
Menghilangkan kolom dari metadata tidak menghapus data yang mendasar untuk kolom dalam file. Untuk menghapus menyeluruh data kolom yang dihilangkan, Anda dapat menggunakan REORG TABLE untuk menulis ulang file. Anda kemudian dapat menggunakan VACUUM untuk menghapus file yang berisi data kolom yang dihilangkan secara fisik.
Untuk menjatuhkan kolom:
ALTER TABLE table_name DROP COLUMN col_name
Untuk menjatuhkan beberapa kolom:
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
Memperbarui skema secara eksplisit untuk mengubah jenis atau nama kolom
Anda dapat mengubah jenis atau nama kolom atau menjatuhkan kolom dengan menulis ulang tabel. Untuk melakukan ini, gunakan overwriteSchema
opsi .
Contoh berikut menunjukkan perubahan jenis kolom:
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
Contoh berikut menunjukkan perubahan nama kolom:
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
Mengaktifkan evolusi skema
Anda dapat mengaktifkan evolusi skema dengan melakukan salah satu hal berikut:
- Atur
.option("mergeSchema", "true")
ke Spark DataFramewrite
atauwriteStream
operasi. Lihat Mengaktifkan evolusi skema untuk penulisan guna menambahkan kolom baru. - Gunakan
MERGE WITH SCHEMA EVOLUTION
sintaksis. Lihat Sintaks evolusi skema untuk penggabungan. - Atur Spark conf
spark.databricks.delta.schema.autoMerge.enabled
ketrue
untuk SparkSession saat ini.
Databricks merekomendasikan untuk mengaktifkan evolusi skema untuk setiap operasi tulis daripada mengatur Spark conf.
Saat Anda menggunakan opsi atau sintaks untuk mengaktifkan evolusi skema dalam operasi tulis, ini lebih diutamakan daripada keyakinan Spark.
Catatan
Tidak ada klausul evolusi skema untuk INSERT INTO
pernyataan.
Aktifkan evolusi skema untuk penulisan untuk menambahkan kolom baru
Kolom yang ada dalam kueri sumber tetapi hilang dari tabel target secara otomatis ditambahkan sebagai bagian dari transaksi tulis saat evolusi skema diaktifkan. Lihat Mengaktifkan evolusi skema.
Kasus dipertahankan saat menambahkan kolom baru. Kolom baru ditambahkan ke akhir skema tabel. Jika kolom tambahan berada dalam struktur, kolom tersebut ditambahkan ke akhir struktur dalam tabel target.
Contoh berikut menunjukkan menggunakan mergeSchema
opsi dengan Auto Loader. Lihat Apa itu Pemuat Otomatis?.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
Contoh berikut menunjukkan penggunaan mergeSchema
opsi dengan operasi penulisan batch:
(spark.read
.table(source_table)
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("table_name")
)
Evolusi skema otomatis untuk penggabungan Delta Lake
Evolusi skema memungkinkan pengguna untuk mengatasi ketidakcocokan skema antara tabel target dan sumber yang digabungkan. Ini menangani dua kasus berikut:
- Kolom dalam tabel sumber tidak ada dalam tabel target. Kolom baru ditambahkan ke skema target, dan nilainya disisipkan atau diperbarui menggunakan nilai sumber.
- Kolom dalam tabel target tidak ada dalam tabel sumber. Skema target dibiarkan tidak berubah; nilai dalam kolom target tambahan dibiarkan tidak berubah (untuk
UPDATE
) atau diatur keNULL
(untukINSERT
).
Anda harus mengaktifkan evolusi skema otomatis secara manual. Lihat Mengaktifkan evolusi skema.
Catatan
Di Databricks Runtime 12.2 LTS ke atas, kolom dan bidang struktur yang ada dalam tabel sumber dapat ditentukan berdasarkan nama dalam tindakan sisipkan atau perbarui. Dalam Databricks Runtime 11.3 LTS dan di bawahnya, hanya INSERT *
atau UPDATE SET *
tindakan yang dapat digunakan untuk evolusi skema dengan penggabungan.
Dalam Databricks Runtime 13.3 LTS ke atas, Anda dapat menggunakan evolusi skema dengan struktur yang berlapis di dalam peta, seperti map<int, struct<a: int, b: int>>
.
Sintaks evolusi skema untuk penggabungan
Dalam Databricks Runtime 15.2 ke atas, Anda dapat menentukan evolusi skema dalam pernyataan penggabungan menggunakan API tabel SQL atau Delta:
SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Python
from delta.tables import *
(targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
import io.delta.tables._
targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
Contoh operasi penggabungan dengan evolusi skema
Berikut adalah beberapa contoh efek operasi merge
dengan dan tanpa evolusi skema.
Kolom | Kueri (dalam SQL) | Perilaku tanpa evolusi skema (default) | Perilaku dengan evolusi skema |
---|---|---|---|
Kolom target: key, value Kolom sumber: key, value, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
Skema tabel tetap tidak berubah; hanya kolom key , value saja yang diperbarui/disisipkan. |
Skema tabel diubah menjadi (key, value, new_value) . Rekaman yang ada dengan kecocokan value diperbarui dengan dan new_value di sumber. Baris baru disisipkan dengan skema (key, value, new_value) . |
Kolom target: key, old_value Kolom sumber: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
Tindakan UPDATE dan INSERT menampilkan kesalahan karena kolom target old_value tidak ada di sumber. |
Skema tabel diubah menjadi (key, old_value, new_value) . Rekaman yang ada dengan kecocokan new_value diperbarui dengan sumber yang tidak old_value berubah. Rekaman baru disisipkan dengan , , new_value dan NULL yang ditentukan key untuk old_value . |
Kolom target: key, old_value Kolom sumber: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET new_value = s.new_value |
UPDATE menampilkan kesalahan karena kolom new_value tidak ada di tabel target. |
Skema tabel diubah menjadi (key, old_value, new_value) . Rekaman yang ada dengan kecocokan new_value diperbarui dengan di sumber yang tidak old_value berubah, dan rekaman yang tidak cocok telah NULL dimasukkan untuk new_value . Lihat catatan (1). |
Kolom target: key, old_value Kolom sumber: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN NOT MATCHED THEN INSERT (key, new_value) VALUES (s.key, s.new_value) |
INSERT menampilkan kesalahan karena kolom new_value tidak ada di tabel target. |
Skema tabel diubah menjadi (key, old_value, new_value) . Rekaman baru disisipkan dengan , , new_value dan NULL yang ditentukan key untuk old_value . Rekaman yang ada telah NULL dimasukkan untuk new_value membiarkan old_value tidak berubah. Lihat catatan (1). |
(1) Perilaku ini tersedia di Databricks Runtime 12.2 LTS ke atas; Databricks Runtime 11.3 LTS dan kesalahan di bawah ini dalam kondisi ini.
Mengecualikan kolom dengan penggabungan Delta Lake
Di Databricks Runtime 12.2 LTS ke atas, Anda dapat menggunakan EXCEPT
klausul dalam kondisi penggabungan untuk mengecualikan kolom secara eksplisit. Perilaku EXCEPT
kata kunci bervariasi tergantung pada apakah evolusi skema diaktifkan atau tidak.
Dengan evolusi skema dinonaktifkan, EXCEPT
kata kunci berlaku untuk daftar kolom dalam tabel target dan memungkinkan pengecualian kolom dari UPDATE
atau INSERT
tindakan. Kolom yang dikecualikan diatur ke null
.
Dengan evolusi skema diaktifkan, EXCEPT
kata kunci berlaku untuk daftar kolom dalam tabel sumber dan memungkinkan pengecualian kolom dari evolusi skema. Kolom baru di sumber yang tidak ada di target tidak ditambahkan ke skema target jika tercantum dalam EXCEPT
klausa. Kolom yang dikecualikan yang sudah ada di target diatur ke null
.
Contoh berikut menunjukkan sintaks ini:
Kolom | Kueri (dalam SQL) | Perilaku tanpa evolusi skema (default) | Perilaku dengan evolusi skema |
---|---|---|---|
Kolom target: id, title, last_updated Kolom sumber: id, title, review, last_updated |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated) |
Baris yang cocok diperbarui dengan mengatur last_updated bidang ke tanggal saat ini. Baris baru disisipkan menggunakan nilai untuk id dan title . Bidang yang dikecualikan last_updated diatur ke null . Bidang review diabaikan karena tidak ada di target. |
Baris yang cocok diperbarui dengan mengatur last_updated bidang ke tanggal saat ini. Skema berevolusi untuk menambahkan bidang review . Baris baru disisipkan menggunakan semua bidang sumber kecuali last_updated yang diatur ke null . |
Kolom target: id, title, last_updated Kolom sumber: id, title, review, internal_count |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated, internal_count) |
INSERT menampilkan kesalahan karena kolom internal_count tidak ada di tabel target. |
Baris yang cocok diperbarui dengan mengatur last_updated bidang ke tanggal saat ini. Bidang review ditambahkan ke tabel target, tetapi internal_count bidang diabaikan. Baris baru yang disisipkan telah last_updated diatur ke null . |
NullType
Menangani kolom dalam pembaruan skema
Karena Parquet tidak mendukung NullType
, NullType
kolom dijatuhkan dari DataFrame saat menulis ke tabel Delta, tetapi masih disimpan dalam skema. Ketika tipe data yang berbeda diterima untuk kolom itu, Delta Lake menggabungkan skema ke tipe data baru. Jika Delta Lake menerima NullType
untuk kolom yang ada, skema lama dipertahankan dan kolom baru dijatuhkan selama penulisan.
NullType
dalam streaming tidak didukung. Karena Anda harus mengatur skema saat menggunakan streaming, ini seharusnya sangat jarang. NullType
juga tidak diterima untuk jenis yang kompleks seperti ArrayType
dan MapType
.
Mengganti skema tabel
Secara default, menimpa data dalam tabel tidak menimpa skema. Saat menimpa tabel menggunakan mode("overwrite")
tanpa replaceWhere
, Anda mungkin masih ingin menimpa skema data yang sedang ditulis. Anda mengganti skema dan partisi tabel dengan mengatur opsi overwriteSchema
ke true
:
df.write.option("overwriteSchema", "true")
Penting
Anda tidak dapat menentukan overwriteSchema
kapan true
menggunakan timpa partisi dinamis.