Adatátalakítás a felhőben egy Spark-tevékenység az Azure Data Factoryban való használatával
A következőkre vonatkozik: Azure Data Factory Azure Synapse Analytics
Tipp.
Próbálja ki a Data Factoryt a Microsoft Fabricben, amely egy teljes körű elemzési megoldás a nagyvállalatok számára. A Microsoft Fabric az adattovábbítástól az adatelemzésig, a valós idejű elemzésig, az üzleti intelligenciáig és a jelentéskészítésig mindent lefed. Ismerje meg, hogyan indíthat új próbaverziót ingyenesen!
Ebben az oktatóanyagban az Azure PowerShell segítségével hozhat létre egy Data Factory-folyamatot, amely egy Spark-tevékenységgel és egy igény szerinti HDInsight társított szolgáltatással alakítja át az adatokat. Az oktatóanyagban az alábbi lépéseket fogja végrehajtani:
- Adat-előállító létrehozása
- Társított szolgáltatások készítése és üzembe helyezése
- Folyamat létrehozása és üzembe helyezése.
- Folyamat futásának indítása
- A folyamat futásának monitorozása.
Ha még nincs Azure-előfizetése, kezdés előtt hozzon létre egy ingyenes fiókot.
Előfeltételek
Megjegyzés:
We recommend that you use the Azure Az PowerShell module to interact with Azure. See Install Azure PowerShell to get started. To learn how to migrate to the Az PowerShell module, see Migrate Azure PowerShell from AzureRM to Az.
- Egy Azure Storage-fiók. Létrehozhat egy Python-szkriptet és egy bemeneti fájlt, és feltöltheti őket az Azure Storage-ba. A Spark-program kimenetét ebben a tárfiókban tárolja a rendszer. Az igény szerinti Spark-fürt ugyanezt a tárfiókot használja elsődleges tárterületként.
- Azure PowerShell. Kövesse az Azure PowerShell telepítését és konfigurálását ismertető cikkben szereplő utasításokat.
Python-szkript feltöltése Blob Storage-fiókba
Hozzon létre egy WordCount_Spark.py nevű Python-fájlt az alábbi tartalommal:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()
Cserélje a <storageAccountName> kifejezést Azure Storage-fiókja nevére. Ezután mentse a fájlt.
Az Azure Blob Storage-ban hozzon létre egy adftutorial nevű tárolót, ha még nem létezik.
Hozzon létre egy spark mappát.
Hozzon létre egy szkript almappát a spark mappában.
Töltse fel a WordCount_Spark.py fájlt a szkript almappába.
A bemeneti fájl feltöltése
- Hozzon létre egy minecraftstory.txt nevű fájlt némi szöveges tartalommal. A Spark-program megszámolja a szavak számát ebben a szövegben.
- Hozzon létre egy
inputfiles
nevű almappát aspark
mappában. - Töltse fel a
minecraftstory.txt
fájlt azinputfiles
almappába.
Társított szolgáltatások létrehozása
Ebben a szakaszban két társított szolgáltatást hoz létre:
- Egy Azure Storage-beli társított szolgáltatást, amely egy Azure Storage-fiókot társít az adat-előállítóhoz. Ezt a tárterületet csak az igény szerinti HDInsight-fürt használja. Ez tartalmazza a végrehajtani kívánt Spark-szkriptet is.
- Egy igény szerinti HDInsight társított szolgáltatást. Az Azure Data Factory automatikusan létrehoz egy HDInsight-fürtöt, futtatja a Spark-programot, majd törli a HDInsight-fürtöt, ha az egy előre megadott időtartamon keresztül inaktív.
Azure Storage társított szolgáltatás
Hozzon létre egy JSON-fájlt az előnyben részesített szerkesztővel, másolja a fájlba egy Azure Storage társított szolgáltatás alábbi JSON-definícióját, majd mentse a fájlt MyStorageLinkedService.json néven.
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
Cserélje le a <storageAccountName> és a <storageAccountKey> értékeket a saját Azure Storage-fiókja nevére és kulcsára.
Igény szerinti HDInsight társított szolgáltatás
Hozzon létre egy JSON-fájlt az előnyben részesített szerkesztővel, másolja a fájlba egy Azure HDInsight társított szolgáltatás alábbi JSON-definícióját, majd mentse a fájlt MyOnDemandSparkLinkedService.json néven.
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
Frissítse a következő tulajdonságok értékeit a társított szolgáltatás definíciójában:
- hostSubscriptionId. Cserélje le a <SubscriptionId> kifejezést az Azure-előfizetés azonosítójára. Létrejön az igény szerinti HDInsight-fürt ebben az előfizetésben.
- tenant. A <tenantID> helyére írja a saját Azure bérlőjének az azonosítóját.
- servicePrincipalId, servicePrincipalKey. Cserélje le <a servicePrincipalID> és <a servicePrincipalKey> azonosítóját és kulcsát a szolgáltatásnév azonosítójára és kulcsára a Microsoft Entra-azonosítóban. A szolgáltatásnévnek az előfizetés vagy a létrejövő fürtnek helyet adó erőforráscsoport Közreműködő szerepkörének tagjának kell lennie. Részletekért tekintse meg a Microsoft Entra alkalmazás és szolgáltatásnév létrehozását. A szolgáltatásnév azonosítója megegyezik az alkalmazásazonosítóval, a szolgáltatásnévkulcs pedig egyenértékű az ügyfélkód értékével.
- clusterResourceGroup. A <resourceGroupOfHDICluster> helyére írja annak az erőforráscsoportnak a nevét, amelyben a HDInsight-fürtöt létre kell hozni.
Megjegyzés:
Az Azure HDInsightban korlátozott azon magok száma, amelyek az egyes támogatott Azure-régiókban felhasználhatók. Igény szerinti HDInsight társított szolgáltatás esetében a HDInsight-fürt ugyanazon a helyen jön létre, mint amit az Azure Storage elsődleges tárterületként használ. Ellenőrizze, hogy a magkvótája elegendő-e a fürt sikeres létrehozásához. További információk: Fürtök beállítása a HDInsightban Hadoop, Spark, Kafka stb. használatával.
Folyamat létrehozása
Ebben a lépésben létrehoz egy új folyamatot egy Spark-tevékenységgel. A tevékenység a szószámlálás példát használja. Ha még nem tette meg, töltse le a tartalmakat innen.
Hozzon létre egy JSON-fájlt az előnyben részesített szerkesztővel, másolja a fájlba egy folyamatdefiníció alábbi JSON-definícióját, majd mentse a fájlt MySparkOnDemandPipeline.json néven.
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
Vegye figyelembe a következő szempontokat:
- A rootPath az adftutorial tároló spark mappájára mutat.
- Az entryFilePath a spark mappa szkript almappájában található WordCount_Spark.py fájlra mutat.
Adat-előállító létrehozása
Már létrehozta a társított szolgáltatást és a folyamat definícióját JSON-fájlokban. Most hozzunk létre egy adat-előállítót, és helyezzük üzembe a társított szolgáltatás- és folyamat JSON-fájljait PowerShell-parancsmagok használatával. Futtassa egyesével az alábbi PowerShell-parancsokat:
Adja meg egyesével a változókat.
Erőforráscsoport neve
$resourceGroupName = "ADFTutorialResourceGroup"
Adat-előállító neve. A névnek globálisan egyedinek kell lennie
$dataFactoryName = "MyDataFactory09102017"
Folyamat neve
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
Indítsa el a PowerShellt. Az Azure PowerShellt hagyja megnyitva a rövid útmutató végéig. Ha bezárja és újra megnyitja a programot, akkor újra le kell futtatnia a parancsokat. Azon Azure-régiók megtekintéséhez, amelyekben jelenleg elérhető a Data Factory, a következő lapon válassza ki az Önt érdeklő régiókat, majd bontsa ki az Elemzés részt, és keresse meg a Data Factory: Elérhető termékek régiók szerint szakaszt. Az adat-előállítók által használt adattárak (Azure Storage, Azure SQL Database stb.) és számítási erőforrások (HDInsight stb.) más régiókban is lehetnek.
Futtassa a következő parancsot, és adja meg az Azure Portalra való bejelentkezéshez használt felhasználónevet és jelszót.
Connect-AzAccount
Futtassa a következő parancsot a fiókhoz tartozó előfizetések megtekintéséhez.
Get-AzSubscription
Futtassa a következő parancsot a használni kívánt előfizetés kiválasztásához. Cserélje le a SubscriptionId kifejezést az Azure-előfizetés azonosítójára:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"
Hozza létre az ADFTutorialResourceGroup erőforráscsoportot.
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"
Hozza létre az adat-előállítót.
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
A kimenet megtekintéséhez futtassa a következő parancsot:
$df
Váltson arra a mappára, ahol létrehozta a JSON-fájlokat, és futtassa az alábbi parancsot egy Azure Storage társított szolgáltatás üzembe helyezéséhez:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
Futtassa az alábbi parancsot egy igény szerinti Spark társított szolgáltatás üzembe helyezéséhez:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
Futtassa az alábbi parancsot egy folyamat üzembe helyezéséhez:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
Folyamat futásának indítása és monitorozása
Folyamat futásának indítása Így megőrizheti a folyamat futásának azonosítóját későbbi monitorozás céljából.
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName
A folyamatfuttatás állapotának a befejezésig való folyamatos ellenőrzéséig futtassa az alábbi szkriptet.
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"
Itt látható a példa futtatás kimenete:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"
Ellenőrizze, hogy az
outputfiles
nevű mappa létrejött-e az adftutorial tárolóspark
mappájában a Spark program kimenetével.
Kapcsolódó tartalom
A példában szereplő folyamat adatokat másol az egyik helyről egy másikra egy Azure Blob Storage-ban. Megtanulta végrehajtani az alábbi műveleteket:
- Adat-előállító létrehozása
- Társított szolgáltatások készítése és üzembe helyezése
- Folyamat létrehozása és üzembe helyezése.
- Folyamat futásának indítása
- A folyamat futásának monitorozása.
A következő oktatóanyagra lépve megtudhatja, hogyan alakíthat át adatokat egy Hive szkriptnek a virtuális hálózatban lévő Azure HDInsight-fürtön való futtatásával.