واجهات برمجة تطبيقات APPLY CHANGES: تبسيط التقاط بيانات التغيير باستخدام Delta Live Tables

تعمل Delta Live Tables على تبسيط التقاط بيانات التغيير (CDC) باستخدام APPLY CHANGES واجهات برمجة التطبيقات و APPLY CHANGES FROM SNAPSHOT . تعتمد الواجهة التي تستخدمها على مصدر بيانات التغيير:

  • يستخدم APPLY CHANGES لمعالجة التغييرات من موجز بيانات التغيير (CDF).
  • استخدم APPLY CHANGES FROM SNAPSHOT (معاينة عامة) لمعالجة التغييرات في لقطات قاعدة البيانات.

في السابق، كانت العبارة MERGE INTO شائعة الاستخدام لمعالجة سجلات التقاط بيانات التغيير على Azure Databricks. ومع ذلك، MERGE INTO يمكن أن ينتج نتائج غير صحيحة بسبب سجلات خارج التسلسل أو يتطلب منطقا معقدا لإعادة ترتيب السجلات.

APPLY CHANGES يتم دعم واجهة برمجة التطبيقات في واجهات Delta Live Tables SQL وPython. APPLY CHANGES FROM SNAPSHOT يتم دعم واجهة برمجة التطبيقات في واجهة Delta Live Tables Python.

يدعم كل من APPLY CHANGES و APPLY CHANGES FROM SNAPSHOT تحديث الجداول باستخدام نوع SCD 1 والنوع 2:

  • استخدم SCD من النوع 1 لتحديث السجلات مباشرة. لا يتم الاحتفاظ بالمحفوظات للسجلات المحدثة.
  • استخدم SCD من النوع 2 للاحتفاظ بمحفوظات السجلات، إما على جميع التحديثات أو على التحديثات لمجموعة محددة من الأعمدة.

للحصول على بناء الجملة والمراجع الأخرى، راجع:

إشعار

توضح هذه المقالة كيفية تحديث الجداول في مسار Delta Live Tables استنادا إلى التغييرات في بيانات المصدر. لمعرفة كيفية تسجيل معلومات التغيير على مستوى الصف للاستعلام عن جداول Delta، راجع استخدام موجز بيانات تغيير Delta Lake على Azure Databricks.

المتطلبات

لاستخدام واجهات برمجة تطبيقات التقاط بيانات التغيير، يجب تكوين البنية الأساسية لبرنامج ربط العمليات التجارية لاستخدام مسارات DLT بلا خادم أو جداول Pro أو Advanced إصدارات Delta Live.

كيف يتم تنفيذ CDC مع APPLY CHANGES واجهة برمجة التطبيقات؟

من خلال معالجة السجلات خارج التسلسل تلقائيا، APPLY CHANGES تضمن واجهة برمجة التطبيقات في Delta Live Tables المعالجة الصحيحة لسجلات التقاط بيانات التغيير وتزيل الحاجة إلى تطوير منطق معقد لمعالجة السجلات خارج التسلسل. يجب تحديد عمود في البيانات المصدر التي سيتم تسلسل السجلات عليها، والتي تفسرها Delta Live Tables على أنها تمثيل متزايد بشكل رتيبة للترتيب الصحيح للبيانات المصدر. تعالج Delta Live Tables تلقائيا البيانات التي تصل خارج الترتيب. بالنسبة لتغييرات نوع SCD 2، تنشر Delta Live Tables قيم التسلسل المناسبة إلى الجدول __START_AT المستهدف والأعمدة __END_AT . يجب أن يكون هناك تحديث مميز واحد لكل مفتاح في كل قيمة تسلسل، وقيم تسلسل NULL غير مدعومة.

