اقرأ باللغة الإنجليزية

مشاركة عبر


Upsert في جدول Delta Lake باستخدام الدمج

يمكنك رفع البيانات من جدول مصدر أو عرض أو DataFrame إلى جدول دلتا الهدف باستخدام MERGE عملية SQL. يدعم Delta Lake عمليات الإدراج والتحديثات والحذف في MERGE، ويدعم بناء الجملة الموسع خارج معايير SQL لتسهيل حالات الاستخدام المتقدمة.

لنفترض أن لديك جدول مصدر يسمى people10mupdates أو مسار مصدر يحتوي على /tmp/delta/people-10m-updates بيانات جديدة لجدول هدف مسمى people10m أو مسار هدف في /tmp/delta/people-10m. قد تكون بعض هذه السجلات الجديدة موجودة بالفعل في البيانات الهدف. لدمج البيانات الجديدة، تريد تحديث الصفوف حيث يكون الشخص id موجودا بالفعل وإدراج الصفوف الجديدة حيث لا توجد مطابقة id . يمكنك تشغيل الاستعلام التالي:

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

هام

يمكن أن يتطابق صف واحد فقط من الجدول المصدر مع صف معين في الجدول الهدف. في Databricks Runtime 16.0 وما فوق، MERGE يقيم الشروط المحددة في WHEN MATCHED عبارات و ON لتحديد التطابقات المكررة. في Databricks Runtime 15.4 LTS وما دونه، MERGE تنظر العمليات فقط في الشروط المحددة في ON عبارة .

راجع وثائق واجهة برمجة تطبيقات Delta Lake للحصول على تفاصيل بناء جملة Scala وPython. للحصول على تفاصيل بناء جملة SQL، راجع MERGE INTO

تعديل كافة الصفوف غير المتطابقة باستخدام الدمج

في Databricks SQL وDatabricks Runtime 12.2 LTS وما فوق، يمكنك استخدام العبارة WHEN NOT MATCHED BY SOURCE إلى UPDATE أو DELETE السجلات في الجدول الهدف التي لا تحتوي على سجلات مقابلة في الجدول المصدر. توصي Databricks بإضافة عبارة شرطية اختيارية لتجنب إعادة كتابة الجدول الهدف بالكامل.

يوضح مثال التعليمات البرمجية التالي بناء الجملة الأساسي لاستخدام هذا للحذف والكتابة فوق الجدول الهدف بمحتويات الجدول المصدر وحذف السجلات غير المتطابقة في الجدول الهدف. للحصول على نمط أكثر قابلية للتطوير للجداول حيث تكون تحديثات المصدر وحذفها محددة زمنيا، راجع مزامنة جدول Delta بشكل متزايد مع المصدر.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

يضيف المثال التالي شروطا إلى العبارة WHEN NOT MATCHED BY SOURCE ويحدد القيم المراد تحديثها في صفوف الهدف غير المتطابقة.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

دلالات عملية الدمج

