Birleştirmeyi kullanarak Delta Lake tablosuna upsert ekleme
SQL işlemini kullanarak bir kaynak tablo, görünüm veya DataFrame'den hedef Delta tablosuna MERGE
veri ekleyebilirsiniz. Delta Lake içinde eklemeleri, güncelleştirmeleri ve silmeleri MERGE
destekler ve gelişmiş kullanım örneklerini kolaylaştırmak için SQL standartlarının ötesinde genişletilmiş söz dizimini destekler.
adlı bir kaynak tablonuz veya adlı people10mupdates
bir hedef people10m
tablo için yeni veriler içeren bir kaynak yolunuz /tmp/delta/people-10m-updates
veya konumunda /tmp/delta/people-10m
bir hedef yolunuz olduğunu varsayalım. Bu yeni kayıtlardan bazıları hedef verilerde zaten mevcut olabilir. Yeni verileri birleştirmek için, kişinin id
zaten bulunduğu satırları güncelleştirmek ve eşleşme id
olmayan yeni satırları eklemek istiyorsunuz. Aşağıdaki sorguyu çalıştırabilirsiniz:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Önemli
Kaynak tablodan yalnızca tek bir satır hedef tablodaki belirli bir satırla eşleşebilir. Databricks Runtime 16.0 ve üzerinde yinelenen MERGE
eşleşmeleri belirlemek için ve ON
yan tümcelerinde WHEN MATCHED
belirtilen koşulları değerlendirir. Databricks Runtime 15.4 LTS ve altında, MERGE
işlemler yalnızca yan tümcesinde belirtilen koşulları dikkate ON
alır.
Scala ve Python söz dizimi ayrıntıları için Delta Lake API belgelerine bakın. SQL söz dizimi ayrıntıları için bkz. MERGE INTO
Birleştirmeyi kullanarak eşleşmeyen tüm satırları değiştirme
Databricks SQL ve Databricks Runtime 12.2 LTS ve üzeri sürümlerde, hedef tablodaki yan DELETE
tümcesini UPDATE
WHEN NOT MATCHED BY SOURCE
veya kaynak tabloda karşılık gelen kayıtları olmayan kayıtları kullanabilirsiniz. Databricks, hedef tablonun tam olarak yeniden yazılmasını önlemek için isteğe bağlı bir koşullu yan tümce eklenmesini önerir.
Aşağıdaki kod örneği, silme işlemleri için bunu kullanmanın temel söz dizimini, hedef tablonun üzerine kaynak tablonun içeriğiyle birlikte yazılmasını ve hedef tablodaki eşleşmeyen kayıtların silinmesini gösterir. Kaynak güncelleştirmelerin ve silmelerin zamana bağlı olduğu tablolar için daha ölçeklenebilir bir desen için bkz . Delta tablosunu kaynakla artımlı olarak eşitleme.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Aşağıdaki örnek yan tümcesine WHEN NOT MATCHED BY SOURCE
koşullar ekler ve eşleşmeyen hedef satırlarda güncelleştirilecek değerleri belirtir.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
Birleştirme işlemi semantiği
Programlı işlem semantiğinin merge
ayrıntılı bir açıklaması aşağıdadır.
herhangi bir sayıda ve
whenNotMatched
yan tümcesiwhenMatched
olabilir.whenMatched
yan tümceleri, kaynak satır eşleşme koşuluna göre bir hedef tablo satırıyla eşleştiğinde yürütülür. Bu yan tümceler aşağıdaki semantiklere sahiptir.whenMatched
yan tümcelerinin en fazla birupdate
ve birdelete
eylemi olabilir.update
içindekimerge
eylemi yalnızca eşleşen hedef satırınupdate
belirtilen sütunlarını (işleme benzer şekilde) güncelleştirir. Eylem,delete
eşleşen satırı siler.Her
whenMatched
yan tümce isteğe bağlı bir koşula sahip olabilir. Bu yan tümce koşulu varsa,update
veyadelete
eylemi yalnızca yan tümce koşulu true olduğunda eşleşen herhangi bir kaynak hedef satır çifti için yürütülür.Birden çok
whenMatched
yan tümce varsa, bunlar belirtildikleri sırayla değerlendirilir. Sonuncusu dışındaki tümwhenMatched
yan tümcelerin koşulları olmalıdır.Birleştirme koşuluyla
whenMatched
eşleşen bir kaynak ve hedef satır çifti için koşulların hiçbiri true olarak değerlendirilmezse, hedef satır değişmeden bırakılır.Hedef Delta tablosunun tüm sütunlarını kaynak veri kümesinin ilgili sütunlarıyla güncelleştirmek için kullanın
whenMatched(...).updateAll()
. Bu, şuna eşdeğerdir:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
hedef Delta tablosunun tüm sütunları için. Bu nedenle, bu eylem kaynak tablonun hedef tablodakilerle aynı sütunlara sahip olduğunu varsayar, aksi takdirde sorgu bir çözümleme hatası oluşturur.
Not
Otomatik şema evrimi etkinleştirildiğinde bu davranış değişir. Ayrıntılar için bkz . otomatik şema evrimi .
whenNotMatched
yan tümceleri, bir kaynak satır eşleşme koşuluna göre herhangi bir hedef satırla eşleşmediğinde yürütülür. Bu yan tümceler aşağıdaki semantiklere sahiptir.whenNotMatched
yan tümceleri yalnızca eylemi içerebilirinsert
. Yeni satır, belirtilen sütuna ve karşılık gelen ifadelere göre oluşturulur. Hedef tablodaki tüm sütunları belirtmeniz gerekmez. Belirtilmemiş hedef sütunlarNULL
için eklenir.Her
whenNotMatched
yan tümce isteğe bağlı bir koşula sahip olabilir. Yan tümce koşulu varsa, kaynak satırın eklenmesi için söz konusu satırın koşulunun doğru olması gerekir. Aksi takdirde, kaynak sütun yoksayılır.Birden çok
whenNotMatched
yan tümce varsa, bunlar belirtildikleri sırayla değerlendirilir. Sonuncusu dışındaki tümwhenNotMatched
yan tümcelerin koşulları olmalıdır.Hedef Delta tablosunun tüm sütunlarını kaynak veri kümesinin ilgili sütunlarıyla eklemek için kullanın
whenNotMatched(...).insertAll()
. Bu, şuna eşdeğerdir:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
hedef Delta tablosunun tüm sütunları için. Bu nedenle, bu eylem kaynak tablonun hedef tablodakilerle aynı sütunlara sahip olduğunu varsayar, aksi takdirde sorgu bir çözümleme hatası oluşturur.
Not
Otomatik şema evrimi etkinleştirildiğinde bu davranış değişir. Ayrıntılar için bkz . otomatik şema evrimi .
whenNotMatchedBySource
yan tümceleri, bir hedef satır birleştirme koşuluna göre hiçbir kaynak satırla eşleşmediğinde yürütülür. Bu yan tümceler aşağıdaki semantiklere sahiptir.whenNotMatchedBySource
yan tümceleri veupdate
eylemleri belirtebilirdelete
.- Her
whenNotMatchedBySource
yan tümce isteğe bağlı bir koşula sahip olabilir. Yan tümce koşulu varsa, hedef satır yalnızca söz konusu satır için bu koşul doğru olduğunda değiştirilir. Aksi takdirde, hedef satır değişmeden bırakılır. - Birden çok
whenNotMatchedBySource
yan tümce varsa, bunlar belirtildikleri sırayla değerlendirilir. Sonuncusu dışındaki tümwhenNotMatchedBySource
yan tümcelerin koşulları olmalıdır. - Tanım gereği,
whenNotMatchedBySource
yan tümcelerde sütun değerlerinin çekileceği bir kaynak satır yoktur ve bu nedenle kaynak sütunlara başvurulamazsınız. Değiştirilecek her sütun için bir değişmez değer belirtebilir veya hedef sütunda gibiSET target.deleted_count = target.deleted_count + 1
bir eylem gerçekleştirebilirsiniz.
Önemli
merge
Kaynak veri kümesinin birden çok satırı eşleşirse ve birleştirme hedef Delta tablosunun aynı satırlarını güncelleştirmeye çalışırsa işlem başarısız olabilir. Birleştirmenin SQL semantiğine göre, eşleşen hedef satırı güncelleştirmek için hangi kaynak satırın kullanılacağı belirsiz olduğundan bu tür bir güncelleştirme işlemi belirsizdir. Birden çok eşleşme olasılığını ortadan kaldırmak için kaynak tabloyu önceden işleyebilirsiniz.- SQL
MERGE
GÖRÜNÜMÜne SQL işlemi uygulayabilmeniz için görünümün olarakCREATE VIEW viewName AS SELECT * FROM deltaTable
tanımlanmış olması gerekir.
Delta tablolarına yazarken yinelenen verileri kaldırma
Yaygın bir ETL kullanım örneği, günlükleri bir tabloya ekleyerek Delta tablosuna toplamaktır. Ancak, genellikle kaynaklar yinelenen günlük kayıtları oluşturabilir ve bunların bakımını yapmak için aşağı akış yinelenenleri kaldırma adımları gerekir. ile merge
, yinelenen kayıtları eklemekten kaçınabilirsiniz.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Not
Yeni günlükleri içeren veri kümesinin kendi içinde yinelenenleri kaldırması gerekir. Birleştirmenin SQL semantiğiyle, yeni verileri tablodaki mevcut verilerle eşleştirir ve yinelenenleri kaldırır, ancak yeni veri kümesinde yinelenen veriler varsa eklenir. Bu nedenle, tabloyla birleştirmeden önce yeni verileri yinelenenleri kaldırın.
Yalnızca birkaç gün boyunca yinelenen kayıtlar alabileceğinizi biliyorsanız, tabloyu tarihe göre bölümleyerek ve ardından eşleşecek hedef tablonun tarih aralığını belirterek sorgunuzu daha da iyileştirebilirsiniz.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
Bu, tablonun tamamında değil yalnızca son 7 günlükte yinelenenleri ararken önceki komuttan daha verimlidir. Ayrıca, günlüklerin sürekli yinelenenlerini kaldırma işlemini gerçekleştirmek için bu salt eklemeli birleştirmeyi Yapılandırılmış Akış ile kullanabilirsiniz.
- Akış sorgusunda
foreachBatch
, yinelenenleri kaldırma ile bir Delta tablosuna sürekli olarak akış verileri yazmak için içindeki birleştirme işlemini kullanabilirsiniz. hakkındaforeachBatch
daha fazla bilgi için aşağıdaki akış örneğine bakın. - Başka bir akış sorgusunda, bu Delta tablosundan yinelenenleri kaldırılmış verileri sürekli okuyabilirsiniz. Yalnızca ekleme birleştirme işlemi Delta tablosuna yalnızca yeni veriler eklediğinden bu mümkündür.
Delta Lake ile yavaş değişen veriler (SCD) ve veri yakalamayı (CDC) değiştirme
Delta Live Tables, SCD Tür 1 ve Tür 2'yi izlemek ve uygulamak için yerel desteğe sahiptir. CDC akışlarını işlerken sıra dışı kayıtların doğru işlendiğinden emin olmak için Delta Live Tables ile birlikte kullanın APPLY CHANGES INTO
. Bkz . DEĞIŞIKLIKLERI UYGULAMA API'leri: Delta Live Tablolarıyla değişiklik verilerini yakalamayı basitleştirme.
Delta tablosunu kaynakla artımlı olarak eşitleme
Databricks SQL ve Databricks Runtime 12.2 LTS ve üzerinde, tablonun bir bölümünü atomik olarak silmek ve değiştirmek için rastgele koşullar oluşturmak için kullanabilirsiniz WHEN NOT MATCHED BY SOURCE
. Bu, özellikle kayıtların ilk veri girdisi sonrasında birkaç gün boyunca değişebileceği veya silinebileceği, ancak sonunda son duruma geçebileceği bir kaynak tablonuz olduğunda yararlı olabilir.
Aşağıdaki sorguda, kaynaktan 5 günlük kayıtları seçmek, hedefteki eşleşen kayıtları güncelleştirmek, kaynaktan hedefe yeni kayıtlar eklemek ve hedefte son 5 günün eşleşmeyen tüm kayıtlarını silmek için bu desenin kullanılması gösterilmektedir.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
Kaynak ve hedef tablolarda aynı boole filtresini sağlayarak, silme işlemleri de dahil olmak üzere değişiklikleri kaynağınızdan hedef tablolara dinamik olarak yayabilirsiniz.
Not
Bu desen herhangi bir koşullu yan tümce olmadan kullanılabilse de, bu durum hedef tablonun tamamen yeniden yazılmasıyla sonuçlanabilir ve bu da pahalı olabilir.