مرجع لغة Delta Live Tables Python

تحتوي هذه المقالة على تفاصيل لواجهة برمجة Delta Live Tables Python.

للحصول على معلومات حول واجهة برمجة تطبيقات SQL، راجع مرجع لغة Delta Live Tables SQL.

للحصول على تفاصيل خاصة بتكوين "المحمل التلقائي"، راجع ما هو المحمل التلقائي؟.

قبل البدء

فيما يلي اعتبارات مهمة عند تنفيذ البنية الأساسية لبرنامج ربط العمليات التجارية باستخدام واجهة Delta Live Tables Python:

  • نظرا لأنه يتم استدعاء Python table() والوظائف view() عدة مرات أثناء تخطيط تحديث البنية الأساسية لبرنامج ربط العمليات التجارية وتشغيله، فلا تقم بتضمين التعليمات البرمجية في إحدى هذه الدالات التي قد يكون لها آثار جانبية (على سبيل المثال، التعليمات البرمجية التي تقوم بتعديل البيانات أو إرسال بريد إلكتروني). لتجنب السلوك غير المتوقع، يجب أن تتضمن وظائف Python التي تحدد مجموعات البيانات التعليمات البرمجية المطلوبة لتعريف الجدول أو العرض فقط.
  • لتنفيذ عمليات مثل إرسال رسائل البريد الإلكتروني أو التكامل مع خدمة مراقبة خارجية، خاصة في الوظائف التي تحدد مجموعات البيانات، استخدم خطافات الأحداث. سيؤدي تنفيذ هذه العمليات في الوظائف التي تحدد مجموعات البيانات الخاصة بك إلى سلوك غير متوقع.
  • يجب أن ترجع Python table والدالات view DataFrame. لا ترجع بعض الدالات التي تعمل على DataFrames DataFrames ولا يجب استخدامها. تتضمن هذه العمليات وظائف مثل collect()وcount()save()toPandas()saveAsTable(). نظرا لتنفيذ تحويلات DataFrame بعد حل الرسم البياني الكامل لتدفق البيانات، قد يكون لاستخدام مثل هذه العمليات آثار جانبية غير مقصودة.

dlt استيراد وحدة Python

يتم تعريف دالات Delta Live Tables Python في الوحدة النمطية dlt . يجب أن تستورد البنية الأساسية لبرنامج ربط العمليات التجارية التي تم تنفيذها باستخدام واجهة برمجة تطبيقات Python هذه الوحدة النمطية:

import dlt

إنشاء طريقة عرض أو جدول دفق مجسد في Delta Live Tables

في Python، تحدد Delta Live Tables ما إذا كان يجب تحديث مجموعة بيانات كعرض مجسد أو جدول دفق استنادا إلى الاستعلام التعريفي. @table يمكن استخدام مصمم الديكور لتحديد كل من طرق العرض المجسدة وجداول الدفق.

لتعريف طريقة عرض مجسدة في Python، قم بتطبيق @table على استعلام يقوم بإجراء قراءة ثابتة مقابل مصدر بيانات. لتعريف جدول دفق، قم بتطبيقه @table على استعلام يقوم بإجراء قراءة دفق مقابل مصدر بيانات أو استخدام الدالة create_streaming_table(). يحتوي كلا النوعين من مجموعات البيانات على نفس مواصفات بناء الجملة كما يلي:

إشعار

لاستخدام الوسيطة cluster_by لتمكين التجميع السائل، يجب تكوين البنية الأساسية لبرنامج ربط العمليات التجارية لاستخدام قناة المعاينة.

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

إنشاء طريقة عرض Delta Live Tables

لتعريف طريقة عرض في Python، قم بتطبيق @view مصمم الديكور. @table مثل مصمم الديكور، يمكنك استخدام طرق العرض في Delta Live Tables لمجموعات البيانات الثابتة أو المتدفقة. فيما يلي بناء الجملة لتعريف طرق العرض باستخدام Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

مثال: تعريف الجداول وطرق العرض

