Menimpa data secara selektif dengan Delta Lake
Azure Databricks memanfaatkan fungsionalitas Delta Lake untuk mendukung dua opsi berbeda untuk penimpaan selektif:
- Opsi secara
replaceWhere
atomik menggantikan semua rekaman yang cocok dengan predikat tertentu. - Anda dapat mengganti direktori data berdasarkan bagaimana tabel dipartisi menggunakan penimpaan partisi dinamis.
Untuk sebagian besar operasi, Databricks merekomendasikan penggunaan replaceWhere
untuk menentukan data mana yang akan ditimpa.
Penting
Jika data ditimpa secara tidak sengaja, Anda dapat menggunakan pemulihan untuk membatalkan perubahan.
Timpa selektif sewenang-wenang dengan replaceWhere
Anda hanya dapat menimpa data yang cocok dengan ekspresi sewenang-wenang secara selektif.
Catatan
SQL memerlukan Databricks Runtime 12.2 LTS atau lebih tinggi.
Perintah berikut secara atomis menggantikan peristiwa pada bulan Januari di tabel target, yang dipartisi oleh start_date
, dengan data dalam replace_data
:
Python
(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.save("/tmp/delta/events")
)
Scala
replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.save("/tmp/delta/events")
SQL
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
Kode sampel ini menulis data dalam replace_data
, memvalidasi bahwa semua baris cocok dengan predikat, dan melakukan penggantian atom menggunakan overwrite
semantik. Jika ada nilai dalam operasi yang berada di luar batasan, operasi ini gagal dengan kesalahan secara default.
Anda dapat mengubah perilaku ini menjadi overwrite
nilai dalam rentang predikat dan insert
rekaman yang berada di luar rentang yang ditentukan. Untuk melakukannya, nonaktifkan pemeriksaan batasan dengan mengatur spark.databricks.delta.replaceWhere.constraintCheck.enabled
ke false menggunakan salah satu pengaturan berikut:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
Perilaku warisan
Perilaku default warisan hanya menimpa replaceWhere
data yang cocok dengan predikat atas kolom partisi. Dengan model warisan ini, perintah berikut akan secara atomik mengganti bulan Januari dalam tabel target, yang dipartisi oleh date
, dengan data dalam df
:
Python
(df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.save("/tmp/delta/people10m")
)
Scala
df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.save("/tmp/delta/people10m")
Jika Anda ingin kembali ke perilaku lama, Anda dapat menonaktifkan spark.databricks.delta.replaceWhere.dataColumns.enabled
bendera:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
Penimpaan partisi dinamis
Penting
Fitur ini ada di Pratinjau Publik.
Databricks Runtime 11.3 LTS ke atas mendukung mode timpa partisi dinamis untuk tabel yang dipartisi. Untuk tabel dengan beberapa partisi, Databricks Runtime 11.3 LTS dan di bawah ini hanya mendukung penimpaan partisi dinamis jika semua kolom partisi memiliki jenis data yang sama.
Ketika dalam mode timpa partisi dinamis, operasi menimpa semua data yang ada di setiap partisi logis tempat penulisan menerapkan data baru. Setiap partisi logis yang ada yang tulisnya tidak berisi data tetap tidak berubah. Mode ini hanya berlaku ketika data sedang ditulis dalam mode penimpaan: baik INSERT OVERWRITE
dalam SQL, atau penulisan DataFrame dengan df.write.mode("overwrite")
.
Konfigurasikan mode timpa partisi dinamis dengan mengatur konfigurasi sesi Spark spark.sql.sources.partitionOverwriteMode
ke dynamic
. Anda juga dapat mengaktifkan ini dengan mengatur opsi DataFrameWriter
partitionOverwriteMode
ke dynamic
. Jika ada, opsi khusus kueri menimpa mode yang ditentukan dalam konfigurasi sesi. Default untuk partitionOverwriteMode
adalah static
.
Penting
Validasi bahwa data yang ditulis dengan penimpaan partisi dinamis hanya menyentuh partisi yang diharapkan. Satu baris dalam partisi yang salah dapat menyebabkan penimpaan seluruh partisi secara tidak sengaja.
Contoh berikut menunjukkan penggunaan penimpaan partisi dinamis:
SQL
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
Python
(df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
)
Scala
df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
Catatan
- Partisi dinamis menimpa konflik dengan opsi
replaceWhere
untuk tabel berpartisi.- Jika timpaan partisi dinamis diaktifkan dalam konfigurasi sesi Spark, dan
replaceWhere
disediakan sebagai opsiDataFrameWriter
, maka Delta Lake menimpa data sesuai dengan ekspresireplaceWhere
(opsi khusus kueri mengambil alih konfigurasi sesi). - Anda menerima kesalahan jika
DataFrameWriter
opsi memiliki penimpaan partisi dinamis danreplaceWhere
diaktifkan.
- Jika timpaan partisi dinamis diaktifkan dalam konfigurasi sesi Spark, dan
- Anda tidak dapat menentukan
overwriteSchema
kapantrue
menggunakan timpa partisi dinamis.