تحويل البيانات في السحابة باستخدام نشاط Spark في Azure Data Factory

ينطبق على: Azure Data Factory Azure Synapse Analytics

تلميح

Data Factory في Microsoft Fabric هو الجيل القادم من Azure Data Factory، مع بنية أبسط، وذكاء اصطناعي مدمج، وميزات جديدة. إذا كنت جديدا في تكامل البيانات، ابدأ مع Fabric Data Factory. يمكن لأعباء عمل ADF الحالية الترقية إلى Fabric للوصول إلى قدرات جديدة في علوم البيانات، والتحليلات اللحظية، والتقارير.

في هذا الدرس، تستخدم Azure PowerShell لإنشاء خط أنابيب Data Factory الذي يحول البيانات باستخدام Spark Activity وخدمة HDInsight المرتبطة عند الطلب. نفذ الخطوات التالية في هذا البرنامج التعليمي:

  • إنشاء data factory.
  • تأليف خدمات مرتبطة ونشرها.
  • تأليف تدفق ونشره.
  • ابدأ تشغيل تدفق.
  • مراقبة تشغيل المسار.

إذا لم يكن لديك اشتراك Azure، أنشئ حسابا free قبل أن تبدأ.

المتطلبات الأساسية

إشعار

نوصي باستخدام وحدة Azure Az PowerShell للتفاعل مع Azure. للبدء، راجع تثبيت Azure PowerShell. لتعلم كيفية الترحيل إلى وحدة Az PowerShell، راجع Migrationate Azure PowerShell من AzureRM إلى Az.

  • تخزين Azure حساب. تقوم بإنشاء سكريبت Python وملف إدخال، ثم ترفع هذه الملفات إلى تخزين Azure. يتم تخزين الإخراج الناتج من برنامج Spark في حساب التخزين هذا. تستخدم مجموعة Spark حسب الطلب حساب التخزين نفسه كمساحة تخزين أساسية لها.
  • Azure PowerShell. اتبع التعليمات في كيفية تثبيت وتكوين Azure PowerShell.

ارفع سكريبت Python إلى حسابك في مخزن البيانات الثنائية الكبيرة

  1. أنشئ ملف Python باسم WordCount_Spark.py يحتوي على المحتوى التالي:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. استبدل <storageAccountName> باسم حسابك تخزين Azure. ثم حفظ الملف.

  3. في مساحة تخزين Azure Blob الخاص بك، أنشئ حاوية باسم adftutorial إذا لم تكن موجودة.

  4. أنشئ مجلدًا باسم spark.

  5. أنشئ مجلداً فرعياً باسم script ضمن مجلد spark.

  6. حمّل الملف WordCount_Spark.py إلى المجلد الفرعي script.

قم بتحميل ملف الإدخال

  1. أنشئ ملفاً باسم minecraftstory.txt يتضمن نصاً. يتولى برنامج Spark حساب عدد الكلمات في هذا النص.
  2. أنشئ مجلداً فرعياً باسم inputfiles في مجلد spark.
  3. حمّل minecraftstory.txt إلى المجلد الفرعي inputfiles.

خدمات مرتبطة بالتأليف

يمكنك تأليف خدمتين مرتبطتين في هذا القسم:

  • خدمة تخزين Azure Linked Service التي تربط حساب تخزين Azure بمصنع البيانات. يتم استخدام هذا التخزين بواسطة مجموعة HDInsight عند الطلب. كما تحتوي على برنامج Spark النصي المطلوب تنفيذه.
  • خدمة مرتبطة حسب الطلب في HDInsight. يقوم Azure Data Factory تلقائيا بإنشاء عنقود HDInsight، ويشغل برنامج Spark، ثم يحذف عنقود HDInsight بعد أن يكون في وضع الخمول لفترة محددة مسبقا.

تخزين Azure linked service

أنشئ ملف JSON باستخدام محررك المفضل، وانسخ التعريف التالي لخدمة مرتبطة تخزين Azure، ثم احفظ الملف ك MyStorageLinkedService.json.

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
      }
    }
}

قم بتحديث <storageAccountName> و <storageAccountKey> باسم ومفتاح حسابك تخزين Azure.

خدمة مرتبطة حسب الطلب في HDInsight

أنشئ ملف JSON باستخدام محررك المفضل، وانسخ التعريف التالي لخدمة مرتبطة Azure HDInsight، واحفظ الملف ك MyOnDemandSparkLinkedService.json.

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

حدّث قيم الخصائص التالية في تعريف الخدمة المرتبطة:

  • hostSubscriptionId. استبدل <subscriptionID> بمعرف اشتراكك Azure. تُنشأ مجموعة HDInsight حسب الطلب في هذا الاشتراك.
  • tenant. استبدل <tenantID> برقم مستأجر Azure الخاص بك.
  • servicePrincipalId وservicePrincipalKey. استبدل <servicePrincipalID> و <servicePrincipalKey> بمعرف ومفتاح موكل الخدمة الخاص بك في Microsoft Entra ID. يجب أن يكون أساس الخدمة هذا عضوا بدور "مساهم" في الاشتراك أو مجموعة الموارد التي تُنشأ فيها المجموعة. راجع إنشاء Microsoft Entra التطبيق وخدمة المبدأ للتفاصيل. معرّف أساس الخدمة هو المكافئ لمعرّف التطبيق، ومفتاح أساس الخدمة هو المكافئ لقيمة البيانات السرية للعميل.
  • clusterResourceGroup. استبدل <resourceGroupOfHDICluster> باسم مجموعة الموارد التي يجب إنشاء مجموعة HDInsight فيها.