لتعريف جدول أو طريقة عرض في Python، قم بتطبيق @dlt.view أو @dlt.table مصمم الديكور على دالة. يمكنك استخدام اسم الدالة أو المعلمة name لتعيين اسم الجدول أو العرض. يحدد المثال التالي مجموعتي بيانات مختلفتين: طريقة عرض تسمى taxi_raw تأخذ ملف JSON كمصدر إدخال وجدول يسمى filtered_data يأخذ taxi_raw طريقة العرض كمدخل:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

مثال: الوصول إلى مجموعة بيانات معرفة في نفس المسار

بالإضافة إلى القراءة من مصادر البيانات الخارجية، يمكنك الوصول إلى مجموعات البيانات المعرفة في نفس المسار باستخدام دالة Delta Live Tables read() . يوضح المثال التالي إنشاء مجموعة customers_filtered بيانات باستخدام الدالة read() :

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

يمكنك أيضا استخدام الدالة spark.table() للوصول إلى مجموعة بيانات معرفة في نفس المسار. عند استخدام الدالة spark.table() للوصول إلى مجموعة بيانات معرفة في البنية الأساسية لبرنامج ربط العمليات التجارية، في وسيطة الدالة LIVE قبل الكلمة الأساسية إلى اسم مجموعة البيانات:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

مثال: القراءة من جدول مسجل في metastore

لقراءة البيانات من جدول مسجل في Hive metastore، في وسيطة الدالة، احذف LIVE الكلمة الأساسية وقم بتأهيل اسم الجدول اختياريا باسم قاعدة البيانات:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

للحصول على مثال للقراءة من جدول كتالوج Unity، راجع استيعاب البيانات في مسار كتالوج Unity.

مثال: الوصول إلى مجموعة بيانات باستخدام spark.sql

يمكنك أيضا إرجاع مجموعة بيانات باستخدام spark.sql تعبير في دالة استعلام. للقراءة من مجموعة بيانات داخلية، تابع LIVE. إلى اسم مجموعة البيانات:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

إنشاء جدول لاستخدامه كهدف لعمليات الدفق

استخدم الدالة create_streaming_table() لإنشاء جدول هدف لإخراج السجلات بواسطة عمليات الدفق، بما في ذلك apply_changes()، apply_changes_from_snapshot()، وسجلات الإخراج @append_flow .

إشعار

create_target_table() يتم إهمال الدالتين وcreate_streaming_live_table(). توصي Databricks بتحديث التعليمات البرمجية الموجودة لاستخدام الدالة create_streaming_table() .

إشعار

لاستخدام الوسيطة cluster_by لتمكين التجميع السائل، يجب تكوين البنية الأساسية لبرنامج ربط العمليات التجارية لاستخدام قناة المعاينة.

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)
الوسيطات
name

النوع: str

اسم الجدول.

هذه المعلمة مطلوبة.
comment

النوع: str

وصف اختياري للجدول.
spark_conf

النوع: dict

قائمة اختيارية من تكوينات Spark لتنفيذ هذا الاستعلام.
table_properties

النوع: dict

قائمة اختيارية بخصائص الجدول للجدول.
partition_cols

النوع: array

قائمة اختيارية مكونة من عمود واحد أو أكثر لاستخدامها في تقسيم الجدول.
cluster_by

النوع: array

تمكين التجميع السائل على الجدول بشكل اختياري وتحديد الأعمدة لاستخدامها كمفاتيح تجميع.

راجع استخدام التجميع السائل لجداول Delta.
path

النوع: str

موقع تخزين اختياري لبيانات الجدول. إذا لم يتم تعيينه، تعيين النظام افتراضيا إلى موقع تخزين البنية الأساسية لبرنامج ربط العمليات التجارية.
schema

النوع: str أو StructType

تعريف مخطط اختياري للجدول. يمكن تعريف المخططات كسلسلة SQL DDL أو باستخدام Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

النوع: dict

قيود جودة البيانات الاختيارية للجدول. راجع توقعات متعددة.
row_filter (معاينة عامة)

النوع: str

عبارة تصفية صف اختيارية للجدول. راجع نشر الجداول باستخدام عوامل تصفية الصفوف وأقنعة الأعمدة.

