استخدام موجز بيانات تغيير Delta Lake على Azure Databricks

إشعار

يسمح موجز بيانات التغيير ل Azure Databricks بتعقب التغييرات على مستوى الصف بين إصدارات جدول Delta. عند تمكينها في جدول Delta، تقوم سجلات وقت التشغيل بتغيير الأحداث لجميع البيانات المكتوبة في الجدول. يتضمن ذلك بيانات الصف مع بيانات التعريف التي تشير إلى ما إذا كان الصف المحدد قد تم إدراجه أو حذفه أو تحديثه.

يمكنك قراءة أحداث التغيير في استعلامات الدفعة باستخدام Spark SQL وApache Spark DataFrames و Structured Streaming.

هام

يعمل موجز بيانات التغيير جنبا إلى جنب مع محفوظات الجدول لتوفير معلومات التغيير. نظرا لأن استنساخ جدول Delta يؤدي إلى إنشاء محفوظات منفصلة، فإن موجز بيانات التغيير على الجداول المستنسخة لا يتطابق مع محفوظات الجدول الأصلي.

حالات الاستخدام

لا يتم تمكين موجز بيانات التغيير بشكل افتراضي. يجب أن تقوم حالات الاستخدام التالية بالقيادة عند تمكين موجز بيانات التغيير.

  • الجداول الفضية والذهبية: تحسين أداء Delta Lake من خلال معالجة التغييرات على مستوى الصف فقط بعد العمليات الأولية MERGEUPDATEأو DELETE لتسريع عمليات ETL و ELT وتبسيطها.
  • طرق العرض المجسدة: إنشاء طرق عرض محدثة ومجمعة للمعلومات لاستخدامها في المعلومات المهنية والتحليلات دون الحاجة إلى إعادة معالجة الجداول الأساسية الكاملة، بدلا من التحديث فقط من حيث جاءت التغييرات.
  • إرسال التغييرات: إرسال موجز بيانات التغيير إلى أنظمة انتقال البيانات من الخادم مثل Kafka أو RDBMS التي يمكنها استخدامها للمعالجة بشكل متزايد في المراحل اللاحقة من مسارات البيانات.
  • جدول سجل التدقيق: يوفر التقاط موجز بيانات التغيير كجدول دلتا مساحة تخزين دائمة وقدرة استعلام فعالة لمشاهدة جميع التغييرات بمرور الوقت، بما في ذلك عند حدوث عمليات الحذف والتحديثات التي تم إجراؤها.

تمكين موجز بيانات التغيير

يجب تمكين خيار موجز بيانات التغيير بشكل صريح باستخدام إحدى الطرق التالية:

  • جدول جديد: تعيين خاصية delta.enableChangeDataFeed = true الجدول في CREATE TABLE الأمر .

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • الجدول الموجود: تعيين خاصية delta.enableChangeDataFeed = true الجدول في ALTER TABLE الأمر .

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • جميع الجداول الجديدة:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

هام

يتم تسجيل التغييرات التي تم إجراؤها فقط بعد تمكين موجز بيانات التغيير؛ ولا يتم تسجيل التغييرات السابقة على جدول.

تغيير تخزين البيانات

تقوم سجلات Azure Databricks بتغيير البيانات لعمليات UPDATEو DELETEو MERGE في _change_data المجلد ضمن دليل الجدول. لا تنشئ بعض العمليات، مثل عمليات الإدراج فقط وحذف القسم الكامل، بيانات في _change_data الدليل لأن Azure Databricks يمكنه حساب موجز بيانات التغيير بكفاءة مباشرة من سجل المعاملات.

تتبع الملفات الموجودة في _change_data المجلد نهج الاستبقاء للجدول. لذلك، إذا قمت بتشغيل الأمر فراغ ، يتم أيضا حذف تغيير بيانات موجز البيانات.

قراءة التغييرات في استعلامات الدفعة

