Aracılığıyla paylaş


Birleştirmeyi kullanarak Delta Lake tablosuna upsert ekleme

SQL işlemini kullanarak bir kaynak tablo, görünüm veya DataFrame'den hedef Delta tablosuna MERGE veri ekleyebilirsiniz. Delta Lake içinde eklemeleri, güncelleştirmeleri ve silmeleri MERGEdestekler ve gelişmiş kullanım örneklerini kolaylaştırmak için SQL standartlarının ötesinde genişletilmiş söz dizimini destekler.

adlı bir kaynak tablonuz veya adlı people10mupdates bir hedef people10m tablo için yeni veriler içeren bir kaynak yolunuz /tmp/delta/people-10m-updates veya konumunda /tmp/delta/people-10mbir hedef yolunuz olduğunu varsayalım. Bu yeni kayıtlardan bazıları hedef verilerde zaten mevcut olabilir. Yeni verileri birleştirmek için, kişinin id zaten bulunduğu satırları güncelleştirmek ve eşleşme id olmayan yeni satırları eklemek istiyorsunuz. Aşağıdaki sorguyu çalıştırabilirsiniz:

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Önemli

Kaynak tablodan yalnızca tek bir satır hedef tablodaki belirli bir satırla eşleşebilir. Databricks Runtime 16.0 ve üzerinde yinelenen MERGE eşleşmeleri belirlemek için ve ON yan tümcelerinde WHEN MATCHED belirtilen koşulları değerlendirir. Databricks Runtime 15.4 LTS ve altında, MERGE işlemler yalnızca yan tümcesinde belirtilen koşulları dikkate ON alır.

Scala ve Python söz dizimi ayrıntıları için Delta Lake API belgelerine bakın. SQL söz dizimi ayrıntıları için bkz. MERGE INTO

Birleştirmeyi kullanarak eşleşmeyen tüm satırları değiştirme

Databricks SQL ve Databricks Runtime 12.2 LTS ve üzeri sürümlerde, hedef tablodaki yan DELETE tümcesini UPDATE WHEN NOT MATCHED BY SOURCE veya kaynak tabloda karşılık gelen kayıtları olmayan kayıtları kullanabilirsiniz. Databricks, hedef tablonun tam olarak yeniden yazılmasını önlemek için isteğe bağlı bir koşullu yan tümce eklenmesini önerir.

Aşağıdaki kod örneği, silme işlemleri için bunu kullanmanın temel söz dizimini, hedef tablonun üzerine kaynak tablonun içeriğiyle birlikte yazılmasını ve hedef tablodaki eşleşmeyen kayıtların silinmesini gösterir. Kaynak güncelleştirmelerin ve silmelerin zamana bağlı olduğu tablolar için daha ölçeklenebilir bir desen için bkz . Delta tablosunu kaynakla artımlı olarak eşitleme.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

Aşağıdaki örnek yan tümcesine WHEN NOT MATCHED BY SOURCE koşullar ekler ve eşleşmeyen hedef satırlarda güncelleştirilecek değerleri belirtir.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

Birleştirme işlemi semantiği