لتنفيذ معالجة التقاط بيانات التغيير باستخدام APPLY CHANGES، يمكنك أولا إنشاء جدول دفق ثم استخدام APPLY CHANGES INTO العبارة في SQL أو apply_changes() الدالة في Python لتحديد المصدر والمفاتيح والتسلسل لموجز التغيير. لإنشاء جدول البث الهدف، استخدم العبارة CREATE OR REFRESH STREAMING TABLE في SQL أو الدالة create_streaming_table() في Python. راجع أمثلة معالجة SCD من النوع 1 والنوع 2.

للحصول على تفاصيل بناء الجملة، راجع مرجع Delta Live Tables SQL أو مرجع Python.

كيف يتم تنفيذ CDC مع APPLY CHANGES FROM SNAPSHOT واجهة برمجة التطبيقات؟

هام

APPLY CHANGES FROM SNAPSHOT واجهة برمجة التطبيقات في المعاينة العامة.

APPLY CHANGES FROM SNAPSHOT هي واجهة برمجة تطبيقات تعريفية تحدد التغييرات في بيانات المصدر بكفاءة من خلال مقارنة سلسلة من اللقطات بالترتيب ثم تقوم بتشغيل المعالجة المطلوبة لمعالجة CDC للسجلات في اللقطات. APPLY CHANGES FROM SNAPSHOT معتمد فقط من قبل واجهة Delta Live Tables Python.

APPLY CHANGES FROM SNAPSHOT يدعم استيعاب اللقطات من أنواع مصادر متعددة:

  • استخدم استيعاب اللقطات الدورية لاستيعاب اللقطات من جدول أو طريقة عرض موجودة. APPLY CHANGES FROM SNAPSHOT لديه واجهة بسيطة ومبسطة لدعم استيعاب اللقطات بشكل دوري من كائن قاعدة بيانات موجود. يتم استيعاب لقطة جديدة مع كل تحديث للبنية الأساسية لبرنامج ربط العمليات التجارية، ويتم استخدام وقت الاستيعاب كإصدار لقطة. عند تشغيل البنية الأساسية لبرنامج ربط العمليات التجارية في الوضع المستمر، يتم استيعاب لقطات متعددة مع كل تحديث للبنية الأساسية لبرنامج ربط العمليات التجارية في فترة يحددها إعداد الفاصل الزمني للمشغل للتدفق الذي يحتوي على معالجة APPLY CHANGES FROM SNAPSHOT.
  • استخدم استيعاب اللقطات التاريخية لمعالجة الملفات التي تحتوي على لقطات قاعدة البيانات، مثل اللقطات التي تم إنشاؤها من قاعدة بيانات Oracle أو MySQL أو مستودع بيانات.

لإجراء معالجة التقاط بيانات التغيير من أي نوع مصدر باستخدام APPLY CHANGES FROM SNAPSHOT، يمكنك أولا إنشاء جدول دفق ثم استخدام apply_changes_from_snapshot() الدالة في Python لتحديد اللقطة والمفاتيح والوسيطات الأخرى المطلوبة لتنفيذ المعالجة. راجع أمثلة استيعاب اللقطات الدورية واستيعاب اللقطات التاريخية.

يجب أن تكون اللقطات التي تم تمريرها إلى واجهة برمجة التطبيقات بترتيب تصاعدي حسب الإصدار. إذا اكتشفت Delta Live Tables لقطة خارج الترتيب، يتم طرح خطأ.

للحصول على تفاصيل بناء الجملة، راجع مرجع Delta Live Tables Python.

القيود

لا يمكن استخدام هدف APPLY CHANGES استعلام أو APPLY CHANGES FROM SNAPSHOT كمصدر لجدول تدفق. يجب أن يكون الجدول الذي يقرأ من هدف APPLY CHANGES استعلام أو APPLY CHANGES FROM SNAPSHOT طريقة عرض مجسدة.

مثال: معالجة SCD من النوع 1 وSCD من النوع 2 مع بيانات مصدر CDF