يمكنك توفير إصدار أو طابع زمني للبدء والنهاية. تكون إصدارات البدء والنهاية والطوابع الزمنية شاملة في الاستعلامات. لقراءة التغييرات من إصدار بدء معين إلى أحدث إصدار من الجدول، حدد إصدار البداية أو الطابع الزمني فقط.

يمكنك تحديد إصدار كعدد صحيح وطوابع زمنية كسلسلة بالتنسيق yyyy-MM-dd[ HH:mm:ss[.SSS]].

إذا قمت بتوفير إصدار أقل أو طابع زمني أقدم من واحد قام بتسجيل أحداث التغيير - أي عند تمكين موجز بيانات التغيير - يتم طرح خطأ يشير إلى عدم تمكين موجز بيانات التغيير.

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')

Python

# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# path based tables
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .load("pathToMyDeltaTable")

Scala

// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// path based tables
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .load("pathToMyDeltaTable")

قراءة التغييرات في استعلامات الدفق

Python

# providing a starting version
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# providing a starting timestamp
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2021-04-21 05:35:43") \
  .load("/pathToMyDeltaTable")

# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .table("myDeltaTable")

Scala

// providing a starting version
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// providing a starting timestamp
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "2021-04-21 05:35:43")
  .load("/pathToMyDeltaTable")

// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

للحصول على بيانات التغيير أثناء قراءة الجدول، قم بتعيين الخيار readChangeFeed إلى true. startingVersion أو startingTimestamp اختيارية وإذا لم يتم توفيرها، يقوم الدفق بإرجاع أحدث لقطة للجدول في وقت البث كتغيرات مستقبلية INSERT كبيانات تغيير. خيارات مثل حدود المعدل (maxFilesPerTrigger، maxBytesPerTrigger) ويتم excludeRegex دعمها أيضا عند قراءة بيانات التغيير.

إشعار

يمكن أن يكون تحديد المعدل ذريا للإصدارات الأخرى غير إصدار لقطة البداية. أي أن إصدار التثبيت بأكمله سيكون محدودا أو سيتم إرجاع التثبيت بأكمله.

بشكل افتراضي، إذا مرر مستخدم إصدارا أو طابعا زمنيا يتجاوز التثبيت الأخير على جدول، يتم طرح الخطأ timestampGreaterThanLatestCommit . في Databricks Runtime 11.3 LTS والإصدارات الأحدث، يمكن لموجز البيانات التغيير معالجة حالة إصدار خارج النطاق إذا كان المستخدم يعين التكوين التالي إلى true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

إذا قمت بتوفير إصدار بدء أكبر من التثبيت الأخير على جدول أو طابع زمني للبدء أحدث من التثبيت الأخير في جدول، ثم عند تمكين التكوين السابق، يتم إرجاع نتيجة قراءة فارغة.

إذا قمت بتوفير إصدار نهاية أكبر من التثبيت الأخير على جدول أو طابع زمني للانتهاء أحدث من التثبيت الأخير على جدول، فعند تمكين التكوين السابق في وضع قراءة الدفعة، يتم إرجاع جميع التغييرات بين إصدار البدء والتثبيت الأخير.

ما هو مخطط موجز بيانات التغيير؟

عند القراءة من موجز بيانات التغيير لجدول، يتم استخدام مخطط أحدث إصدار للجدول.

إشعار

يتم دعم معظم عمليات تغيير المخطط والتطور بشكل كامل. لا يدعم الجدول الذي تم تمكين تعيين العمود فيه جميع حالات الاستخدام ويوضح سلوكا مختلفا. راجع تغيير قيود موجز البيانات للجداول مع تمكين تعيين العمود.

بالإضافة إلى أعمدة البيانات من مخطط جدول Delta، يحتوي موجز بيانات التغيير على أعمدة بيانات التعريف التي تحدد نوع حدث التغيير:

اسم العمود نوع القيم
_change_type السلسلة‬ insert، update_preimage ، update_postimage، delete(1)
_commit_version طويل يحتوي إصدار سجل أو جدول Delta على التغيير.
_commit_timestamp الطابع الزمني الطابع الزمني المقترن عند إنشاء التثبيت.

(1)preimage هي القيمة قبل التحديث، postimage وهي القيمة بعد التحديث.

إشعار

لا يمكنك تمكين تغيير موجز البيانات على جدول إذا كان المخطط يحتوي على أعمدة بنفس أسماء هذه الأعمدة المضافة. أعد تسمية الأعمدة في الجدول لحل هذا التعارض قبل محاولة تمكين موجز بيانات التغيير.

تغيير قيود موجز البيانات للجداول مع تمكين تعيين العمود

مع تمكين تعيين العمود على جدول Delta، يمكنك إسقاط الأعمدة أو إعادة تسميتها في الجدول دون إعادة كتابة ملفات البيانات للبيانات الموجودة. مع تمكين تعيين العمود، يحتوي موجز بيانات التغيير على قيود بعد إجراء تغييرات مخطط غير مضافة مثل إعادة تسمية عمود أو إسقاطه أو تغيير نوع البيانات أو تغييرات قابلية القيم الخالية.

هام

  • لا يمكنك قراءة موجز بيانات التغيير لمعاملة أو نطاق يحدث فيه تغيير مخطط غير إضافي باستخدام دلالات الدفعة.
  • في Databricks Runtime 12.2 LTS والإدخالات أدناه، لا تدعم الجداول التي تم تمكين تعيين الأعمدة بها والتي شهدت تغييرات مخطط غير مضافة عمليات قراءة الدفق على موجز بيانات التغيير. راجع البث مع تعيين العمود وتغييرات المخطط.
  • في Databricks Runtime 11.3 LTS والإصدارات أدناه، لا يمكنك قراءة موجز بيانات التغيير للجداول مع تمكين تعيين الأعمدة التي واجهت إعادة تسمية العمود أو إسقاطه.

في Databricks Runtime 12.2 LTS والإصدارات الأحدث، يمكنك إجراء قراءات دفعية على موجز بيانات التغيير للجداول مع تمكين تعيين العمود الذي واجه تغييرات مخطط غير مضافة. بدلا من استخدام مخطط أحدث إصدار من الجدول، تستخدم عمليات القراءة مخطط الإصدار النهائي من الجدول المحدد في الاستعلام. لا تزال الاستعلامات تفشل إذا كان نطاق الإصدار المحدد يمتد على تغيير مخطط غير إضافي.

الأسئلة الشائعة (FAQ)

ما هي النفقات العامة لتمكين موجز بيانات التغيير؟

لا يوجد تأثير كبير. يتم إنشاء سجلات بيانات التغيير سطرا أثناء عملية تنفيذ الاستعلام، وهي بشكل عام أصغر بكثير من الحجم الإجمالي للملفات التي تمت إعادة كتابتها.

ما نهج الاستبقاء لتغيير السجلات؟

تتبع سجلات التغيير نفس نهج الاستبقاء مثل إصدارات الجدول غير المحدثة، وسيتم تنظيفها من خلال فراغ إذا كانت خارج فترة الاستبقاء المحددة.

متى تصبح السجلات الجديدة متوفرة في موجز بيانات التغيير؟

يتم الالتزام بتغيير البيانات جنبا إلى جنب مع معاملة Delta Lake، وستصبح متاحة في نفس الوقت الذي تتوفر فيه البيانات الجديدة في الجدول.

مثال دفتر الملاحظات: نشر التغييرات باستخدام موجز بيانات تغيير دلتا

يوضح دفتر الملاحظات هذا كيفية نشر التغييرات التي تم إجراؤها على جدول فضي من العدد المطلق للتطعيمات إلى جدول ذهبي لمعدلات التطعيم.

تغيير دفتر ملاحظات موجز البيانات

الحصول على دفتر الملاحظات