البرنامج التعليمي: تشغيل مسار تحليلات مستودع شامل

يوضح لك هذا البرنامج التعليمي كيفية إعداد مسار تحليلات شاملة ل Azure Databricks lakehouse.

هام

يستخدم هذا البرنامج التعليمي دفاتر الملاحظات التفاعلية لإكمال مهام ETL الشائعة في مجموعات تمكين Python على كتالوج Unity. إذا كنت لا تستخدم كتالوج Unity، فشاهد تشغيل حمل عمل ETL الأول على Azure Databricks.

المهام في هذا البرنامج التعليمي

بنهاية هذه المقالة، سوف تشعر بالراحة:

  1. تشغيل مجموعة حساب ممكنة في كتالوج Unity.
  2. إنشاء دفتر ملاحظات Databricks.
  3. كتابة البيانات وقراءتها من موقع خارجي كتالوج Unity.
  4. تكوين استيعاب البيانات التزايدية إلى جدول كتالوج Unity باستخدام أداة التحميل التلقائي.
  5. تنفيذ خلايا دفتر الملاحظات لمعالجة البيانات والاستعلام عنها ومعاينة البيانات.
  6. جدولة دفتر ملاحظات كوظيفة Databricks.
  7. الاستعلام عن جداول كتالوج Unity من Databricks SQL

يوفر Azure Databricks مجموعة من الأدوات الجاهزة للإنتاج التي تسمح لمتخصصي البيانات بتطوير ونشر مسارات الاستخراج والتحويل والتحميل (ETL) بسرعة. يسمح كتالوج Unity لوكلاء البيانات بتكوين بيانات اعتماد التخزين والمواقع الخارجية وعناصر قاعدة البيانات وتأمينها للمستخدمين في جميع أنحاء المؤسسة. يسمح Databricks SQL للمحللين بتشغيل استعلامات SQL مقابل نفس الجداول المستخدمة في إنتاج أحمال عمل ETL، ما يسمح للمعلومات المهنية في الوقت الحقيقي على نطاق واسع.

يمكنك أيضا استخدام Delta Live Tables لإنشاء مسارات ETL. أنشأت Databricks Delta Live Tables لتقليل تعقيد بناء وتوزيع وصيانة خطوط أنابيب ETL للإنتاج. راجع البرنامج التعليمي: تشغيل خط أنابيب Delta Live Tables الأول.

المتطلبات

إشعار

إذا لم يكن لديك امتيازات التحكم في نظام المجموعة، فلا يزال بإمكانك إكمال معظم الخطوات أدناه طالما لديك حق الوصول إلى نظام مجموعة.

الخطوة 1: إنشاء نظام مجموعة

للقيام بتحليل البيانات الاستكشافية وهندسة البيانات، قم بإنشاء نظام مجموعة لتوفير موارد الحوسبة اللازمة لتنفيذ الأوامر.

  1. انقر فوق أيقونة الحسابحساب في الشريط الجانبي.
  2. انقر فوق أيقونة جديدةجديد في الشريط الجانبي، ثم حدد نظام المجموعة. يؤدي ذلك إلى فتح صفحة New Cluster/Compute.
  3. حدد اسما فريدا للمجموعة.
  4. حدد الزر التبادلي للعقدة الواحدة.
  5. حدد Single User من القائمة المنسدلة Access mode .
  6. تأكد من ظهور عنوان بريدك الإلكتروني في حقل المستخدم الفردي.
  7. حدد إصدار وقت تشغيل Databricks المطلوب، 11.1 أو أعلى لاستخدام كتالوج Unity.
  8. انقر فوق Create compute لإنشاء نظام المجموعة.

لمعرفة المزيد حول مجموعات Databricks، راجع الحساب.

الخطوة 2: إنشاء دفتر ملاحظات Databricks

