Upsert في جدول Delta Lake باستخدام الدمج
يمكنك رفع البيانات من جدول مصدر أو عرض أو DataFrame إلى جدول دلتا الهدف باستخدام MERGE
عملية SQL. يدعم Delta Lake عمليات الإدراج والتحديثات والحذف في MERGE
، ويدعم بناء الجملة الموسع خارج معايير SQL لتسهيل حالات الاستخدام المتقدمة.
لنفترض أن لديك جدول مصدر يسمى people10mupdates
أو مسار مصدر يحتوي على /tmp/delta/people-10m-updates
بيانات جديدة لجدول هدف مسمى people10m
أو مسار هدف في /tmp/delta/people-10m
. قد تكون بعض هذه السجلات الجديدة موجودة بالفعل في البيانات الهدف. لدمج البيانات الجديدة، تريد تحديث الصفوف حيث يكون الشخص id
موجودا بالفعل وإدراج الصفوف الجديدة حيث لا توجد مطابقة id
. يمكنك تشغيل الاستعلام التالي:
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
هام
يمكن أن يتطابق صف واحد فقط من الجدول المصدر مع صف معين في الجدول الهدف. في Databricks Runtime 16.0 وما فوق، MERGE
يقيم الشروط المحددة في WHEN MATCHED
عبارات و ON
لتحديد التطابقات المكررة. في Databricks Runtime 15.4 LTS وما دونه، MERGE
تنظر العمليات فقط في الشروط المحددة في ON
عبارة .
راجع وثائق واجهة برمجة تطبيقات Delta Lake للحصول على تفاصيل بناء جملة Scala وPython. للحصول على تفاصيل بناء جملة SQL، راجع MERGE INTO
في Databricks SQL وDatabricks Runtime 12.2 LTS وما فوق، يمكنك استخدام العبارة WHEN NOT MATCHED BY SOURCE
إلى UPDATE
أو DELETE
السجلات في الجدول الهدف التي لا تحتوي على سجلات مقابلة في الجدول المصدر. توصي Databricks بإضافة عبارة شرطية اختيارية لتجنب إعادة كتابة الجدول الهدف بالكامل.
يوضح مثال التعليمات البرمجية التالي بناء الجملة الأساسي لاستخدام هذا للحذف والكتابة فوق الجدول الهدف بمحتويات الجدول المصدر وحذف السجلات غير المتطابقة في الجدول الهدف. للحصول على نمط أكثر قابلية للتطوير للجداول حيث تكون تحديثات المصدر وحذفها محددة زمنيا، راجع مزامنة جدول Delta بشكل متزايد مع المصدر.
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
يضيف المثال التالي شروطا إلى العبارة WHEN NOT MATCHED BY SOURCE
ويحدد القيم المراد تحديثها في صفوف الهدف غير المتطابقة.
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
فيما يلي وصف مفصل لدلالات merge
العملية البرمجية.
يمكن أن يكون هناك أي عدد من
whenMatched
العبارات وwhenNotMatched
.whenMatched
يتم تنفيذ العبارات عندما يتطابق صف مصدر مع صف جدول هدف استنادا إلى شرط المطابقة. تحتوي هذه العبارات على الدلالات التالية.whenMatched
يمكن أن تحتوي العبارات على إجراء واحدupdate
delete
على الأكثر. يقومupdate
الإجراء الموجود فيmerge
فقط بتحديث الأعمدة المحددة (مشابهةupdate
للعملية) للصف الهدف المطابق. يحذفdelete
الإجراء الصف المطابق.يمكن أن يكون لكل
whenMatched
عبارة شرط اختياري. إذا كان شرط العبارةupdate
هذا موجودا، يتم تنفيذ الإجراء أوdelete
لأي زوج صف مطابق للهدف المصدر فقط عندما يكون شرط العبارة صحيحا.إذا كانت هناك عبارات متعددة
whenMatched
، تقييمها بالترتيب المحدد. يجب أن تحتوي جميعwhenMatched
العبارات، باستثناء العبارة الأخيرة، على شروط.إذا لم يتم تقييم أي من
whenMatched
الشروط إلى true لزوج صف المصدر والهدف الذي يطابق شرط الدمج، يتم ترك الصف الهدف دون تغيير.لتحديث جميع أعمدة جدول Delta الهدف بالأعمدة المقابلة لمجموعة البيانات المصدر، استخدم
whenMatched(...).updateAll()
. وهذا يعادل:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
لكافة أعمدة جدول Delta الهدف. لذلك، يفترض هذا الإجراء أن الجدول المصدر يحتوي على نفس الأعمدة الموجودة في الجدول الهدف، وإلا فإن الاستعلام يطرح خطأ تحليل.
ملاحظة
يتغير هذا السلوك عند تمكين تطور المخطط التلقائي. راجع تطور المخطط التلقائي للحصول على التفاصيل.
whenNotMatched
يتم تنفيذ العبارات عندما لا يتطابق صف المصدر مع أي صف هدف استنادا إلى شرط المطابقة. تحتوي هذه العبارات على الدلالات التالية.whenNotMatched
يمكن أن تحتوي العبارات علىinsert
الإجراء فقط. يتم إنشاء الصف الجديد استنادا إلى العمود المحدد والتعبيرات المقابلة. لا تحتاج إلى تحديد كافة الأعمدة في الجدول الهدف. بالنسبة للأعمدة المستهدفة غير المحددة،NULL
يتم إدراجها.يمكن أن يكون لكل
whenNotMatched
عبارة شرط اختياري. إذا كان شرط العبارة موجودا، يتم إدراج صف مصدر فقط إذا كان هذا الشرط صحيحا لهذا الصف. وإلا، يتم تجاهل العمود المصدر.إذا كانت هناك عبارات متعددة
whenNotMatched
، تقييمها بالترتيب المحدد. يجب أن تحتوي جميعwhenNotMatched
العبارات، باستثناء العبارة الأخيرة، على شروط.لإدراج كافة أعمدة جدول Delta الهدف مع الأعمدة المقابلة لمجموعة البيانات المصدر، استخدم
whenNotMatched(...).insertAll()
. وهذا يعادل:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
لكافة أعمدة جدول Delta الهدف. لذلك، يفترض هذا الإجراء أن الجدول المصدر يحتوي على نفس الأعمدة الموجودة في الجدول الهدف، وإلا فإن الاستعلام يطرح خطأ تحليل.
ملاحظة
يتغير هذا السلوك عند تمكين تطور المخطط التلقائي. راجع تطور المخطط التلقائي للحصول على التفاصيل.
whenNotMatchedBySource
يتم تنفيذ العبارات عندما لا يتطابق صف الهدف مع أي صف مصدر استنادا إلى شرط الدمج. تحتوي هذه العبارات على الدلالات التالية.whenNotMatchedBySource
يمكن أن تحددdelete
العبارات والإجراءاتupdate
.- يمكن أن يكون لكل
whenNotMatchedBySource
عبارة شرط اختياري. إذا كان شرط العبارة موجودا، يتم تعديل صف الهدف فقط إذا كان هذا الشرط صحيحا لهذا الصف. وإلا، يتم ترك الصف الهدف دون تغيير. - إذا كانت هناك عبارات متعددة
whenNotMatchedBySource
، تقييمها بالترتيب المحدد. يجب أن تحتوي جميعwhenNotMatchedBySource
العبارات، باستثناء العبارة الأخيرة، على شروط. - بحكم التعريف،
whenNotMatchedBySource
لا تحتوي العبارات على صف مصدر لسحب قيم الأعمدة منه، وبالتالي لا يمكن الرجوع إلى أعمدة المصدر. لكل عمود ليتم تعديله، يمكنك إما تحديد قيمة حرفية أو تنفيذ إجراء على العمود الهدف، مثلSET target.deleted_count = target.deleted_count + 1
.
هام
merge
يمكن أن تفشل العملية إذا تطابقت صفوف متعددة من مجموعة البيانات المصدر وحاول الدمج تحديث نفس صفوف جدول Delta الهدف. وفقا لدلالات SQL للدمج، فإن عملية التحديث هذه غامضة لأنه من غير الواضح أي صف مصدر يجب استخدامه لتحديث الصف الهدف المطابق. يمكنك معالجة الجدول المصدر مسبقا لإزالة إمكانية وجود تطابقات متعددة.- يمكنك تطبيق عملية SQL
MERGE
على طريقة عرض SQL فقط إذا تم تعريف طريقة العرض على أنهاCREATE VIEW viewName AS SELECT * FROM deltaTable
.
حالة استخدام ETL الشائعة هي جمع السجلات في جدول Delta عن طريق إلحاقها بجدول. ومع ذلك، غالبا ما يمكن للمصادر إنشاء سجلات سجل مكررة وخطوات إلغاء التكرار النهائية مطلوبة للعناية بها. باستخدام merge
، يمكنك تجنب إدراج السجلات المكررة.
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
ملاحظة
يجب إلغاء تكرار مجموعة البيانات التي تحتوي على السجلات الجديدة داخل نفسها. بواسطة دلالات SQL للدمج، فإنه يطابق البيانات الجديدة ويزيل تكرارها مع البيانات الموجودة في الجدول، ولكن إذا كانت هناك بيانات مكررة داخل مجموعة البيانات الجديدة، يتم إدراجها. ومن ثم، قم بإلغاء تكرار البيانات الجديدة قبل الدمج في الجدول.
إذا كنت تعلم أنك قد تحصل على سجلات مكررة لبضعة أيام فقط، فيمكنك تحسين الاستعلام بشكل أكبر عن طريق تقسيم الجدول حسب التاريخ، ثم تحديد نطاق التاريخ للجدول الهدف المراد مطابقته.
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
هذا أكثر كفاءة من الأمر السابق لأنه يبحث عن التكرارات فقط في آخر 7 أيام من السجلات، وليس الجدول بأكمله. علاوة على ذلك، يمكنك استخدام هذا الدمج الإدراج فقط مع Structured Streaming لإجراء إلغاء تكرار مستمر للسجلات.
- في استعلام دفق، يمكنك استخدام عملية الدمج في
foreachBatch
لكتابة أي بيانات دفق باستمرار إلى جدول Delta مع إلغاء التكرار. راجع مثال البث التالي للحصول على مزيد من المعلومات حولforeachBatch
. - في استعلام دفق آخر، يمكنك قراءة البيانات المكررة باستمرار من جدول Delta هذا. هذا ممكن لأن دمج الإدراج فقط يقوم فقط بإلحاق بيانات جديدة بجدول Delta.
تحتوي Delta Live Tables على دعم أصلي لتعقب وتطبيق SCD Type 1 والنوع 2. استخدم APPLY CHANGES INTO
مع Delta Live Tables للتأكد من معالجة السجلات غير المطلوبة بشكل صحيح عند معالجة موجزات CDC. راجع واجهات برمجة تطبيقات APPLY CHANGES: تبسيط التقاط بيانات التغيير باستخدام Delta Live Tables.
في Databricks SQL وDatabricks Runtime 12.2 LTS وما فوق، يمكنك استخدام WHEN NOT MATCHED BY SOURCE
لإنشاء شروط عشوائية لحذف جزء من الجدول واستبداله تلقائيا. يمكن أن يكون هذا مفيدا بشكل خاص عندما يكون لديك جدول مصدر حيث قد تتغير السجلات أو يتم حذفها لعدة أيام بعد إدخال البيانات الأولية، ولكن في النهاية يتم تسويتها إلى حالة نهائية.
يظهر الاستعلام التالي استخدام هذا النمط لتحديد 5 أيام من السجلات من المصدر، وتحديث السجلات المطابقة في الهدف، وإدراج سجلات جديدة من المصدر إلى الهدف، وحذف جميع السجلات غير المتطابقة من الأيام الخمسة الماضية في الهدف.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
من خلال توفير نفس عامل التصفية المنطقي على الجدولين المصدر والهدف، يمكنك نشر التغييرات ديناميكيا من المصدر إلى الجداول المستهدفة، بما في ذلك الحذف.
ملاحظة
في حين أنه يمكن استخدام هذا النمط دون أي عبارات شرطية، فإن هذا من شأنه أن يؤدي إلى إعادة كتابة الجدول الهدف بالكامل والذي يمكن أن يكون مكلفا.