نظرة عامة Delta Lake منLinux Foundation

تمت ملاءمة هذه المقالة لتكون أكثر وضوحاً مقارنة بنظيرتها الأصلية هنا. تساعدك هذه المقالة على استكشاف الميزات الرئيسة لـDelta Lake بصورة سريعة. توفر المقالة أجزاءً من التعليمات البرمجية التي توضح كيفية قراءة جداول Delta Lake والكتابة فيها من الاستعلامات التفاعلية والمتدفقة والمقدمة على دفعات. وتتوفر أجزاء التعليمات البرمجية أيضاً في مجموعة من المفكرات PySpark here وScala hereا وC# here

إليك ما سنتناوله:

  • إنشاء جدول
  • قراءة البيانات
  • تحديث بيانات الجدول
  • استبدال بيانات الجدول
  • تحديث شرطي دون استبدال
  • قراءة الإصدارات القديمة من البيانات باستخدام Time Travel
  • كتابة دفق بيانات في الجدول
  • قراءة دفق تغييرات من جدول
  • دعم SQL

التكوين

تأكد من تعديل القيم الموضحة أدناه لما يناسب بيئتك.

import random

session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)

delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";

deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";

النتائج:

'/delta/delta-table-335323'

إنشاء جدول

لإنشاء جدولDelta Lake، اكتب إطار بيانات خارج إطار بيانات بتنسيق delta. يمكنك تغيير التنسيق من Parquet وCSV وJSON، وهكذا، إلى delta.

توضح لك التعليمة البرمجية الآتية كيفية إنشاء جدول Delta Lake جديد باستخدام المخطط المستدل عليه من إطار بياناتك.

data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)

النتائج:

المعرف
0
1
2
3
4

قراءة البيانات

يمكنك قراءة البيانات في جدولDelta Lake لديك بتحديد مسار الملفات وتنسيق delta.

df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()

النتائج:

المعرف
1
3
4
0
2

يختلف ترتيب النتائج عن النحو الموضح أعلاه نظراً إلى عدم وجود ترتيب موضح بشكلٍ صريح قبل إخراج النتائج.

تحديث بيانات الجدول

يدعم Delta Lake العديد من العمليات لتعديل الجداول باستخدام واجهات برمجة تطبيقات DataFrame القياسية. هذه العمليات هي واحدة من التحسينات التي يضيفها تنسيق دلتا. في المثال الآتي، يتم تشغيل وظيفة دفعية لاستبدال البيانات الموجودة في الجدول.

data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()

النتائج:

المعرف
7
8
5
9
6

هنا يمكنك أن ترى أنه تم تحديث السجلات الخمسة كلها لاستيعاب قيم جديدة.

الحفظ في صورة جداول كتالوج

يمكن لـDelta Lake الكتابة على جداول الكتالوج المُدارة أو الخارجية.

data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show

النتائج:

قاعدة البيانات اسم الجدول مؤقت
افتراضي externaldeltatable false
افتراضي manageddeltatable false

بهذه التعليمة البرمجية، أنشأت جدولاً جديداً في الكتالوج من إطار بيانات موجود، يُشار إليه بالجدول المُدار. ثم حددت جدولاً خارجياً جديداً في الكتالوج الذي يستخدم موقعاً موجوداً، يُشار إليه بالجدول الخارجي. في المخرجات، يمكنك رؤية كلا الجدولين مُدرجين في الكتالوج بغض النظر عن طريقة إنشائهما.

الآن يمكنك عرض الخصائص الموسعة لكل من هذين الجدولين

spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)

النتائج:

