تحميل البيانات باستخدام جداول Delta Live

يمكنك تحميل البيانات من أي مصدر بيانات يدعمه Apache Spark على Azure Databricks باستخدام Delta Live Tables. يمكنك تعريف مجموعات البيانات (الجداول وطرق العرض) في Delta Live Tables مقابل أي استعلام يقوم بإرجاع Spark DataFrame، بما في ذلك دفق DataFrames وPandas ل Spark DataFrames. بالنسبة لمهام استيعاب البيانات، توصي Databricks باستخدام جداول الدفق لمعظم حالات الاستخدام. تعد جداول الدفق جيدة لاستيعاب البيانات من تخزين الكائنات السحابية باستخدام أداة التحميل التلقائي أو من حافلات الرسائل مثل Kafka. توضح الأمثلة أدناه بعض الأنماط الشائعة.

هام

لا تحتوي جميع مصادر البيانات على دعم SQL. يمكنك خلط دفاتر ملاحظات SQL وPython في مسار Delta Live Tables لاستخدام SQL لجميع العمليات التي تتجاوز الاستيعاب.

للحصول على تفاصيل حول العمل مع المكتبات غير المجمعة في جداول Delta Live بشكل افتراضي، راجع إدارة تبعيات Python لخطوط أنابيب Delta Live Tables.

تحميل الملفات من تخزين كائن السحابة

توصي Databricks باستخدام أداة التحميل التلقائي مع Delta Live Tables لمعظم مهام استيعاب البيانات من تخزين الكائنات السحابية. تم تصميم أداة التحميل التلقائي وجداول Delta Live لتحميل البيانات المتزايدة بشكل متزايد وغير متكرر عند وصولها إلى التخزين السحابي. تستخدم الأمثلة التالية Auto Loader لإنشاء مجموعات بيانات من ملفات CSV وJSON:

إشعار

لتحميل الملفات باستخدام أداة التحميل التلقائي في البنية الأساسية لبرنامج ربط العمليات التجارية الممكنة لكتالوج Unity، يجب استخدام مواقع خارجية. لمعرفة المزيد حول استخدام كتالوج Unity مع Delta Live Tables، راجع استخدام كتالوج Unity مع خطوط أنابيب Delta Live Tables.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

راجع ما هو المحمل التلقائي؟ وبناء جملة SQL للتحميل التلقائي.

تحذير

إذا كنت تستخدم أداة التحميل التلقائي مع إعلامات الملفات وقمت بتشغيل تحديث كامل للبنية الأساسية لبرنامج ربط العمليات التجارية أو جدول البث، يجب تنظيف الموارد يدويا. يمكنك استخدام CloudFilesResourceManager في دفتر ملاحظات لإجراء التنظيف.

تحميل البيانات من ناقل رسائل

يمكنك تكوين خطوط أنابيب Delta Live Tables لاستيعاب البيانات من حافلات الرسائل باستخدام جداول الدفق. توصي Databricks بدمج جداول الدفق مع التنفيذ المستمر والتحجيم التلقائي المحسن لتوفير الاستيعاب الأكثر كفاءة لتحميل زمن الانتقال المنخفض من حافلات الرسائل. راجع تحسين استخدام نظام المجموعة لخطوط أنابيب Delta Live Tables باستخدام التحجيم التلقائي المحسن.

على سبيل المثال، تقوم التعليمات البرمجية التالية بتكوين جدول تدفق لاستيعاب البيانات من Kafka:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

يمكنك كتابة عمليات انتقال البيانات من الخادم في SQL فقط لإجراء تحويلات دفق على هذه البيانات، كما في المثال التالي:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

للحصول على مثال للعمل مع مراكز الأحداث، راجع استخدام Azure Event Hubs كمصدر بيانات Delta Live Tables.

راجع تكوين مصادر بيانات الدفق.

تحميل البيانات من الأنظمة الخارجية

تدعم Delta Live Tables تحميل البيانات من أي مصدر بيانات يدعمه Azure Databricks. راجع الاتصال لمصادر البيانات. يمكنك أيضا تحميل البيانات الخارجية باستخدام Lakehouse Federation لمصادر البيانات المدعومة. لأن Lakehouse Federation يتطلب Databricks Runtime 13.3 LTS أو أعلى، لاستخدام Lakehouse Federation، يجب تكوين البنية الأساسية لبرنامج ربط العمليات التجارية لاستخدام قناة المعاينة.

