Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
Vonatkozik:
Azure Data Factory
Azure Synapse Analytics
Tipp
Ebben az oktatóanyagban a Azure PowerShell használatával hoz létre egy Data Factory-folyamatot, amely a Spark-tevékenység és egy igény szerinti HDInsight társított szolgáltatás használatával alakítja át az adatokat. Az oktatóanyagban az alábbi lépéseket fogja végrehajtani:
- Adat-előállító létrehozása
- Társított szolgáltatások készítése és üzembe helyezése
- Folyamat létrehozása és üzembe helyezése.
- Folyamat futásának indítása
- A csővezeték futásának monitorozása.
Ha nem rendelkezik Azure-előfizetéssel, a kezdés előtt hozzon létre egy free fiókot.
Előfeltételek
Megjegyzés
Javasoljuk, hogy az Azure Az PowerShell-modult használja a Azure használatához. Első lépésként lásd: Install Azure PowerShell. Az Az PowerShell-modulra való migrálásról az Migrate Azure PowerShell az AzureRM-ből az Az című témakörben olvashat.
- Azure Storage fiók. Létrehozhat egy Python szkriptet és egy bemeneti fájlt, és feltöltheti őket a Azure tárolóba. A Spark-program kimenetét ebben a tárfiókban tárolja a rendszer. Az igény szerinti Spark-fürt ugyanazt a tárfiókot használja elsődleges tárolóként.
- Azure PowerShell. Kövesse a Hogyan telepítse és konfigurálja Azure PowerShell utasításait.
Python szkript feltöltése a Blob Storage-fiókba
Hozzon létre egy Python WordCount_Spark.py nevű fájlt a következő tartalommal:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()Cserélje le <storageAccountName> az Azure Storage-fiók nevére. Ezután mentse a fájlt.
A Azure Blob Storage hozzon létre egy adftutorial nevű tárolót, ha nem létezik.
Hozzon létre egy spark mappát.
Hozzon létre egy szkript almappát a spark mappában.
Töltse fel a WordCount_Spark.py fájlt a szkript almappába.
A bemeneti fájl feltöltése
- Hozzon létre egy minecraftstory.txt nevű fájlt némi szöveges tartalommal. A Spark-program megszámolja a szavak számát ebben a szövegben.
- Hozzon létre egy
inputfilesnevű almappát asparkmappában. - Töltse fel a
minecraftstory.txtfájlt azinputfilesalmappába.
Szerzőhöz társított szolgáltatások
Ebben a szakaszban két társított szolgáltatást hoz létre:
- Egy Azure Storage társított szolgáltatás, amely egy Azure Storage-fiókot csatol az adat-előállítóhoz. Ezt a tárterületet az igény szerinti HDInsight-fürt használja. Ez tartalmazza a végrehajtani kívánt Spark-szkriptet is.
- Egy igény szerinti HDInsight kapcsolt szolgáltatás. Azure Data Factory automatikusan létrehoz egy HDInsight-fürtöt, futtatja a Spark programot, majd törli a HDInsight-fürtöt az előre konfigurált idő elteltével.
Azure Storage társított szolgáltatás
Hozzon létre egy JSON-fájlt az előnyben részesített szerkesztővel, másolja ki egy Azure Storage társított szolgáltatás következő JSON-definícióját, majd mentse a fájlt MyStorageLinkedService.json.
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
Frissítse a <storageAccountName> és <storageAccountKey> a Azure Storage-fiók nevével és kulcsával.
Igényalapú HDInsight társított szolgáltatás
Hozzon létre egy JSON-fájlt az előnyben részesített szerkesztővel, másolja ki egy Azure HDInsight társított szolgáltatás következő JSON-definícióját, és mentse a fájlt MyOnDemandSparkLinkedService.json.
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
Frissítse a következő tulajdonságok értékeit a társított szolgáltatás definíciójában:
- hostSubscriptionId. Cserélje le <subscriptionID> a Azure-előfizetés azonosítójára. Ebben az előfizetésben kérésre létrehozásra kerül a HDInsight-fürt.
- bérlő. Cserélje le <tenantID> a Azure-bérlő azonosítójára.
- servicePrincipalId, servicePrincipalKey. Cserélje le <servicePrincipalID> és <servicePrincipalKey> a szolgáltatásnév azonosítójára és kulcsára a Microsoft Entra ID-ben. A szolgáltatásnévnek az előfizetés vagy a létrejövő fürtnek helyet adó erőforráscsoport Közreműködő szerepkörének tagjának kell lennie. A részletekért lásd: Microsoft Entra alkalmazás és szolgáltatásnév létrehozása. A szolgáltatásnév azonosítója megegyezik az alkalmazásazonosítóval, a szolgáltatásnévkulcs pedig egyenértékű az ügyfélkód értékével.
- clusterResourceGroup. A <resourceGroupOfHDICluster> helyére írja annak az erőforráscsoportnak a nevét, amelyben a HDInsight-fürtöt létre kell hozni.
Megjegyzés
Azure HDInsight az egyes támogatott Azure régiókban használható magok teljes számát korlátozza. Igény szerinti HDInsight társított szolgáltatás esetén a HDInsight-fürt az elsődleges tárolóként használt Azure Storage ugyanazon a helyszínen lesz létrehozva. Ellenőrizze, hogy a magkvótája elegendő-e a fürt sikeres létrehozásához. További információk: Fürtök beállítása a HDInsightban Hadoop, Spark, Kafka stb. használatával.
Folyamatlánc megírása
Ebben a lépésben létrehoz egy új folyamatot egy Spark-tevékenységgel. A tevékenység a szószámlálás példát használja. Ha még nem tette meg, töltse le a tartalmakat innen.
Hozzon létre egy JSON-fájlt az előnyben részesített szerkesztővel, másolja a fájlba egy folyamatdefiníció alábbi JSON-definícióját, majd mentse a fájlt MySparkOnDemandPipeline.json néven.
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
Vegye figyelembe az alábbiakat:
- A rootPath az adftutorial tároló spark mappájára mutat.
- Az entryFilePath a spark mappa szkript almappájában található WordCount_Spark.py fájlra mutat.
Adat-előállító létrehozása
Már létrehozta a társított szolgáltatást és a folyamat definícióját JSON-fájlokban. Most hozzunk létre egy adat-előállítót, és helyezzük üzembe a társított szolgáltatás- és folyamat JSON-fájljait PowerShell-parancsmagok használatával. Futtassa egyesével az alábbi PowerShell-parancsokat:
Adja meg egyesével a változókat.
Erőforráscsoport neve
$resourceGroupName = "ADFTutorialResourceGroup"Adat-előállító neve. A névnek globálisan egyedinek kell lennie
$dataFactoryName = "MyDataFactory09102017"Csővezeték neve
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipelineIndítsa el a PowerShellt. A rövid útmutató végéig tartsa nyitva Azure PowerShell. Ha bezárja és újra megnyitja a programot, akkor újra le kell futtatnia a parancsokat. Azoknak a Azure régióknak a listájához, amelyekben a Data Factory jelenleg elérhető, jelölje ki az Önt érdeklő régiókat az alábbi lapon, majd bontsa ki a Analytics elemet a Data Factory: Régiók szerint elérhető termékek. A data factory által használt adattárak (Azure Storage, Azure SQL Database stb.) és a data factory által használt számítások (HDInsight stb.) más régiókban is lehetnek.
Futtassa a következő parancsot, és adja meg a Azure portálra való bejelentkezéshez használt felhasználónevet és jelszót:
Connect-AzAccountFuttassa a következő parancsot a fiókhoz tartozó előfizetések megtekintéséhez.
Get-AzSubscriptionFuttassa a következő parancsot a használni kívánt előfizetés kiválasztásához. Cserélje le a SubscriptionId elemet a Azure-előfizetés azonosítójára:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"Hozza létre az ADFTutorialResourceGroup erőforráscsoportot.
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"Hozza létre az adatgyárat.
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupNameA kimenet megtekintéséhez futtassa a következő parancsot:
$dfVáltson arra a mappára, ahol JSON-fájlokat hozott létre, és futtassa a következő parancsot egy Azure Storage társított szolgáltatás üzembe helyezéséhez:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"Futtassa az alábbi parancsot egy igény szerinti Spark társított szolgáltatás üzembe helyezéséhez:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"Futtassa az alábbi parancsot egy folyamat üzembe helyezéséhez:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
Csővezeték futtatásának indítása és monitorozása
Folyamat futásának indítása Így megőrizheti a folyamat futásának azonosítóját későbbi monitorozás céljából.
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineNameA folyamatfuttatás állapotának a befejezésig való folyamatos ellenőrzéséig futtassa az alábbi szkriptet.
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"Itt látható a példa futtatás kimenete:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"Ellenőrizze, hogy az
outputfilesnevű mappa létrejött-e az adftutorial tárolósparkmappájában a Spark program kimenetével.
Kapcsolódó tartalom
A mintafolyamat adatokat másol egy Azure blobtároló egyik helyére. Megtanulta végrehajtani az alábbi műveleteket:
- Adat-előállító létrehozása
- Társított szolgáltatások készítése és üzembe helyezése
- Folyamat létrehozása és üzembe helyezése.
- Folyamat futásának indítása
- A csővezeték futásának monitorozása.
Folytassa a következő oktatóanyagtal, amelyből megtudhatja, hogyan alakíthat át adatokat Hive-szkript futtatásával egy virtuális hálózaton található Azure HDInsight fürtön.