اسم العمود نوع البيانات تعليق
المعرف Bigint خالٍ
معلومات الجدول المفصلة
قاعدة البيانات افتراضي
الجدول manageddeltatable
مالك فرد موثوق به من مستخدمي الخدمة
وقت الإنشاء السبت 25 أبريل 2020 الساعة 00:35:34 بالتوقيت العالمي
آخر وصول الخميس 01 يناير 1970 الساعة 00:00:00 بالتوقيت العالمي
جهة الإنشاء Spark 2.4.4.2.6.99.201-11401300
النوع مُدار
⁧⁩"الموفر"⁧⁩ delta
خصائص الجدول [transient_lastDdlTime=1587774934]
الإحصائيات‬ 2407 بايت
‏‏الموقع abfss://data@<data lake>.dfs.core.windows.net/synapse/workspaces/<workspace name>/warehouse/manageddeltatable
Serde Library org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
معلومات المدخلات org.apache.hadoop.mapred.SequenceFileInputFormat
معلومات المخرجات org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
خصائص التخزين [serialization.format=1]
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)

النتائج:

اسم العمود نوع البيانات تعليق
المعرف Bigint خالٍ
معلومات الجدول المفصلة
قاعدة البيانات افتراضي
الجدول externaldeltatable
مالك فرد موثوق به من مستخدمي الخدمة
وقت الإنشاء السبت 25 أبريل 2020 الساعة 12:35:38 ص بالتوقيت العالمي
آخر وصول الخميس 01 يناير 1970 الساعة 00:00:00 بالتوقيت العالمي
جهة الإنشاء Spark 2.4.4.2.6.99.201-11401300
النوع خارجي
⁧⁩"الموفر"⁧⁩ دلتا
خصائص الجدول [transient_lastDdlTime=1587774938]
‏‏الموقع abfss://data@<data lake>.dfs.core.windows.net/delta/delta-table-587152
Serde Library org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
معلومات المدخلات org.apache.hadoop.mapred.SequenceFileInputFormat
معلومات المخرجات org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
خصائص التخزين [serialization.format=1]

تحديث شرطي دون استبدال

يوفر Delta Lake واجهات برمجة تطبيقات برمجية لبيانات التحديث الشرطي والحذف والدمج (يشار إلى هذا الأمر عادة باسم upsert) في الجداول.

from delta.tables import *
from pyspark.sql.functions import *

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;

var deltaTable = DeltaTable.ForPath(deltaTablePath);

deltaTable.Update(
  condition: Expr("id % 2 == 0"),
  set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath(deltaTablePath)

// Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show

النتائج:

المعرف
106
108
5
7
9

هنا أضفت 100 فقط لكل معرف زوجي.

delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show

النتائج:

المعرف
5
7
9

لاحظ أنه تم حذف كل الصفوف.

new_data = spark.range(0,20).alias("newData")

delta_table.alias("oldData")\
    .merge(new_data.alias("newData"), "oldData.id = newData.id")\
    .whenMatchedUpdate(set = { "id": lit("-1")})\
    .whenNotMatchedInsert(values = { "id": col("newData.id") })\
    .execute()

delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");

deltaTable
    .As("oldData")
    .Merge(newData, "oldData.id = newData.id")
    .WhenMatched()
        .Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
    .WhenNotMatched()
        .Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
    .Execute();

deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF

deltaTable.as("oldData").
  merge(
    newData.as("newData"),
    "oldData.id = newData.id").
  whenMatched.
  update(Map("id" -> lit(-1))).
  whenNotMatched.
  insert(Map("id" -> col("newData.id"))).
  execute()

deltaTable.toDF.show()

النتائج:

المعرف
18
15
19
2
1
6
8
3
-1
10
13
0
16
4
-1
12
11
14
-1
17

هنا لديك مزيج من البيانات الموجودة. تم تعيين القيمة -1للبيانات الموجودة في مسار التعليمات البرمجية للتحديث (عند المطابقة). تمت إضافة البيانات الجديدة أيضاً التي تم إنشاؤها في الجزء العلوي من جزء التعليمة البرمجية وتمت إضافتها عبر إدراج مسار التعليمة البرمجية (عند المطابقة).

المحفوظات

يمتلك Delta Lake صلاحية السماح بالنظر في سجل الجدول. أي التغييرات التي تم إجراؤها على جدول Delta الأساسي. توضح الخلية أدناه مدى بساطة فحص المحفوظات.

delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)

