使用 Azure Data Factory 中的 Spark 活動來轉換雲端中的資料
適用於:Azure Data Factory Azure Synapse Analytics
提示
試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費開始新的試用!
在本教學課程中,您會使用 Azure PowerShell 建立 Data Factory 管道,以使用 Spark 活動和隨選 HDInsight 連結服務來轉換資料。 您會在本教學課程中執行下列步驟:
- 建立資料處理站。
- 撰寫和部署連結服務。
- 撰寫和部署管道。
- 啟動管線執行。
- 監視管道執行。
如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶。
必要條件
注意
建議您使用 Azure Az PowerShell 模組來與 Azure 互動。 若要開始使用,請參閱 安裝 Azure PowerShell。 若要了解如何移轉至 Az PowerShell 模組,請參閱將 Azure PowerShell 從 AzureRM 移轉至 Az。
- Azure 儲存體帳戶。 您需要建立 Python 指令碼和輸入檔案,並上傳至 Azure 儲存體。 spark 程式的輸出會儲存在這個儲存體帳戶中。 隨選 Spark 叢集與其主要儲存體是使用相同的儲存體帳戶。
- Azure PowerShell。 遵循如何安裝並設定 Azure PowerShell 中的指示。
將 Python 指令碼上傳至 Blob 儲存體帳戶
使用下列內容建立名為 WordCount_Spark.py 的 Python 檔案:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()
以您的 Azure 儲存體帳戶名稱取代 <storageAccountName>。 然後儲存檔案。
在 Azure Blob 儲存體中,建立名為 adftutorial 的容器 (如果不存在)。
建立名為 spark 的資料夾。
在 spark 資料夾下,建立名為 script 的子資料夾。
將 WordCount_Spark.py 檔案上傳至 script 子資料夾。
上傳輸入檔案
- 建立名為 minecraftstory.txt 的檔案並填入一些文字。 Spark 程式會計算這段文字中的字數。
- 在
spark
資料夾中建立名為inputfiles
的子資料夾。 - 將
minecraftstory.txt
上傳至inputfiles
子資料夾。
撰寫連結服務
在本節中,您會撰寫兩個連結服務:
- 將 Azure 儲存體帳戶連結至資料處理站的 Azure 儲存體連結服務。 隨選 HDInsight 叢集會使用此儲存體。 它也包含要執行的 Spark 指令碼。
- 隨選 HDInsight 連結服務。 Azure Data Factory 會自動建立 HDInsight 叢集、執行 Spark 程式,然後刪除已閒置一段預先設定時間的 HDInsight 叢集。
Azure 儲存體連結服務
使用您慣用的編輯器建立 JSON 檔案、複製下列 Azure 儲存體連結服務的 JSON 定義,然後將檔案儲存為 MyStorageLinkedService.json。
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
以您的 Azure 儲存體帳戶名稱和金鑰更新 <storageAccountName> 和 <storageAccountKey>。
隨選 HDInsight 連結服務
使用您慣用的編輯器建立 JSON 檔案、複製下列 Azure HDInsight 連結服務的 JSON 定義,然後將檔案儲存為 MyOnDemandSparkLinkedService.json。
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
在連結服務定義中更新下列屬性的值:
- hostSubscriptionId。 以您的 Azure 訂用帳戶識別碼取代 <subscriptionID>。 隨選 HDInsight 叢集會在此訂用帳戶中建立。
- tenant. 以您的 Azure 租用戶識別碼取代 <tenantID>。
- servicePrincipalId、servicePrincipalKey。 以您在 Microsoft Entra ID 中的服務主體識別碼與金鑰,取代 servicePrincipalID<> 和 servicePrincipalKey<>。 此服務主體必須是訂用帳戶之參與者角色的成員,或其中建立叢集之資源群組的成員。 如需詳細資訊,請參閱建立 Microsoft Entra 應用程式和服務主體。 服務主體識別碼相當於「應用程式識別碼」,而服務主體金鑰則相當於「用戶端密碼」的值。
- clusterResourceGroup。 以需要在其中建立 HDInsight 叢集的資源群組名稱,取代<resourceGroupOfHDICluster>。
注意
Azure HDInsight 對您在其支援的每個 Azure 區域中可使用的核心總數有所限制。 對於隨選 HDInsight 連結服務,建立 HDInsight 叢集的位置與作為其主要儲存體之 Azure 儲存體的位置相同。 請確定您有足夠的核心配額,才能成功建立叢集。 如需詳細資訊,請參閱使用 Hadoop、Spark 及 Kafka 等在 HDInsight 中設定叢集。
撰寫管道
在此步驟中,您會建立具有 Spark 活動的新管道。 此活動使用字數統計範例。 從這個位置下載內容 (如果尚未這樣做)。
在您慣用的編輯器中建立 JSON 檔案、複製下列管道定義 JSON 定義,然後儲存為 MySparkOnDemandPipeline.json。
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
請注意下列幾點:
- rootPath 指向 adftutorial 容器的 spark 資料夾。
- entryFilePath 指向 spark 資料夾的 script 子資料夾中的 WordCount_Spark.py 檔案。
建立資料處理站
您已在 JSON 檔案中撰寫連結服務和管道定義。 現在,讓我們使用 PowerShell Cmdlet 來建立資料處理站,並部署連結服務和管道 JSON 檔案。 逐一執行下列 PowerShell 命令:
逐一設定變數。
資源群組名稱
$resourceGroupName = "ADFTutorialResourceGroup"
Data Factory 名稱。 必須是全域唯一的
$dataFactoryName = "MyDataFactory09102017"
管線名稱
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
啟動 PowerShell。 保持開啟 Azure PowerShell,直到本快速入門結束為止。 如果您關閉並重新開啟,則需要再次執行這些命令。 如需目前可使用 Data Factory 的 Azure 區域清單,請在下列頁面上選取您感興趣的區域,然後展開 [分析] 以找出 [Data Factory]:依區域提供的產品。 資料處理站所使用的資料存放區 (Azure 儲存體、Azure SQL Database 等) 和計算 (HDInsight 等) 可位於其他區域。
執行下列命令,並輸入您用來登入 Azure 入口網站的使用者名稱和密碼:
Connect-AzAccount
執行下列命令以檢視此帳戶的所有訂用帳戶:
Get-AzSubscription
執行下列命令以選取您要使用的訂用帳戶。 以您的 Azure 訂用帳戶識別碼取代 SubscriptionId:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"
建立資源群組:ADFTutorialResourceGroup。
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"
建立資料處理站。
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
執行下列命令來查看輸出:
$df
切換至您建立 JSON 檔案的資料夾,然後執行下列命令來部署 Azure 儲存體連結服務:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
執行下列命令來部署隨選 Spark 連結服務:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
執行下列命令來部署管道:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
啟動及監視管道執行
啟動管線執行。 它也會擷取管線執行識別碼,方便後續監視。
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName
執行下列程式碼以持續檢查管道執行狀態,直到完成為止。
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"
執行範例的輸出如下:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"
根據 spark 程式的輸出,確認 adftutorial 容器的
spark
資料夾中已建立名為outputfiles
的資料夾。
相關內容
在此範例中的管線會將資料從 Azure Blob 儲存體中的一個位置複製到其他位置。 您已了解如何︰
- 建立資料處理站。
- 撰寫和部署連結服務。
- 撰寫和部署管道。
- 啟動管線執行。
- 監視管道執行。
進入下一個教學課程,以了解如何在虛擬網路中的 Azure HDInsight 叢集上執行 Hive 指令碼,以轉換資料。