البرنامج التعليمي: Delta Lake

يقدم هذا البرنامج التعليمي عمليات Delta Lake الشائعة على Azure Databricks، بما في ذلك ما يلي:

يمكنك تشغيل مثال التعليمات البرمجية Python وSc scala وSQL في هذه المقالة من داخل دفتر ملاحظات مرفق بمورد حساب Azure Databricks مثل نظام مجموعة. يمكنك أيضا تشغيل التعليمات البرمجية SQL في هذه المقالة من داخل استعلام مقترن بمستودع SQL في Databricks SQL.

إعداد بيانات المصدر

يعتمد هذا البرنامج التعليمي على مجموعة بيانات تسمى People 10 M. يحتوي على 10 ملايين سجل وهمي يحتوي على حقائق عن الناس، مثل الأسماء الأولى والأخيرة وتاريخ الميلاد والراتب. يفترض هذا البرنامج التعليمي أن مجموعة البيانات هذه موجودة في وحدة تخزين كتالوج Unity المقترنة بمساحة عمل Azure Databricks المستهدفة.

للحصول على مجموعة بيانات People 10 M لهذا البرنامج التعليمي، قم بما يلي:

  1. انتقل إلى صفحة الأشخاص 10 M في Kaggle.
  2. انقر فوق تنزيل لتنزيل ملف باسم archive.zip إلى جهازك المحلي.
  3. استخراج الملف المسمى export.csv من archive.zip الملف. export.csv يحتوي الملف على بيانات هذا البرنامج التعليمي.

لتحميل export.csv الملف إلى وحدة التخزين، قم بما يلي:

  1. على الشريط الجانبي، انقر فوق كتالوج.
  2. في مستكشف الكتالوج، استعرض وصولا إلى وحدة التخزين وافتحها حيث تريد تحميل export.csv الملف.
  3. انقر فوق تحميل إلى وحدة التخزين هذه.
  4. اسحب الملف الموجود على جهازك المحلي وأفلته أو استعرض وصولا إليه وحدده export.csv .
  5. انقر فوق تحميل.

في أمثلة التعليمات البرمجية التالية، استبدل /Volumes/main/default/my-volume/export.csv بالمسار إلى export.csv الملف في وحدة التخزين الهدف.

إنشاء جدول

تستخدم جميع الجداول التي تم إنشاؤها على Azure Databricks Delta Lake بشكل افتراضي. توصي Databricks باستخدام الجداول المدارة في كتالوج Unity.

في مثال التعليمات البرمجية السابق وأمثلة التعليمات البرمجية التالية، استبدل اسم main.default.people_10m الجدول بالكتالوج والمخطط واسم الجدول المكون من ثلاثة أجزاء الهدف في كتالوج Unity.

إشعار

Delta Lake هو الافتراضي لجميع أوامر القراءة والكتابة وإنشاء الجدول Azure Databricks.

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")

SQL

CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

تنشئ العمليات السابقة جدولا مدارا جديدا. للحصول على معلومات حول الخيارات المتوفرة عند إنشاء جدول دلتا، راجع إنشاء جدول.

في Databricks Runtime 13.3 LTS والإصدارات الأحدث، يمكنك استخدام CREATE TABLE LIKE لإنشاء جدول Delta فارغ جديد يكرر خصائص المخطط والجدول لجدول Delta المصدر. يمكن أن يكون هذا مفيدا بشكل خاص عند ترقية الجداول من بيئة تطوير إلى إنتاج، كما هو موضح في مثال التعليمات البرمجية التالي:

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

لإنشاء جدول فارغ، يمكنك أيضا استخدام DeltaTableBuilder واجهة برمجة التطبيقات في Delta Lake ل Python وSc scala. مقارنة بواجهات برمجة تطبيقات DataFrameWriter المكافئة، تسهل واجهات برمجة التطبيقات هذه تحديد معلومات إضافية مثل تعليقات الأعمدة وخصائص الجدول والأعمدة التي تم إنشاؤها.

هام

هذه الميزة في المعاينة العامة.

Python

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Scala

DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Upsert إلى جدول

لدمج مجموعة من التحديثات والإدخالات في جدول Delta موجود، يمكنك استخدام DeltaTable.merge الأسلوب ل Python وSc scala وعلامتي MERGE INTO ل SQL. على سبيل المثال، يأخذ المثال التالي البيانات من الجدول المصدر ويدمجها في جدول Delta الهدف. عند وجود صف مطابق في كلا الجدولين، يقوم Delta Lake بتحديث عمود البيانات باستخدام التعبير المحدد. عندما لا يوجد صف مطابق، يضيف Delta Lake صفا جديدا. تعرف هذه العملية باسم upsert.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

SQL

CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

في SQL، إذا قمت بتحديد *، يقوم هذا بتحديث أو إدراج كافة الأعمدة في الجدول الهدف، على افتراض أن الجدول المصدر يحتوي على نفس الأعمدة مثل الجدول الهدف. إذا لم يكن الجدول الهدف يحتوي على نفس الأعمدة، يطرح الاستعلام خطأ تحليل.

يجب تحديد قيمة لكل عمود في الجدول عند تنفيذ عملية إدراج (على سبيل المثال، عند عدم وجود صف مطابق في مجموعة البيانات الموجودة). ومع ذلك، لا تحتاج إلى تحديث كافة القيم.