التحكم في كيفية ترجمة الجداول

توفر الجداول أيضا تحكما إضافيا في تجسيدها:

  • حدد كيفية تقسيم الجداول باستخدام partition_cols. يمكنك استخدام التقسيم لتسريع الاستعلامات.
  • يمكنك تعيين خصائص الجدول عند تعريف طريقة عرض أو جدول. راجع خصائص جدول Delta Live Tables.
  • تعيين موقع تخزين لبيانات الجدول باستخدام path الإعداد . بشكل افتراضي، يتم تخزين بيانات الجدول في موقع تخزين البنية الأساسية لبرنامج ربط العمليات التجارية إذا path لم يتم تعيينها.
  • يمكنك استخدام الأعمدة التي تم إنشاؤها في تعريف المخطط. راجع مثال: تحديد مخطط وأعمدة قسم.

إشعار

بالنسبة للجداول الأقل من 1 تيرابايت، توصي Databricks بالسماح ل Delta Live Tables بالتحكم في تنظيم البيانات. يجب عدم تحديد أعمدة القسم إلا إذا كنت تتوقع أن يزيد الجدول عن تيرابايت.

مثال: تحديد مخطط وأعمدة قسم

يمكنك اختياريا تحديد مخطط جدول باستخدام Python StructType أو سلسلة SQL DDL. عند تحديده باستخدام سلسلة DDL، يمكن أن يتضمن التعريف أعمدة تم إنشاؤها.

ينشئ المثال التالي جدولا يسمى sales بمخطط محدد باستخدام Python StructType:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

يحدد المثال التالي مخطط جدول باستخدام سلسلة DDL، ويحدد عمودا تم إنشاؤه، ويعرف عمود قسم:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

بشكل افتراضي، تستنتج Delta Live Tables المخطط من table التعريف إذا لم تحدد مخططا.

تكوين جدول دفق لتجاهل التغييرات في جدول تدفق المصدر

إشعار

  • تعمل العلامة skipChangeCommits فقط مع spark.readStream استخدام الدالة option() . لا يمكنك استخدام هذه العلامة في دالة dlt.read_stream() .
  • لا يمكنك استخدام العلامة skipChangeCommits عند تعريف جدول دفق المصدر كهدف لدالة apply_changes().

بشكل افتراضي، تتطلب جداول الدفق مصادر إلحاق فقط. عندما يستخدم جدول دفق جدول دفق آخر كمصدر، ويتطلب جدول دفق المصدر تحديثات أو حذف، على سبيل المثال، معالجة القانون العام لحماية البيانات (GDPR) "الحق في النسيان"، skipChangeCommits يمكن تعيين العلامة عند قراءة جدول دفق المصدر لتجاهل هذه التغييرات. لمزيد من المعلومات حول هذه العلامة، راجع تجاهل التحديثات والحذف.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

مثال: تعريف قيود الجدول

هام

قيود الجدول موجودة في المعاينة العامة.

عند تحديد مخطط، يمكنك تعريف المفاتيح الأساسية والخارجية. القيود إعلامية ولا يتم فرضها. راجع عبارة CONSTRAINT في مرجع لغة SQL.

يعرف المثال التالي جدولا مع قيد مفتاح أساسي ومفتاح خارجي:

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

مثال: تعريف عامل تصفية صف وقناع عمود

هام

توجد عوامل تصفية الصفوف وأقنعة الأعمدة في المعاينة العامة.

لإنشاء طريقة عرض مجسدة أو جدول دفق مع عامل تصفية صف وقناع عمود، استخدم عبارة ROW FILTER وعبارة MASK. يوضح المثال التالي كيفية تعريف طريقة عرض مجسدة وجدول دفق مع كل من عامل تصفية الصف وقناع العمود:

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

لمزيد من المعلومات حول عوامل تصفية الصفوف وأقنعة الأعمدة، راجع نشر الجداول باستخدام عوامل تصفية الصفوف وأقنعة الأعمدة.

خصائص جداول Python Delta Live

تصف الجداول التالية الخيارات والخصائص التي يمكنك تحديدها أثناء تعريف الجداول وطرق العرض باستخدام Delta Live Tables:

