العمل مع محفوظات جدول Delta Lake
تنشئ كل عملية تقوم بتعديل جدول Delta Lake إصدار جدول جديد. يمكنك استخدام معلومات المحفوظات لتدقيق العمليات أو التراجع عن جدول أو الاستعلام عن جدول في نقطة زمنية معينة باستخدام السفر عبر الزمن.
ملاحظة
لا توصي Databricks باستخدام محفوظات جدول Delta Lake كحل نسخ احتياطي طويل الأجل لأرشفة البيانات. توصي Databricks باستخدام الأيام السبعة الماضية فقط لعمليات السفر عبر الوقت ما لم تقم بتعيين كل من تكوينات استبقاء البيانات والسجلات إلى قيمة أكبر.
يمكنك استرداد المعلومات بما في ذلك العمليات والمستخدم والطابع الزمني لكل كتابة إلى جدول Delta عن طريق تشغيل history
الأمر . يتم إرجاع العمليات بترتيب زمني عكسي.
يتم تحديد استبقاء محفوظات الجدول بواسطة إعداد delta.logRetentionDuration
الجدول ، وهو 30 يوما بشكل افتراضي.
ملاحظة
يتم التحكم في السفر عبر الزمن ومحفوظات الجدول من خلال حدود استبقاء مختلفة. راجع ما هو السفر عبر الزمن Delta Lake؟.
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
للحصول على تفاصيل بناء جملة Spark SQL، راجع وصف محفوظات.
راجع وثائق واجهة برمجة تطبيقات Delta Lake للحصول على تفاصيل بناء جملة Scala/Java/Python.
يوفر مستكشف الكتالوج طريقة عرض مرئية لمعلومات الجدول التفصيلية هذه ومحفوظات جداول دلتا. بالإضافة إلى مخطط الجدول وعينة البيانات، يمكنك النقر فوق علامة التبويب محفوظات لمشاهدة محفوظات الجدول التي يتم عرضها باستخدام DESCRIBE HISTORY
.
يحتوي إخراج العملية على history
الأعمدة التالية.
Column | نوع | الوصف |
---|---|---|
إصدار | طويل | إصدار الجدول الذي تم إنشاؤه بواسطة العملية. |
الطابع الزمني | الطابع الزمني | عندما تم تثبيت هذا الإصدار. |
userId | سلسلة | معرف المستخدم الذي قام بتشغيل العملية. |
userName | سلسلة | اسم المستخدم الذي قام بتشغيل العملية. |
التشغيل | سلسلة | اسم العملية. |
operationParameters | map | معلمات العملية (على سبيل المثال، دالات التقييم.) |
المهمة | بنية | تفاصيل المهمة التي شغلت العملية. |
notebook | بنية | تفاصيل دفتر الملاحظات الذي تم تشغيل العملية منه. |
clusterId | سلسلة | معرف نظام المجموعة الذي تم تشغيل العملية عليه. |
readVersion | طويل | إصدار الجدول الذي تمت قراءته لتنفيذ عملية الكتابة. |
isolationLevel | سلسلة | مستوى العزل المستخدم لهذه العملية. |
isBlindAppend | boolean | ما إذا كانت هذه العملية قد ألحقت البيانات أم لا. |
operationMetrics | map | مقاييس العملية (على سبيل المثال، عدد الصفوف والملفات المعدلة.) |
بيانات تعريف المستخدم | سلسلة | بيانات تعريف التثبيت المعرفة من قبل المستخدم إذا تم تحديدها |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
ملاحظة
- لا تتوفر بعض الأعمدة الأخرى إذا كتبت في جدول Delta باستخدام الطرق التالية:
- ستتم دائما إضافة الأعمدة المضافة في المستقبل بعد العمود الأخير.
ترجع history
العملية مجموعة من مقاييس العمليات في operationMetrics
مخطط العمود.
تسرد الجداول التالية تعريفات مفتاح الخريطة حسب العملية.
العملية | اسم قياسي | الوصف |
---|---|---|
كتابة، إنشاء جدول كحدد، استبدال جدول كحدد، نسخ إلى | ||
ملفات numFiles | عدد الملفات المكتوبة. | |
numOutputBytes | الحجم بالبايت للمحتويات المكتوبة. | |
numOutputRows | عدد الصفوف المكتوبة. | |
تحديث الدفق | ||
numAddedFiles | عدد الملفات المضافة. | |
numRemovedFiles | عدد الملفات التي تمت إزالتها. | |
numOutputRows | عدد الصفوف المكتوبة. | |
numOutputBytes | حجم الكتابة بالبايت. | |
حذف | ||
numAddedFiles | عدد الملفات المضافة. لا يتم توفيره عند حذف أقسام الجدول. | |
numRemovedFiles | عدد الملفات التي تمت إزالتها. | |
numDeletedRows | عدد الصفوف التي تمت إزالتها. لا يتم توفيره عند حذف أقسام الجدول. | |
numCopiedRows | عدد الصفوف المنسخة في عملية حذف الملفات. | |
executionTimeMs | الوقت المستغرق لتنفيذ العملية بأكملها. | |
scanTimeMs | الوقت المستغرق لفحص الملفات بحثا عن التطابقات. | |
إعادة كتابةTimeMs | الوقت المستغرق لإعادة كتابة الملفات المتطابقة. | |
اقتطاع | ||
numRemovedFiles | عدد الملفات التي تمت إزالتها. | |
executionTimeMs | الوقت المستغرق لتنفيذ العملية بأكملها. | |
دمج | ||
numSourceRows | عدد الصفوف في DataFrame المصدر. | |
numTargetRowsInserted | عدد الصفوف المدرجة في الجدول الهدف. | |
numTargetRowsUpdated | عدد الصفوف التي تم تحديثها في الجدول الهدف. | |
numTargetRowsDeleted | عدد الصفوف المحذوفة في الجدول الهدف. | |
numTargetRowsCopied | عدد الصفوف الهدف المنسخة. | |
numOutputRows | إجمالي عدد الصفوف المكتوبة. | |
تمت إضافة numTargetFiles | عدد الملفات المضافة إلى المتلقي (الهدف). | |
numTargetFilesRemoved | عدد الملفات التي تمت إزالتها من المتلقي (الهدف). | |
executionTimeMs | الوقت المستغرق لتنفيذ العملية بأكملها. | |
scanTimeMs | الوقت المستغرق لفحص الملفات بحثا عن التطابقات. | |
إعادة كتابةTimeMs | الوقت المستغرق لإعادة كتابة الملفات المتطابقة. | |
تحديث | ||
numAddedFiles | عدد الملفات المضافة. | |
numRemovedFiles | عدد الملفات التي تمت إزالتها. | |
numUpdatedRows | عدد الصفوف المحدثة. | |
numCopiedRows | عدد الصفوف التي تم نسخها للتو في عملية تحديث الملفات. | |
executionTimeMs | الوقت المستغرق لتنفيذ العملية بأكملها. | |
scanTimeMs | الوقت المستغرق لفحص الملفات بحثا عن التطابقات. | |
إعادة كتابةTimeMs | الوقت المستغرق لإعادة كتابة الملفات المتطابقة. | |
FSCK | numRemovedFiles | عدد الملفات التي تمت إزالتها. |
CONVERT | numConvertedFiles | عدد ملفات Parquet التي تم تحويلها. |
تحسين | ||
numAddedFiles | عدد الملفات المضافة. | |
numRemovedFiles | عدد الملفات المحسنة. | |
numAddedBytes | عدد وحدات البايت المضافة بعد تحسين الجدول. | |
numRemovedBytes | عدد وحدات البايت التي تمت إزالتها. | |
minFileSize | حجم أصغر ملف بعد تحسين الجدول. | |
p25FileSize | حجم الملف 25 بالمائة بعد تحسين الجدول. | |
p50FileSize | متوسط حجم الملف بعد تحسين الجدول. | |
p75FileSize | حجم الملف 75 بالمائة بعد تحسين الجدول. | |
maxFileSize | حجم أكبر ملف بعد تحسين الجدول. | |
استنساخ | ||
حجم sourceTableSize | الحجم بالبايت للجدول المصدر في الإصدار المستنسخ. | |
sourceNumOfFiles | عدد الملفات في الجدول المصدر في الإصدار المستنسخ. | |
numRemovedFiles | عدد الملفات التي تمت إزالتها من الجدول الهدف إذا تم استبدال جدول Delta سابق. | |
تمت إزالةFilesSize | الحجم الإجمالي بالبايت للملفات التي تمت إزالتها من الجدول الهدف إذا تم استبدال جدول Delta سابق. | |
ملفات numCopied | عدد الملفات التي تم نسخها إلى الموقع الجديد. 0 للنسخ الضحلة. | |
حجم ملفات النسخ | الحجم الإجمالي بالبايت للملفات التي تم نسخها إلى الموقع الجديد. 0 للنسخ الضحلة. | |
الاستعادة | ||
tableSizeAfterRestore | حجم الجدول بالبايت بعد الاستعادة. | |
numOfFilesAfterRestore | عدد الملفات في الجدول بعد الاستعادة. | |
numRemovedFiles | عدد الملفات التي تمت إزالتها بواسطة عملية الاستعادة. | |
numRestoredFiles | عدد الملفات التي تمت إضافتها نتيجة للاستعادة. | |
تمت إزالةFilesSize | الحجم بالبايت من الملفات التي تمت إزالتها بواسطة الاستعادة. | |
حجم الملفات المستعادة | الحجم بالبايت من الملفات المضافة بواسطة الاستعادة. | |
مِكْنَسَة كَهْرَبَائِيَّة | ||
numDeletedFiles | عدد الملفات المحذوفة. | |
numVacuumedDirectories | عدد الدلائل المنسغة. | |
numFilesToDelete | عدد الملفات المراد حذفها. |
يدعم Delta Lake time travel الاستعلام عن إصدارات الجدول السابقة استنادا إلى الطابع الزمني أو إصدار الجدول (كما هو مسجل في سجل المعاملات). يمكنك استخدام السفر عبر الزمن لتطبيقات مثل ما يلي:
- إعادة إنشاء التحليلات أو التقارير أو المخرجات (على سبيل المثال" إخراج نموذج التعلم الآلي). وقد يكون ذلك مفيدًا في تصحيح الأخطاء أو التدقيق، لا سيما في الصناعات الخاضعة للتنظيم.
- كتابة الاستعلامات المؤقتة المعقدة.
- إصلاح الأخطاء في بياناتك.
- توفير عزل اللقطات لمجموعة من الاستعلامات للجداول سريعة التغير.
هام
يتم تحديد إصدارات الجدول التي يمكن الوصول إليها مع السفر عبر الوقت من خلال مجموعة من حد الاستبقاء لملفات سجل المعاملات وتكرار العمليات واستبقاءها VACUUM
المحدد. إذا قمت بتشغيل VACUUM
يوميا بالقيم الافتراضية، فستتوفر 7 أيام من البيانات للسفر عبر الزمن.
يمكنك الاستعلام عن جدول Delta مع السفر عبر الوقت عن طريق إضافة عبارة بعد مواصفات اسم الجدول.
timestamp_expression
يمكن أن يكون أي واحد من:'2018-10-18T22:15:12.013Z'
، أي سلسلة يمكن تحويلها إلى طابع زمنيcast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
، أي سلسلة تاريخcurrent_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- أي تعبير آخر يتم تحويله أو يمكن تحويله إلى طابع زمني
version
هي قيمة طويلة يمكن الحصول عليها من إخراجDESCRIBE HISTORY table_spec
.
لا timestamp_expression
يمكن ولا version
الاستعلامات الفرعية.
يتم قبول سلاسل التاريخ أو الطابع الزمني فقط. على سبيل المثال، "2019-01-01"
و "2019-01-01T00:00:00.000Z"
. راجع التعليمات البرمجية التالية على سبيل المثال بناء الجملة:
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
يمكنك أيضا استخدام بناء الجملة @
لتحديد الطابع الزمني أو الإصدار كجزء من اسم الجدول. يجب أن يكون الطابع الزمني بالتنسيق yyyyMMddHHmmssSSS
. يمكنك تحديد إصدار بعد @
عن طريق إلحاق بالإصدار v
. راجع التعليمات البرمجية التالية على سبيل المثال بناء الجملة:
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
يسجل Delta Lake إصدارات الجدول كملفات JSON داخل _delta_log
الدليل، والذي يتم تخزينه جنبا إلى جنب مع بيانات الجدول. لتحسين الاستعلام عن نقاط التحقق، تقوم Delta Lake بتجميع إصدارات الجدول إلى ملفات نقطة التحقق Parquet، مما يمنع الحاجة إلى قراءة جميع إصدارات JSON من محفوظات الجدول. يحسن Azure Databricks تكرار نقاط التفتيش لحجم البيانات وعبء العمل. يجب ألا يحتاج المستخدمون إلى التفاعل مع نقاط التحقق مباشرة. يخضع تكرار نقطة التحقق للتغيير دون إشعار.
للاستعلام عن إصدار جدول سابق، يجب الاحتفاظ بكل من السجل وملفات البيانات لهذا الإصدار.
يتم حذف ملفات البيانات عند VACUUM
التشغيل مقابل جدول. تدير Delta Lake إزالة ملف السجل تلقائيا بعد التحقق من إصدارات الجدول.
نظرا لأن معظم جداول Delta قد VACUUM
تم تشغيلها عليها بانتظام، يجب أن تحترم الاستعلامات في نقطة زمنية حد الاستبقاء ل VACUUM
، وهو 7 أيام بشكل افتراضي.
لزيادة حد استبقاء البيانات لجداول Delta، يجب تكوين خصائص الجدول التالية:
delta.logRetentionDuration = "interval <interval>"
: يتحكم في المدة التي يتم فيها الاحتفاظ بمحفوظات الجدول. الافتراضي هوinterval 30 days
.delta.deletedFileRetentionDuration = "interval <interval>"
: يحدد الحدVACUUM
الذي يستخدم لإزالة ملفات البيانات التي لم تعد مشار إليها في إصدار الجدول الحالي. الافتراضي هوinterval 7 days
.
يمكنك تحديد خصائص Delta أثناء إنشاء الجدول أو تعيينها باستخدام عبارة ALTER TABLE
. راجع مرجع خصائص جدول Delta.
ملاحظة
يجب تعيين هاتين الخاصيتين لضمان الاحتفاظ بمحفوظات الجدول لمدة أطول للجداول ذات العمليات المتكررة VACUUM
. على سبيل المثال، للوصول إلى 30 يوما من البيانات التاريخية، قم بتعيين delta.deletedFileRetentionDuration = "interval 30 days"
(الذي يطابق الإعداد الافتراضي ل delta.logRetentionDuration
).
يمكن أن تؤدي زيادة حد استبقاء البيانات إلى ارتفاع تكاليف التخزين الخاصة بك، حيث يتم الاحتفاظ بمزيد من ملفات البيانات.
يمكنك استعادة جدول Delta إلى حالته السابقة باستخدام RESTORE
الأمر . يحتفظ جدول Delta داخليا بالإصدارات التاريخية من الجدول التي تمكنه من استعادته إلى حالة سابقة.
يتم دعم الإصدار المقابل للحالة السابقة أو الطابع الزمني لوقت إنشاء الحالة السابقة كخيارات RESTORE
بواسطة الأمر .
هام
- يمكنك استعادة جدول تمت استعادته بالفعل.
- يمكنك استعادة جدول مستنسخ .
- يجب أن يكون لديك
MODIFY
إذن على الجدول الذي يتم استعادته. - لا يمكنك استعادة جدول إلى إصدار أقدم حيث تم حذف ملفات البيانات يدويا أو بواسطة
vacuum
. لا يزال الاستعادة إلى هذا الإصدار ممكنا جزئيا إذاspark.sql.files.ignoreMissingFiles
تم تعيين إلىtrue
. - تنسيق الطابع الزمني للاستعادة إلى حالة سابقة هو
yyyy-MM-dd HH:mm:ss
. يتم أيضا دعم توفير سلسلة تاريخ (yyyy-MM-dd
) تاريخ فقط.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
للحصول على تفاصيل بناء الجملة، راجع استعادة.
هام
تعتبر الاستعادة عملية تغيير البيانات. تحتوي إدخالات سجل Delta Lake المضافة بواسطة RESTORE
الأمر على dataChange المعينة إلى true. إذا كان هناك تطبيق انتقال البيانات من الخادم، مثل مهمة التدفق المنظم التي تعالج التحديثات إلى جدول Delta Lake، فإن إدخالات سجل تغيير البيانات التي تمت إضافتها بواسطة عملية الاستعادة تعتبر تحديثات بيانات جديدة، وقد تؤدي معالجتها إلى بيانات مكررة.
على سبيل المثال:
إصدار الجدول | العملية | تحديثات سجل دلتا | السجلات في تحديثات سجل تغيير البيانات |
---|---|---|---|
0 | إدراج | AddFile(/path/to/file-1, dataChange = true) | (الاسم = فيكتور، العمر = 29، (الاسم = جورج، العمر = 55) |
1 | إدراج | AddFile(/path/to/file-2, dataChange = true) | (الاسم = جورج، العمر = 39) |
2 | تحسين | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (لا توجد سجلات مثل تحسين الضغط لا يغير البيانات في الجدول) |
3 | RESTORE(version=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (الاسم = فيكتور، العمر = 29)، (الاسم = جورج، العمر = 55)، (الاسم = جورج، العمر = 39) |
في المثال السابق، RESTORE
ينتج عن الأمر تحديثات تمت رؤيتها بالفعل عند قراءة إصدار جدول Delta 0 و1. إذا كان الاستعلام المتدفق يقرأ هذا الجدول، اعتبار هذه الملفات كبيانات تمت إضافتها حديثا وستتم معالجتها مرة أخرى.
RESTORE
يبلغ عن المقاييس التالية ك DataFrame صف واحد بمجرد اكتمال العملية:
table_size_after_restore
: حجم الجدول بعد الاستعادة.num_of_files_after_restore
: عدد الملفات في الجدول بعد الاستعادة.num_removed_files
: عدد الملفات التي تمت إزالتها (تم حذفها منطقيا) من الجدول.num_restored_files
: عدد الملفات التي تمت استعادتها بسبب التراجع.removed_files_size
: الحجم الإجمالي بالبايت للملفات التي تمت إزالتها من الجدول.restored_files_size
: الحجم الإجمالي بالبايت للملفات التي تمت استعادتها.
إصلاح عمليات الحذف العرضية إلى جدول للمستخدم
111
:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
إصلاح التحديثات غير الصحيحة العرضية لجدول:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
الاستعلام عن عدد العملاء الجدد الذين تمت إضافتهم خلال الأسبوع الماضي.
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
للحصول على رقم إصدار آخر تثبيت مكتوب بواسطة الحالي SparkSession
عبر جميع مؤشرات الترابط وجميع الجداول، استعلم عن تكوين spark.databricks.delta.lastCommitVersionInSession
SQL .
SET spark.databricks.delta.lastCommitVersionInSession
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
إذا لم يتم إجراء أي عمليات تثبيت بواسطة SparkSession
، فإن الاستعلام عن المفتاح يرجع قيمة فارغة.
ملاحظة
إذا كنت تشارك نفس الشيء SparkSession
عبر مؤشرات ترابط متعددة، فإنه يشبه مشاركة متغير عبر مؤشرات ترابط متعددة؛ قد تصل إلى شروط السباق حيث يتم تحديث قيمة التكوين بشكل متزامن.