توفر الأقسام التالية أمثلة على Delta Live Tables SCD من النوع 1 واستعلامات النوع 2 التي تقوم بتحديث الجداول الهدف استنادا إلى الأحداث المصدر من موجز بيانات التغيير الذي:

  1. إنشاء سجلات مستخدم جديدة.
  2. حذف سجل مستخدم.
  3. تحديث سجلات المستخدمين. في المثال SCD من النوع 1، تصل العمليات الأخيرة UPDATE متأخرة ويتم إسقاطها من الجدول الهدف، مما يوضح معالجة الأحداث خارج الترتيب.

تفترض الأمثلة التالية الإلمام بتكوين مسارات Delta Live Tables وتحديثها. راجع البرنامج التعليمي: تشغيل خط أنابيب Delta Live Tables الأول.

لتشغيل هذه الأمثلة، يجب أن تبدأ بإنشاء عينة مجموعة بيانات. راجع إنشاء بيانات الاختبار.

فيما يلي سجلات الإدخال لهذه الأمثلة:

userId الاسم city ‏‏التشغيل sequenceNum
124 راؤول اواكساكا إدراج 1
123 ايزابيل مونتيري إدراج 1
125 مرسيدس تيجوانا إدراج 2
126 زنبق كانكون إدراج 2
123 قيمة فارغة قيمة فارغة حذف 6
125 مرسيدس غوادالاخارا تحديث 6
125 مرسيدس Mexicali تحديث 5
123 ايزابيل تشيهواهوا تحديث 5

إذا قمت بإلغاء التعليق على الصف النهائي في بيانات المثال، إدراج السجل التالي الذي يحدد مكان اقتطاع السجلات:

userId الاسم city ‏‏التشغيل sequenceNum
قيمة فارغة قيمة فارغة قيمة فارغة اقتطاع 3

إشعار

تتضمن جميع الأمثلة التالية خيارات لتحديد كل من DELETE العمليات و TRUNCATE ، ولكن كل منها اختياري.

معالجة تحديثات SCD من النوع 1

يوضح المثال التالي معالجة تحديثات SCD من النوع 1:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

بعد تشغيل المثال SCD من النوع 1، يحتوي الجدول الهدف على السجلات التالية:

userId الاسم city
124 راؤول اواكساكا
125 مرسيدس غوادالاخارا
126 زنبق كانكون

بعد تشغيل مثال SCD من النوع 1 مع السجل الإضافي TRUNCATE ، يتم اقتطاع السجلات 124 و 126 بسبب TRUNCATE العملية في sequenceNum=3، ويحتوي الجدول الهدف على السجل التالي:

userId الاسم city
125 مرسيدس غوادالاخارا

معالجة تحديثات SCD من النوع 2

يوضح المثال التالي معالجة تحديثات SCD من النوع 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

بعد تشغيل المثال SCD من النوع 2، يحتوي الجدول الهدف على السجلات التالية:

userId الاسم city __START_AT __END_AT
123 ايزابيل مونتيري 1 5
123 ايزابيل تشيهواهوا 5 6
124 راؤول اواكساكا 1 قيمة فارغة
125 مرسيدس تيجوانا 2 5
125 مرسيدس Mexicali 5 6
125 مرسيدس غوادالاخارا 6 قيمة فارغة
126 زنبق كانكون 2 قيمة فارغة

يمكن لاستعلام SCD من النوع 2 أيضا تحديد مجموعة فرعية من أعمدة الإخراج التي سيتم تعقبها للمحفوظات في الجدول الهدف. يتم تحديث التغييرات على الأعمدة الأخرى في مكانها بدلا من إنشاء سجلات محفوظات جديدة. يوضح المثال التالي استبعاد city العمود من التعقب:

يوضح المثال التالي استخدام محفوظات التتبع مع SCD من النوع 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

بعد تشغيل هذا المثال بدون السجل الإضافي TRUNCATE ، يحتوي الجدول الهدف على السجلات التالية:

userId الاسم city __START_AT __END_AT
123 ايزابيل تشيهواهوا 1 6
124 راؤول اواكساكا 1 قيمة فارغة
125 مرسيدس غوادالاخارا 2 قيمة فارغة
126 زنبق كانكون 2 قيمة فارغة

إنشاء بيانات الاختبار

يتم توفير التعليمات البرمجية أدناه لإنشاء مجموعة بيانات مثال للاستخدام في أمثلة الاستعلامات الموجودة في هذا البرنامج التعليمي. بافتراض أن لديك بيانات الاعتماد المناسبة لإنشاء مخطط جديد وإنشاء جدول جديد، يمكنك تشغيل هذه العبارات إما باستخدام دفتر ملاحظات أو Databricks SQL. التعليمات البرمجية التالية غير مخصصة ليتم تشغيلها كجزء من مسار Delta Live Tables:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

مثال: معالجة اللقطات الدورية

يوضح المثال التالي معالجة SCD من النوع 2 الذي يلتقط لقطات لجدول مخزن في mycatalog.myschema.mytable. تتم كتابة نتائج المعالجة إلى جدول يسمى target.

mycatalog.myschema.mytable السجلات في الطابع الزمني 2024-01-01 00:00:00

مفتاح القيمة
1 a1
2 a2

mycatalog.myschema.mytable السجلات في الطابع الزمني 2024-01-01 12:00:00

مفتاح القيمة
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

بعد معالجة اللقطات، يحتوي الجدول الهدف على السجلات التالية:

مفتاح القيمة __START_AT __END_AT
1 a1 2024-01-01 00:00:00 2024-01-01 12:00:00
2 a2 2024-01-01 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 قيمة فارغة
3 a3 2024-01-01 12:00:00 قيمة فارغة

مثال: معالجة اللقطة التاريخية

يوضح المثال التالي معالجة SCD من النوع 2 التي تحدث جدولا مستهدفا استنادا إلى أحداث المصدر من نسختين مطابقة مخزنتين في نظام تخزين سحابي:

لقطة في timestamp، مخزنة في /<PATH>/filename1.csv

مفتاح مسار التعقب NonTrackingColumn
1 a1 b1
2 a2 b2
4 a4 b4

لقطة في timestamp + 5، مخزنة في /<PATH>/filename2.csv

مفتاح مسار التعقب NonTrackingColumn
2 a2_new b2
3 a3 b3
4 a4 b4_new

يوضح مثال التعليمات البرمجية التالي معالجة تحديثات SCD من النوع 2 مع هذه اللقطات:

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

بعد معالجة اللقطات، يحتوي الجدول الهدف على السجلات التالية:

مفتاح مسار التعقب NonTrackingColumn __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 قيمة فارغة
3 a3 b3 2 قيمة فارغة
4 a4 b4_new 1 قيمة فارغة

إضافة بيانات أو تغييرها أو حذفها في جدول تدفق هدف

إذا كانت البنية الأساسية لبرنامج ربط العمليات التجارية تنشر الجداول إلى كتالوج Unity، يمكنك استخدام عبارات لغة معالجة البيانات (DML)، بما في ذلك عبارات الإدراج والتحديث والحذف والدمج، لتعديل جداول الدفق الهدف التي تم إنشاؤها بواسطة APPLY CHANGES INTO العبارات.

إشعار

  • عبارات DML التي تعدل مخطط الجدول لجدول دفق غير معتمدة. تأكد من أن عبارات DML الخاصة بك لا تحاول تطوير مخطط الجدول.
  • يمكن تشغيل عبارات DML التي تحدث جدول دفق فقط في مجموعة كتالوج Unity مشتركة أو مستودع SQL باستخدام Databricks Runtime 13.3 LTS وما فوق.
  • نظرا لأن الدفق يتطلب مصادر بيانات ملحقة فقط، إذا كانت المعالجة تتطلب دفقا من جدول دفق مصدر مع تغييرات (على سبيل المثال، بواسطة عبارات DML)، قم بتعيين علامة skipChangeCommits عند قراءة جدول دفق المصدر. عند skipChangeCommits تعيين، يتم تجاهل المعاملات التي تحذف السجلات أو تعدلها في الجدول المصدر. إذا لم تتطلب المعالجة جدول دفق، يمكنك استخدام طريقة عرض مجسدة (التي لا تحتوي على قيد الإلحاق فقط) كجدول هدف.