إشعار

لاستخدام الوسيطة cluster_by لتمكين التجميع السائل، يجب تكوين البنية الأساسية لبرنامج ربط العمليات التجارية لاستخدام قناة المعاينة.

@table أو @view
name

النوع: str

اسم اختياري للجدول أو طريقة العرض. إذا لم يتم تعريفه، يتم استخدام اسم الدالة كاسم الجدول أو العرض.
comment

النوع: str

وصف اختياري للجدول.
spark_conf

النوع: dict

قائمة اختيارية من تكوينات Spark لتنفيذ هذا الاستعلام.
table_properties

النوع: dict

قائمة اختيارية بخصائص الجدول للجدول.
path

النوع: str

موقع تخزين اختياري لبيانات الجدول. إذا لم يتم تعيينه، تعيين النظام افتراضيا إلى موقع تخزين البنية الأساسية لبرنامج ربط العمليات التجارية.
partition_cols

النوع: a collection of str

مجموعة اختيارية، على سبيل المثال، عمود list واحد أو أكثر لاستخدامه لتقسيم الجدول.
cluster_by

النوع: array

تمكين التجميع السائل على الجدول بشكل اختياري وتحديد الأعمدة لاستخدامها كمفاتيح تجميع.

راجع استخدام التجميع السائل لجداول Delta.
schema

النوع: str أو StructType

تعريف مخطط اختياري للجدول. يمكن تعريف المخططات كسلسلة SQL DDL أو باستخدام Python StructType.
temporary

النوع: bool

إنشاء جدول، ولكن لا تنشر بيانات التعريف للجدول. ترشد temporary الكلمة الأساسية Delta Live Tables لإنشاء جدول متوفر للبنية الأساسية لبرنامج ربط العمليات التجارية ولكن لا يجب الوصول إليه خارج البنية الأساسية لبرنامج ربط العمليات التجارية. لتقليل وقت المعالجة، يستمر جدول مؤقت طوال مدة بقاء المسار الذي يقوم بإنشائه، وليس فقط تحديث واحد.

الإعداد الافتراضي هو "خطأ".
row_filter (معاينة عامة)

النوع: str

عبارة تصفية صف اختيارية للجدول. راجع نشر الجداول باستخدام عوامل تصفية الصفوف وأقنعة الأعمدة.
تعريف الجدول أو العرض
def <function-name>()

دالة Python التي تحدد مجموعة البيانات. إذا لم يتم تعيين المعلمة name ، <function-name> استخدامها كاسم مجموعة البيانات الهدف.
query

عبارة Spark SQL التي ترجع Spark Dataset أو Koalas DataFrame.

استخدم dlt.read() أو spark.table() لتنفيذ قراءة كاملة من مجموعة بيانات معرفة في نفس المسار. عند استخدام الدالة للقراءة spark.table() من مجموعة بيانات معرفة في نفس المسار، قم بإيقاف LIVE الكلمة الأساسية مسبقا إلى اسم مجموعة البيانات في وسيطة الدالة. على سبيل المثال، للقراءة من مجموعة بيانات تسمى customers:

spark.table("LIVE.customers")

يمكنك أيضا استخدام الدالة spark.table() للقراءة من جدول مسجل في metastore عن طريق حذف LIVE الكلمة الأساسية وتأهيل اسم الجدول اختياريا باسم قاعدة البيانات:

spark.table("sales.customers")

يستخدم dlt.read_stream() لإجراء قراءة دفق من مجموعة بيانات محددة في نفس المسار.

استخدم الدالة spark.sql لتعريف استعلام SQL لإنشاء مجموعة بيانات الإرجاع.

استخدم بناء جملة PySpark لتعريف استعلامات Delta Live Tables باستخدام Python.
التوقعات
@expect("description", "constraint")

الإعلان عن قيد جودة البيانات الذي تم تحديده بواسطة
description. إذا انتهك صف التوقعات، فضمن الصف في مجموعة البيانات الهدف.
@expect_or_drop("description", "constraint")

