Upsert ke dalam tabel Delta Lake menggunakan penggabungan
Anda dapat melakukan upsert data dari tabel sumber, tampilan, atau DataFrame ke dalam tabel Delta target dengan menggunakan operasi SQL MERGE
. Delta Lake mendukung sisipan, pembaruan, dan penghapusan di MERGE
, dan mendukung sintaks yang diperluas di luar standar SQL untuk memfasilitasi kasus penggunaan tingkat lanjut.
Misalkan Anda memiliki tabel sumber bernama people10mupdates
atau jalur sumber di /tmp/delta/people-10m-updates
yang berisi data baru untuk tabel target bernama people10m
atau jalur target di /tmp/delta/people-10m
. Beberapa rekaman baru ini mungkin sudah ada dalam data target. Untuk menggabungkan data baru, Anda harus memperbarui baris di mana id
orang tersebut sudah ada dan menyisipkan baris baru di mana tidak ada id
yang cocok. Anda bisa menjalankan kueri berikut:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Penting
Hanya satu baris dari tabel sumber yang dapat mencocokkan baris tertentu dalam tabel target. Dalam Databricks Runtime 16.0 ke atas, MERGE
mengevaluasi kondisi yang ditentukan dalam WHEN MATCHED
klausul dan ON
untuk menentukan kecocokan duplikat. Dalam Databricks Runtime 15.4 LTS dan di bawahnya, MERGE
operasi hanya mempertimbangkan kondisi yang ditentukan dalam ON
klausul.
Lihat dokumentasi Delta Lake API untuk detail sintaks Scala dan Python. Untuk detail sintaks SQL, lihat MERGE INTO
Ubah semua baris yang tidak cocok menggunakan gabungkan
Di Databricks SQL dan Databricks Runtime 12.2 LTS ke atas, Anda dapat menggunakan WHEN NOT MATCHED BY SOURCE
klausul ke UPDATE
atau DELETE
rekaman dalam tabel target yang tidak memiliki rekaman yang sesuai dalam tabel sumber. Databricks merekomendasikan penambahan klausul kondisional opsional untuk menghindari penulisan ulang tabel target sepenuhnya.
Contoh kode berikut menunjukkan sintaks dasar menggunakan ini untuk menghapus, menimpa tabel target dengan konten tabel sumber dan menghapus rekaman yang tidak cocok dalam tabel target. Untuk pola yang lebih dapat diskalakan untuk tabel di mana pembaruan dan penghapusan sumber terikat waktu, lihat Menyinkronkan tabel Delta secara bertahap dengan sumber.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Contoh berikut menambahkan kondisi ke WHEN NOT MATCHED BY SOURCE
klausul dan menentukan nilai untuk diperbarui dalam baris target yang tidak cocok.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
Menggabungkan semantik operasi
Berikut ini adalah deskripsi terperinci tentang merge
semantik operasi terprogram.
Mungkin ada sejumlah klausul
whenMatched
danwhenNotMatched
.Klausul
whenMatched
dijalankan saat baris sumber cocok dengan baris tabel target berdasarkan kondisi kecocokan. Klausul ini memiliki semantik berikut.Klausul
whenMatched
dapat memiliki paling banyak satu tindakanupdate
dan satu tindakandelete
. Tindakanupdate
hanya memperbaruimerge
kolom yang ditentukan (miripupdate
dengan operasi) dari baris target yang cocok. Tindakandelete
akan menghapus baris yang cocok.Setiap klausul
whenMatched
dapat memiliki kondisi opsional. Jika kondisi klausul ini ada, tindakanupdate
atau tindakandelete
dijalankan untuk pasangan baris target sumber yang cocok hanya jika kondisi klausul benar.Jika ada beberapa klausul
whenMatched
, klausul tersebut dievaluasi sesuai urutan yang telah ditentukan. Semua klausulwhenMatched
, kecuali yang terakhir, harus memiliki kondisi.Jika tidak ada kondisi
whenMatched
mengevaluasi yang benar untuk pasangan baris sumber dan target yang cocok dengan kondisi gabungan, baris target dibiarkan tidak berubah.Untuk memperbarui semua kolom tabel Delta target dengan kolom yang sesuai dari himpunan data sumber, gunakan
whenMatched(...).updateAll()
. Tindakan ini setara dengan:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
untuk semua kolom tabel Delta target. Oleh karena itu, tindakan ini mengasumsikan bahwa tabel sumber memiliki kolom yang sama dengan yang ada di tabel target, jika tidak, kueri tersebut akan menampilkan kesalahan analisis.
Catatan
Perilaku ini berubah ketika evolusi skema otomatis diaktifkan. Lihat evolusi skema otomatis untuk detailnya.
Klausul
whenNotMatched
dijalankan saat baris sumber tidak cocok dengan baris tabel target berdasarkan kondisi kecocokannya. Klausul ini memiliki semantik berikut.Klausul
whenNotMatched
hanya dapat memiliki tindakaninsert
. Baris baru dihasilkan berdasarkan kolom yang ditentukan dan ekspresi yang sesuai. Anda tidak perlu menentukan semua kolom dalam tabel target. Untuk kolom target yang tidak ditentukan,NULL
disisipkan.Setiap klausul
whenNotMatched
dapat memiliki kondisi opsional. Jika kondisi klausul ada, baris sumber hanya dimasukkan jika kondisi tersebut berlaku untuk baris tersebut. Jika tidak, kolom sumber diabaikan.Jika ada beberapa klausul
whenNotMatched
, klausul tersebut dievaluasi sesuai urutan yang telah ditentukan. Semua klausulwhenNotMatched
, kecuali yang terakhir, harus memiliki kondisi.Untuk menyisipkan semua kolom tabel Delta target dengan kolom yang sesuai dari himpunan data sumber, gunakan
whenNotMatched(...).insertAll()
. Tindakan ini setara dengan:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
untuk semua kolom tabel Delta target. Oleh karena itu, tindakan ini mengasumsikan bahwa tabel sumber memiliki kolom yang sama dengan yang ada di tabel target, jika tidak, kueri tersebut akan menampilkan kesalahan analisis.
Catatan
Perilaku ini berubah ketika evolusi skema otomatis diaktifkan. Lihat evolusi skema otomatis untuk detailnya.
whenNotMatchedBySource
klausa dijalankan ketika baris target tidak cocok dengan baris sumber apa pun berdasarkan kondisi penggabungan. Klausul ini memiliki semantik berikut.whenNotMatchedBySource
klausul dapat menentukandelete
danupdate
bertindak.- Setiap klausul
whenNotMatchedBySource
dapat memiliki kondisi opsional. Jika kondisi klausa ada, baris target dimodifikasi hanya jika kondisi tersebut benar untuk baris tersebut. Jika tidak, baris target dibiarkan tidak berubah. - Jika ada beberapa klausul
whenNotMatchedBySource
, klausul tersebut dievaluasi sesuai urutan yang telah ditentukan. Semua klausulwhenNotMatchedBySource
, kecuali yang terakhir, harus memiliki kondisi. - Menurut definisi,
whenNotMatchedBySource
klausa tidak memiliki baris sumber untuk menarik nilai kolom, sehingga kolom sumber tidak dapat dirujuk. Agar setiap kolom dimodifikasi, Anda dapat menentukan literal atau melakukan tindakan pada kolom target, sepertiSET target.deleted_count = target.deleted_count + 1
.
Penting
- Operasi
merge
dapat gagal jika beberapa baris himpunan data sumber cocok dan gabung mencoba memperbarui baris yang sama dari tabel Delta target. Menurut semantik SQL penggabungan, operasi pembaruan semacam itu ambigu karena tidak jelas baris sumber mana yang harus digunakan untuk memperbarui baris target yang cocok. Anda dapat memproses tabel sumber terlebih dahulu untuk menghilangkan kemungkinan beberapa kecocokan. - Anda dapat menerapkan operasi SQL
MERGE
pada TAMPILAN SQL hanya jika tampilan tersebut telah didefinisikan sebagaiCREATE VIEW viewName AS SELECT * FROM deltaTable
.
Deduplikasi data saat menulis ke dalam tabel Delta
Kasus penggunaan ETL yang umum adalah mengumpulkan log ke dalam tabel Delta dengan menambahkannya ke tabel. Namun, sering kali sumber dapat menghasilkan catatan log duplikat, sehingga langkah-langkah deduplikasi hilir diperlukan untuk menanganinya. Dengan merge
, Anda dapat menghindari memasukkan rekaman duplikat.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Catatan
Himpunan data yang berisi log baru perlu dideduplikasi dalam dirinya sendiri. Semantik SQL operasi gabung mencocokkan dan melakukan deduplikasi data baru dengan data yang sudah ada dalam tabel, tetapi jika ada data duplikat dalam himpunan data yang baru, data tersebut akan dimasukkan. Oleh karena itu, lakukan deduplikasi data baru sebelum menggabungkannya ke dalam tabel.
Jika Anda tahu bahwa Anda mungkin mendapatkan rekaman duplikat hanya selama beberapa hari, Anda bisa mengoptimalkan kueri Anda lebih lanjut dengan mempartisi tabel menurut tanggal, lalu menentukan rentang tanggal tabel target yang cocok.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
Tindakan ini lebih efisien daripada perintah sebelumnya karena hanya mencari duplikat dalam 7 hari terakhir log, bukan mencari di seluruh tabel. Selain itu, Anda dapat menggunakan penggabungan sisipkan-saja ini dengan Streaming Terstruktur untuk melakukan deduplikasi log secara terus menerus.
- Dalam kueri streaming, Anda dapat menggunakan operasi penggabungan di
foreachBatch
untuk terus menulis data streaming apa pun ke tabel Delta dengan deduplikasi. Lihat contoh streaming berikut untuk informasi lebih lanjut mengenaiforeachBatch
. - Dalam kueri streaming lain, Anda dapat terus membaca data yang telah dideduplikasi dari tabel Delta ini. Anda dapat membacanya karena penggabungan sisipkan-saja hanya menambahkan data baru ke tabel Delta.
Mengubah data secara perlahan (SCD) dan mengubah pengambilan data (CDC) dengan Delta Lake
Tabel Langsung Delta memiliki dukungan asli untuk melacak dan menerapkan SCD Tipe 1 dan Tipe 2. Gunakan APPLY CHANGES INTO
dengan Tabel Langsung Delta untuk memastikan bahwa rekaman yang tidak berurutan ditangani dengan benar saat memproses umpan CDC. Lihat TERAPKAN PERUBAHAN API: Menyederhanakan perubahan pengambilan data dengan Delta Live Tables.
Menyinkronkan tabel Delta secara bertahap dengan sumber
Di Databricks SQL dan Databricks Runtime 12.2 LTS ke atas, Anda dapat menggunakan WHEN NOT MATCHED BY SOURCE
untuk membuat kondisi semena-mena untuk menghapus dan mengganti sebagian tabel secara atomik. Ini bisa sangat berguna ketika Anda memiliki tabel sumber di mana rekaman dapat berubah atau dihapus selama beberapa hari setelah entri data awal, tetapi akhirnya diselesaikan ke status akhir.
Kueri berikut ini memperlihatkan penggunaan pola ini untuk memilih 5 hari rekaman dari sumber, memperbarui rekaman yang cocok di target, menyisipkan rekaman baru dari sumber ke target, dan menghapus semua rekaman yang tidak cocok dari 5 hari terakhir di target.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
Dengan menyediakan filter boolean yang sama pada tabel sumber dan target, Anda dapat menyebarkan perubahan secara dinamis dari sumber Anda ke tabel target, termasuk penghapusan.
Catatan
Meskipun pola ini dapat digunakan tanpa klausul kondisional apa pun, ini akan menyebabkan penulisan ulang tabel target sepenuhnya yang bisa mahal.