Upsert ke dalam tabel Delta Lake menggunakan penggabungan

Anda dapat melakukan upsert data dari tabel sumber, tampilan, atau DataFrame ke dalam tabel Delta target dengan menggunakan operasi SQL MERGE. Delta Lake mendukung sisipan, pembaruan, dan penghapusan di MERGE, dan mendukung sintaks yang diperluas di luar standar SQL untuk memfasilitasi kasus penggunaan tingkat lanjut.

Misalkan Anda memiliki tabel sumber bernama people10mupdates atau jalur sumber di /tmp/delta/people-10m-updates yang berisi data baru untuk tabel target bernama people10m atau jalur target di /tmp/delta/people-10m. Beberapa rekaman baru ini mungkin sudah ada dalam data target. Untuk menggabungkan data baru, Anda harus memperbarui baris di mana id orang tersebut sudah ada dan menyisipkan baris baru di mana tidak ada id yang cocok. Anda bisa menjalankan kueri berikut:

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Lihat dokumentasi Delta Lake API untuk detail sintaks Scala dan Python. Untuk detail sintaks SQL, lihat MERGE INTO

Ubah semua baris yang tidak cocok menggunakan gabungkan

Di Databricks SQL dan Databricks Runtime 12.2 LTS ke atas, Anda dapat menggunakan WHEN NOT MATCHED BY SOURCE klausul ke UPDATE atau DELETE rekaman dalam tabel target yang tidak memiliki rekaman yang sesuai dalam tabel sumber. Databricks merekomendasikan penambahan klausul kondisional opsional untuk menghindari penulisan ulang tabel target sepenuhnya.

Contoh kode berikut menunjukkan sintaks dasar menggunakan ini untuk menghapus, menimpa tabel target dengan konten tabel sumber dan menghapus rekaman yang tidak cocok dalam tabel target. Untuk pola yang lebih dapat diskalakan untuk tabel di mana pembaruan dan penghapusan sumber terikat waktu, lihat Menyinkronkan tabel Delta secara bertahap dengan sumber.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

Contoh berikut menambahkan kondisi ke WHEN NOT MATCHED BY SOURCE klausul dan menentukan nilai untuk diperbarui dalam baris target yang tidak cocok.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

Menggabungkan semantik operasi

Berikut ini adalah deskripsi terperinci tentang merge semantik operasi terprogram.

  • Mungkin ada sejumlah klausul whenMatched dan whenNotMatched.

  • Klausul whenMatched dijalankan saat baris sumber cocok dengan baris tabel target berdasarkan kondisi kecocokan. Klausul ini memiliki semantik berikut.

    • Klausul whenMatched dapat memiliki paling banyak satu tindakan update dan satu tindakan delete. Tindakan update di merge hanya memperbarui kolom yang ditentukan (mirip dengan updateoperasi) baris target yang cocok. Tindakan delete akan menghapus baris yang cocok.

    • Setiap klausul whenMatched dapat memiliki kondisi opsional. Jika kondisi klausul ini ada, tindakan update atau tindakan delete dijalankan untuk pasangan baris target sumber yang cocok hanya jika kondisi klausul benar.

    • Jika ada beberapa klausul whenMatched, klausul tersebut dievaluasi sesuai urutan yang telah ditentukan. Semua klausul whenMatched, kecuali yang terakhir, harus memiliki kondisi.

    • Jika tidak ada kondisi whenMatched mengevaluasi yang benar untuk pasangan baris sumber dan target yang cocok dengan kondisi gabungan, baris target dibiarkan tidak berubah.

    • Untuk memperbarui semua kolom tabel Delta target dengan kolom yang sesuai dari himpunan data sumber, gunakan whenMatched(...).updateAll(). Tindakan ini setara dengan:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      untuk semua kolom tabel Delta target. Oleh karena itu, tindakan ini mengasumsikan bahwa tabel sumber memiliki kolom yang sama dengan yang ada di tabel target, jika tidak, kueri tersebut akan menampilkan kesalahan analisis.

      Catatan

      Perilaku ini berubah saat migrasi skema otomatis diaktifkan. Lihat evolusi skema otomatis untuk detailnya.

  • Klausul whenNotMatched dijalankan saat baris sumber tidak cocok dengan baris tabel target berdasarkan kondisi kecocokannya. Klausul ini memiliki semantik berikut.

    • Klausul whenNotMatched hanya dapat memiliki tindakan insert. Baris baru dihasilkan berdasarkan kolom yang ditentukan dan ekspresi yang sesuai. Anda tidak perlu menentukan semua kolom dalam tabel target. Untuk kolom target yang tidak ditentukan, NULL disisipkan.

    • Setiap klausul whenNotMatched dapat memiliki kondisi opsional. Jika kondisi klausul ada, baris sumber hanya dimasukkan jika kondisi tersebut berlaku untuk baris tersebut. Jika tidak, kolom sumber diabaikan.

    • Jika ada beberapa klausul whenNotMatched, klausul tersebut dievaluasi sesuai urutan yang telah ditentukan. Semua klausul whenNotMatched, kecuali yang terakhir, harus memiliki kondisi.

    • Untuk menyisipkan semua kolom tabel Delta target dengan kolom yang sesuai dari himpunan data sumber, gunakan whenNotMatched(...).insertAll(). Tindakan ini setara dengan:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      untuk semua kolom tabel Delta target. Oleh karena itu, tindakan ini mengasumsikan bahwa tabel sumber memiliki kolom yang sama dengan yang ada di tabel target, jika tidak, kueri tersebut akan menampilkan kesalahan analisis.

      Catatan

      Perilaku ini berubah saat migrasi skema otomatis diaktifkan. Lihat evolusi skema otomatis untuk detailnya.

  • whenNotMatchedBySource klausa dijalankan ketika baris target tidak cocok dengan baris sumber apa pun berdasarkan kondisi penggabungan. Klausul ini memiliki semantik berikut.

    • whenNotMatchedBySource klausul dapat menentukan delete dan update bertindak.
    • Setiap klausul whenNotMatchedBySource dapat memiliki kondisi opsional. Jika kondisi klausa ada, baris target dimodifikasi hanya jika kondisi tersebut benar untuk baris tersebut. Jika tidak, baris target dibiarkan tidak berubah.
    • Jika ada beberapa klausul whenNotMatchedBySource, klausul tersebut dievaluasi sesuai urutan yang telah ditentukan. Semua klausul whenNotMatchedBySource, kecuali yang terakhir, harus memiliki kondisi.
    • Menurut definisi, whenNotMatchedBySource klausa tidak memiliki baris sumber untuk menarik nilai kolom, sehingga kolom sumber tidak dapat dirujuk. Agar setiap kolom dimodifikasi, Anda dapat menentukan literal atau melakukan tindakan pada kolom target, seperti SET target.deleted_count = target.deleted_count + 1.