لمشاهدة النتائج، استعلم عن الجدول.

Python

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Scala

val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)

SQL

SELECT * FROM main.default.people_10m WHERE id >= 9999998

قراءة جدول

يمكنك الوصول إلى البيانات في جداول Delta حسب اسم الجدول أو مسار الجدول، كما هو موضح في الأمثلة التالية:

Python

people_df = spark.read.table("main.default.people_10m")
display(people_df)

Scala

val people_df = spark.read.table("main.default.people_10m")
display(people_df)

SQL

SELECT * FROM main.default.people_10m;

الكتابة إلى جدول

يستخدم Delta Lake بناء الجملة القياسي لكتابة البيانات إلى الجداول.

لإضافة بيانات جديدة تلقائيا إلى جدول Delta موجود، استخدم وضع الإلحاق كما هو موضح في الأمثلة التالية:

Python

df.write.mode("append").saveAsTable("main.default.people_10m")

Scala

df.write.mode("append").saveAsTable("main.default.people_10m")

SQL

INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

لاستبدال كافة البيانات في جدول، استخدم وضع الكتابة فوق كما في الأمثلة التالية:

Python

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Scala

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

SQL

INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

تحديث جدول

يمكنك تحديث البيانات التي تطابق دالة تقييم في جدول Delta. على سبيل المثال، في جدول المثال people_10m ، لتغيير اختصار في gender العمود من M أو F إلى Male أو Female، يمكنك تشغيل ما يلي:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

SQL

UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

حذف من جدول

يمكنك إزالة البيانات التي تطابق دالة تقييم من جدول Delta. على سبيل المثال، في جدول المثال people_10m ، لحذف كافة الصفوف المقابلة للأشخاص الذين يعانون من قيمة في birthDate العمود من قبل 1955، يمكنك تشغيل ما يلي:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

SQL

DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

هام

يزيل الحذف البيانات من أحدث إصدار من جدول Delta ولكنه لا يزيلها من التخزين الفعلي حتى يتم تفريغ الإصدارات القديمة بشكل صريح. راجع الفراغ للحصول على التفاصيل.

عرض محفوظات الجدول

لعرض محفوظات جدول، يمكنك استخدام DeltaTable.history الأسلوب ل Python وSc scala، وبيان وصف المحفوظات في SQL، الذي يوفر معلومات المصدر، بما في ذلك إصدار الجدول والعملية والمستخدم وما إلى ذلك، لكل كتابة إلى جدول.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

SQL

DESCRIBE HISTORY main.default.people_10m

الاستعلام عن إصدار سابق من الجدول (السفر عبر الزمن)

يسمح لك السفر عبر الوقت في Delta Lake بالاستعلام عن لقطة قديمة لجدول Delta.

للاستعلام عن إصدار أقدم من جدول، حدد إصدار الجدول أو الطابع الزمني. على سبيل المثال، للاستعلام عن الإصدار 0 أو الطابع 2024-05-15T22:43:15.000+00:00Z الزمني من المحفوظات السابقة، استخدم ما يلي:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

بالنسبة للطوابع الزمنية، يتم قبول سلاسل التاريخ أو الطابع الزمني فقط، على سبيل المثال، "2024-05-15T22:43:15.000+00:00" أو "2024-05-15 22:43:15".

تسمح لك خيارات DataFrameReader بإنشاء DataFrame من جدول Delta تم إصلاحه إلى إصدار أو طابع زمني معين للجدول، على سبيل المثال:

Python

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Scala

val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

للحصول على التفاصيل، راجع العمل مع محفوظات جدول Delta Lake.

تحسين جدول

بعد إجراء تغييرات متعددة على جدول، قد يكون لديك الكثير من الملفات الصغيرة. لتحسين سرعة قراءة الاستعلامات، يمكنك استخدام عملية التحسين لطي الملفات الصغيرة إلى ملفات أكبر:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize()

SQL

OPTIMIZE main.default.people_10m

ترتيب Z حسب الأعمدة

لتحسين أداء القراءة بشكل أكبر، يمكنك تجميع المعلومات ذات الصلة في نفس مجموعة الملفات حسب ترتيب z. تستخدم خوارزميات تخطي بيانات Delta Lake هذا الترتيب لتقليل كمية البيانات التي تحتاج إلى قراءة بشكل كبير. ل z-order data، يمكنك تحديد الأعمدة المراد ترتيبها في ترتيب z حسب العملية. على سبيل المثال، للترشيح حسب gender، قم بتشغيل:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

SQL

OPTIMIZE main.default.people_10m
ZORDER BY (gender)

للحصول على المجموعة الكاملة من الخيارات المتوفرة عند تشغيل عملية التحسين، راجع تحسين تخطيط ملف البيانات.

تنظيف اللقطات باستخدام VACUUM

يوفر Delta Lake عزل اللقطة للقراءات، ما يعني أنه من الآمن تشغيل عملية تحسين حتى أثناء قيام المستخدمين أو الوظائف الأخرى بالاستعلام عن الجدول. ومع ذلك، في النهاية، يجب تنظيف اللقطات القديمة. يمكنك القيام بذلك عن طريق تشغيل عملية التفريغ:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

SQL

VACUUM main.default.people_10m

للحصول على تفاصيل حول استخدام عملية التفريغ بشكل فعال، راجع إزالة ملفات البيانات غير المستخدمة باستخدام الفراغ.