الإعلان عن قيد جودة البيانات الذي تم تحديده بواسطة
description. إذا انتهك صف التوقعات، فسقط الصف من مجموعة البيانات الهدف.
@expect_or_fail("description", "constraint")

الإعلان عن قيد جودة البيانات الذي تم تحديده بواسطة
description. إذا انتهك صف التوقعات، فتوقف عن التنفيذ على الفور.
@expect_all(expectations)

الإعلان عن قيد واحد أو أكثر من قيود جودة البيانات.
expectations هو قاموس Python، حيث المفتاح هو وصف التوقع والقيمة هي قيد التوقع. إذا انتهك صف أيا من التوقعات، فبادر بتضمين الصف في مجموعة البيانات الهدف.
@expect_all_or_drop(expectations)

الإعلان عن قيد واحد أو أكثر من قيود جودة البيانات.
expectations هو قاموس Python، حيث المفتاح هو وصف التوقع والقيمة هي قيد التوقع. إذا انتهك صف أيا من التوقعات، فسقط الصف من مجموعة البيانات الهدف.
@expect_all_or_fail(expectations)

الإعلان عن قيد واحد أو أكثر من قيود جودة البيانات.
expectations هو قاموس Python، حيث المفتاح هو وصف التوقع والقيمة هي قيد التوقع. إذا انتهك صف أيا من التوقعات، فتوقف عن التنفيذ على الفور.

تغيير التقاط البيانات من موجز تغيير باستخدام Python في Delta Live Tables

استخدم الدالة apply_changes() في Python API لاستخدام وظيفة التقاط بيانات تغيير جداول Delta Live (CDC) لمعالجة بيانات المصدر من موجز بيانات التغيير (CDF).

هام

يجب الإعلان عن جدول تدفق هدف لتطبيق التغييرات فيه. يمكنك اختياريا تحديد المخطط للجدول الهدف. عند تحديد مخطط apply_changes() الجدول الهدف، يجب تضمين __START_AT العمودين و __END_AT بنفس نوع البيانات مثل sequence_by الحقول.

لإنشاء الجدول الهدف المطلوب، يمكنك استخدام الدالة create_streaming_table() في واجهة Delta Live Tables Python.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

إشعار

للمعالجة APPLY CHANGES ، السلوك الافتراضي للأحداث INSERT و UPDATE هو رفع أحداث CDC من المصدر: تحديث أي صفوف في الجدول الهدف تتطابق مع المفتاح (المفاتيح) المحددة أو إدراج صف جديد عندما لا يوجد سجل مطابق في الجدول الهدف. يمكن تحديد معالجة DELETE الأحداث مع APPLY AS DELETE WHEN الشرط .

لمعرفة المزيد حول معالجة التقاط بيانات التغيير باستخدام موجز التغيير، راجع واجهات برمجة التطبيقات APPLY CHANGES: تبسيط التقاط بيانات التغيير باستخدام Delta Live Tables. للحصول على مثال لاستخدام الدالة apply_changes() ، راجع مثال: معالجة SCD من النوع 1 وSCD من النوع 2 مع بيانات مصدر CDF.

هام

يجب الإعلان عن جدول تدفق هدف لتطبيق التغييرات فيه. يمكنك اختياريا تحديد المخطط للجدول الهدف. عند تحديد apply_changes مخطط الجدول الهدف، يجب تضمين __START_AT العمودين و __END_AT بنفس نوع البيانات مثل sequence_by الحقل.

راجع واجهات برمجة تطبيقات APPLY CHANGES: تبسيط التقاط بيانات التغيير باستخدام Delta Live Tables.

الوسيطات
target

النوع: str

اسم الجدول الذي سيتم تحديثه. يمكنك استخدام الدالة create_streaming_table() لإنشاء الجدول الهدف قبل تنفيذ الدالة apply_changes() .

هذه المعلمة مطلوبة.
source

النوع: str

مصدر البيانات الذي يحتوي على سجلات التقاط بيانات التغيير.

هذه المعلمة مطلوبة.
keys

النوع: list

