تطبيق واجهة برمجة تطبيقات التغييرات: تبسيط التقاط بيانات التغيير في Delta Live Tables

تعمل Delta Live Tables على تبسيط التقاط بيانات التغيير (CDC) باستخدام APPLY CHANGES واجهة برمجة التطبيقات. في السابق، كانت العبارة MERGE INTO شائعة الاستخدام لمعالجة سجلات التقاط بيانات التغيير على Azure Databricks. ومع ذلك، MERGE INTO يمكن أن ينتج نتائج غير صحيحة بسبب سجلات خارج التسلسل، أو تتطلب منطقا معقدا لإعادة ترتيب السجلات.

من خلال معالجة السجلات خارج التسلسل تلقائيا، APPLY CHANGES تضمن واجهة برمجة التطبيقات في Delta Live Tables المعالجة الصحيحة لسجلات التقاط بيانات التغيير وتزيل الحاجة إلى تطوير منطق معقد لمعالجة السجلات خارج التسلسل.

APPLY CHANGES يتم دعم واجهة برمجة التطبيقات في واجهات Delta Live Tables SQL وPython، بما في ذلك دعم تحديث الجداول باستخدام SCD من النوع 1 والنوع 2:

  • استخدم SCD من النوع 1 لتحديث السجلات مباشرة. لا يتم الاحتفاظ بالمحفوظات للسجلات التي تم تحديثها.
  • استخدم SCD من النوع 2 للاحتفاظ بمحفوظات السجلات، إما على جميع التحديثات أو على التحديثات لمجموعة محددة من الأعمدة.

للحصول على بناء الجملة والمراجع الأخرى، راجع:

إشعار

توضح هذه المقالة كيفية تحديث الجداول في مسار Delta Live Tables استنادا إلى التغييرات في بيانات المصدر. لمعرفة كيفية تسجيل معلومات التغيير على مستوى الصف للاستعلام عن جداول Delta، راجع استخدام موجز بيانات تغيير Delta Lake على Azure Databricks.

كيف يتم تنفيذ CDC مع جداول Delta Live؟

يجب تحديد عمود في البيانات المصدر التي سيتم تسلسل السجلات عليها، والتي تفسرها Delta Live Tables على أنها تمثيل متزايد بشكل رتيبة للترتيب الصحيح للبيانات المصدر. تعالج Delta Live Tables تلقائيا البيانات التي تصل خارج الترتيب. بالنسبة لتغييرات نوع SCD 2، تنشر Delta Live Tables قيم التسلسل المناسبة __START_AT إلى العمودين و __END_AT للجدول الهدف. يجب أن يكون هناك تحديث مميز واحد لكل مفتاح في كل قيمة تسلسل، وقيم تسلسل NULL غير مدعومة.

لإجراء معالجة التقاط بيانات التغيير باستخدام Delta Live Tables، يمكنك أولا إنشاء جدول دفق، ثم استخدام عبارة APPLY CHANGES INTO لتحديد المصدر والمفاتيح والتسلسل لموجز التغيير. لإنشاء جدول البث الهدف، استخدم العبارة CREATE OR REFRESH STREAMING TABLE في SQL أو الدالة create_streaming_table() في Python. لإنشاء عبارة تعريف معالجة التقاط بيانات التغيير، استخدم العبارة APPLY CHANGES في SQL أو الدالة apply_changes() في Python. للحصول على تفاصيل بناء الجملة، راجع تغيير التقاط البيانات باستخدام SQL في Delta Live Tables أو تغيير التقاط البيانات باستخدام Python في Delta Live Tables.

ما هي كائنات البيانات المستخدمة لمعالجة Delta Live Tables CDC؟

عند الإعلان عن الجدول الهدف في Hive metastore، يتم إنشاء بنيتي بيانات:

  • طريقة عرض تستخدم الاسم المعين للجدول الهدف.
  • جدول دعم داخلي تستخدمه Delta Live Tables لإدارة معالجة CDC. تمت تسمية هذا الجدول عن طريق الإلحاق __apply_changes_storage_ باسم الجدول الهدف.