نظرا لأن Delta Live Tables تستخدم عمودا محددا SEQUENCE BY وتنشر قيم التسلسل المناسبة إلى __START_AT العمودين و __END_AT للجدول الهدف (لنوع SCD 2)، يجب التأكد من أن عبارات DML تستخدم قيما صالحة لهذه الأعمدة للحفاظ على الترتيب الصحيح للسجلات. راجع كيف يتم تنفيذ التقاط بيانات التغيير باستخدام واجهة برمجة تطبيقات APPLY CHANGES؟.

لمزيد من المعلومات حول استخدام عبارات DML مع جداول الدفق، راجع إضافة بيانات أو تغييرها أو حذفها في جدول دفق.

يدرج المثال التالي سجلا نشطا بتسلسل بدء من 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

الحصول على بيانات حول السجلات التي تمت معالجتها بواسطة استعلام Delta Live Tables CDC

إشعار

يتم التقاط المقاييس التالية فقط بواسطة APPLY CHANGES الاستعلامات، وليس بواسطة APPLY CHANGES FROM SNAPSHOT الاستعلامات.

يتم التقاط المقاييس التالية بواسطة APPLY CHANGES الاستعلامات:

  • num_upserted_rows: عدد صفوف الإخراج التي تم إدراجها في مجموعة البيانات أثناء التحديث.
  • num_deleted_rows: عدد صفوف الإخراج الموجودة المحذوفة من مجموعة البيانات أثناء التحديث.

لا يتم تسجيل المقياس num_output_rows ، وهو إخراج لتدفقات غير CDC، للاستعلامات apply changes .

ما هي كائنات البيانات المستخدمة لمعالجة Delta Live Tables CDC؟

ملاحظة: تنطبق بنيات البيانات التالية فقط على APPLY CHANGES المعالجة، وليس APPLY CHANGES FROM SNAPSHOT المعالجة.

عند الإعلان عن الجدول الهدف في Hive metastore، يتم إنشاء بنيتي بيانات:

  • طريقة عرض تستخدم الاسم المعين للجدول الهدف.
  • جدول دعم داخلي تستخدمه Delta Live Tables لإدارة معالجة CDC. تمت تسمية هذا الجدول عن طريق الإلحاق __apply_changes_storage_ باسم الجدول الهدف.

على سبيل المثال، إذا قمت بتعريف جدول هدف باسم dlt_cdc_target، فسترى طريقة عرض مسماة dlt_cdc_target وجدولا باسم __apply_changes_storage_dlt_cdc_target في metastore. يسمح إنشاء طريقة عرض لجداول Delta Live بتصفية المعلومات الإضافية (على سبيل المثال، علامات العلامات والإصدارات) المطلوبة للتعامل مع البيانات غير المطلوبة. لعرض البيانات المعالجة، استعلم عن طريقة العرض الهدف. نظرا لأن مخطط __apply_changes_storage_ الجدول قد يتغير لدعم الميزات أو التحسينات المستقبلية، يجب عدم الاستعلام عن الجدول لاستخدام الإنتاج. إذا قمت بإضافة البيانات يدويا إلى الجدول، فيفترض أن السجلات تأتي قبل تغييرات أخرى لأن أعمدة الإصدار مفقودة.

إذا تم نشر البنية الأساسية لبرنامج ربط العمليات التجارية إلى كتالوج Unity، فإن جداول النسخ الاحتياطي الداخلية لا يمكن للمستخدمين الوصول إليها.