Transformera data i molnet genom att använda Spark-aktivitet i Azure Data Factory
GÄLLER FÖR: Azure Data Factory Azure Synapse Analytics
Dricks
Prova Data Factory i Microsoft Fabric, en allt-i-ett-analyslösning för företag. Microsoft Fabric omfattar allt från dataflytt till datavetenskap, realtidsanalys, business intelligence och rapportering. Lär dig hur du startar en ny utvärderingsversion kostnadsfritt!
I den här självstudien använder du Azure PowerShell för att skapa en Data Factory-pipeline som transformerar data med Spark-aktivitet och en länkad HDInsight-tjänst på begäran. I de här självstudierna går du igenom följande steg:
- Skapa en datafabrik.
- Skapa och distribuera länkade tjänster.
- Skapa och distribuera en pipeline.
- Starta en pipelinekörning.
- Övervaka pipelinekörningen.
Om du inte har någon Azure-prenumeration skapar du ett kostnadsfritt konto innan du börjar.
Förutsättningar
Kommentar
Vi rekommenderar att du använder Azure Az PowerShell-modulen för att interagera med Azure. Information om hur du kommer igång finns i Installera Azure PowerShell. Information om hur du migrerar till Az PowerShell-modulen finns i artikeln om att migrera Azure PowerShell från AzureRM till Az.
- Azure Storage-konto. Du skapar ett Python-skript och en indatafil och laddar upp dem till Azure Storage. Spark-programmets utdata lagras på det här lagringskontot. Spark-klustret på begäran använder samma lagringskonto som den primära lagringen.
- Azure PowerShell. Följ instruktionerna i Så här installerar och konfigurerar du Azure PowerShell.
Ladda upp Python-skript till ditt Blob Storage-konto
Skapa en Python-fil med namnet WordCount_Spark.py med följande innehåll:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()
Ersätt <storageAccountName> med namnet på ditt Azure-konto. Spara sedan filen.
Skapa en container med namnet adftutorial i Azure Blob Storage om den inte finns.
Skapa en mapp med namnet spark.
Skapa en undermapp med namnet script under mappen spark.
Överför filen WordCount_Spark.py till undermappen script.
Överföra indatafilen
- Skapa en fil med namnet minecraftstory.txt med lite text. Spark-programmet räknar antalet ord i texten.
- Skapa en undermapp med namnet
inputfiles
i mappenspark
. - Överför
minecraftstory.txt
till mappeninputfiles
.
Skapa länkade tjänster
Du har skapat två länkade tjänster i det här avsnittet:
- En länkad Azure Storage-tjänst som länkar ett Azure Storage-konto till datafabriken. Den här lagringen används av HDInsight-kluster på begäran. Den innehåller också Spark-skriptet som ska köras.
- En på begäran länkad HDInsight-tjänst. Azure Data Factory skapar automatiskt ett HDInsight-kluster, kör Spark-programmet och tar sedan bort HDInsight-klustret när det har varit inaktivt under en förkonfigurerad tid.
Länkad Azure-lagringstjänst
Skapa en JSON-fil med önskat redigeringsprogram, kopiera följande JSON-definition för en länkad Azure Storage-tjänst och spara filen som MyStorageLinkedService.json.
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
Uppdatera <storageAccountName> och <storageAccountKey> med namnet på och nyckeln för ditt Azure Storage-konto.
På begäran länkad HDInsight-tjänst
Skapa en JSON-fil med önskat redigeringsprogram, kopiera följande JSON-definition för en länkad Azure HDInsight-tjänst och spara filen som MyOnDemandSparkLinkedService.json.
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
Uppdatera värden för följande egenskaper i definitionen för den länkade tjänsten:
- hostSubscriptionId. Ersätt <subscriptionId> med ID:t för din Azure-prenumeration. Klustret HDInsight på begäran skapas i den här prenumerationen.
- klient. Ersätt <tenantID> med Azure-klientens ID.
- servicePrincipalId, servicePrincipalKey. Ersätt <servicePrincipalID> och <servicePrincipalKey> med ID och nyckeln för tjänstens huvudnamn i Microsoft Entra-ID:t. Tjänstens huvudnamn måste vara medlem av prenumerationens deltagarrollen eller resursgruppen som klustret har skapats i. Mer information finns i Skapa Microsoft Entra-program och tjänstens huvudnamn . Tjänstens huvudnamns-ID motsvarar program-ID :t och en nyckel för tjänstens huvudnamn motsvarar värdet för en klienthemlighet.
- clusterResourceGroup. Ersätt <resourceGroupOfHDICluster> med namnet på resursgruppen som HDInsight-klustret ska skapas i.
Kommentar
Azure HDInsight har en begränsning för hur många kärnor du kan använda i varje Azure-region som stöds. För den HDInsight-länkade tjänsten på begäran skapas HDInsight-klustret på samma plats som Azure Storage använde som primär lagring. Se till att du har tillräckligt med kärnkvoter så att klustret kan skapats på rätt sätt. Mer information finns i Set up clusters in HDInsight with Hadoop, Spark, Kafka, and more (Konfigurera kluster i HDInsight med Hadoop, Spark, Kafka med mera).
Skapa en pipeline
I det här steget kan du skapa en ny pipeline med en Spark-aktivitet. Aktiviteten använder exemplet ordräkning. Hämta innehållet från den här platsen om du inte redan gjort det.
Skapa en JSON-fil med önskat redigeringsprogram, kopiera följande JSON-definition för en pipelinedefinition och spara filen som MySparkOnDemandPipeline.json.
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
Observera följande:
- rootPath pekar på Spark-mappen i containern adftutorial.
- entryFilePath pekar på filen WordCount_Spark.py i skriptets undermapp i Spark-mappen.
Skapa en datafabrik
Du har skapat definitioner för länkad tjänst och pipeline i JSON-filer. Nu ska vi skapa en datafabrik och distribuera de länkade JSON-filerna för tjänsten och pipelinen med hjälp av PowerShell-cmdletar. Kör följande PowerShell-kommandon ett i taget:
Ange variabler en i taget.
Namn på resursgrupp
$resourceGroupName = "ADFTutorialResourceGroup"
Namn på datafabrik. Måste vara globalt unikt
$dataFactoryName = "MyDataFactory09102017"
Namn på pipeline
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
Starta PowerShell. Låt Azure PowerShell vara öppet tills du är klar med snabbstarten. Om du stänger och öppnar det igen måste du köra kommandona en gång till. Om du vill se en lista med Azure-regioner där Data Factory är tillgängligt för närvarande markerar du de regioner du är intresserad av på följande sida. Expandera sedan Analytics och leta rätt på Data Factory: Tillgängliga produkter per region. Datalagren (Azure Storage, Azure SQL Database osv.) och beräkningarna (HDInsight osv.) som används i Data Factory kan finnas i andra regioner.
Kör följande kommando och ange det användarnamn och lösenord som du använder för att logga in i Azure Portal:
Connect-AzAccount
Kör följande kommando för att visa alla prenumerationer för det här kontot:
Get-AzSubscription
Kör följande kommando för att välja den prenumeration som du vill arbeta med. Ersätt SubscriptionId med ID:t för din Azure-prenumeration:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"
Skapa resursgruppen: ADFTutorialResourceGroup.
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"
Skapa datafabriken.
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
Kör följande kommando för att se utdata:
$df
Växla till den mapp där du skapade JSON-filerna och kör följande kommando för att distribuera en länkad Azure Storage-tjänst:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
Kör följande kommando för att distribuera en länkad Spark-tjänst på begäran:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
Kör följande kommando för att distribuera en pipeline:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
Starta och övervaka en pipelinekörning
Starta en pipelinekörning. Den samlar även in pipelinekörningens ID för kommande övervakning.
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName
Kör följande skript för att kontinuerligt kontrollera pipelinekörningens status tills den är klar.
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"
Här är utdata från exempelkörningen:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"
Bekräfta att en mapp med namnet
outputfiles
har skapats i mappenspark
i adftutorial-containern med utdata från Spark-programmet.
Relaterat innehåll
Pipeline i det här exemplet kopierar data från en plats till en annan i Azure Blob Storage. Du har lärt dig att:
- Skapa en datafabrik.
- Skapa och distribuera länkade tjänster.
- Skapa och distribuera en pipeline.
- Starta en pipelinekörning.
- Övervaka pipelinekörningen.
Gå vidare till nästa självstudiekurs för att lära dig hur du transformerar data genom att köra Hive-skript på ett Azure HDInsight-kluster som är i ett virtuellt nätverk.