Adatátalakítás a felhőben egy Spark-tevékenység az Azure Data Factoryban való használatával
A következőkre vonatkozik: Azure Data Factory Azure Synapse Analytics
Tipp.
Próbálja ki a Data Factoryt a Microsoft Fabricben, amely egy teljes körű elemzési megoldás a nagyvállalatok számára. A Microsoft Fabric az adattovábbítástól az adatelemzésig, a valós idejű elemzésig, az üzleti intelligenciáig és a jelentéskészítésig mindent lefed. Ismerje meg, hogyan indíthat új próbaverziót ingyenesen!
Ebben az oktatóanyagban az Azure PowerShell segítségével hozhat létre egy Data Factory-folyamatot, amely egy Spark-tevékenységgel és egy igény szerinti HDInsight társított szolgáltatással alakítja át az adatokat. Az oktatóanyagban az alábbi lépéseket fogja végrehajtani:
- Adat-előállító létrehozása
- Társított szolgáltatások készítése és üzembe helyezése
- Folyamat létrehozása és üzembe helyezése.
- Folyamat futásának indítása
- A folyamat futásának monitorozása.
Ha még nincs Azure-előfizetése, kezdés előtt hozzon létre egy ingyenes fiókot.
Előfeltételek
Feljegyzés
Javasoljuk, hogy az Azure Az PowerShell modult használja az Azure-ral való interakcióhoz. Első lépésként tekintse meg az Azure PowerShell telepítését ismertető témakört. Az Az PowerShell-modulra történő migrálás részleteiről lásd: Az Azure PowerShell migrálása az AzureRM modulból az Az modulba.
- Egy Azure Storage-fiók. Létrehozhat egy Python-szkriptet és egy bemeneti fájlt, és feltöltheti őket az Azure Storage-ba. A Spark-program kimenetét ebben a tárfiókban tárolja a rendszer. Az igény szerinti Spark-fürt ugyanezt a tárfiókot használja elsődleges tárterületként.
- Azure PowerShell. Kövesse az Azure PowerShell telepítését és konfigurálását ismertető cikkben szereplő utasításokat.
Python-szkript feltöltése Blob Storage-fiókba
Hozzon létre egy WordCount_Spark.py nevű Python-fájlt az alábbi tartalommal:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()
Cserélje a <storageAccountName> kifejezést Azure Storage-fiókja nevére. Ezután mentse a fájlt.
Az Azure Blob Storage-ban hozzon létre egy adftutorial nevű tárolót, ha még nem létezik.
Hozzon létre egy spark mappát.
Hozzon létre egy szkript almappát a spark mappában.
Töltse fel a WordCount_Spark.py fájlt a szkript almappába.
A bemeneti fájl feltöltése
- Hozzon létre egy minecraftstory.txt nevű fájlt némi szöveges tartalommal. A Spark-program megszámolja a szavak számát ebben a szövegben.
- Hozzon létre egy
inputfiles
nevű almappát aspark
mappában. - Töltse fel a
minecraftstory.txt
fájlt azinputfiles
almappába.
Társított szolgáltatások létrehozása
Ebben a szakaszban két társított szolgáltatást hoz létre:
- Egy Azure Storage-beli társított szolgáltatást, amely egy Azure Storage-fiókot társít az adat-előállítóhoz. Ezt a tárterületet csak az igény szerinti HDInsight-fürt használja. Ez tartalmazza a végrehajtani kívánt Spark-szkriptet is.
- Egy igény szerinti HDInsight társított szolgáltatást. Az Azure Data Factory automatikusan létrehoz egy HDInsight-fürtöt, futtatja a Spark-programot, majd törli a HDInsight-fürtöt, ha az egy előre megadott időtartamon keresztül inaktív.
Azure Storage társított szolgáltatás
Hozzon létre egy JSON-fájlt az előnyben részesített szerkesztővel, másolja a fájlba egy Azure Storage társított szolgáltatás alábbi JSON-definícióját, majd mentse a fájlt MyStorageLinkedService.json néven.
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
Cserélje le a <storageAccountName> és a <storageAccountKey> értékeket a saját Azure Storage-fiókja nevére és kulcsára.
Igény szerinti HDInsight társított szolgáltatás
Hozzon létre egy JSON-fájlt az előnyben részesített szerkesztővel, másolja a fájlba egy Azure HDInsight társított szolgáltatás alábbi JSON-definícióját, majd mentse a fájlt MyOnDemandSparkLinkedService.json néven.
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
Frissítse a következő tulajdonságok értékeit a társított szolgáltatás definíciójában:
- hostSubscriptionId. Cserélje le a <SubscriptionId> kifejezést az Azure-előfizetés azonosítójára. Létrejön az igény szerinti HDInsight-fürt ebben az előfizetésben.
- tenant. A <tenantID> helyére írja a saját Azure bérlőjének az azonosítóját.
- servicePrincipalId, servicePrincipalKey. Cserélje le <a servicePrincipalID> és <a servicePrincipalKey> azonosítóját és kulcsát a szolgáltatásnév azonosítójára és kulcsára a Microsoft Entra-azonosítóban. A szolgáltatásnévnek az előfizetés vagy a létrejövő fürtnek helyet adó erőforráscsoport Közreműködő szerepkörének tagjának kell lennie. Részletekért tekintse meg a Microsoft Entra alkalmazás és szolgáltatásnév létrehozását. A szolgáltatásnév azonosítója megegyezik az alkalmazásazonosítóval, a szolgáltatásnévkulcs pedig egyenértékű az ügyfélkód értékével.
- clusterResourceGroup. A <resourceGroupOfHDICluster> helyére írja annak az erőforráscsoportnak a nevét, amelyben a HDInsight-fürtöt létre kell hozni.
Feljegyzés
Az Azure HDInsightban korlátozott azon magok száma, amelyek az egyes támogatott Azure-régiókban felhasználhatók. Igény szerinti HDInsight társított szolgáltatás esetében a HDInsight-fürt ugyanazon a helyen jön létre, mint amit az Azure Storage elsődleges tárterületként használ. Ellenőrizze, hogy a magkvótája elegendő-e a fürt sikeres létrehozásához. További információk: Fürtök beállítása a HDInsightban Hadoop, Spark, Kafka stb. használatával.
Folyamat létrehozása
Ebben a lépésben létrehoz egy új folyamatot egy Spark-tevékenységgel. A tevékenység a szószámlálás példát használja. Ha még nem tette meg, töltse le a tartalmakat innen.
Hozzon létre egy JSON-fájlt az előnyben részesített szerkesztővel, másolja a fájlba egy folyamatdefiníció alábbi JSON-definícióját, majd mentse a fájlt MySparkOnDemandPipeline.json néven.
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
Vegye figyelembe az alábbiakat:
- A rootPath az adftutorial tároló spark mappájára mutat.
- Az entryFilePath a spark mappa szkript almappájában található WordCount_Spark.py fájlra mutat.
Adat-előállító létrehozása
Már létrehozta a társított szolgáltatást és a folyamat definícióját JSON-fájlokban. Most hozzunk létre egy adat-előállítót, és helyezzük üzembe a társított szolgáltatás- és folyamat JSON-fájljait PowerShell-parancsmagok használatával. Futtassa egyesével az alábbi PowerShell-parancsokat:
Adja meg egyesével a változókat.
Erőforráscsoport neve
$resourceGroupName = "ADFTutorialResourceGroup"
Adat-előállító neve. A névnek globálisan egyedinek kell lennie
$dataFactoryName = "MyDataFactory09102017"
Folyamat neve
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
Indítsa el a PowerShellt. Az Azure PowerShellt hagyja megnyitva a rövid útmutató végéig. Ha bezárja és újra megnyitja a programot, akkor újra le kell futtatnia a parancsokat. Azon Azure-régiók megtekintéséhez, amelyekben jelenleg elérhető a Data Factory, a következő lapon válassza ki az Önt érdeklő régiókat, majd bontsa ki az Elemzés részt, és keresse meg a Data Factory: Elérhető termékek régiók szerint szakaszt. Az adat-előállítók által használt adattárak (Azure Storage, Azure SQL Database stb.) és számítási erőforrások (HDInsight stb.) más régiókban is lehetnek.
Futtassa a következő parancsot, és adja meg az Azure Portalra való bejelentkezéshez használt felhasználónevet és jelszót.
Connect-AzAccount
Futtassa a következő parancsot a fiókhoz tartozó előfizetések megtekintéséhez.
Get-AzSubscription
Futtassa a következő parancsot a használni kívánt előfizetés kiválasztásához. Cserélje le a SubscriptionId kifejezést az Azure-előfizetés azonosítójára:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"
Hozza létre az ADFTutorialResourceGroup erőforráscsoportot.
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"
Hozza létre az adat-előállítót.
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
A kimenet megtekintéséhez futtassa a következő parancsot:
$df
Váltson arra a mappára, ahol létrehozta a JSON-fájlokat, és futtassa az alábbi parancsot egy Azure Storage társított szolgáltatás üzembe helyezéséhez:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
Futtassa az alábbi parancsot egy igény szerinti Spark társított szolgáltatás üzembe helyezéséhez:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
Futtassa az alábbi parancsot egy folyamat üzembe helyezéséhez:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
Folyamat futásának indítása és monitorozása
Folyamat futásának indítása Így megőrizheti a folyamat futásának azonosítóját későbbi monitorozás céljából.
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName
A folyamatfuttatás állapotának a befejezésig való folyamatos ellenőrzéséig futtassa az alábbi szkriptet.
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"
Itt látható a példa futtatás kimenete:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"
Ellenőrizze, hogy az
outputfiles
nevű mappa létrejött-e az adftutorial tárolóspark
mappájában a Spark program kimenetével.
Kapcsolódó tartalom
A példában szereplő folyamat adatokat másol az egyik helyről egy másikra egy Azure Blob Storage-ban. Megtanulta végrehajtani az alábbi műveleteket:
- Adat-előállító létrehozása
- Társított szolgáltatások készítése és üzembe helyezése
- Folyamat létrehozása és üzembe helyezése.
- Folyamat futásának indítása
- A folyamat futásának monitorozása.
A következő oktatóanyagra lépve megtudhatja, hogyan alakíthat át adatokat egy Hive szkriptnek a virtuális hálózatban lévő Azure HDInsight-fürtön való futtatásával.