النتائج:

الإصدار طابع زمني معرّف المستخدم userName ‏‏العملية operationParameters المهمة notebook clusterId readVersion isolationLevel isBlindAppend
4 2020-04-25 00:36:27 خالٍ خالٍ دمج [predicate -> (oldData.ID = newData.ID)] خالٍ خالٍ خالٍ 3 خالٍ false
3 2020-04-25 12:36:08 ص خالٍ خالٍ DELETE [predicate -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] خالٍ خالٍ خالٍ 2 خالٍ false
2 2020-04-25 12:35:51 ص خالٍ خالٍ UPDATE [predicate -> ((ID#744L % cast(2 as bigint)) = cast(0 as bigint))] خالٍ خالٍ خالٍ 1 خالٍ false
1 2020-04-25 12:35:05 ص خالٍ خالٍ كتابة [mode -> Overwrite, partitionBy -> []] خالٍ خالٍ خالٍ 0 خالٍ false
0 2020-04-25 12:34:34 ص خالٍ خالٍ كتابة [mode -> ErrorIfExists, partitionBy -> []] خالٍ خالٍ خالٍ خالٍ خالٍ صواب

يمكنك أن ترى هنا كل التعديلات التي أجريت على أجزاء التعليمات البرمجية أعلاه.

قراءة الإصدارات القديمة من البيانات باستخدام Time Travel

يمكن الاستعلام عن اللقطات السابقة لجدول Delta Lake باستخدام ميزة تسمى Time Travel. إذا كنت ترغب في الوصول إلى البيانات التي قمت بالكتابة فوقها، فيمكنك الاستعلام عن لقطة للجدول قبل الكتابة فوق المجموعة الأولى من البيانات باستخدام الخيار versionAsOf.

بمجرد تشغيل الخلية أدناه، يجب أن ترى المجموعة الأولى من البيانات قبل الكتابة فوقها. Time Travel هي ميزة قوية تستفيد من قوة سجل معاملات Delta Lake للوصول إلى البيانات التي لم تعد موجودة في الجدول. تتيح لك إزالة خيار الإصدار 0 (أو تحديد الإصدار 1) الاطلاع على أحدث البيانات مرة أخرى. لمزيد من المعلومات، راجع الاستعلام عن لقطة قديمة لجدول.

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()

النتائج:

المعرف
0
1
4
3
2

هنا يمكنك أن ترى أنك عدت إلى أقرب إصدار من البيانات.

كتابة دفق بيانات في الجدول

يمكنك أيضاً الكتابة إلى جدول Delta Lake باستخدام الدفق المنظم من Spark. يضمن سجل معاملات Delta Lake المعالجة مرة واحدة بالضبط، حتى عندما توجد عمليات دفق أخرى أو استعلامات دفعات تعمل بشكل متزامن في الجدول. يتم تشغيل عمليات الدفق بشكل افتراضي في وضع الإلحاق، الذي يضيف سجلات جديدة إلى الجدول.

لمزيد من المعلومات حول تكامل Delta Lake مع دفق منظم، راجع جدول دفق القراءات والكتابات.

إليك ما نقوم به في الخلايا أدناه:

  • تُظهر الخلية 30 البيانات الملحقة حديثاً
  • تفحص الخلية 31 المحفوظات
  • توقف الخلية 32 مهمة الدفق المنظم
  • تفحص الخلية 33 المحفوظات <--ستلاحظ توقف عمليات الإلحاق

أولا، ستقوم بإعداد مهمة Spark Streaming بسيطة لإنشاء تسلسل وجعل المهمة تكتب إلى Delta Table الخاص بك.

streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
    .selectExpr("value as id")\
    .writeStream\
    .format("delta")\
    .option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
    .start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)

قراءة دفق تغييرات من جدول

في أثناء كتابة الدفق إلى جدول Delta Lake، يمكنك أيضاً القراءة من هذا الجدول كمصدر دفق. على سبيل المثال، يمكنك بدء تشغيل استعلام دفق آخر يطبع جميع التغييرات التي تم إجراؤها على جدول Delta Lake.

delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show

النتائج:

المعرف
19
18
17
16
15
14
13
12
11
10
8
6
4
3
2
1
0
-1
-1
-1
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show

النتائج:

الإصدار طابع زمني ‏‏العملية operationParameters readVersion
5 2020-04-25 12:37:09 ص تحديث الدفق [outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 دمج [predicate -> (oldData.id = newData.id)] 3
3 2020-04-25 12:36:08 ص DELETE [predicate -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 12:35:51 ص UPDATE [predicate -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 12:35:05 ص كتابة [mode -> Overwrite, partitionBy -> []] 0
0 2020-04-25 12:34:34 ص كتابة [mode -> ErrorIfExists, partitionBy -> []] خالٍ

هنا تقوم بإسقاط بعض الأعمدة الأقل إثارة للاهتمام لتبسيط تجربة عرض طريقة عرض المحفوظات.

stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show

النتائج:

الإصدار طابع زمني ‏‏العملية operationParameters readVersion
5 2020-04-25 12:37:09 ص تحديث الدفق [outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 دمج [predicate -> (oldData.id = newData.id)] 3
3 2020-04-25 12:36:08 ص DELETE [predicate -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 12:35:51 ص UPDATE [predicate -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 12:35:05 ص كتابة [mode -> Overwrite, partitionBy -> []] 0
0 2020-04-25 12:34:34 ص كتابة [mode -> ErrorIfExists, partitionBy -> []] خالٍ

تحويل Parquet إلى Delta

يمكنك إجراء تحويل في الموقع نفسه من تنسيق Parquet إلى Delta.

هنا ستقوم باختبار ما إذا كان الجدول الموجود بتنسيق دلتا أم لا.

parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)

النتائج:

خطأ

الآن ستقوم بتحويل البيانات إلى تنسيق دلتا والتحقق من أنها تعمل.

DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

النتائج:

صواب

دعم SQL

يدعم Delta أوامر الأداة المساعدة للجدول خلال SQL. يمكنك استخدام SQL من أجل:

  • الحصول على محفوظات DeltaTable
  • تفريغ DeltaTable
  • تحويل ملف Parquet إلى Delta
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()

النتائج:

الإصدار طابع زمني معرّف المستخدم userName ‏‏العملية operationParameters المهمة notebook clusterId readVersion isolationLevel isBlindAppend
5 2020-04-25 12:37:09 ص خالٍ خالٍ تحديث الدفق [outputMode -> Ap... خالٍ خالٍ خالٍ 4 خالٍ صواب
4 2020-04-25 00:36:27 خالٍ خالٍ دمج [predicate -> (ol... خالٍ خالٍ خالٍ 3 خالٍ false
3 2020-04-25 12:36:08 ص خالٍ خالٍ DELETE [predicate -> ["(... خالٍ خالٍ خالٍ 2 خالٍ false
2 2020-04-25 12:35:51 ص خالٍ خالٍ UPDATE [predicate -> ((i... خالٍ خالٍ خالٍ 1 خالٍ false
1 2020-04-25 12:35:05 ص خالٍ خالٍ كتابة [mode -> Overwrit... خالٍ خالٍ خالٍ 0 خالٍ false
0 2020-04-25 12:34:34 ص خالٍ خالٍ كتابة [mode -> ErrorIfE... خالٍ خالٍ خالٍ خالٍ خالٍ صواب
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()

النتائج:

مسار
abfss://data@arca...

الآن، ستتحقق من أن الجدول ليس جدول تنسيق دلتا. بعد ذلك، ستقوم بتحويل الجدول إلى تنسيق دلتا باستخدام Spark SQL والتأكد من تحويله بشكل صحيح.

parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId =  (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

النتائج:

صواب

للحصول على الوثائق الكاملة، راجع صفحة وثائق Delta Lake

لمزيد من المعلومات، راجع مشروع Delta Lake.

الخطوات التالية