تنسيق وظائف Azure Databricks باستخدام Apache Airflow

توضح هذه المقالة دعم Apache Airflow لتنسيق مسارات البيانات باستخدام Azure Databricks، وتحتوي على إرشادات لتثبيت وتكوين Airflow محليا، وتوفر مثالا لنشر وتشغيل سير عمل Azure Databricks مع Airflow.

تزامن الوظيفة في مسار بيانات

غالبا ما يتطلب تطوير ونشر مسار معالجة البيانات إدارة التبعيات المعقدة بين المهام. على سبيل المثال، قد يقرأ المسار البيانات من مصدر، وتنظيف البيانات، وتحويل البيانات التي تم تنظيفها، وكتابة البيانات المحولة إلى هدف. تحتاج أيضا إلى دعم لاختبار الأخطاء وجدولتها واستكشاف الأخطاء وإصلاحها عند تشغيل البنية الأساسية لبرنامج ربط العمليات التجارية.

تعالج أنظمة سير العمل هذه التحديات من خلال السماح لك بتحديد التبعيات بين المهام، والجدول الزمني عند تشغيل المسارات، ومراقبة مهام سير العمل. Apache Airflow هو حل مصدر مفتوح لإدارة وجدولة مسارات البيانات. يمثل تدفق الهواء خطوط أنابيب البيانات كرسوم بيانية دورةية موجهة (DAGs) للعمليات. يمكنك تحديد سير عمل في ملف Python، ويدير Airflow الجدولة والتنفيذ. يتيح لك اتصال Airflow Azure Databricks الاستفادة من محرك Spark المحسن الذي تقدمه Azure Databricks مع ميزات الجدولة ل Airflow.

المتطلبات

  • يتطلب التكامل بين Airflow وAzure Databricks الإصدار 2.5.0 من Airflow والإصدارات الأحدث. يتم اختبار الأمثلة في هذه المقالة مع Airflow الإصدار 2.6.1.
  • يتطلب تدفق الهواء Python 3.8 أو 3.9 أو 3.10 أو 3.11. يتم اختبار الأمثلة في هذه المقالة باستخدام Python 3.8.
  • تتطلب الإرشادات الواردة في هذه المقالة لتثبيت وتشغيل Airflow pipenv لإنشاء بيئة Python الظاهرية.

مشغلو تدفق الهواء ل Databricks

يتكون Airflow DAG من مهام، حيث تقوم كل مهمة بتشغيل مشغل تدفق الهواء. يتم تنفيذ مشغلي تدفق الهواء الذين يدعمون التكامل مع Databricks في موفر Databricks.

يتضمن موفر Databricks عوامل تشغيل لتشغيل عدد من المهام مقابل مساحة عمل Azure Databricks، بما في ذلك استيراد البيانات إلى جدول، وتشغيل استعلامات SQL، والعمل مع مجلدات Databricks Git.

يقوم موفر Databricks بتنفيذ عاملي تشغيل لتشغيل المهام:

لإنشاء مهمة Azure Databricks جديدة أو إعادة تعيين مهمة موجودة، يقوم موفر Databricks بتنفيذ DatabricksCreateJobsOperator. DatabricksCreateJobsOperator يستخدم POST /api/2.1/jobs/create وPOST /api/2.1/jobs/reset API requests. يمكنك استخدام DatabricksCreateJobsOperator مع DatabricksRunNowOperator لإنشاء وظيفة وتشغيلها.

إشعار

يتطلب استخدام عوامل تشغيل Databricks لتشغيل مهمة توفير بيانات اعتماد في تكوين اتصال Databricks. راجع إنشاء رمز مميز للوصول الشخصي ل Azure Databricks ل Airflow.

يكتب مشغلو Databricks Airflow عنوان URL لصفحة تشغيل المهمة إلى سجلات Airflow كل polling_period_seconds (الافتراضي هو 30 ثانية). لمزيد من المعلومات، راجع صفحة حزمة apache-airflow-providers-databricks على موقع Airflow على الويب.

تثبيت تكامل Airflow Azure Databricks محليا