Programlı işlem semantiğinin merge ayrıntılı bir açıklaması aşağıdadır.

  • herhangi bir sayıda ve whenNotMatched yan tümcesi whenMatched olabilir.

  • whenMatched yan tümceleri, kaynak satır eşleşme koşuluna göre bir hedef tablo satırıyla eşleştiğinde yürütülür. Bu yan tümceler aşağıdaki semantiklere sahiptir.

    • whenMatched yan tümcelerinin en fazla bir update ve bir delete eylemi olabilir. update içindeki merge eylemi yalnızca eşleşen hedef satırın update belirtilen sütunlarını (işleme benzer şekilde) güncelleştirir. Eylem, delete eşleşen satırı siler.

    • Her whenMatched yan tümce isteğe bağlı bir koşula sahip olabilir. Bu yan tümce koşulu varsa, update veya delete eylemi yalnızca yan tümce koşulu true olduğunda eşleşen herhangi bir kaynak hedef satır çifti için yürütülür.

    • Birden çok whenMatched yan tümce varsa, bunlar belirtildikleri sırayla değerlendirilir. Sonuncusu dışındaki tüm whenMatched yan tümcelerin koşulları olmalıdır.

    • Birleştirme koşuluyla whenMatched eşleşen bir kaynak ve hedef satır çifti için koşulların hiçbiri true olarak değerlendirilmezse, hedef satır değişmeden bırakılır.

    • Hedef Delta tablosunun tüm sütunlarını kaynak veri kümesinin ilgili sütunlarıyla güncelleştirmek için kullanın whenMatched(...).updateAll(). Bu, şuna eşdeğerdir:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      hedef Delta tablosunun tüm sütunları için. Bu nedenle, bu eylem kaynak tablonun hedef tablodakilerle aynı sütunlara sahip olduğunu varsayar, aksi takdirde sorgu bir çözümleme hatası oluşturur.

      Not

      Otomatik şema evrimi etkinleştirildiğinde bu davranış değişir. Ayrıntılar için bkz . otomatik şema evrimi .

  • whenNotMatched yan tümceleri, bir kaynak satır eşleşme koşuluna göre herhangi bir hedef satırla eşleşmediğinde yürütülür. Bu yan tümceler aşağıdaki semantiklere sahiptir.

    • whenNotMatched yan tümceleri yalnızca eylemi içerebilir insert . Yeni satır, belirtilen sütuna ve karşılık gelen ifadelere göre oluşturulur. Hedef tablodaki tüm sütunları belirtmeniz gerekmez. Belirtilmemiş hedef sütunlar NULL için eklenir.

    • Her whenNotMatched yan tümce isteğe bağlı bir koşula sahip olabilir. Yan tümce koşulu varsa, kaynak satırın eklenmesi için söz konusu satırın koşulunun doğru olması gerekir. Aksi takdirde, kaynak sütun yoksayılır.

    • Birden çok whenNotMatched yan tümce varsa, bunlar belirtildikleri sırayla değerlendirilir. Sonuncusu dışındaki tüm whenNotMatched yan tümcelerin koşulları olmalıdır.

    • Hedef Delta tablosunun tüm sütunlarını kaynak veri kümesinin ilgili sütunlarıyla eklemek için kullanın whenNotMatched(...).insertAll(). Bu, şuna eşdeğerdir:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      hedef Delta tablosunun tüm sütunları için. Bu nedenle, bu eylem kaynak tablonun hedef tablodakilerle aynı sütunlara sahip olduğunu varsayar, aksi takdirde sorgu bir çözümleme hatası oluşturur.

      Not

      Otomatik şema evrimi etkinleştirildiğinde bu davranış değişir. Ayrıntılar için bkz . otomatik şema evrimi .

  • whenNotMatchedBySource yan tümceleri, bir hedef satır birleştirme koşuluna göre hiçbir kaynak satırla eşleşmediğinde yürütülür. Bu yan tümceler aşağıdaki semantiklere sahiptir.

    • whenNotMatchedBySourceyan tümceleri ve update eylemleri belirtebilirdelete.
    • Her whenNotMatchedBySource yan tümce isteğe bağlı bir koşula sahip olabilir. Yan tümce koşulu varsa, hedef satır yalnızca söz konusu satır için bu koşul doğru olduğunda değiştirilir. Aksi takdirde, hedef satır değişmeden bırakılır.
    • Birden çok whenNotMatchedBySource yan tümce varsa, bunlar belirtildikleri sırayla değerlendirilir. Sonuncusu dışındaki tüm whenNotMatchedBySource yan tümcelerin koşulları olmalıdır.
    • Tanım gereği, whenNotMatchedBySource yan tümcelerde sütun değerlerinin çekileceği bir kaynak satır yoktur ve bu nedenle kaynak sütunlara başvurulamazsınız. Değiştirilecek her sütun için bir değişmez değer belirtebilir veya hedef sütunda gibi SET target.deleted_count = target.deleted_count + 1bir eylem gerçekleştirebilirsiniz.

Önemli

  • merge Kaynak veri kümesinin birden çok satırı eşleşirse ve birleştirme hedef Delta tablosunun aynı satırlarını güncelleştirmeye çalışırsa işlem başarısız olabilir. Birleştirmenin SQL semantiğine göre, eşleşen hedef satırı güncelleştirmek için hangi kaynak satırın kullanılacağı belirsiz olduğundan bu tür bir güncelleştirme işlemi belirsizdir. Birden çok eşleşme olasılığını ortadan kaldırmak için kaynak tabloyu önceden işleyebilirsiniz.
  • SQL MERGE GÖRÜNÜMÜne SQL işlemi uygulayabilmeniz için görünümün olarak CREATE VIEW viewName AS SELECT * FROM deltaTabletanımlanmış olması gerekir.

