إنشاء البنية الأساسية لبرنامج ربط العمليات التجارية للبيانات من طرف إلى طرف في Databricks

توضح هذه المقالة كيفية إنشاء ونشر مسار معالجة البيانات من طرف إلى طرف، بما في ذلك كيفية استيعاب البيانات الأولية وتحويل البيانات وتشغيل التحليلات على البيانات المعالجة.

إشعار

على الرغم من أن هذه المقالة توضح كيفية إنشاء مسار بيانات كامل باستخدام دفاتر ملاحظات Databricks ووظيفة Azure Databricks لتنسيق سير عمل، توصي Databricks باستخدام Delta Live Tables، وهي واجهة تعريفية لإنشاء مسارات معالجة بيانات موثوقة وقابلة للصيانة وقابلة للاختبار.

ما هو مسار البيانات؟

تنفذ البنية الأساسية لبرنامج ربط العمليات التجارية للبيانات الخطوات المطلوبة لنقل البيانات من أنظمة المصدر، وتحويل تلك البيانات استنادا إلى المتطلبات، وتخزين البيانات في نظام مستهدف. يتضمن مسار البيانات جميع العمليات اللازمة لتحويل البيانات الأولية إلى بيانات معدة يمكن للمستخدمين استهلاكها. على سبيل المثال، قد يقوم مسار البيانات بإعداد البيانات حتى يتمكن محللو البيانات وعلماء البيانات من استخراج القيمة من البيانات من خلال التحليل وإعداد التقارير.

يعد سير عمل الاستخراج والتحويل والتحميل (ETL) مثالا شائعا لمسار البيانات. في معالجة ETL، يتم استيعاب البيانات من أنظمة المصدر وكتابتها إلى منطقة التقسيم المرحلي، وتحويلها استنادا إلى المتطلبات (ضمان جودة البيانات، وإلغاء تكرار السجلات، وما إلى ذلك)، ثم كتابتها إلى نظام مستهدف مثل مستودع البيانات أو مستودع البيانات.

خطوات البنية الأساسية لبرنامج ربط العمليات التجارية للبيانات

لمساعدتك على البدء في إنشاء مسارات البيانات على Azure Databricks، يوضح المثال المضمن في هذه المقالة إنشاء سير عمل لمعالجة البيانات:

  • استخدم ميزات Azure Databricks لاستكشاف مجموعة بيانات أولية.
  • إنشاء دفتر ملاحظات Databricks لاستيعاب بيانات المصدر الخام وكتابة البيانات الأولية إلى جدول هدف.
  • إنشاء دفتر ملاحظات Databricks لتحويل بيانات المصدر الخام وكتابة البيانات المحولة إلى جدول هدف.
  • إنشاء دفتر ملاحظات Databricks للاستعلام عن البيانات المحولة.
  • أتمتة مسار البيانات باستخدام مهمة Azure Databricks.

المتطلبات

مثال: مجموعة بيانات مليون أغنية

مجموعة البيانات المستخدمة في هذا المثال هي مجموعة فرعية من مجموعة بيانات مليون أغنية، وهي مجموعة من الميزات وبيانات التعريف لمسارات الموسيقى المعاصرة. تتوفر مجموعة البيانات هذه في نماذج مجموعات البيانات المضمنة في مساحة عمل Azure Databricks.

الخطوة 1: إنشاء نظام مجموعة

لتنفيذ معالجة البيانات وتحليلها في هذا المثال، قم بإنشاء نظام مجموعة لتوفير موارد الحوسبة اللازمة لتشغيل الأوامر.

إشعار