فيما يلي وصف مفصل لدلالات merge العملية البرمجية.

  • يمكن أن يكون هناك أي عدد من whenMatched العبارات و whenNotMatched .

  • whenMatched يتم تنفيذ العبارات عندما يتطابق صف مصدر مع صف جدول هدف استنادا إلى شرط المطابقة. تحتوي هذه العبارات على الدلالات التالية.

    • whenMatched يمكن أن تحتوي العبارات على إجراء واحد update delete على الأكثر. يقوم update الإجراء الموجود في merge فقط بتحديث الأعمدة المحددة (مشابهة update للعملية) للصف الهدف المطابق. يحذف delete الإجراء الصف المطابق.

    • يمكن أن يكون لكل whenMatched عبارة شرط اختياري. إذا كان شرط العبارة update هذا موجودا، يتم تنفيذ الإجراء أو delete لأي زوج صف مطابق للهدف المصدر فقط عندما يكون شرط العبارة صحيحا.

    • إذا كانت هناك عبارات متعددة whenMatched ، تقييمها بالترتيب المحدد. يجب أن تحتوي جميع whenMatched العبارات، باستثناء العبارة الأخيرة، على شروط.

    • إذا لم يتم تقييم أي من whenMatched الشروط إلى true لزوج صف المصدر والهدف الذي يطابق شرط الدمج، يتم ترك الصف الهدف دون تغيير.

    • لتحديث جميع أعمدة جدول Delta الهدف بالأعمدة المقابلة لمجموعة البيانات المصدر، استخدم whenMatched(...).updateAll(). وهذا يعادل:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      لكافة أعمدة جدول Delta الهدف. لذلك، يفترض هذا الإجراء أن الجدول المصدر يحتوي على نفس الأعمدة الموجودة في الجدول الهدف، وإلا فإن الاستعلام يطرح خطأ تحليل.

      ملاحظة

      يتغير هذا السلوك عند تمكين تطور المخطط التلقائي. راجع تطور المخطط التلقائي للحصول على التفاصيل.

  • whenNotMatched يتم تنفيذ العبارات عندما لا يتطابق صف المصدر مع أي صف هدف استنادا إلى شرط المطابقة. تحتوي هذه العبارات على الدلالات التالية.

    • whenNotMatched يمكن أن تحتوي العبارات على insert الإجراء فقط. يتم إنشاء الصف الجديد استنادا إلى العمود المحدد والتعبيرات المقابلة. لا تحتاج إلى تحديد كافة الأعمدة في الجدول الهدف. بالنسبة للأعمدة المستهدفة غير المحددة، NULL يتم إدراجها.

    • يمكن أن يكون لكل whenNotMatched عبارة شرط اختياري. إذا كان شرط العبارة موجودا، يتم إدراج صف مصدر فقط إذا كان هذا الشرط صحيحا لهذا الصف. وإلا، يتم تجاهل العمود المصدر.

    • إذا كانت هناك عبارات متعددة whenNotMatched ، تقييمها بالترتيب المحدد. يجب أن تحتوي جميع whenNotMatched العبارات، باستثناء العبارة الأخيرة، على شروط.

    • لإدراج كافة أعمدة جدول Delta الهدف مع الأعمدة المقابلة لمجموعة البيانات المصدر، استخدم whenNotMatched(...).insertAll(). وهذا يعادل:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      لكافة أعمدة جدول Delta الهدف. لذلك، يفترض هذا الإجراء أن الجدول المصدر يحتوي على نفس الأعمدة الموجودة في الجدول الهدف، وإلا فإن الاستعلام يطرح خطأ تحليل.

      ملاحظة

      يتغير هذا السلوك عند تمكين تطور المخطط التلقائي. راجع تطور المخطط التلقائي للحصول على التفاصيل.

  • whenNotMatchedBySource يتم تنفيذ العبارات عندما لا يتطابق صف الهدف مع أي صف مصدر استنادا إلى شرط الدمج. تحتوي هذه العبارات على الدلالات التالية.

    • whenNotMatchedBySource يمكن أن تحدد delete العبارات والإجراءات update .
    • يمكن أن يكون لكل whenNotMatchedBySource عبارة شرط اختياري. إذا كان شرط العبارة موجودا، يتم تعديل صف الهدف فقط إذا كان هذا الشرط صحيحا لهذا الصف. وإلا، يتم ترك الصف الهدف دون تغيير.
    • إذا كانت هناك عبارات متعددة whenNotMatchedBySource ، تقييمها بالترتيب المحدد. يجب أن تحتوي جميع whenNotMatchedBySource العبارات، باستثناء العبارة الأخيرة، على شروط.
    • بحكم التعريف، whenNotMatchedBySource لا تحتوي العبارات على صف مصدر لسحب قيم الأعمدة منه، وبالتالي لا يمكن الرجوع إلى أعمدة المصدر. لكل عمود ليتم تعديله، يمكنك إما تحديد قيمة حرفية أو تنفيذ إجراء على العمود الهدف، مثل SET target.deleted_count = target.deleted_count + 1.

هام

  • merge يمكن أن تفشل العملية إذا تطابقت صفوف متعددة من مجموعة البيانات المصدر وحاول الدمج تحديث نفس صفوف جدول Delta الهدف. وفقا لدلالات SQL للدمج، فإن عملية التحديث هذه غامضة لأنه من غير الواضح أي صف مصدر يجب استخدامه لتحديث الصف الهدف المطابق. يمكنك معالجة الجدول المصدر مسبقا لإزالة إمكانية وجود تطابقات متعددة.
  • يمكنك تطبيق عملية SQL MERGE على طريقة عرض SQL فقط إذا تم تعريف طريقة العرض على أنها CREATE VIEW viewName AS SELECT * FROM deltaTable.