لتثبيت Airflow وموفر Databricks محليا للاختبار والتطوير، استخدم الخطوات التالية. للحصول على خيارات تثبيت Airflow الأخرى، بما في ذلك إنشاء تثبيت إنتاج، راجع التثبيت في وثائق Airflow.

افتح محطة طرفية ثم قم بتشغيل الأوامر التالية:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

استبدل <firstname>و <lastname>و <email> باسم المستخدم والبريد الإلكتروني. ستتم مطالبتك بإدخال كلمة مرور للمستخدم المسؤول. تأكد من حفظ كلمة المرور هذه لأنها مطلوبة لتسجيل الدخول إلى واجهة مستخدم Airflow.

ينفذ هذا البرنامج النصي الخطوات التالية:

  1. إنشاء دليل باسم airflow والتغييرات في هذا الدليل.
  2. يستخدم pipenv لإنشاء بيئة Python الظاهرية وإنشاءها. توصي Databricks باستخدام بيئة Python الظاهرية لعزل إصدارات الحزمة وتبعيات التعليمات البرمجية لتلك البيئة. يساعد هذا العزل على تقليل عدم تطابق إصدار الحزمة غير المتوقع وتضارب تبعية التعليمات البرمجية.
  3. تهيئة متغير بيئة يسمى AIRFLOW_HOME تعيين إلى مسار airflow الدليل.
  4. تثبيت Airflow وحزم موفر Airflow Databricks.
  5. airflow/dags إنشاء دليل. يستخدم dags Airflow الدليل لتخزين تعريفات DAG.
  6. تهيئة قاعدة بيانات SQLite التي يستخدمها Airflow لتعقب بيانات التعريف. في توزيع تدفق الهواء للإنتاج، يمكنك تكوين Airflow مع قاعدة بيانات قياسية. تتم تهيئة قاعدة بيانات SQLite والتكوين الافتراضي لنشر Airflow في airflow الدليل.
  7. إنشاء مستخدم مسؤول ل Airflow.

تلميح

لتأكيد تثبيت موفر Databricks، قم بتشغيل الأمر التالي في دليل تثبيت Airflow:

airflow providers list

بدء تشغيل خادم ويب Airflow والمجدول

مطلوب خادم ويب Airflow لعرض واجهة مستخدم Airflow. لبدء تشغيل خادم الويب، افتح محطة طرفية في دليل تثبيت Airflow وقم بتشغيل الأوامر التالية:

إشعار

إذا فشل خادم ويب Airflow في البدء بسبب تعارض في المنفذ، يمكنك تغيير المنفذ الافتراضي في تكوين Airflow.

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

المجدول هو مكون Airflow الذي يقوم بجدولة DAGs. لبدء المجدول، افتح محطة طرفية جديدة في دليل تثبيت Airflow وقم بتشغيل الأوامر التالية:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

اختبار تثبيت Airflow

للتحقق من تثبيت Airflow، يمكنك تشغيل أحد أمثلة DAGs المضمنة في Airflow:

  1. في نافذة المستعرض، افتح http://localhost:8080/home. سجل الدخول إلى واجهة مستخدم Airflow باستخدام اسم المستخدم وكلمة المرور التي أنشأتها عند تثبيت Airflow. تظهر صفحة Airflow DAGs .
  2. انقر فوق مفتاح التبديل Pause/Unpause DAG لإلغاء استخدام أحد أمثلة DAGs، على سبيل المثال، example_python_operator.
  3. قم بتشغيل مثال DAG بالنقر فوق الزر Trigger DAG .
  4. انقر فوق اسم DAG لعرض التفاصيل، بما في ذلك حالة تشغيل DAG.

إنشاء رمز مميز للوصول الشخصي ل Azure Databricks ل Airflow