نظرا لأن هذا المثال يستخدم عينة من مجموعة البيانات المخزنة في DBFS ويوصي بالجداول الدائمة إلى كتالوج Unity، يمكنك إنشاء نظام مجموعة تم تكوينه باستخدام وضع وصول مستخدم واحد. يوفر وضع وصول المستخدم الفردي وصولا كاملا إلى DBFS مع تمكين الوصول إلى كتالوج Unity أيضا. راجع أفضل الممارسات ل DBFS وUnity Catalog.

  1. انقر فوق حساب في الشريط الجانبي.
  2. في صفحة Compute، انقر فوق Create Cluster.
  3. في صفحة New Cluster، أدخل اسما فريدا للمجموعة.
  4. في وضع الوصول، حدد مستخدم واحد.
  5. في وصول مستخدم واحد أو كيان الخدمة، حدد اسم المستخدم الخاص بك.
  6. اترك القيم المتبقية في حالتها الافتراضية، وانقر فوق إنشاء نظام مجموعة.

لمعرفة المزيد حول مجموعات Databricks، راجع الحساب.

الخطوة 2: استكشاف البيانات المصدر

لمعرفة كيفية استخدام واجهة Azure Databricks لاستكشاف بيانات المصدر الأولية، راجع استكشاف البيانات المصدر لمسار البيانات. إذا كنت تريد الانتقال مباشرة إلى استيعاب البيانات وإعدادها، فتابع إلى الخطوة 3: استيعاب البيانات الأولية.

الخطوة 3: استيعاب البيانات الأولية

في هذه الخطوة، يمكنك تحميل البيانات الأولية في جدول لجعلها متاحة لمزيد من المعالجة. لإدارة أصول البيانات على النظام الأساسي Databricks مثل الجداول، توصي Databricks ب Unity Catalog. ومع ذلك، إذا لم يكن لديك أذونات لإنشاء الكتالوج والمخطط المطلوبين لنشر الجداول إلى كتالوج Unity، فلا يزال بإمكانك إكمال الخطوات التالية عن طريق نشر الجداول إلى Hive metastore.

لاستيعاب البيانات، توصي Databricks باستخدام أداة التحميل التلقائي. يقوم "التحميل التلقائي" تلقائيا بالكشف عن الملفات الجديدة ومعالجتها عند وصولها إلى تخزين كائن السحابة.

يمكنك تكوين "المحمل التلقائي" للكشف تلقائيا عن مخطط البيانات المحملة، ما يسمح لك بتهيئة الجداول دون الإعلان صراحة عن مخطط البيانات وتطوير مخطط الجدول مع تقديم أعمدة جديدة. وهذا يلغي الحاجة إلى تعقب تغييرات المخطط وتطبيقها يدويا بمرور الوقت. توصي Databricks باستنتاج المخطط عند استخدام "المحمل التلقائي". ومع ذلك، كما هو الحال في خطوة استكشاف البيانات، لا تحتوي بيانات الأغاني على معلومات العنوان. نظرا لعدم تخزين العنوان مع البيانات، ستحتاج إلى تعريف المخطط بشكل صريح، كما هو موضح في المثال التالي.

  1. في الشريط الجانبي، انقر فوق أيقونة جديدةجديد وحدد دفتر الملاحظات من القائمة. يظهر مربع الحوار إنشاء دفتر ملاحظات.

  2. أدخل اسما لدفتر الملاحظات، على سبيل المثال، Ingest songs data. بشكل افتراضي:

    • Python هي اللغة المحددة.
    • يتم إرفاق دفتر الملاحظات بآخر مجموعة استخدمتها. في هذه الحالة، المجموعة التي قمت بإنشائها في الخطوة 1: إنشاء نظام مجموعة.
  3. أدخل ما يلي في الخلية الأولى من دفتر الملاحظات:

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    إذا كنت تستخدم كتالوج Unity، فاستبدل <table-name> باسم كتالوج ومخطط وجدول لاحتواء السجلات التي تم إدخالها (على سبيل المثال، data_pipelines.songs_data.raw_song_data). وإلا، استبدل <table-name> باسم جدول ليحتوي على السجلات التي تم استيعابها، على سبيل المثال، raw_song_data.

    استبدل <checkpoint-path> بمسار إلى دليل في DBFS للحفاظ على ملفات نقاط التحقق، على سبيل المثال، /tmp/pipeline_get_started/_checkpoint/song_data.

  4. انقر فوق قائمة التشغيل، وحدد Run Cell. يعرف هذا المثال مخطط البيانات باستخدام المعلومات من README، وي استيعاب بيانات الأغاني من جميع الملفات الموجودة في file_path، ويكتب البيانات إلى الجدول المحدد بواسطة table_name.

