Menimpa data secara selektif dengan Delta Lake

Azure Databricks memanfaatkan fungsionalitas Delta Lake untuk mendukung dua opsi berbeda untuk penimpaan selektif:

  • Opsi secara replaceWhere atomik menggantikan semua rekaman yang cocok dengan predikat tertentu.
  • Anda dapat mengganti direktori data berdasarkan bagaimana tabel dipartisi menggunakan penimpaan partisi dinamis.

Untuk sebagian besar operasi, Databricks merekomendasikan penggunaan replaceWhere untuk menentukan data mana yang akan ditimpa.

Penting

Jika data ditimpa secara tidak sengaja, Anda dapat menggunakan pemulihan untuk membatalkan perubahan.

Timpa selektif sewenang-wenang dengan replaceWhere

Anda hanya dapat menimpa data yang cocok dengan ekspresi sewenang-wenang secara selektif.

Catatan

SQL memerlukan Databricks Runtime 12.2 LTS atau lebih tinggi.

Perintah berikut secara atomis menggantikan peristiwa pada bulan Januari di tabel target, yang dipartisi oleh start_date, dengan data dalam replace_data:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Kode sampel ini menulis data dalam replace_data, memvalidasi bahwa semua baris cocok dengan predikat, dan melakukan penggantian atom menggunakan overwrite semantik. Jika ada nilai dalam operasi yang berada di luar batasan, operasi ini gagal dengan kesalahan secara default.

Anda dapat mengubah perilaku ini menjadi overwrite nilai dalam rentang predikat dan insert rekaman yang berada di luar rentang yang ditentukan. Untuk melakukannya, nonaktifkan pemeriksaan batasan dengan mengatur spark.databricks.delta.replaceWhere.constraintCheck.enabled ke false menggunakan salah satu pengaturan berikut:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Perilaku warisan

Perilaku default warisan hanya menimpa replaceWhere data yang cocok dengan predikat atas kolom partisi. Dengan model warisan ini, perintah berikut akan secara atomik mengganti bulan Januari dalam tabel target, yang dipartisi oleh date, dengan data dalam df:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")

Jika Anda ingin kembali ke perilaku lama, Anda dapat menonaktifkan spark.databricks.delta.replaceWhere.dataColumns.enabled bendera:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Penimpaan partisi dinamis

Penting

Fitur ini ada di Pratinjau Publik.

Databricks Runtime 11.3 LTS ke atas mendukung mode timpa partisi dinamis untuk tabel yang dipartisi. Untuk tabel dengan beberapa partisi, Databricks Runtime 11.3 LTS dan di bawah ini hanya mendukung penimpaan partisi dinamis jika semua kolom partisi memiliki jenis data yang sama.

Ketika dalam mode timpa partisi dinamis, operasi menimpa semua data yang ada di setiap partisi logis tempat penulisan menerapkan data baru. Setiap partisi logis yang ada yang tulisnya tidak berisi data tetap tidak berubah. Mode ini hanya berlaku ketika data sedang ditulis dalam mode penimpaan: baik INSERT OVERWRITE dalam SQL, atau penulisan DataFrame dengan df.write.mode("overwrite").

Konfigurasikan mode timpa partisi dinamis dengan mengatur konfigurasi sesi Spark spark.sql.sources.partitionOverwriteMode ke dynamic. Anda juga dapat mengaktifkan ini dengan mengatur opsi DataFrameWriterpartitionOverwriteModeke dynamic. Jika ada, opsi khusus kueri menimpa mode yang ditentukan dalam konfigurasi sesi. Default untuk partitionOverwriteMode adalah static.

Penting

Validasi bahwa data yang ditulis dengan penimpaan partisi dinamis hanya menyentuh partisi yang diharapkan. Satu baris dalam partisi yang salah dapat menyebabkan penimpaan seluruh partisi secara tidak sengaja.

Contoh berikut menunjukkan penggunaan penimpaan partisi dinamis:

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Catatan

  • Partisi dinamis menimpa konflik dengan opsi replaceWhere untuk tabel berpartisi.
    • Jika timpaan partisi dinamis diaktifkan dalam konfigurasi sesi Spark, dan replaceWhere disediakan sebagai opsi DataFrameWriter, maka Delta Lake menimpa data sesuai dengan ekspresi replaceWhere (opsi khusus kueri mengambil alih konfigurasi sesi).
    • Anda menerima kesalahan jika DataFrameWriter opsi memiliki penimpaan partisi dinamis dan replaceWhere diaktifkan.
  • Anda tidak dapat menentukan overwriteSchema kapan true menggunakan timpa partisi dinamis.