إشعار

Azure HDInsight لديه حد على العدد الإجمالي للنوى التي يمكنك استخدامها في كل منطقة Azure يدعمها. بالنسبة لخدمة HDInsight المرتبطة عند الطلب، سيتم إنشاء عنقود HDInsight في نفس موقع تخزين Azure المستخدم كمخزن رئيسي. تأكد من أن لديك عددًا من حصص النوى كافيًا لإنشاء المجموعة بنجاح. لمزيد من المعلومات، راجع إعداد المجموعات في HDInsight باستخدام Hadoop وSpark وKafka وغير ذلك.

تأليف تدفق

في هذه الخطوة، تنشئ تدفقاً جديداً باستخدام نشاط Spark. يستخدم النشاط عينة عدد الكلمات. نزّل المحتويات من هذا الموقع إذا لم تكن قد نزّلتها بالفعل.

أنشئ ملف JSON في المحرر الذي تفضله، وانسخ تعريف JSON التالي الخاص بتعريف تدفق، ثم احفظه بهذا الاسم MySparkOnDemandPipeline.json.

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

لاحظ النقاط التالية:

  • يشير rootPath إلى مجلد Spark للحاوية adftutorial.
  • يشير entryFilePath يشير إلى ملف WordCount_Spark.py في المجلد الفرعي script من مجلد spark.

إنشاء مصدرًا للبيانات

لقد ألّفت تعريفات الخدمة المرتبطة والتدفق في ملفات JSON. الآن، دَعنا ننشئ مصنع بيانات، وننشر ملفات JSON الخاصة بالخدمة المرتبطة والتدفق باستخدام PowerShell cmdlets. شغّل أوامر PowerShell التالية واحداً تلو الآخر:

  1. عيّن المتغيرات واحداً تلو الآخر.

    اسم مجموعة الموارد

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    اسم Data Factory. يجب أن تكون فريدة عالميًا

    $dataFactoryName = "MyDataFactory09102017"
    

    اسم التدفق

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. شغّل PowerShell. Keep Azure PowerShell مفتوحا حتى نهاية هذا البدء السريع. في حال قمت بإغلاق وإعادة فتح، تحتاج إلى تشغيل الأوامر مرة أخرى. للحصول على قائمة بالمناطق Azure التي يتوفر فيها Data Factory حاليا، اختر المناطق التي تهمك في الصفحة التالية، ثم قم بتوسيع Analytics لتحديد موقع Data Factory: المنتجات المتاحة حسب المنطقة. مخازن البيانات (تخزين Azure، قاعدة بيانات Azure SQL، إلخ) والحسابات (HDInsight، إلخ) المستخدمة في Data Factory يمكن أن تكون في مناطق أخرى.

    شغل الأمر التالي، وأدخل اسم المستخدم وكلمة المرور التي تستخدمها لتسجيل الدخول إلى بوابة Azure:

    Connect-AzAccount
    

    شغّل الأمر التالي لعرض جميع الاشتراكات لهذا الحساب:

    Get-AzSubscription
    

    شغّل الأمر التالي لتحديد الاشتراك الذي تريد العمل معه. استبدل SubscriptionId بمعرف اشتراكك Azure:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. أنشئ مجموعة الموارد: ADFTutorialResourceGroup.

    New-AzResourceGroup -Name $resourceGroupName -Location "East Us" 
    
  4. أنشئ مصنع البيانات.

     $df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    نفذ الأمر التالي للاطلاع على الإخراج الناتج:

    $df
    
  5. انتقل إلى المجلد الذي أنشأت فيه ملفات JSON، وشغل الأمر التالي لنشر خدمة مرتبطة ب تخزين Azure:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. شغّل الأمر التالي لنشر خدمة مرتبطة حسب الطلب في Spark:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. شغّل الأمر التالي لنشر تدفق:

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

بدء تشغيل تدفق ومراقبته

  1. ابدأ تشغيل تدفق. تؤدي هذه الخطوة أيضًا إلى التقاط معرّف تشغيل التدفق للمراقبة في المستقبل.

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. شغّل البرنامج النصي التالي للتحقق باستمرار من حالة تشغيل التدفق حتى ينتهي.

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. إليك ناتج تشغيل العينة:

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. تأكد من إنشاء مجلد باسم outputfiles في مجلد spark الخاص بالحاوية adftutorial يحتوي على الإخراج الناتج من برنامج Spark.

ينسخ خط الأنابيب في هذه العينة البيانات من موقع إلى آخر في مخزن كتلة Azure. لقد تعرفت على كيفية:

  • إنشاء data factory.
  • تأليف خدمات مرتبطة ونشرها.
  • تأليف تدفق ونشره.
  • ابدأ تشغيل تدفق.
  • مراقبة تشغيل المسار.

تقدم إلى الدرس التالي لتتعلم كيفية تحويل البيانات عن طريق تشغيل سكريبت Hive على عنقود Azure HDInsight موجود في شبكة افتراضية.