適用於:
Azure Data Factory
Azure Synapse Analytics
秘訣
Data Factory in Microsoft Fabric 是下一代的 Azure Data Factory,擁有更簡單的架構、內建 AI 及新功能。 如果你是資料整合新手,建議先從 Fabric Data Factory 開始。 現有的 ADF 工作負載可升級至 Fabric,以存取資料科學、即時分析與報告等新能力。
在這個教學中,你會使用 Azure PowerShell 建立一個資料工廠管線,透過 Spark Activity 和按需的 HDInsight 連結服務來轉換資料。 您會在本教學課程中執行下列步驟:
- 建立資料處理站。
- 撰寫和部署連結服務。
- 撰寫和部署管道。
- 啟動管道運行。
- 監視管道執行。
如果你沒有Azure訂閱,請在開始前先建立一個free帳號。
必要條件
注意
我們建議你使用 Azure Az PowerShell 模組來與 Azure 互動。 要開始,請參考 安裝 Azure PowerShell。 想了解如何遷移到 Az PowerShell 模組,請參考 Migrate Azure PowerShell from AzureRM to Az。
- Azure Storage account。 你要建立一個 Python 腳本和一個輸入檔,然後上傳到 Azure 儲存。 spark 程式的輸出會儲存在這個儲存體帳戶中。 隨選 Spark 叢集使用相同的儲存體帳戶作為其主要儲存體。
- Azure PowerShell。 請依照如何安裝與設定 Azure PowerShell 中的說明操作。
將 Python 腳本上傳到你的 Blob Storage 帳號
建立一個名為 WordCount_Spark.pyPython 的檔案,內容如下:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()將 <storageAccountName> 替換成你的Azure Storage帳戶名稱。 然後儲存檔案。
在你的Azure Blob Storage裡,如果沒有的話,請建立一個名為 adftutorial 的容器。
建立名為 spark 的資料夾。
在 spark 資料夾下,建立名為 script 的子資料夾。
將 WordCount_Spark.py 檔案上傳至 script 子資料夾。
上傳輸入檔案
- 建立名為 minecraftstory.txt 的檔案並填入一些文字。 Spark 程式會計算這段文字中的字數。
- 在
inputfiles資料夾中建立名為spark的子資料夾。 - 將
minecraftstory.txt上傳至inputfiles子資料夾。
作者連結的服務
在本節中,您將建立兩個連結服務:
- 一個 Azure Storage 連結服務,將 Azure Storage 帳號連結到資料工廠。 隨選 HDInsight 叢集會使用此儲存體。 它也包含要執行的 Spark 指令碼。
- 隨選 HDInsight 連結服務。 Azure Data Factory 會自動建立一個 HDInsight 叢集,執行 Spark 程式,然後在 HDInsight 叢集閒置一段時間後刪除它。
Azure Storage 連接服務
用你偏好的編輯器建立一個 JSON 檔案,複製以下 Azure Storage 連結服務的 JSON 定義,然後儲存為 MyStorageLinkedService.json。
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
更新 <storageAccountName> 和 <storageAccountKey>,使用你 Azure Storage 帳號的名稱和金鑰。
隨選 HDInsight 連結服務
用你偏好的編輯器建立一個 JSON 檔案,複製以下 Azure HDInsight 連結服務的 JSON 定義,然後儲存為 MyOnDemandSparkLinkedService.json。
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
在連結服務定義中更新下列屬性的值:
- hostSubscriptionId。 將 <subscriptionID> 替換成你Azure訂閱的 ID。 隨選 HDInsight 叢集會在此訂用帳戶中建立。
- 租戶. 將 <tenantID> 替換成你Azure租戶的 ID。
- servicePrincipalId、servicePrincipalKey。 將 <servicePrincipalID> 和 <servicePrincipalKey> 替換成Microsoft Entra ID中服務主體的 ID 和金鑰。 此服務主體必須是建立叢集之訂用帳戶或資源群組中參與者角色的成員。 請參閱 create Microsoft Entra 應用程式與服務主體 以獲取詳細資訊。 服務主體識別碼相當於「應用程式識別碼」,而服務主體金鑰則相當於「用戶端密碼」的值。
- clusterResourceGroup。 以需要在其中建立 HDInsight 叢集的資源群組名稱,取代<resourceGroupOfHDICluster>。
注意
Azure HDInsight 對每個支援的 Azure 區域可使用的核心數量有限制。 對於隨需 HDInsight 連結服務,HDInsight 叢集將在用作主要儲存體的 Azure Storage 所在位置建立。 請確定您有足夠的核心配額,才能成功建立叢集。 如需詳細資訊,請參閱使用 Hadoop、Spark 及 Kafka 等在 HDInsight 中設定叢集。
建立管道
在此步驟中,您會建立具有 Spark 活動的新管道。 此活動使用字數統計範例。 從這個位置下載內容 (如果尚未這樣做)。
在您慣用的編輯器中建立 JSON 檔案、複製下列管道定義 JSON 定義,然後儲存為 MySparkOnDemandPipeline.json。
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
請注意下列幾點:
- rootPath 指向 adftutorial 容器的 spark 資料夾。
- entryFilePath 指向 spark 資料夾的 script 子資料夾中的 WordCount_Spark.py 檔案。
建立資料處理站
您已在 JSON 檔案中撰寫連結服務和管道定義。 現在,讓我們使用 PowerShell Cmdlet 來建立資料處理站,並部署連結服務和管道 JSON 檔案。 逐一執行下列 PowerShell 命令:
逐一設定變數。
資源群組名稱
$resourceGroupName = "ADFTutorialResourceGroup"Data Factory 名稱。 必須是全域唯一的
$dataFactoryName = "MyDataFactory09102017"管線名稱
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline啟動 PowerShell。 請保持 Azure PowerShell 開啟直到這個快速入門結束。 如果您關閉並重新開啟,則需要再次執行這些命令。 如需查詢目前 Data Factory 可用的Azure區域清單,請在下一頁選擇您感興趣的區域,然後展開 Analytics 以找到 Data Factory:Products by region。 資料工廠使用的資料儲存(Azure Storage、Azure SQL Database 等)和運算(HDInsight 等)可能在其他區域。
執行以下指令,輸入你用來登入 Azure 入口網站的使用者名稱和密碼:
Connect-AzAccount執行下列命令以檢視此帳戶的所有訂用帳戶:
Get-AzSubscription執行下列命令以選取您要使用的訂用帳戶。 將 SubscriptionId 替換成你Azure訂閱的 ID:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"建立資源群組:ADFTutorialResourceGroup。
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"建立資料處理站。
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName執行下列命令來查看輸出:
$df切換到你建立 JSON 檔案的資料夾,並執行以下指令部署 Azure Storage 連結服務:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"執行下列命令來部署隨選 Spark 連結服務:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"執行下列命令來部署管道:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
啟動並監視管線運行
啟動管道運行。 其也會擷取管線執行識別碼,以供日後監視。
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName執行下列程式碼以持續檢查管道執行狀態,直到完成為止。
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"執行範例的輸出如下:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"根據 spark 程式的輸出,確認 adftutorial 容器的
outputfiles資料夾中已建立名為spark的資料夾。
相關內容
本範例中的管線會將資料從一個位置複製到 Azure Blob 儲存體中的另一個位置。 您已了解如何︰
- 建立資料處理站。
- 撰寫和部署連結服務。
- 撰寫和部署管道。
- 啟動管道運行。
- 監視管道執行。
接下來的教學,學習如何在虛擬網路中的 Azure HDInsight 叢集上執行 Hive 腳本來轉換資料。
教學:在 Azure Virtual Network 中使用 Hive 進行資料轉換。