يتصل Airflow ب Databricks باستخدام رمز الوصول الشخصي Azure Databricks (PAT). لإنشاء PAT:

  1. في مساحة عمل Azure Databricks، انقر فوق اسم مستخدم Azure Databricks في الشريط العلوي، ثم حدد الإعدادات من القائمة المنسدلة.
  2. انقر فوق المطور.
  3. إلى جانب رموز الوصول المميزة، انقر فوق إدارة.
  4. النقر على Generate new token.
  5. (اختياري) أدخل تعليقا يساعدك على تحديد هذا الرمز المميز في المستقبل، وتغيير العمر الافتراضي للرمز المميز وهو 90 يوما. لإنشاء رمز مميز بدون مدة بقاء (غير مستحسن)، اترك مربع مدة البقاء (أيام) فارغا (فارغ).
  6. انقر فوق "Generate".
  7. انسخ الرمز المميز المعروض إلى موقع آمن، ثم انقر فوق تم.

إشعار

تأكد من حفظ الرمز المميز المنسخ في موقع آمن. لا تشارك الرمز المميز المنسخ مع الآخرين. إذا فقدت الرمز المميز المنسخ، فلا يمكنك إعادة إنشاء نفس الرمز المميز بالضبط. بدلا من ذلك، يجب تكرار هذا الإجراء لإنشاء رمز مميز جديد. إذا فقدت الرمز المميز الذي تم نسخه، أو كنت تعتقد أنه تم اختراق الرمز المميز، فإن Databricks يوصي بشدة بحذف هذا الرمز المميز على الفور من مساحة العمل الخاصة بك عن طريق النقر فوق أيقونة سلة المهملات (إبطال) بجوار الرمز المميز في صفحة رموز Access المميزة .

إذا لم تتمكن من إنشاء الرموز المميزة أو استخدامها في مساحة العمل الخاصة بك، فقد يرجع ذلك إلى قيام مسؤول مساحة العمل بتعطيل الرموز المميزة أو عدم منحك الإذن لإنشاء الرموز المميزة أو استخدامها. راجع مسؤول مساحة العمل أو المواضيع التالية:

إشعار

كأفضل ممارسة أمان، عند المصادقة باستخدام الأدوات والأنظمة والبرامج النصية والتطبيقات التلقائية، توصي Databricks باستخدام رموز الوصول الشخصية التي تنتمي إلى كيانات الخدمة بدلا من مستخدمي مساحة العمل. لإنشاء رموز مميزة لكيانات الخدمة، راجع إدارة الرموز المميزة لكيان الخدمة.

يمكنك أيضا المصادقة على Azure Databricks باستخدام رمز مميز لمعرف Microsoft Entra. راجع اتصال Databricks في وثائق Airflow.

تكوين اتصال Azure Databricks

يحتوي تثبيت Airflow على اتصال افتراضي ل Azure Databricks. لتحديث الاتصال للاتصال بمساحة العمل باستخدام رمز الوصول الشخصي الذي أنشأته أعلاه:

  1. في نافذة المستعرض، افتح http://localhost:8080/connection/list/. إذا تمت مطالبتك بتسجيل الدخول، أدخل اسم المستخدم وكلمة المرور للمسؤول.
  2. ضمن معرف Conn، حدد موقع databricks_default وانقر فوق الزر تحرير السجل .
  3. استبدل القيمة في حقل المضيف باسم مثيل مساحة العمل لنشر Azure Databricks، على سبيل المثال، https://adb-123456789.cloud.databricks.com.
  4. في حقل كلمة المرور ، أدخل رمز الوصول الشخصي الخاص بك في Azure Databricks.
  5. انقر فوق حفظ.

إذا كنت تستخدم رمزا مميزا لمعرف Microsoft Entra، فشاهد اتصال Databricks في وثائق Airflow للحصول على معلومات حول تكوين المصادقة.

مثال: إنشاء Airflow DAG لتشغيل وظيفة Azure Databricks

يوضح المثال التالي كيفية إنشاء نشر Airflow بسيط يعمل على جهازك المحلي وينشر مثالا على DAG لتشغيل عمليات التشغيل في Azure Databricks. في هذا المثال، سوف:

  1. إنشاء دفتر ملاحظات جديد وإضافة تعليمة برمجية لطباعة ترحيب استنادا إلى معلمة مكونة.
  2. إنشاء مهمة Azure Databricks مع مهمة واحدة تقوم بتشغيل دفتر الملاحظات.
  3. تكوين اتصال Airflow بمساحة عمل Azure Databricks.
  4. إنشاء Airflow DAG لتشغيل مهمة دفتر الملاحظات. يمكنك تعريف DAG في برنامج نصي Python باستخدام DatabricksRunNowOperator.
  5. استخدم واجهة مستخدم Airflow لتشغيل DAG وعرض حالة التشغيل.