لا تحتوي بعض مصادر البيانات على دعم مكافئ في SQL. إذا لم تتمكن من استخدام Lakehouse Federation مع أحد مصادر البيانات هذه، يمكنك استخدام دفتر ملاحظات Python مستقل لاستيعاب البيانات من المصدر. يمكن بعد ذلك إضافة دفتر الملاحظات هذا كمكتبة مصدر مع دفاتر ملاحظات SQL لإنشاء مسار Delta Live Tables. يوضح المثال التالي طريقة عرض مجسدة للوصول إلى الحالة الحالية للبيانات في جدول PostgreSQL بعيد:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

تحميل مجموعات البيانات الصغيرة أو الثابتة من تخزين كائن السحابة

يمكنك تحميل مجموعات بيانات صغيرة أو ثابتة باستخدام بناء جملة تحميل Apache Spark. تدعم Delta Live Tables جميع تنسيقات الملفات التي يدعمها Apache Spark على Azure Databricks. للحصول على قائمة كاملة، راجع خيارات تنسيق البيانات.

توضح الأمثلة التالية تحميل JSON لإنشاء جداول Delta Live Tables:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH LIVE TABLE clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

إشعار

تعد SELECT * FROM format.`path`; بنية SQL شائعة لجميع بيئات SQL على Azure Databricks. وهو النمط الموصى به للوصول المباشر إلى الملفات باستخدام SQL مع Delta Live Tables.

الوصول بأمان إلى بيانات اعتماد التخزين مع البيانات السرية في البنية الأساسية لبرنامج ربط العمليات التجارية

يمكنك استخدام أسرار Azure Databricks لتخزين بيانات الاعتماد مثل مفاتيح الوصول أو كلمات المرور. لتكوين البيانات السرية في البنية الأساسية لبرنامج ربط العمليات التجارية الخاصة بك، استخدم خاصية Spark في تكوين مجموعة إعدادات البنية الأساسية لبرنامج ربط العمليات التجارية. راجع تكوين إعدادات الحساب.

يستخدم المثال التالي سرا لتخزين مفتاح وصول مطلوب لقراءة بيانات الإدخال من حساب تخزين Azure Data Lake Storage Gen2 (ADLS Gen2) باستخدام المحمل التلقائي. يمكنك استخدام هذا الأسلوب نفسه لتكوين أي سر مطلوب بواسطة البنية الأساسية لبرنامج ربط العمليات التجارية الخاصة بك، على سبيل المثال، مفاتيح AWS للوصول إلى S3، أو كلمة المرور إلى Apache Hive metastore.

لمعرفة المزيد حول العمل مع Azure Data Lake Storage Gen2، راجع الاتصال إلى Azure Data Lake Storage Gen2 وBlob Storage.

إشعار

يجب إضافة البادئة spark.hadoop. إلى spark_conf مفتاح التكوين الذي يعين القيمة السرية.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

الاستبدال

  • <storage-account-name> مع اسم حساب تخزين ADLS Gen2.
  • <scope-name> مع اسم النطاق السري Azure Databricks.
  • <secret-name> باسم المفتاح الذي يحتوي على مفتاح الوصول إلى حساب تخزين Azure.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

الاستبدال

  • <container-name> باسم حاوية حساب تخزين Azure التي تخزن بيانات الإدخال.
  • <storage-account-name> مع اسم حساب تخزين ADLS Gen2.
  • <path-to-input-dataset> مع المسار إلى مجموعة بيانات الإدخال.

تحميل البيانات من Azure Event Hubs

Azure Event Hubs هي خدمة تدفق البيانات التي توفر واجهة متوافقة مع Apache Kafka. يمكنك استخدام موصل Structured Streaming Kafka، المضمن في وقت تشغيل Delta Live Tables، لتحميل الرسائل من Azure Event Hubs. لمعرفة المزيد حول تحميل الرسائل ومعالجتها من Azure Event Hubs، راجع استخدام Azure Event Hubs كمصدر بيانات Delta Live Tables.