على سبيل المثال، إذا قمت بتعريف جدول هدف باسم dlt_cdc_target، فسترى طريقة عرض مسماة dlt_cdc_target وجدولا باسم __apply_changes_storage_dlt_cdc_target في metastore. يسمح إنشاء طريقة عرض لجداول Delta Live بتصفية المعلومات الإضافية (على سبيل المثال، علامات العلامات والإصدارات) المطلوبة للتعامل مع البيانات غير المطلوبة. لعرض البيانات المعالجة، استعلم عن طريقة العرض الهدف. نظرا لأن مخطط __apply_changes_storage_ الجدول قد يتغير لدعم الميزات أو التحسينات المستقبلية، يجب عدم الاستعلام عن الجدول لاستخدام الإنتاج. إذا قمت بإضافة البيانات يدويا إلى الجدول، فيفترض أن السجلات تأتي قبل تغييرات أخرى لأن أعمدة الإصدار مفقودة.

إذا تم نشر البنية الأساسية لبرنامج ربط العمليات التجارية إلى كتالوج Unity، فلن يمكن للمستخدمين الوصول إلى جداول النسخ الاحتياطي الداخلية.

الحصول على بيانات حول السجلات التي تمت معالجتها بواسطة استعلام Delta Live Tables CDC

يتم التقاط المقاييس التالية بواسطة apply changes الاستعلامات:

  • num_upserted_rows: عدد صفوف الإخراج التي تم إدراجها في مجموعة البيانات أثناء التحديث.
  • num_deleted_rows: عدد صفوف الإخراج الموجودة المحذوفة من مجموعة البيانات أثناء التحديث.

لا يتم تسجيل المقياس num_output_rows ، وهو إخراج لتدفقات غير CDC، للاستعلامات apply changes .

القيود

لا يمكن استخدام هدف APPLY CHANGES INTO الاستعلام أو apply_changes الدالة كمصدر لجدول تدفق. يجب أن يكون الجدول الذي يقرأ من هدف APPLY CHANGES INTO استعلام أو apply_changes دالة طريقة عرض مجسدة.

SCD type 1 وSCD type 2 على Azure Databricks

توفر الأقسام التالية أمثلة توضح Delta Live Tables SCD من النوع 1 واستعلامات النوع 2 التي تقوم بتحديث الجداول الهدف استنادا إلى الأحداث المصدر التي:

  1. إنشاء سجلات مستخدم جديدة.
  2. حذف سجل مستخدم.
  3. تحديث سجلات المستخدمين. في المثال SCD من النوع 1، تصل العمليات الأخيرة UPDATE متأخرة ويتم إسقاطها من الجدول الهدف، مما يوضح معالجة الأحداث خارج الترتيب.

تفترض الأمثلة التالية الإلمام بتكوين مسارات Delta Live Tables وتحديثها. راجع البرنامج التعليمي: تشغيل خط أنابيب Delta Live Tables الأول.

لتشغيل هذه الأمثلة، يجب أن تبدأ بإنشاء عينة مجموعة بيانات. راجع إنشاء بيانات الاختبار.

فيما يلي سجلات الإدخال لهذه الأمثلة:

userId الاسم city ‏‏التشغيل sequenceNum
124 راؤول اواكساكا إدراج 1
123 ايزابيل مونتيري إدراج 1
125 مرسيدس تيخوانا إدراج 2
126 ليلي كانكون إدراج 2
123 قيمة فارغة قيمة فارغة حذف 6
125 مرسيدس غوادالاخارا تحديث 6
125 مرسيدس Mexicali تحديث 5
123 ايزابيل تشيهواهوا تحديث 5

إذا قمت بإلغاء التعليق على الصف النهائي في بيانات المثال، فسيتم إدراج السجل التالي الذي يحدد مكان اقتطاع السجلات:

userId الاسم city ‏‏التشغيل sequenceNum
قيمة فارغة قيمة فارغة قيمة فارغة اقتطاع 3

إشعار

تتضمن جميع الأمثلة التالية خيارات لتحديد كل DELETE من العمليات و TRUNCATE ، ولكن كل منها اختياري.

معالجة تحديثات SCD من النوع 1

يوضح مثال التعليمات البرمجية التالي معالجة تحديثات SCD من النوع 1:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

بعد تشغيل المثال SCD من النوع 1، يحتوي الجدول الهدف على السجلات التالية:

userId الاسم city
124 راؤول اواكساكا
125 مرسيدس غوادالاخارا
126 ليلي كانكون

بعد تشغيل مثال SCD من النوع 1 مع السجل الإضافي TRUNCATE ، يتم اقتطاع السجلات 124 و 126 بسبب TRUNCATE العملية في sequenceNum=3، ويحتوي الجدول الهدف على السجل التالي:

userId الاسم city
125 مرسيدس غوادالاخارا

معالجة تحديثات SCD من النوع 2