لبدء كتابة التعليمات البرمجية التفاعلية وتنفيذها على Azure Databricks، قم بإنشاء دفتر ملاحظات.

  1. انقر فوق أيقونة جديدةجديد في الشريط الجانبي، ثم انقر فوق دفتر الملاحظات.
  2. في صفحة إنشاء دفتر ملاحظات:
    • حدد اسما فريدا لدفتر الملاحظات.
    • تأكد من تعيين اللغة الافتراضية إلى Python.
    • استخدم القائمة المنسدلة الاتصال لتحديد المجموعة التي قمت بإنشائها في الخطوة 1 من القائمة المنسدلة Cluster.

يتم فتح دفتر الملاحظات بخلية واحدة فارغة.

لمعرفة المزيد حول إنشاء دفاتر الملاحظات وإدارتها، راجع إدارة دفاتر الملاحظات.

الخطوة 3: كتابة البيانات وقراءتها من موقع خارجي يديره كتالوج Unity

توصي Databricks باستخدام أداة التحميل التلقائي لاستيعاب البيانات المتزايدة. يقوم "التحميل التلقائي" تلقائيا بالكشف عن الملفات الجديدة ومعالجتها عند وصولها إلى تخزين كائن السحابة.

استخدم كتالوج Unity لإدارة الوصول الآمن إلى المواقع الخارجية. يمكن للمستخدمين أو كيانات الخدمة الذين لديهم READ FILES أذونات على موقع خارجي استخدام "المحمل التلقائي" لاستيعاب البيانات.

عادة، ستصل البيانات إلى موقع خارجي بسبب عمليات الكتابة من أنظمة أخرى. في هذا العرض التوضيحي، يمكنك محاكاة وصول البيانات عن طريق كتابة ملفات JSON إلى موقع خارجي.

انسخ التعليمات البرمجية أدناه إلى خلية دفتر ملاحظات. استبدل قيمة السلسلة باسم catalog كتالوج ب CREATE CATALOG وأذونات USE CATALOG . استبدل قيمة السلسلة ل external_location بمسار موقع خارجي بأذونات READ FILESWRITE FILESو وCREATE EXTERNAL TABLE.

يمكن تعريف المواقع الخارجية كحاوية تخزين بأكملها، ولكن غالبا ما تشير إلى دليل متداخل في حاوية.

التنسيق الصحيح لمسار موقع خارجي هو "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location".


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

يجب أن يطبع تنفيذ هذه الخلية سطرا يقرأ 12 بايت، ويطبع السلسلة "Hello world!"، ويعرض جميع قواعد البيانات الموجودة في الكتالوج المتوفر. إذا لم تتمكن من تشغيل هذه الخلية، فتأكد من أنك في مساحة عمل ممكنة لكتالوج Unity واطلب أذونات مناسبة من مسؤول مساحة العمل لإكمال هذا البرنامج التعليمي.

يستخدم رمز Python أدناه عنوان بريدك الإلكتروني لإنشاء قاعدة بيانات فريدة في الكتالوج المتوفر وموقع تخزين فريد في الموقع الخارجي المقدم. سيؤدي تنفيذ هذه الخلية إلى إزالة جميع البيانات المقترنة بهذا البرنامج التعليمي، مما يسمح لك بتنفيذ هذا المثال بشكل متكرر. يتم تعريف فئة وإنشاء مثيل لها ستستخدمه لمحاكاة دفعات البيانات التي تصل من نظام متصل إلى الموقع الخارجي المصدر.

انسخ هذه التعليمة البرمجية إلى خلية جديدة في دفتر الملاحظات وقم بتنفيذها لتكوين بيئتك.

إشعار

يجب أن تسمح لك المتغيرات المحددة في هذه التعليمة البرمجية بتنفيذها بأمان دون خطر التعارض مع أصول مساحة العمل الموجودة أو المستخدمين الآخرين. ستؤدي أذونات الشبكة أو التخزين المقيدة إلى حدوث أخطاء عند تنفيذ هذه التعليمة البرمجية؛ اتصل بمسؤول مساحة العمل لاستكشاف هذه القيود وإصلاحها.


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

يمكنك الآن إرسال دفعة من البيانات عن طريق نسخ التعليمات البرمجية التالية إلى خلية وتنفيذها. يمكنك تنفيذ هذه الخلية يدويا حتى 60 مرة لتشغيل وصول بيانات جديدة.

