Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Anda dapat melakukan upsert data dari tabel sumber, tampilan, atau DataFrame ke dalam tabel Delta target dengan menggunakan operasi SQL MERGE. Delta Lake mendukung sisipan, pembaruan, dan penghapusan di MERGE, dan mendukung sintaks yang diperluas di luar standar SQL untuk memfasilitasi kasus penggunaan tingkat lanjut.
Misalkan Anda memiliki tabel sumber bernama people10mupdates atau jalur sumber di /tmp/delta/people-10m-updates yang berisi data baru untuk tabel target bernama people10m atau jalur target di /tmp/delta/people-10m. Beberapa rekaman baru ini mungkin sudah ada dalam data target. Untuk menggabungkan data-data baru, Anda ingin memperbarui baris di mana id orang tersebut sudah ada dan menyisipkan baris baru di mana tidak ada id yang cocok. Anda bisa menjalankan kueri berikut:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Phyton
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Penting
Hanya satu baris dari tabel sumber yang dapat mencocokkan baris tertentu dalam tabel target. Dalam Databricks Runtime 16.0 ke atas, MERGE mengevaluasi kondisi yang ditentukan dalam WHEN MATCHED klausul dan ON untuk menentukan kecocokan duplikat. Dalam Databricks Runtime 15.4 LTS dan versi sebelumnya, MERGE operasi hanya mempertimbangkan kondisi yang ditentukan dalam ON klausa.
Lihat dokumentasi Delta Lake API untuk detail sintaks Scala dan Python. Untuk detail sintaks SQL, lihat MERGE INTO
Ubah semua baris yang tidak cocok menggunakan gabungkan
Di Databricks SQL dan Databricks Runtime 12.2 LTS ke atas, Anda dapat menggunakan klausul WHEN NOT MATCHED BY SOURCE untuk UPDATE atau DELETE catatan dalam tabel target yang tidak memiliki catatan yang sesuai di tabel sumber. Databricks merekomendasikan penambahan klausul kondisional opsional untuk menghindari penulisan ulang tabel target sepenuhnya.
Contoh kode berikut menunjukkan sintaks dasar menggunakan ini untuk operasi penghapusan, menimpa tabel target dengan konten tabel sumber dan menghapus catatan yang tidak cocok dalam tabel target. Untuk pola yang lebih dapat diskalakan untuk tabel di mana pembaruan dan penghapusan sumber terikat waktu, lihat Menyinkronkan tabel Delta secara bertahap dengan sumber.
Phyton
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Contoh berikut menambahkan kondisi ke WHEN NOT MATCHED BY SOURCE klausul dan menentukan nilai untuk diperbarui dalam baris target yang tidak cocok.
Phyton
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
Semantik operasi penggabungan
Berikut ini adalah deskripsi terperinci tentang merge semantik operasi terprogram.
Mungkin ada sejumlah klausul
whenMatcheddanwhenNotMatched.Klausul
whenMatcheddijalankan saat baris sumber cocok dengan baris tabel target berdasarkan kondisi kecocokan. Klausul ini memiliki semantik berikut.Klausul
whenMatcheddapat memiliki paling banyak satu tindakanupdatedan satu tindakandelete. Tindakanupdatedimergehanya memperbarui kolom yang ditentukan (mirip denganupdateoperasi) pada baris target yang cocok. Tindakandeleteakan menghapus baris yang cocok.Setiap klausul
whenMatcheddapat memiliki kondisi opsional. Jika kondisi klausul ini ada, tindakanupdateatau tindakandeletedijalankan untuk pasangan baris target sumber yang cocok hanya jika kondisi klausul benar.Jika ada beberapa klausul
whenMatched, klausul tersebut dievaluasi sesuai urutan yang telah ditentukan. Semua klausulwhenMatched, kecuali yang terakhir, harus memiliki kondisi.Jika tidak ada kondisi
whenMatchedyang dievaluasi sebagai benar untuk pasangan baris sumber dan baris target yang memenuhi kondisi penggabungan, maka baris target dibiarkan tidak berubah.Untuk memperbarui semua kolom tabel Delta target dengan kolom yang sesuai dari himpunan data sumber, gunakan
whenMatched(...).updateAll(). Tindakan ini setara dengan:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))untuk semua kolom pada tabel Delta yang ditargetkan. Oleh karena itu, tindakan ini mengasumsikan bahwa tabel sumber memiliki kolom yang sama dengan yang ada di tabel target, jika tidak, kueri tersebut akan menampilkan kesalahan analisis.
Catatan
Perilaku ini berubah ketika evolusi skema otomatis diaktifkan. Lihat evolusi skema otomatis untuk informasi lebih lanjut.
Klausul
whenNotMatcheddijalankan saat baris sumber tidak cocok dengan baris tabel target berdasarkan kondisi kecocokannya. Klausul ini memiliki semantik berikut.Klausul
whenNotMatchedhanya dapat memiliki tindakaninsert. Baris baru dihasilkan berdasarkan kolom yang ditentukan dan ekspresi yang sesuai. Anda tidak perlu menentukan semua kolom dalam tabel target. Untuk kolom target yang tidak ditentukan,NULLdisisipkan.Setiap klausul
whenNotMatcheddapat memiliki kondisi opsional. Jika kondisi klausul ada, baris sumber hanya dimasukkan jika kondisi tersebut berlaku untuk baris tersebut. Jika tidak, kolom sumber diabaikan.Jika ada beberapa klausul
whenNotMatched, klausul tersebut dievaluasi sesuai urutan yang telah ditentukan. Semua klausulwhenNotMatched, kecuali yang terakhir, harus memiliki kondisi.Untuk menyisipkan semua kolom tabel Delta target dengan kolom yang sesuai dari himpunan data sumber, gunakan
whenNotMatched(...).insertAll(). Tindakan ini setara dengan:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))untuk semua kolom pada tabel Delta yang ditargetkan. Oleh karena itu, tindakan ini mengasumsikan bahwa tabel sumber memiliki kolom yang sama dengan yang ada di tabel target, jika tidak, kueri tersebut akan menampilkan kesalahan analisis.
Catatan
Perilaku ini berubah ketika evolusi skema otomatis diaktifkan. Lihat evolusi skema otomatis untuk informasi lebih lanjut.
whenNotMatchedBySourceklausa dijalankan ketika baris target tidak cocok dengan baris sumber apa pun berdasarkan kondisi penggabungan. Klausul ini memiliki semantik berikut.-
whenNotMatchedBySourceklausul dapat menentukandeletedanupdatetindakan. - Setiap klausul
whenNotMatchedBySourcedapat memiliki kondisi opsional. Jika kondisi klausa ada, baris target dimodifikasi hanya jika kondisi tersebut benar untuk baris tersebut. Jika tidak, baris target dibiarkan tidak berubah. - Jika ada beberapa klausul
whenNotMatchedBySource, klausul tersebut dievaluasi sesuai urutan yang telah ditentukan. Semua klausulwhenNotMatchedBySource, kecuali yang terakhir, harus memiliki kondisi. - Menurut definisi,
whenNotMatchedBySourceklausa tidak memiliki baris sumber untuk menarik nilai kolom, sehingga kolom sumber tidak dapat dirujuk. Untuk setiap kolom yang akan dimodifikasi, Anda dapat menentukan literal atau melakukan tindakan pada kolom target, misalnyaSET target.deleted_count = target.deleted_count + 1.
-
Penting
- Operasi
mergedapat gagal jika beberapa baris himpunan data sumber cocok dan proses penggabungan berupaya memperbarui baris yang sama dari tabel Delta tujuan. Menurut semantik SQL penggabungan, operasi pembaruan semacam itu ambigu karena tidak jelas baris sumber mana yang harus digunakan untuk memperbarui baris target yang cocok. Anda dapat memproses tabel sumber terlebih dahulu untuk menghilangkan kemungkinan beberapa kecocokan. - Anda dapat menerapkan operasi SQL
MERGEpada TAMPILAN SQL hanya jika tampilan tersebut telah didefinisikan sebagaiCREATE VIEW viewName AS SELECT * FROM deltaTable.
Deduplikasi data saat menulis ke dalam tabel Delta
Kasus penggunaan ETL yang umum adalah mengumpulkan log ke dalam tabel Delta dengan menambahkannya ke tabel. Namun, sering kali sumber dapat menghasilkan catatan log duplikat, sehingga langkah-langkah deduplikasi hilir diperlukan untuk menanganinya. Dengan merge, Anda dapat menghindari memasukkan rekaman duplikat.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Phyton
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Catatan
Himpunan data yang berisi log baru perlu dideduplikasi dalam dirinya sendiri. Semantik SQL operasi gabung mencocokkan dan melakukan deduplikasi data baru dengan data yang sudah ada dalam tabel, tetapi jika ada data duplikat dalam himpunan data yang baru, data tersebut akan dimasukkan. Oleh karena itu, lakukan deduplikasi data baru sebelum menggabungkannya ke dalam tabel.
Jika Anda tahu bahwa Anda mungkin mendapatkan rekaman duplikat hanya selama beberapa hari, Anda bisa mengoptimalkan kueri Anda lebih lanjut dengan mempartisi tabel menurut tanggal, lalu menentukan rentang tanggal tabel target yang cocok.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Phyton
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
Tindakan ini lebih efisien daripada perintah sebelumnya karena hanya mencari duplikat dalam 7 hari terakhir log, bukan mencari di seluruh tabel. Selain itu, Anda dapat menggunakan penggabungan sisipkan-saja ini dengan Streaming Terstruktur untuk melakukan deduplikasi log secara terus menerus.
- Dalam kueri streaming, Anda dapat menggunakan operasi penggabungan di
foreachBatchuntuk terus menulis data streaming apa pun ke tabel Delta dengan deduplikasi. Lihat contoh streaming berikut untuk informasi lebih lanjut mengenaiforeachBatch. - Dalam kueri streaming lain, Anda dapat terus membaca data yang telah dideduplikasi dari tabel Delta ini. Hal ini dimungkinkan karena proses penggabungan 'insert-only' hanya menambahkan data baru ke tabel Delta.
Data yang berubah secara perlahan (SCD) dan penangkapan perubahan data (CDC) dengan Delta Lake
Alur Deklaratif Lakeflow Spark memiliki dukungan asli untuk melacak dan menerapkan SCD Tipe 1 dan Tipe 2. Gunakan AUTO CDC ... INTO dengan Alur Deklaratif Lakeflow Spark untuk memastikan bahwa rekaman yang tidak berurutan ditangani dengan benar saat memproses umpan CDC. Lihat API CDC Otomatis: Menyederhanakan penangkapan perubahan data menggunakan pipeline.
Menyinkronkan tabel Delta secara bertahap dengan sumber
Di Databricks SQL dan Databricks Runtime 12.2 LTS ke atas, Anda dapat menggunakan WHEN NOT MATCHED BY SOURCE untuk membuat kondisi semena-mena untuk menghapus dan mengganti sebagian tabel secara atomik. Ini bisa sangat berguna ketika Anda memiliki tabel sumber di mana rekaman dapat berubah atau dihapus selama beberapa hari setelah entri data awal, tetapi akhirnya diselesaikan ke status akhir.
Kueri berikut ini memperlihatkan penggunaan pola ini untuk memilih 5 hari rekaman dari sumber, memperbarui rekaman yang cocok di target, menyisipkan rekaman baru dari sumber ke target, dan menghapus semua rekaman yang tidak cocok dari 5 hari terakhir di target.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
Dengan menyediakan filter boolean yang sama pada tabel sumber dan target, Anda dapat menyebarkan perubahan secara dinamis dari sumber Anda ke tabel target, termasuk penghapusan.
Catatan
Meskipun pola ini dapat digunakan tanpa klausul kondisional apa pun, ini akan menyebabkan penulisan ulang tabel target sepenuhnya yang bisa mahal.