يوضح مثال التعليمات البرمجية التالي معالجة تحديثات SCD من النوع 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

بعد تشغيل المثال SCD من النوع 2، يحتوي الجدول الهدف على السجلات التالية:

userId الاسم city __START_AT __END_AT
123 ايزابيل مونتيري 1 5
123 ايزابيل تشيهواهوا 5 6
124 راؤول اواكساكا 1 قيمة فارغة
125 مرسيدس تيخوانا 2 5
125 مرسيدس Mexicali 5 6
125 مرسيدس غوادالاخارا 6 قيمة فارغة
126 ليلي كانكون 2 قيمة فارغة

يمكن لاستعلام SCD من النوع 2 أيضا تحديد مجموعة فرعية من أعمدة الإخراج التي سيتم تعقبها للمحفوظات في الجدول الهدف. يتم تحديث التغييرات على الأعمدة الأخرى في مكانها بدلا من إنشاء سجلات محفوظات جديدة. يوضح المثال التالي استبعاد city العمود من التعقب:

يوضح المثال التالي استخدام محفوظات التتبع مع SCD من النوع 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

بعد تشغيل هذا المثال بدون السجل الإضافي TRUNCATE ، يحتوي الجدول الهدف على السجلات التالية:

userId الاسم city __START_AT __END_AT
123 ايزابيل تشيهواهوا 1 6
124 راؤول اواكساكا 1 قيمة فارغة
125 مرسيدس غوادالاخارا 2 قيمة فارغة
126 ليلي كانكون 2 قيمة فارغة

إنشاء بيانات الاختبار

يتم توفير التعليمات البرمجية أدناه لإنشاء مجموعة بيانات مثال للاستخدام في أمثلة الاستعلامات الموجودة في هذا البرنامج التعليمي. بافتراض أن لديك بيانات الاعتماد المناسبة لإنشاء مخطط جديد وإنشاء جدول جديد، يمكنك تنفيذ هذه العبارات إما باستخدام دفتر ملاحظات أو Databricks SQL. التعليمات البرمجية التالية غير مخصصة ليتم تشغيلها كجزء من مسار Delta Live Tables:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

إضافة بيانات أو تغييرها أو حذفها في جدول تدفق هدف

إذا كانت البنية الأساسية لبرنامج ربط العمليات التجارية تنشر الجداول إلى كتالوج Unity، يمكنك استخدام عبارات لغة معالجة البيانات (DML)، بما في ذلك عبارات الإدراج والتحديث والحذف والدمج، لتعديل جداول الدفق الهدف التي تم إنشاؤها بواسطة APPLY CHANGES INTO العبارات.

إشعار

  • عبارات DML التي تعدل مخطط الجدول لجدول دفق غير معتمدة. تأكد من أن عبارات DML الخاصة بك لا تحاول تطوير مخطط الجدول.
  • يمكن تشغيل عبارات DML التي تحدث جدول دفق فقط في مجموعة كتالوج Unity مشتركة أو مستودع SQL باستخدام Databricks Runtime 13.3 LTS وما فوق.
  • نظرا لأن الدفق يتطلب مصادر بيانات ملحقة فقط، إذا كانت المعالجة تتطلب دفقا من جدول دفق مصدر مع تغييرات (على سبيل المثال، بواسطة عبارات DML)، قم بتعيين علامة skipChangeCommits عند قراءة جدول دفق المصدر. عند skipChangeCommits تعيين، يتم تجاهل المعاملات التي تحذف السجلات أو تعدلها في الجدول المصدر. إذا لم تتطلب المعالجة جدول دفق، يمكنك استخدام طريقة عرض مجسدة (التي لا تحتوي على قيد الإلحاق فقط) كجدول هدف.

نظرا لأن Delta Live Tables تستخدم عمودا محددا SEQUENCE BY وتنشر قيم التسلسل المناسبة إلى __START_AT العمودين و __END_AT للجدول الهدف (لنوع SCD 2)، يجب التأكد من أن عبارات DML تستخدم قيما صالحة لهذه الأعمدة للحفاظ على الترتيب الصحيح للسجلات. راجع كيف يتم تنفيذ التقاط بيانات التغيير باستخدام جداول Delta Live؟.

لمزيد من المعلومات حول استخدام عبارات DML مع جداول الدفق، راجع إضافة بيانات أو تغييرها أو حذفها في جدول دفق.

يدرج المثال التالي سجلا نشطا بتسلسل بدء من 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);