Delta tablolarına yazarken yinelenen verileri kaldırma

Yaygın bir ETL kullanım örneği, günlükleri bir tabloya ekleyerek Delta tablosuna toplamaktır. Ancak, genellikle kaynaklar yinelenen günlük kayıtları oluşturabilir ve bunların bakımını yapmak için aşağı akış yinelenenleri kaldırma adımları gerekir. ile merge, yinelenen kayıtları eklemekten kaçınabilirsiniz.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Not

Yeni günlükleri içeren veri kümesinin kendi içinde yinelenenleri kaldırması gerekir. Birleştirmenin SQL semantiğiyle, yeni verileri tablodaki mevcut verilerle eşleştirir ve yinelenenleri kaldırır, ancak yeni veri kümesinde yinelenen veriler varsa eklenir. Bu nedenle, tabloyla birleştirmeden önce yeni verileri yinelenenleri kaldırın.

Yalnızca birkaç gün boyunca yinelenen kayıtlar alabileceğinizi biliyorsanız, tabloyu tarihe göre bölümleyerek ve ardından eşleşecek hedef tablonun tarih aralığını belirterek sorgunuzu daha da iyileştirebilirsiniz.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Bu, tablonun tamamında değil yalnızca son 7 günlükte yinelenenleri ararken önceki komuttan daha verimlidir. Ayrıca, günlüklerin sürekli yinelenenlerini kaldırma işlemini gerçekleştirmek için bu salt eklemeli birleştirmeyi Yapılandırılmış Akış ile kullanabilirsiniz.

  • Akış sorgusunda foreachBatch , yinelenenleri kaldırma ile bir Delta tablosuna sürekli olarak akış verileri yazmak için içindeki birleştirme işlemini kullanabilirsiniz. hakkında foreachBatchdaha fazla bilgi için aşağıdaki akış örneğine bakın.
  • Başka bir akış sorgusunda, bu Delta tablosundan yinelenenleri kaldırılmış verileri sürekli okuyabilirsiniz. Yalnızca ekleme birleştirme işlemi Delta tablosuna yalnızca yeni veriler eklediğinden bu mümkündür.

Delta Lake ile yavaş değişen veriler (SCD) ve veri yakalamayı (CDC) değiştirme

Delta Live Tables, SCD Tür 1 ve Tür 2'yi izlemek ve uygulamak için yerel desteğe sahiptir. CDC akışlarını işlerken sıra dışı kayıtların doğru işlendiğinden emin olmak için Delta Live Tables ile birlikte kullanın APPLY CHANGES INTO . Bkz . DEĞIŞIKLIKLERI UYGULAMA API'leri: Delta Live Tablolarıyla değişiklik verilerini yakalamayı basitleştirme.

Delta tablosunu kaynakla artımlı olarak eşitleme

Databricks SQL ve Databricks Runtime 12.2 LTS ve üzerinde, tablonun bir bölümünü atomik olarak silmek ve değiştirmek için rastgele koşullar oluşturmak için kullanabilirsiniz WHEN NOT MATCHED BY SOURCE . Bu, özellikle kayıtların ilk veri girdisi sonrasında birkaç gün boyunca değişebileceği veya silinebileceği, ancak sonunda son duruma geçebileceği bir kaynak tablonuz olduğunda yararlı olabilir.

Aşağıdaki sorguda, kaynaktan 5 günlük kayıtları seçmek, hedefteki eşleşen kayıtları güncelleştirmek, kaynaktan hedefe yeni kayıtlar eklemek ve hedefte son 5 günün eşleşmeyen tüm kayıtlarını silmek için bu desenin kullanılması gösterilmektedir.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

Kaynak ve hedef tablolarda aynı boole filtresini sağlayarak, silme işlemleri de dahil olmak üzere değişiklikleri kaynağınızdan hedef tablolara dinamik olarak yayabilirsiniz.

Not

Bu desen herhangi bir koşullu yan tümce olmadan kullanılabilse de, bu durum hedef tablonun tamamen yeniden yazılmasıyla sonuçlanabilir ve bu da pahalı olabilir.