إنشاء دفتر ملاحظات

يستخدم هذا المثال دفتر ملاحظات يحتوي على خليتين:

  • تحتوي الخلية الأولى على عنصر واجهة مستخدم نص Databricks Utilities يحدد متغيرا يسمى greeting معينا إلى القيمة worldالافتراضية .
  • تطبع الخلية الثانية قيمة greeting المتغير مسبوقا ب hello.

لإنشاء دفتر الملاحظات:

  1. انتقل إلى مساحة عمل Azure Databricks، وانقر فوق أيقونة جديدة جديد في الشريط الجانبي، وحدد دفتر الملاحظات.

  2. امنح دفتر ملاحظاتك اسما، مثل Hello Airflow، وتأكد من تعيين اللغة الافتراضية إلى Python.

  3. انسخ التعليمة البرمجية ل Python التالية والصقها في الخلية الأولى من دفتر الملاحظات.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. أضف خلية جديدة أسفل الخلية الأولى وانسخ التعليمة البرمجية ل Python التالية والصقها في الخلية الجديدة:

    print("hello {}".format(greeting))
    

قم بإنشاء وظيفة

  1. انقر فوق أيقونة مهام سير العمل مهام سير العمل في الشريط الجانبي.

  2. انقر فوق الزر .

    تظهر علامة التبويب المهام مع مربع الحوار إنشاء مهمة.

    إنشاء مربع حوار المهمة الأولى

  3. استبدل إضافة اسم لمهمتك... باسم وظيفتك.

  4. في حقل اسم المهمة، أدخل اسما للمهمة، على سبيل المثال، greeting-task.

  5. في القائمة المنسدلة النوع ، حدد دفتر الملاحظات.

  6. في القائمة المنسدلة Source ، حدد Workspace.

  7. انقر فوق مربع النص المسار واستخدم مستعرض الملفات للعثور على دفتر الملاحظات الذي أنشأته، وانقر فوق اسم دفتر الملاحظات، ثم انقر فوق تأكيد.

  8. انقر فوق Add ضمن Parameters. في حقل المفتاح ، أدخل greeting. في حقل القيمة ، أدخل Airflow user.

  9. انقر فوق إنشاء مهمة.

في لوحة Job details ، انسخ قيمة Job ID . هذه القيمة مطلوبة لتشغيل المهمة من Airflow.

تشغيل المهمة

لاختبار وظيفتك الجديدة في واجهة مستخدم وظائف Azure Databricks، انقر في الزر الزاوية العلوية اليسرى. عند اكتمال التشغيل، يمكنك التحقق من الإخراج عن طريق عرض تفاصيل تشغيل الوظيفة.

إنشاء Airflow DAG جديد

يمكنك تعريف Airflow DAG في ملف Python. لإنشاء DAG لتشغيل مهمة دفتر الملاحظات المثال:

  1. في محرر نص أو IDE، أنشئ ملفا جديدا باسم databricks_dag.py بالمحتويات التالية:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    استبدل JOB_ID بقيمة معرف المهمة المحفوظة سابقا.

  2. احفظ الملف في airflow/dags الدليل. يقوم Airflow تلقائيا بقراءة ملفات DAG المخزنة في airflow/dags/.

تثبيت وتحقق من DAG في Airflow

لتشغيل DAG والتحقق منها في واجهة مستخدم Airflow:

  1. في نافذة المستعرض، افتح http://localhost:8080/home. تظهر شاشة Airflow DAGs .
  2. حدد موقع databricks_dag زر التبديل إيقاف مؤقت/إلغاء استخدام DAG وانقر فوقه لإلغاء إيقاف DAG.
  3. قم بتشغيل DAG بالنقر فوق الزر Trigger DAG .
  4. انقر فوق تشغيل في العمود Run لعرض حالة التشغيل وتفاصيله.