Penting

  • Operasi merge dapat gagal jika beberapa baris himpunan data sumber cocok dan gabung mencoba memperbarui baris yang sama dari tabel Delta target. Menurut semantik SQL penggabungan, operasi pembaruan semacam itu ambigu karena tidak jelas baris sumber mana yang harus digunakan untuk memperbarui baris target yang cocok. Anda dapat memproses tabel sumber terlebih dahulu untuk menghilangkan kemungkinan beberapa kecocokan.
  • Anda dapat menerapkan operasi SQL MERGE pada TAMPILAN SQL hanya jika tampilan tersebut telah didefinisikan sebagai CREATE VIEW viewName AS SELECT * FROM deltaTable.

Deduplikasi data saat menulis ke dalam tabel Delta

Kasus penggunaan ETL yang umum adalah mengumpulkan log ke dalam tabel Delta dengan menambahkannya ke tabel. Namun, sering kali sumber dapat menghasilkan catatan log duplikat, sehingga langkah-langkah deduplikasi hilir diperlukan untuk menanganinya. Dengan merge, Anda dapat menghindari memasukkan rekaman duplikat.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Catatan

Himpunan data yang berisi log baru perlu dideduplikasi dalam dirinya sendiri. Semantik SQL operasi gabung mencocokkan dan melakukan deduplikasi data baru dengan data yang sudah ada dalam tabel, tetapi jika ada data duplikat dalam himpunan data yang baru, data tersebut akan dimasukkan. Oleh karena itu, lakukan deduplikasi data baru sebelum menggabungkannya ke dalam tabel.

Jika Anda tahu bahwa Anda mungkin mendapatkan rekaman duplikat hanya selama beberapa hari, Anda bisa mengoptimalkan kueri Anda lebih lanjut dengan mempartisi tabel menurut tanggal, lalu menentukan rentang tanggal tabel target yang cocok.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Tindakan ini lebih efisien daripada perintah sebelumnya karena hanya mencari duplikat dalam 7 hari terakhir log, bukan mencari di seluruh tabel. Selain itu, Anda dapat menggunakan penggabungan sisipkan-saja ini dengan Streaming Terstruktur untuk melakukan deduplikasi log secara terus menerus.

  • Dalam kueri streaming, Anda dapat menggunakan operasi penggabungan di foreachBatch untuk terus menulis data streaming apa pun ke tabel Delta dengan deduplikasi. Lihat contoh streaming berikut untuk informasi lebih lanjut mengenai foreachBatch.
  • Dalam kueri streaming lain, Anda dapat terus membaca data yang telah dideduplikasi dari tabel Delta ini. Anda dapat membacanya karena penggabungan sisipkan-saja hanya menambahkan data baru ke tabel Delta.

Mengubah data secara perlahan (SCD) dan mengubah pengambilan data (CDC) dengan Delta Lake

Tabel Langsung Delta memiliki dukungan asli untuk melacak dan menerapkan SCD Tipe 1 dan Tipe 2. Gunakan APPLY CHANGES INTO dengan Tabel Langsung Delta untuk memastikan bahwa rekaman yang tidak berurutan ditangani dengan benar saat memproses umpan CDC. Lihat TERAPKAN PERUBAHAN API: Menyederhanakan perubahan pengambilan data di Tabel Langsung Delta.

Menyinkronkan tabel Delta secara bertahap dengan sumber

Di Databricks SQL dan Databricks Runtime 12.2 LTS ke atas, Anda dapat menggunakan WHEN NOT MATCHED BY SOURCE untuk membuat kondisi semena-mena untuk menghapus dan mengganti sebagian tabel secara atomik. Ini bisa sangat berguna ketika Anda memiliki tabel sumber di mana rekaman dapat berubah atau dihapus selama beberapa hari setelah entri data awal, tetapi akhirnya diselesaikan ke status akhir.

Kueri berikut ini memperlihatkan penggunaan pola ini untuk memilih 5 hari rekaman dari sumber, memperbarui rekaman yang cocok di target, menyisipkan rekaman baru dari sumber ke target, dan menghapus semua rekaman yang tidak cocok dari 5 hari terakhir di target.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

Dengan menyediakan filter boolean yang sama pada tabel sumber dan target, Anda dapat menyebarkan perubahan secara dinamis dari sumber Anda ke tabel target, termasuk penghapusan.

Catatan

Meskipun pola ini dapat digunakan tanpa klausul kondisional apa pun, ini akan menyebabkan penulisan ulang tabel target sepenuhnya yang bisa mahal.