RawData.land_batch()

الخطوة 4: تكوين أداة التحميل التلقائي لاستيعاب البيانات في كتالوج Unity

توصي Databricks بتخزين البيانات باستخدام Delta Lake. Delta Lake هي طبقة تخزين مصدر مفتوح توفر معاملات ACID وتمكن مستودع البيانات. Delta Lake هو التنسيق الافتراضي للجداول التي تم إنشاؤها في Databricks.

لتكوين المحمل التلقائي لاستيعاب البيانات في جدول كتالوج Unity، انسخ التعليمات البرمجية التالية والصقها في خلية فارغة في دفتر ملاحظاتك:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

لمعرفة المزيد حول "المحمل التلقائي"، راجع ما هو المحمل التلقائي؟.

لمعرفة المزيد حول الدفق المنظم باستخدام كتالوج Unity، راجع استخدام كتالوج Unity مع الدفق المنظم.

الخطوة 5: معالجة البيانات والتفاعل معها

تنفذ دفاتر الملاحظات خلية منطقية بخلية. استخدم هذه الخطوات لتنفيذ المنطق في الخلية:

  1. لتشغيل الخلية التي أكملتها في الخطوة السابقة، حدد الخلية واضغط على SHIFT+ENTER.

  2. للاستعلام عن الجدول الذي أنشأته للتو، انسخ التعليمات البرمجية التالية والصقها في خلية فارغة، ثم اضغط على SHIFT+ENTER لتشغيل الخلية.

    df = spark.read.table(table_name)
    
  3. لمعاينة البيانات في DataFrame، انسخ التعليمات البرمجية التالية والصقها في خلية فارغة، ثم اضغط على SHIFT+ENTER لتشغيل الخلية.

    display(df)
    

لمعرفة المزيد حول الخيارات التفاعلية لتصور البيانات، راجع المرئيات في دفاتر ملاحظات Databricks.

الخطوة 6: جدولة وظيفة

يمكنك تشغيل دفاتر ملاحظات Databricks كنصوص إنتاج عن طريق إضافتها كمهمة في مهمة Databricks. في هذه الخطوة، ستقوم بإنشاء وظيفة جديدة يمكنك تشغيلها يدويا.

لجدولة دفتر الملاحظات كمهمة:

  1. انقر فوق جدولة على الجانب الأيسر من شريط الرأس.
  2. أدخل اسما فريدا لاسم الوظيفة.
  3. انقر فوق يدوي.
  4. في القائمة المنسدلة Cluster ، حدد نظام المجموعة الذي أنشأته في الخطوة 1.
  5. انقر فوق Create.
  6. في النافذة التي تظهر، انقر فوق تشغيل الآن.
  7. لمشاهدة نتائج تشغيل المهمة، انقر فوق الأيقونة ارتباط خارجي الموجودة بجانب الطابع الزمني آخر تشغيل .

لمزيد من المعلومات حول الوظائف، راجع ما هي وظائف Azure Databricks؟.

الخطوة 7: جدول الاستعلام من Databricks SQL

يمكن لأي شخص لديه USE CATALOG إذن في الكتالوج الحالي، USE SCHEMA والإذن على المخطط الحالي، والأذونات SELECT على الجدول الاستعلام عن محتويات الجدول من واجهة برمجة تطبيقات Databricks المفضلة لديه.

تحتاج إلى الوصول إلى مستودع SQL قيد التشغيل لتنفيذ الاستعلامات في Databricks SQL.

الجدول الذي قمت بإنشائه سابقا في هذا البرنامج التعليمي له اسم target_table. يمكنك الاستعلام عنه باستخدام الكتالوج الذي قدمته في الخلية الأولى وقاعدة البيانات مع الأب e2e_lakehouse_<your-username>. يمكنك استخدام مستكشف الكتالوج للعثور على كائنات البيانات التي قمت بإنشائها.

عمليات تكامل إضافية

تعرف على المزيد حول عمليات التكامل والأدوات لهندسة البيانات مع Azure Databricks: