تحويل البيانات في السحابة باستخدام نشاط Spark في Azure Data Factory
ينطبق على: Azure Data Factory Azure Synapse Analytics
تلميح
جرب Data Factory في Microsoft Fabric، وهو حل تحليلي متكامل للمؤسسات. يغطي Microsoft Fabric كل شيء بدءا من حركة البيانات إلى علم البيانات والتحليلات في الوقت الحقيقي والمعلومات المهنية وإعداد التقارير. تعرف على كيفية بدء إصدار تجريبي جديد مجانا!
في هذا البرنامج التعليمي، تستخدم Azure PowerShell لإنشاء مسار في Data Factory يحوّل البيانات باستخدام نشاط Spark وخدمة مرتبطة حسب الطلب في HDInsight. نفذ الخطوات التالية في هذا البرنامج التعليمي:
- إنشاء data factory.
- تأليف خدمات مرتبطة ونشرها.
- تأليف تدفق ونشره.
- ابدأ تشغيل تدفق.
- مراقبة تشغيل المسار.
في حال لم يكن لديك اشتراك Azure، فأنشئ حساباً مجانيّاً قبل البدء.
المتطلبات الأساسية
إشعار
نوصي باستخدام الوحدة النمطية Azure Az PowerShell للتفاعل مع Azure. للبدء، راجع تثبيت Azure PowerShell. لمعرفة كيفية الترحيل إلى الوحدة النمطية Az PowerShell، راجع ترحيل Azure PowerShell من AzureRM إلى Az.
- حساب في مساحة تخزين Azure. قم بإنشاء نص Python وملف إدخال، وتحميلها إلى تخزين Azure. يتم تخزين الإخراج الناتج من برنامج Spark في حساب التخزين هذا. تستخدم مجموعة Spark حسب الطلب حساب التخزين نفسه كمساحة تخزين أساسية لها.
- Azure PowerShell. اتبع الإرشادات الموجودة في كيفية تثبيت وتكوين Azure PowerShell.
تحميل برنامج Python النصي إلى حساب التخزين الخاص بك في Blob
أنشئ ملف Python باسم WordCount_Spark.py يحتوي على المحتوى التالي:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()
استبدل <storageAccountName> باسم حسابك في مساحة تخزين Azure. ثم حفظ الملف.
في مساحة تخزين Azure Blob، أنشئ حاوية باسم adftutorial إذا لم تكن موجودة بالفعل.
أنشئ مجلدًا باسم spark.
أنشئ مجلداً فرعياً باسم script ضمن مجلد spark.
حمّل الملف WordCount_Spark.py إلى المجلد الفرعي script.
قم بتحميل ملف الإدخال
- أنشئ ملفاً باسم minecraftstory.txt يتضمن نصاً. يتولى برنامج Spark حساب عدد الكلمات في هذا النص.
- أنشئ مجلداً فرعياً باسم
inputfiles
في مجلدspark
. - حمّل
minecraftstory.txt
إلى المجلد الفرعيinputfiles
.
خدمات مرتبطة بالتأليف
يمكنك تأليف خدمتين مرتبطتين في هذا القسم:
- خدمة مرتبطة بتخزين Azure تربط حساب Azure Storage بمُنشئ البيانات. يتم استخدام هذا التخزين بواسطة مجموعة HDInsight عند الطلب. كما تحتوي على برنامج Spark النصي المطلوب تنفيذه.
- خدمة مرتبطة حسب الطلب في HDInsight. ينشئ Azure Data Factory مجموعة HDInsight تلقائياً، ويشغّل برنامج Spark ثم يحذف مجموعة HDInsight بعدما تكون خاملة لوقت تم تكوينه مسبقاً.
خدمة Azure Storage المرتبطة
أنشئ ملف JSON باستخدام المحرر الذي تفضله، وانسخ تعريف JSON التالي لخدمة مرتبطة في مساحة تخزين Azure، ثم احفظ الملف بهذا الاسم MyStorageLinkedService.json.
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
حدّث <storageAccountName> و<storageAccountKey> بإضافة اسم حسابك في مساحة تخزين Azure ومفتاح الحساب.
خدمة مرتبطة حسب الطلب في HDInsight
أنشئ ملف JSON باستخدام المحرر الذي تفضله، وانسخ تعريف JSON التالي لخدمة مرتبطة في Azure HDInsight، ثم احفظ الملف بهذا الاسم MyOnDemandSparkLinkedService.json.
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
حدّث قيم الخصائص التالية في تعريف الخدمة المرتبطة:
- hostSubscriptionId. استبدل <subscriptionID> بالمعرّف الخاص باشتراكك في Azure. تُنشأ مجموعة HDInsight حسب الطلب في هذا الاشتراك.
- tenant. استبدل <tenantID> بمعرّف المستأجر الخاص بك في Azure.
- servicePrincipalId وservicePrincipalKey. استبدل <servicePrincipalID> وservicePrincipalKey <> بمعرف ومفتاح كيان الخدمة في معرف Microsoft Entra. يجب أن يكون أساس الخدمة هذا عضوا بدور "مساهم" في الاشتراك أو مجموعة الموارد التي تُنشأ فيها المجموعة. راجع إنشاء تطبيق Microsoft Entra ومدير الخدمة للحصول على التفاصيل. معرّف أساس الخدمة هو المكافئ لمعرّف التطبيق، ومفتاح أساس الخدمة هو المكافئ لقيمة البيانات السرية للعميل.
- clusterResourceGroup. استبدل <resourceGroupOfHDICluster> باسم مجموعة الموارد التي يجب إنشاء مجموعة HDInsight فيها.
إشعار
يتضمن Azure HDInsight قيوداً على العدد الإجمالي للنوى التي يمكنك استخدامها في كل منطقة Azure تدعمها. بالنسبة لخدمة HDInsight المرتبطة عند الطلب، تُنشأ مجموعة HDInsight في نفس الموقع بمساحة تخزين Azure الذي تستخدمه كمساحة تخزين أساسية. تأكد من أن لديك عددًا من حصص النوى كافيًا لإنشاء المجموعة بنجاح. لمزيد من المعلومات، راجع إعداد المجموعات في HDInsight باستخدام Hadoop وSpark وKafka وغير ذلك.
تأليف تدفق
في هذه الخطوة، تنشئ تدفقاً جديداً باستخدام نشاط Spark. يستخدم النشاط عينة عدد الكلمات. نزّل المحتويات من هذا الموقع إذا لم تكن قد نزّلتها بالفعل.
أنشئ ملف JSON في المحرر الذي تفضله، وانسخ تعريف JSON التالي الخاص بتعريف تدفق، ثم احفظه بهذا الاسم MySparkOnDemandPipeline.json.
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
لاحظ النقاط التالية:
- يشير rootPath إلى مجلد Spark للحاوية adftutorial.
- يشير entryFilePath يشير إلى ملف WordCount_Spark.py في المجلد الفرعي script من مجلد spark.
إنشاء مصدرًا للبيانات
لقد ألّفت تعريفات الخدمة المرتبطة والتدفق في ملفات JSON. الآن، دَعنا ننشئ مصنع بيانات، وننشر ملفات JSON الخاصة بالخدمة المرتبطة والتدفق باستخدام PowerShell cmdlets. شغّل أوامر PowerShell التالية واحداً تلو الآخر:
عيّن المتغيرات واحداً تلو الآخر.
اسم مجموعة الموارد
$resourceGroupName = "ADFTutorialResourceGroup"
اسم Data Factory. يجب أن تكون فريدة عالميًا
$dataFactoryName = "MyDataFactory09102017"
اسم التدفق
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
شغّل PowerShell. اترك Azure PowerShell مفتوحًا حتى نهاية هذه البداية السريعة. في حال قمت بإغلاق وإعادة فتح، تحتاج إلى تشغيل الأوامر مرة أخرى. للحصول على قائمة بمناطق Azure التي يتوفر فيها حالياً Data Factory، حدد المناطق التي تهمك في الصفحة التالية، ثم قم بتوسيع "Analytics" لتحديد موقع Data Factory: "Products available by region". تخزن البيانات (Azure Storage، وAzure SQL Database، وما إلى ذلك) وتحسب (HDInsight، وما إلى ذلك) التي يستخدمها مصنع البيانات في مناطق أخرى.
شغّل الأمر التالي، وأدخل اسم المستخدم وكلمة المرور اللذين تستخدمهما لتسجيل الدخول إلى مدخل Microsoft Azure:
Connect-AzAccount
شغّل الأمر التالي لعرض جميع الاشتراكات لهذا الحساب:
Get-AzSubscription
شغّل الأمر التالي لتحديد الاشتراك الذي تريد العمل معه. استبدل SubscriptionId بـ ID الخاص باشتراك Azure الخاص بك:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"
أنشئ مجموعة الموارد: ADFTutorialResourceGroup.
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"
أنشئ مصنع البيانات.
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
نفذ الأمر التالي للاطلاع على الإخراج الناتج:
$df
بدّل إلى المجلد الذي أنشأت فيه ملفات JSON، ثم شغّل الأمر التالي لنشر خدمة مرتبطة في مساحة تخزين Azure:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
شغّل الأمر التالي لنشر خدمة مرتبطة حسب الطلب في Spark:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
شغّل الأمر التالي لنشر تدفق:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
بدء تشغيل تدفق ومراقبته
ابدأ تشغيل تدفق. تؤدي هذه الخطوة أيضًا إلى التقاط معرّف تشغيل التدفق للمراقبة في المستقبل.
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName
شغّل البرنامج النصي التالي للتحقق باستمرار من حالة تشغيل التدفق حتى ينتهي.
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"
إليك ناتج تشغيل العينة:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"
تأكد من إنشاء مجلد باسم
outputfiles
في مجلدspark
الخاص بالحاوية adftutorial يحتوي على الإخراج الناتج من برنامج Spark.
المحتوى ذو الصلة
تنسخ البنية الأساسية في هذه العينة البيانات من موقع إلى موقع آخر في تخزين Azure blob. لقد تعرفت على كيفية:
- إنشاء data factory.
- تأليف خدمات مرتبطة ونشرها.
- تأليف تدفق ونشره.
- ابدأ تشغيل تدفق.
- مراقبة تشغيل المسار.
تقدَّم إلى البرنامج التعليمي التالي للتعرّف على كيفية تحويل البيانات عن طريق تشغيل برنامج Hive نصي على مجوعة Azure HDInsight موجودة في شبكة ظاهرية.