العمود أو مجموعة الأعمدة التي تعرف صفا في البيانات المصدر بشكل فريد. يتم استخدام هذا لتحديد أحداث التقاط بيانات التغيير التي تنطبق على سجلات معينة في الجدول الهدف.

يمكنك تحديد إما:

- قائمة بالسلاسل: ["userId", "orderId"]
- قائمة بوظائف Spark SQL col() : [col("userId"), col("orderId"]

لا يمكن أن col() تتضمن وسيطات الدالات المؤهلات. على سبيل المثال، يمكنك استخدام col(userId)، ولكن لا يمكنك استخدام col(source.userId).

هذه المعلمة مطلوبة.
sequence_by

النوع: str أو col()

اسم العمود الذي يحدد الترتيب المنطقي لأحداث التقاط بيانات التغيير في البيانات المصدر. تستخدم Delta Live Tables هذا التسلسل لمعالجة أحداث التغيير التي تصل خارج الترتيب.

يمكنك تحديد إما:

- سلسلة: "sequenceNum"
- دالة Spark SQL col() : col("sequenceNum")

لا يمكن أن col() تتضمن وسيطات الدالات المؤهلات. على سبيل المثال، يمكنك استخدام col(userId)، ولكن لا يمكنك استخدام col(source.userId).

هذه المعلمة مطلوبة.
ignore_null_updates

النوع: bool

السماح باستيعاب التحديثات التي تحتوي على مجموعة فرعية من الأعمدة الهدف. عندما يتطابق حدث CDC مع صف موجود وهو ignore_null_updates True، تحتفظ الأعمدة null بقيمها الموجودة في الهدف. ينطبق هذا أيضا على الأعمدة المتداخلة بقيمة null. عندما ignore_null_updates يكون ، Falseتتم الكتابة فوق القيم الموجودة بالقيم null .

هذه المعلمة اختيارية.

الافتراضي هو False.
apply_as_deletes

النوع: str أو expr()

يحدد متى يجب التعامل مع حدث CDC على DELETE أنه upsert بدلا من أن يتم التعامل معه. لمعالجة البيانات خارج الترتيب، يتم الاحتفاظ بالصف المحذوف مؤقتا كحصة في جدول Delta الأساسي، ويتم إنشاء طريقة عرض في metastore الذي يقوم بتصفية علامات الحذف هذه. يمكن تكوين الفاصل الزمني للاحتفاظ باستخدام
pipelines.cdc.tombstoneGCThresholdInSecondsخاصية الجدول.

يمكنك تحديد إما:

- سلسلة: "Operation = 'DELETE'"
- دالة Spark SQL expr() : expr("Operation = 'DELETE'")

هذه المعلمة اختيارية.
apply_as_truncates

النوع: str أو expr()

يحدد متى يجب التعامل مع حدث التقاط بيانات التغيير كجدول TRUNCATEكامل . نظرا لأن هذه العبارة تؤدي إلى اقتطاع كامل للجدول الهدف، يجب استخدامها فقط لحالات استخدام معينة تتطلب هذه الوظيفة.

apply_as_truncates المعلمة معتمدة فقط لنوع SCD 1. لا يدعم SCD النوع 2 عمليات الاقتطاع.

يمكنك تحديد إما:

- سلسلة: "Operation = 'TRUNCATE'"
- دالة Spark SQL expr() : expr("Operation = 'TRUNCATE'")

هذه المعلمة اختيارية.
column_list

except_column_list

النوع: list

مجموعة فرعية من الأعمدة لتضمينها في الجدول الهدف. استخدم column_list لتحديد القائمة الكاملة للأعمدة المراد تضمينها. استخدم except_column_list لتحديد الأعمدة المراد استبعادها. يمكنك تعريف إما القيمة كقائمة سلاسل أو كدالات Spark SQL col() :

- column_list = ["userId", "name", "city"].
- column_list = [col("userId"), col("name"), col("city")]
- except_column_list = ["operation", "sequenceNum"]
- except_column_list = [col("operation"), col("sequenceNum")

لا يمكن أن col() تتضمن وسيطات الدالات المؤهلات. على سبيل المثال، يمكنك استخدام col(userId)، ولكن لا يمكنك استخدام col(source.userId).

هذه المعلمة اختيارية.

الإعداد الافتراضي هو تضمين كافة الأعمدة في الجدول الهدف عند تمرير لا column_list أو except_column_list وسيطة إلى الدالة .
stored_as_scd_type

النوع: str أو int

ما إذا كنت تريد تخزين السجلات كنوع SCD 1 أو SCD من النوع 2.

قم بتعيين إلى 1 لنوع SCD 1 أو 2 ل SCD من النوع 2.

هذه العبارة اختيارية.

الإعداد الافتراضي هو SCD من النوع 1.
track_history_column_list

track_history_except_column_list

النوع: list

مجموعة فرعية من أعمدة الإخراج المراد تعقبها للمحفوظات في الجدول الهدف. استخدم track_history_column_list لتحديد القائمة الكاملة للأعمدة التي سيتم تعقبها. استخدام
track_history_except_column_list لتحديد الأعمدة التي سيتم استبعادها من التعقب. يمكنك تعريف إما القيمة كقائمة سلاسل أو كدالات Spark SQL col() :
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

لا يمكن أن col() تتضمن وسيطات الدالات المؤهلات. على سبيل المثال، يمكنك استخدام col(userId)، ولكن لا يمكنك استخدام col(source.userId).

هذه المعلمة اختيارية.

الإعداد الافتراضي هو تضمين كافة الأعمدة في الجدول الهدف عندما لا أو track_history_column_list
track_history_except_column_list يتم تمرير الوسيطة إلى الدالة .

تغيير التقاط البيانات من لقطات قاعدة البيانات باستخدام Python في Delta Live Tables

هام

APPLY CHANGES FROM SNAPSHOT واجهة برمجة التطبيقات في المعاينة العامة.

استخدم الدالة apply_changes_from_snapshot() في واجهة برمجة تطبيقات Python لاستخدام وظيفة التقاط بيانات تغيير جداول Delta Live (CDC) لمعالجة بيانات المصدر من لقطات قاعدة البيانات.

هام

يجب الإعلان عن جدول تدفق هدف لتطبيق التغييرات فيه. يمكنك اختياريا تحديد المخطط للجدول الهدف. عند تحديد مخطط apply_changes_from_snapshot() الجدول الهدف، يجب أيضا تضمين __START_AT العمودين و __END_AT بنفس نوع البيانات مثل sequence_by الحقل.

لإنشاء الجدول الهدف المطلوب، يمكنك استخدام الدالة create_streaming_table() في واجهة Delta Live Tables Python.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

إشعار

للمعالجة APPLY CHANGES FROM SNAPSHOT ، السلوك الافتراضي هو إدراج صف جديد عندما لا يوجد سجل مطابق بنفس المفتاح (المفاتيح) في الهدف. إذا كان هناك سجل مطابق، يتم تحديثه فقط إذا تغيرت أي من القيم الموجودة في الصف. يتم حذف الصفوف ذات المفاتيح الموجودة في الهدف ولكن لم تعد موجودة في المصدر.

لمعرفة المزيد حول معالجة التقاط بيانات التغيير باستخدام اللقطات، راجع واجهات برمجة التطبيقات APPLY CHANGES: تبسيط التقاط بيانات التغيير باستخدام Delta Live Tables. للحصول على أمثلة لاستخدام الدالة apply_changes_from_snapshot() ، راجع أمثلة استيعاب اللقطات الدورية واستيعاب اللقطات التاريخية.

الوسيطات
target

النوع: str

اسم الجدول الذي سيتم تحديثه. يمكنك استخدام الدالة create_streaming_table() لإنشاء الجدول الهدف قبل تشغيل الدالة apply_changes() .

هذه المعلمة مطلوبة.
source

النوع: str أو lambda function

إما اسم جدول أو طريقة عرض للقطة بشكل دوري أو دالة Python lambda التي ترجع لقطة DataFrame لمعالجتها وإصدار اللقطة. راجع تنفيذ الوسيطة المصدر.

هذه المعلمة مطلوبة.
keys

النوع: list

العمود أو مجموعة الأعمدة التي تعرف صفا في البيانات المصدر بشكل فريد. يتم استخدام هذا لتحديد أحداث التقاط بيانات التغيير التي تنطبق على سجلات معينة في الجدول الهدف.

يمكنك تحديد إما:

- قائمة بالسلاسل: ["userId", "orderId"]
- قائمة بوظائف Spark SQL col() : [col("userId"), col("orderId"]

لا يمكن أن col() تتضمن وسيطات الدالات المؤهلات. على سبيل المثال، يمكنك استخدام col(userId)، ولكن لا يمكنك استخدام col(source.userId).

هذه المعلمة مطلوبة.
stored_as_scd_type

النوع: str أو int

ما إذا كنت تريد تخزين السجلات كنوع SCD 1 أو SCD من النوع 2.

قم بتعيين إلى 1 لنوع SCD 1 أو 2 ل SCD من النوع 2.

هذه العبارة اختيارية.

الإعداد الافتراضي هو SCD من النوع 1.
track_history_column_list

track_history_except_column_list

النوع: list

مجموعة فرعية من أعمدة الإخراج المراد تعقبها للمحفوظات في الجدول الهدف. استخدم track_history_column_list لتحديد القائمة الكاملة للأعمدة التي سيتم تعقبها. استخدام
track_history_except_column_list لتحديد الأعمدة التي سيتم استبعادها من التعقب. يمكنك تعريف إما القيمة كقائمة سلاسل أو كدالات Spark SQL col() :
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

لا يمكن أن col() تتضمن وسيطات الدالات المؤهلات. على سبيل المثال، يمكنك استخدام col(userId)، ولكن لا يمكنك استخدام col(source.userId).

هذه المعلمة اختيارية.

الإعداد الافتراضي هو تضمين كافة الأعمدة في الجدول الهدف عندما لا أو track_history_column_list
track_history_except_column_list يتم تمرير الوسيطة إلى الدالة .

تنفيذ الوسيطة source

apply_changes_from_snapshot() تتضمن الدالة الوسيطة source . لمعالجة اللقطات التاريخية، source من المتوقع أن تكون الوسيطة دالة Python lambda التي ترجع قيمتين إلى apply_changes_from_snapshot() الدالة: Python DataFrame يحتوي على بيانات اللقطة التي ستتم معالجتها وإصدار لقطة.

فيما يلي توقيع الدالة lambda:

lambda Any => Optional[(DataFrame, Any)]
  • الوسيطة إلى دالة lambda هي أحدث إصدار لقطة تمت معالجته.
  • القيمة المرجعة لدالة lambda هي None أو مجموعة من قيمتين: القيمة الأولى للمجموعة هي DataFrame تحتوي على اللقطة التي ستتم معالجتها. القيمة الثانية للمجموعة هي إصدار اللقطة الذي يمثل الترتيب المنطقي للقطة.

مثال ينفذ ويستدعي دالة lambda:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

ينفذ وقت تشغيل Delta Live Tables الخطوات التالية في كل مرة يتم فيها تشغيل البنية الأساسية لبرنامج ربط العمليات التجارية التي تحتوي على الدالة apply_changes_from_snapshot() :

  1. تشغيل الدالة next_snapshot_and_version لتحميل اللقطة التالية DataFrame وإصدار اللقطة المقابل.
  2. إذا لم يتم إرجاع DataFrame، يتم إنهاء التشغيل ويتم وضع علامة على تحديث البنية الأساسية لبرنامج ربط العمليات التجارية كمكتمل.
  3. يكتشف التغييرات في اللقطة الجديدة ويطبقها بشكل متزايد على الجدول الهدف.
  4. إرجاع إلى الخطوة رقم 1 لتحميل اللقطة التالية وإصدارها.

القيود

تحتوي واجهة Delta Live Tables Python على القيد التالي:

الدالة pivot() غير معتمدة. pivot تتطلب العملية في Spark تحميلا حريصا لبيانات الإدخال لحساب مخطط الإخراج. هذه الإمكانية غير معتمدة في Delta Live Tables.