إلغاء تكرار البيانات عند الكتابة في جداول دلتا

حالة استخدام ETL الشائعة هي جمع السجلات في جدول Delta عن طريق إلحاقها بجدول. ومع ذلك، غالبا ما يمكن للمصادر إنشاء سجلات سجل مكررة وخطوات إلغاء التكرار النهائية مطلوبة للعناية بها. باستخدام merge، يمكنك تجنب إدراج السجلات المكررة.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

ملاحظة

يجب إلغاء تكرار مجموعة البيانات التي تحتوي على السجلات الجديدة داخل نفسها. بواسطة دلالات SQL للدمج، فإنه يطابق البيانات الجديدة ويزيل تكرارها مع البيانات الموجودة في الجدول، ولكن إذا كانت هناك بيانات مكررة داخل مجموعة البيانات الجديدة، يتم إدراجها. ومن ثم، قم بإلغاء تكرار البيانات الجديدة قبل الدمج في الجدول.

إذا كنت تعلم أنك قد تحصل على سجلات مكررة لبضعة أيام فقط، فيمكنك تحسين الاستعلام بشكل أكبر عن طريق تقسيم الجدول حسب التاريخ، ثم تحديد نطاق التاريخ للجدول الهدف المراد مطابقته.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

هذا أكثر كفاءة من الأمر السابق لأنه يبحث عن التكرارات فقط في آخر 7 أيام من السجلات، وليس الجدول بأكمله. علاوة على ذلك، يمكنك استخدام هذا الدمج الإدراج فقط مع Structured Streaming لإجراء إلغاء تكرار مستمر للسجلات.

  • في استعلام دفق، يمكنك استخدام عملية الدمج في foreachBatch لكتابة أي بيانات دفق باستمرار إلى جدول Delta مع إلغاء التكرار. راجع مثال البث التالي للحصول على مزيد من المعلومات حول foreachBatch.
  • في استعلام دفق آخر، يمكنك قراءة البيانات المكررة باستمرار من جدول Delta هذا. هذا ممكن لأن دمج الإدراج فقط يقوم فقط بإلحاق بيانات جديدة بجدول Delta.

تغيير البيانات ببطء (SCD) وتغيير التقاط البيانات (CDC) باستخدام Delta Lake

تحتوي Delta Live Tables على دعم أصلي لتعقب وتطبيق SCD Type 1 والنوع 2. استخدم APPLY CHANGES INTO مع Delta Live Tables للتأكد من معالجة السجلات غير المطلوبة بشكل صحيح عند معالجة موجزات CDC. راجع واجهات برمجة تطبيقات APPLY CHANGES: تبسيط التقاط بيانات التغيير باستخدام Delta Live Tables.

مزامنة جدول Delta بشكل متزايد مع المصدر

في Databricks SQL وDatabricks Runtime 12.2 LTS وما فوق، يمكنك استخدام WHEN NOT MATCHED BY SOURCE لإنشاء شروط عشوائية لحذف جزء من الجدول واستبداله تلقائيا. يمكن أن يكون هذا مفيدا بشكل خاص عندما يكون لديك جدول مصدر حيث قد تتغير السجلات أو يتم حذفها لعدة أيام بعد إدخال البيانات الأولية، ولكن في النهاية يتم تسويتها إلى حالة نهائية.

يظهر الاستعلام التالي استخدام هذا النمط لتحديد 5 أيام من السجلات من المصدر، وتحديث السجلات المطابقة في الهدف، وإدراج سجلات جديدة من المصدر إلى الهدف، وحذف جميع السجلات غير المتطابقة من الأيام الخمسة الماضية في الهدف.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

من خلال توفير نفس عامل التصفية المنطقي على الجدولين المصدر والهدف، يمكنك نشر التغييرات ديناميكيا من المصدر إلى الجداول المستهدفة، بما في ذلك الحذف.

ملاحظة

في حين أنه يمكن استخدام هذا النمط دون أي عبارات شرطية، فإن هذا من شأنه أن يؤدي إلى إعادة كتابة الجدول الهدف بالكامل والذي يمكن أن يكون مكلفا.