الخطوة 4: إعداد البيانات الأولية

لإعداد البيانات الأولية للتحليل، تقوم الخطوات التالية بتحويل بيانات الأغاني الأولية عن طريق تصفية الأعمدة غير الضرورية وإضافة حقل جديد يحتوي على طابع زمني لإنشاء السجل الجديد.

  1. في الشريط الجانبي، انقر فوق أيقونة جديدةجديد وحدد دفتر الملاحظات من القائمة. يظهر مربع الحوار إنشاء دفتر ملاحظات.

  2. أدخل اسما لدفتر الملاحظات. على سبيل المثال، Prepare songs data غير اللغة الافتراضية إلى SQL.

  3. أدخل ما يلي في الخلية الأولى من دفتر الملاحظات:

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    إذا كنت تستخدم كتالوج Unity، فاستبدله <table-name> بكتالوج ومخطط واسم جدول لاحتواء السجلات المصفاة والمحولة (على سبيل المثال، data_pipelines.songs_data.prepared_song_data). وإلا، استبدل <table-name> باسم جدول ليحتوي على السجلات المصفاة والمحولة (على سبيل المثال، prepared_song_data).

    استبدل <raw-songs-table-name> باسم الجدول الذي يحتوي على سجلات الأغاني الأولية التي تم تناولها في الخطوة السابقة.

  4. انقر فوق قائمة التشغيل، وحدد Run Cell.

الخطوة 5: الاستعلام عن البيانات المحولة

في هذه الخطوة، يمكنك توسيع مسار المعالجة عن طريق إضافة استعلامات لتحليل بيانات الأغاني. تستخدم هذه الاستعلامات السجلات المعدة التي تم إنشاؤها في الخطوة السابقة.

  1. في الشريط الجانبي، انقر فوق أيقونة جديدةجديد وحدد دفتر الملاحظات من القائمة. يظهر مربع الحوار إنشاء دفتر ملاحظات.

  2. أدخل اسما لدفتر الملاحظات. على سبيل المثال، Analyze songs data غير اللغة الافتراضية إلى SQL.

  3. أدخل ما يلي في الخلية الأولى من دفتر الملاحظات:

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    استبدل <prepared-songs-table-name> باسم الجدول الذي يحتوي على بيانات معدة. على سبيل المثال، data_pipelines.songs_data.prepared_song_data

  4. انقر علامة الإقهار لأسفل في قائمة إجراءات الخلية، وحدد إضافة خلية أدناه وأدخل ما يلي في الخلية الجديدة:

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    استبدل <prepared-songs-table-name> باسم الجدول المعد الذي تم إنشاؤه في الخطوة السابقة. على سبيل المثال، data_pipelines.songs_data.prepared_song_data

  5. لتشغيل الاستعلامات وعرض الإخراج، انقر فوق تشغيل الكل.

الخطوة 6: إنشاء مهمة Azure Databricks لتشغيل البنية الأساسية لبرنامج ربط العمليات التجارية

يمكنك إنشاء سير عمل لأتمتة تشغيل خطوات استيعاب البيانات ومعالجتها وتحليلها باستخدام مهمة Azure Databricks.

  1. في مساحة عمل علوم البيانات والهندسة، قم بأحد الإجراءات التالية:
    • انقر فوق أيقونة مهام سير العملمهام سير العمل في الشريط الجانبي وانقر فوق .الزر
    • في الشريط الجانبي، انقر فوق أيقونة جديدةجديد وحدد مهمة.
  2. في مربع حوار المهمة على علامة التبويب المهام ، استبدل إضافة اسم لمهمتك... باسم وظيفتك. على سبيل المثال، "سير عمل الأغاني".
  3. في اسم المهمة، أدخل اسما للمهمة الأولى، على سبيل المثال، Ingest_songs_data.
  4. في النوع، حدد نوع مهمة دفتر الملاحظات.
  5. في المصدر، حدد مساحة العمل.
  6. استخدم مستعرض الملفات للعثور على دفتر ملاحظات استيعاب البيانات، وانقر فوق اسم دفتر الملاحظات، وانقر فوق تأكيد.
  7. في نظام المجموعة، حدد Shared_job_cluster أو المجموعة التي أنشأتها في Create a cluster الخطوة.
  8. انقر فوق Create.
  9. انقر الزر أسفل المهمة التي أنشأتها للتو وحدد دفتر الملاحظات.
  10. في اسم المهمة، أدخل اسما للمهمة، على سبيل المثال، Prepare_songs_data.
  11. في النوع، حدد نوع مهمة دفتر الملاحظات.
  12. في المصدر، حدد مساحة العمل.
  13. استخدم مستعرض الملفات للعثور على دفتر ملاحظات إعداد البيانات، وانقر فوق اسم دفتر الملاحظات، ثم انقر فوق تأكيد.
  14. في نظام المجموعة، حدد Shared_job_cluster أو المجموعة التي أنشأتها في Create a cluster الخطوة.
  15. انقر فوق Create.
  16. انقر الزر أسفل المهمة التي أنشأتها للتو وحدد دفتر الملاحظات.
  17. في اسم المهمة، أدخل اسما للمهمة، على سبيل المثال، Analyze_songs_data.
  18. في النوع، حدد نوع مهمة دفتر الملاحظات.
  19. في المصدر، حدد مساحة العمل.
  20. استخدم مستعرض الملفات للعثور على دفتر ملاحظات تحليل البيانات، وانقر فوق اسم دفتر الملاحظات، ثم انقر فوق تأكيد.
  21. في نظام المجموعة، حدد Shared_job_cluster أو المجموعة التي أنشأتها في Create a cluster الخطوة.
  22. انقر فوق Create.
  23. لتشغيل سير العمل، انقر فوق الزر . لعرض تفاصيل التشغيل، انقر فوق الارتباط في عمود وقت البدء للتشغيل في طريقة عرض تشغيل المهمة. انقر فوق كل مهمة لعرض تفاصيل تشغيل المهمة.
  24. لعرض النتائج عند اكتمال سير العمل، انقر فوق مهمة تحليل البيانات النهائية. تظهر صفحة الإخراج وتعرض نتائج الاستعلام.

الخطوة 7: جدولة مهمة مسار البيانات

إشعار

لإثبات استخدام مهمة Azure Databricks لتنسيق سير عمل مجدول، يفصل مثال البدء هذا خطوات الاستيعاب والتحضير والتحليل في دفاتر ملاحظات منفصلة، ثم يتم استخدام كل دفتر ملاحظات لإنشاء مهمة في الوظيفة. إذا كانت جميع المعالجات مضمنة في دفتر ملاحظات واحد، يمكنك بسهولة جدولة دفتر الملاحظات مباشرة من واجهة مستخدم دفتر ملاحظات Azure Databricks. راجع إنشاء مهام دفتر الملاحظات المجدولة وإدارتها.

أحد المتطلبات الشائعة هو تشغيل البنية الأساسية لبرنامج ربط العمليات التجارية للبيانات على أساس مجدول. لتحديد جدول للوظيفة التي تقوم بتشغيل المسار:

  1. انقر فوق أيقونة مهام سير العملمهام سير العمل في الشريط الجانبي.
  2. في العمود الاسم ، انقر فوق اسم المهمة. تعرض اللوحة الجانبية تفاصيل الوظيفة.
  3. انقر فوق Add trigger في لوحة Job details وحدد Scheduled in Trigger type.
  4. حدد الفترة ووقت البدء والمنطقة الزمنية. حدد اختياريا خانة الاختيار Show Cron Syntax لعرض الجدول وتحريره في بناء جملة Quartz Cron.
  5. انقر فوق حفظ.

معرفة المزيد