إشعار
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تسجيل الدخول أو تغيير الدلائل.
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تغيير الدلائل.
ينطبق على:
Azure Data Factory
Azure Synapse Analytics
تلميح
Data Factory في Microsoft Fabric هو الجيل القادم من Azure Data Factory، مع بنية أبسط، وذكاء اصطناعي مدمج، وميزات جديدة. إذا كنت جديدا في تكامل البيانات، ابدأ مع Fabric Data Factory. يمكن لأعباء عمل ADF الحالية الترقية إلى Fabric للوصول إلى قدرات جديدة في علوم البيانات، والتحليلات اللحظية، والتقارير.
في هذا الدرس، تستخدم Azure PowerShell لإنشاء خط أنابيب Data Factory الذي يحول البيانات باستخدام Spark Activity وخدمة HDInsight المرتبطة عند الطلب. نفذ الخطوات التالية في هذا البرنامج التعليمي:
- إنشاء data factory.
- تأليف خدمات مرتبطة ونشرها.
- تأليف تدفق ونشره.
- ابدأ تشغيل تدفق.
- مراقبة تشغيل المسار.
إذا لم يكن لديك اشتراك Azure، أنشئ حسابا free قبل أن تبدأ.
المتطلبات الأساسية
إشعار
نوصي باستخدام وحدة Azure Az PowerShell للتفاعل مع Azure. للبدء، راجع تثبيت Azure PowerShell. لتعلم كيفية الترحيل إلى وحدة Az PowerShell، راجع Migrationate Azure PowerShell من AzureRM إلى Az.
- تخزين Azure حساب. تقوم بإنشاء سكريبت Python وملف إدخال، ثم ترفع هذه الملفات إلى تخزين Azure. يتم تخزين الإخراج الناتج من برنامج Spark في حساب التخزين هذا. تستخدم مجموعة Spark حسب الطلب حساب التخزين نفسه كمساحة تخزين أساسية لها.
- Azure PowerShell. اتبع التعليمات في كيفية تثبيت وتكوين Azure PowerShell.
ارفع سكريبت Python إلى حسابك في مخزن البيانات الثنائية الكبيرة
أنشئ ملف Python باسم WordCount_Spark.py يحتوي على المحتوى التالي:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()استبدل <storageAccountName> باسم حسابك تخزين Azure. ثم حفظ الملف.
في مساحة تخزين Azure Blob الخاص بك، أنشئ حاوية باسم adftutorial إذا لم تكن موجودة.
أنشئ مجلدًا باسم spark.
أنشئ مجلداً فرعياً باسم script ضمن مجلد spark.
حمّل الملف WordCount_Spark.py إلى المجلد الفرعي script.
قم بتحميل ملف الإدخال
- أنشئ ملفاً باسم minecraftstory.txt يتضمن نصاً. يتولى برنامج Spark حساب عدد الكلمات في هذا النص.
- أنشئ مجلداً فرعياً باسم
inputfilesفي مجلدspark. - حمّل
minecraftstory.txtإلى المجلد الفرعيinputfiles.
خدمات مرتبطة بالتأليف
يمكنك تأليف خدمتين مرتبطتين في هذا القسم:
- خدمة تخزين Azure Linked Service التي تربط حساب تخزين Azure بمصنع البيانات. يتم استخدام هذا التخزين بواسطة مجموعة HDInsight عند الطلب. كما تحتوي على برنامج Spark النصي المطلوب تنفيذه.
- خدمة مرتبطة حسب الطلب في HDInsight. يقوم Azure Data Factory تلقائيا بإنشاء عنقود HDInsight، ويشغل برنامج Spark، ثم يحذف عنقود HDInsight بعد أن يكون في وضع الخمول لفترة محددة مسبقا.
تخزين Azure linked service
أنشئ ملف JSON باستخدام محررك المفضل، وانسخ التعريف التالي لخدمة مرتبطة تخزين Azure، ثم احفظ الملف ك MyStorageLinkedService.json.
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
قم بتحديث <storageAccountName> و <storageAccountKey> باسم ومفتاح حسابك تخزين Azure.
خدمة مرتبطة حسب الطلب في HDInsight
أنشئ ملف JSON باستخدام محررك المفضل، وانسخ التعريف التالي لخدمة مرتبطة Azure HDInsight، واحفظ الملف ك MyOnDemandSparkLinkedService.json.
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
حدّث قيم الخصائص التالية في تعريف الخدمة المرتبطة:
- hostSubscriptionId. استبدل <subscriptionID> بمعرف اشتراكك Azure. تُنشأ مجموعة HDInsight حسب الطلب في هذا الاشتراك.
- tenant. استبدل <tenantID> برقم مستأجر Azure الخاص بك.
- servicePrincipalId وservicePrincipalKey. استبدل <servicePrincipalID> و <servicePrincipalKey> بمعرف ومفتاح موكل الخدمة الخاص بك في Microsoft Entra ID. يجب أن يكون أساس الخدمة هذا عضوا بدور "مساهم" في الاشتراك أو مجموعة الموارد التي تُنشأ فيها المجموعة. راجع إنشاء Microsoft Entra التطبيق وخدمة المبدأ للتفاصيل. معرّف أساس الخدمة هو المكافئ لمعرّف التطبيق، ومفتاح أساس الخدمة هو المكافئ لقيمة البيانات السرية للعميل.
- clusterResourceGroup. استبدل <resourceGroupOfHDICluster> باسم مجموعة الموارد التي يجب إنشاء مجموعة HDInsight فيها.
إشعار
Azure HDInsight لديه حد على العدد الإجمالي للنوى التي يمكنك استخدامها في كل منطقة Azure يدعمها. بالنسبة لخدمة HDInsight المرتبطة عند الطلب، سيتم إنشاء عنقود HDInsight في نفس موقع تخزين Azure المستخدم كمخزن رئيسي. تأكد من أن لديك عددًا من حصص النوى كافيًا لإنشاء المجموعة بنجاح. لمزيد من المعلومات، راجع إعداد المجموعات في HDInsight باستخدام Hadoop وSpark وKafka وغير ذلك.
تأليف تدفق
في هذه الخطوة، تنشئ تدفقاً جديداً باستخدام نشاط Spark. يستخدم النشاط عينة عدد الكلمات. نزّل المحتويات من هذا الموقع إذا لم تكن قد نزّلتها بالفعل.
أنشئ ملف JSON في المحرر الذي تفضله، وانسخ تعريف JSON التالي الخاص بتعريف تدفق، ثم احفظه بهذا الاسم MySparkOnDemandPipeline.json.
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
لاحظ النقاط التالية:
- يشير rootPath إلى مجلد Spark للحاوية adftutorial.
- يشير entryFilePath يشير إلى ملف WordCount_Spark.py في المجلد الفرعي script من مجلد spark.
إنشاء مصدرًا للبيانات
لقد ألّفت تعريفات الخدمة المرتبطة والتدفق في ملفات JSON. الآن، دَعنا ننشئ مصنع بيانات، وننشر ملفات JSON الخاصة بالخدمة المرتبطة والتدفق باستخدام PowerShell cmdlets. شغّل أوامر PowerShell التالية واحداً تلو الآخر:
عيّن المتغيرات واحداً تلو الآخر.
اسم مجموعة الموارد
$resourceGroupName = "ADFTutorialResourceGroup"اسم Data Factory. يجب أن تكون فريدة عالميًا
$dataFactoryName = "MyDataFactory09102017"اسم التدفق
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipelineشغّل PowerShell. Keep Azure PowerShell مفتوحا حتى نهاية هذا البدء السريع. في حال قمت بإغلاق وإعادة فتح، تحتاج إلى تشغيل الأوامر مرة أخرى. للحصول على قائمة بالمناطق Azure التي يتوفر فيها Data Factory حاليا، اختر المناطق التي تهمك في الصفحة التالية، ثم قم بتوسيع Analytics لتحديد موقع Data Factory: المنتجات المتاحة حسب المنطقة. مخازن البيانات (تخزين Azure، قاعدة بيانات Azure SQL، إلخ) والحسابات (HDInsight، إلخ) المستخدمة في Data Factory يمكن أن تكون في مناطق أخرى.
شغل الأمر التالي، وأدخل اسم المستخدم وكلمة المرور التي تستخدمها لتسجيل الدخول إلى بوابة Azure:
Connect-AzAccountشغّل الأمر التالي لعرض جميع الاشتراكات لهذا الحساب:
Get-AzSubscriptionشغّل الأمر التالي لتحديد الاشتراك الذي تريد العمل معه. استبدل SubscriptionId بمعرف اشتراكك Azure:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"أنشئ مجموعة الموارد: ADFTutorialResourceGroup.
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"أنشئ مصنع البيانات.
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupNameنفذ الأمر التالي للاطلاع على الإخراج الناتج:
$dfانتقل إلى المجلد الذي أنشأت فيه ملفات JSON، وشغل الأمر التالي لنشر خدمة مرتبطة ب تخزين Azure:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"شغّل الأمر التالي لنشر خدمة مرتبطة حسب الطلب في Spark:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"شغّل الأمر التالي لنشر تدفق:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
بدء تشغيل تدفق ومراقبته
ابدأ تشغيل تدفق. تؤدي هذه الخطوة أيضًا إلى التقاط معرّف تشغيل التدفق للمراقبة في المستقبل.
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineNameشغّل البرنامج النصي التالي للتحقق باستمرار من حالة تشغيل التدفق حتى ينتهي.
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"إليك ناتج تشغيل العينة:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"تأكد من إنشاء مجلد باسم
outputfilesفي مجلدsparkالخاص بالحاوية adftutorial يحتوي على الإخراج الناتج من برنامج Spark.
المحتوى ذو الصلة
ينسخ خط الأنابيب في هذه العينة البيانات من موقع إلى آخر في مخزن كتلة Azure. لقد تعرفت على كيفية:
- إنشاء data factory.
- تأليف خدمات مرتبطة ونشرها.
- تأليف تدفق ونشره.
- ابدأ تشغيل تدفق.
- مراقبة تشغيل المسار.
تقدم إلى الدرس التالي لتتعلم كيفية تحويل البيانات عن طريق تشغيل سكريبت Hive على عنقود Azure HDInsight موجود في شبكة افتراضية.