Megosztás a következőn keresztül:


Adatátalakítás a felhőben egy Spark-tevékenység az Azure Data Factoryban való használatával

A következőkre vonatkozik: Azure Data Factory Azure Synapse Analytics

Tipp.

Próbálja ki a Data Factoryt a Microsoft Fabricben, amely egy teljes körű elemzési megoldás a nagyvállalatok számára. A Microsoft Fabric az adattovábbítástól az adatelemzésig, a valós idejű elemzésig, az üzleti intelligenciáig és a jelentéskészítésig mindent lefed. Ismerje meg, hogyan indíthat új próbaverziót ingyenesen!

Ebben az oktatóanyagban az Azure PowerShell segítségével hozhat létre egy Data Factory-folyamatot, amely egy Spark-tevékenységgel és egy igény szerinti HDInsight társított szolgáltatással alakítja át az adatokat. Az oktatóanyagban az alábbi lépéseket fogja végrehajtani:

  • Adat-előállító létrehozása
  • Társított szolgáltatások készítése és üzembe helyezése
  • Folyamat létrehozása és üzembe helyezése.
  • Folyamat futásának indítása
  • A folyamat futásának monitorozása.

Ha még nincs Azure-előfizetése, kezdés előtt hozzon létre egy ingyenes fiókot.

Előfeltételek

Feljegyzés

Javasoljuk, hogy az Azure Az PowerShell modult használja az Azure-ral való interakcióhoz. Első lépésként tekintse meg az Azure PowerShell telepítését ismertető témakört. Az Az PowerShell-modulra történő migrálás részleteiről lásd: Az Azure PowerShell migrálása az AzureRM modulból az Az modulba.

  • Egy Azure Storage-fiók. Létrehozhat egy Python-szkriptet és egy bemeneti fájlt, és feltöltheti őket az Azure Storage-ba. A Spark-program kimenetét ebben a tárfiókban tárolja a rendszer. Az igény szerinti Spark-fürt ugyanezt a tárfiókot használja elsődleges tárterületként.
  • Azure PowerShell. Kövesse az Azure PowerShell telepítését és konfigurálását ismertető cikkben szereplő utasításokat.

Python-szkript feltöltése Blob Storage-fiókba

  1. Hozzon létre egy WordCount_Spark.py nevű Python-fájlt az alábbi tartalommal:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. Cserélje a <storageAccountName> kifejezést Azure Storage-fiókja nevére. Ezután mentse a fájlt.

  3. Az Azure Blob Storage-ban hozzon létre egy adftutorial nevű tárolót, ha még nem létezik.

  4. Hozzon létre egy spark mappát.

  5. Hozzon létre egy szkript almappát a spark mappában.

  6. Töltse fel a WordCount_Spark.py fájlt a szkript almappába.

A bemeneti fájl feltöltése

  1. Hozzon létre egy minecraftstory.txt nevű fájlt némi szöveges tartalommal. A Spark-program megszámolja a szavak számát ebben a szövegben.
  2. Hozzon létre egy inputfiles nevű almappát a spark mappában.
  3. Töltse fel a minecraftstory.txt fájlt az inputfiles almappába.

Társított szolgáltatások létrehozása

Ebben a szakaszban két társított szolgáltatást hoz létre:

  • Egy Azure Storage-beli társított szolgáltatást, amely egy Azure Storage-fiókot társít az adat-előállítóhoz. Ezt a tárterületet csak az igény szerinti HDInsight-fürt használja. Ez tartalmazza a végrehajtani kívánt Spark-szkriptet is.
  • Egy igény szerinti HDInsight társított szolgáltatást. Az Azure Data Factory automatikusan létrehoz egy HDInsight-fürtöt, futtatja a Spark-programot, majd törli a HDInsight-fürtöt, ha az egy előre megadott időtartamon keresztül inaktív.

Azure Storage társított szolgáltatás

Hozzon létre egy JSON-fájlt az előnyben részesített szerkesztővel, másolja a fájlba egy Azure Storage társított szolgáltatás alábbi JSON-definícióját, majd mentse a fájlt MyStorageLinkedService.json néven.

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
      }
    }
}

Cserélje le a <storageAccountName> és a <storageAccountKey> értékeket a saját Azure Storage-fiókja nevére és kulcsára.

Igény szerinti HDInsight társított szolgáltatás

Hozzon létre egy JSON-fájlt az előnyben részesített szerkesztővel, másolja a fájlba egy Azure HDInsight társított szolgáltatás alábbi JSON-definícióját, majd mentse a fájlt MyOnDemandSparkLinkedService.json néven.

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

Frissítse a következő tulajdonságok értékeit a társított szolgáltatás definíciójában:

  • hostSubscriptionId. Cserélje le a <SubscriptionId> kifejezést az Azure-előfizetés azonosítójára. Létrejön az igény szerinti HDInsight-fürt ebben az előfizetésben.
  • tenant. A <tenantID> helyére írja a saját Azure bérlőjének az azonosítóját.
  • servicePrincipalId, servicePrincipalKey. Cserélje le <a servicePrincipalID> és <a servicePrincipalKey> azonosítóját és kulcsát a szolgáltatásnév azonosítójára és kulcsára a Microsoft Entra-azonosítóban. A szolgáltatásnévnek az előfizetés vagy a létrejövő fürtnek helyet adó erőforráscsoport Közreműködő szerepkörének tagjának kell lennie. Részletekért tekintse meg a Microsoft Entra alkalmazás és szolgáltatásnév létrehozását. A szolgáltatásnév azonosítója megegyezik az alkalmazásazonosítóval, a szolgáltatásnévkulcs pedig egyenértékű az ügyfélkód értékével.
  • clusterResourceGroup. A <resourceGroupOfHDICluster> helyére írja annak az erőforráscsoportnak a nevét, amelyben a HDInsight-fürtöt létre kell hozni.

Feljegyzés

Az Azure HDInsightban korlátozott azon magok száma, amelyek az egyes támogatott Azure-régiókban felhasználhatók. Igény szerinti HDInsight társított szolgáltatás esetében a HDInsight-fürt ugyanazon a helyen jön létre, mint amit az Azure Storage elsődleges tárterületként használ. Ellenőrizze, hogy a magkvótája elegendő-e a fürt sikeres létrehozásához. További információk: Fürtök beállítása a HDInsightban Hadoop, Spark, Kafka stb. használatával.

Folyamat létrehozása

Ebben a lépésben létrehoz egy új folyamatot egy Spark-tevékenységgel. A tevékenység a szószámlálás példát használja. Ha még nem tette meg, töltse le a tartalmakat innen.

Hozzon létre egy JSON-fájlt az előnyben részesített szerkesztővel, másolja a fájlba egy folyamatdefiníció alábbi JSON-definícióját, majd mentse a fájlt MySparkOnDemandPipeline.json néven.

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

Vegye figyelembe az alábbiakat:

  • A rootPath az adftutorial tároló spark mappájára mutat.
  • Az entryFilePath a spark mappa szkript almappájában található WordCount_Spark.py fájlra mutat.

Adat-előállító létrehozása

Már létrehozta a társított szolgáltatást és a folyamat definícióját JSON-fájlokban. Most hozzunk létre egy adat-előállítót, és helyezzük üzembe a társított szolgáltatás- és folyamat JSON-fájljait PowerShell-parancsmagok használatával. Futtassa egyesével az alábbi PowerShell-parancsokat:

  1. Adja meg egyesével a változókat.

    Erőforráscsoport neve

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    Adat-előállító neve. A névnek globálisan egyedinek kell lennie

    $dataFactoryName = "MyDataFactory09102017"
    

    Folyamat neve

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. Indítsa el a PowerShellt. Az Azure PowerShellt hagyja megnyitva a rövid útmutató végéig. Ha bezárja és újra megnyitja a programot, akkor újra le kell futtatnia a parancsokat. Azon Azure-régiók megtekintéséhez, amelyekben jelenleg elérhető a Data Factory, a következő lapon válassza ki az Önt érdeklő régiókat, majd bontsa ki az Elemzés részt, és keresse meg a Data Factory: Elérhető termékek régiók szerint szakaszt. Az adat-előállítók által használt adattárak (Azure Storage, Azure SQL Database stb.) és számítási erőforrások (HDInsight stb.) más régiókban is lehetnek.

    Futtassa a következő parancsot, és adja meg az Azure Portalra való bejelentkezéshez használt felhasználónevet és jelszót.

    Connect-AzAccount
    

    Futtassa a következő parancsot a fiókhoz tartozó előfizetések megtekintéséhez.

    Get-AzSubscription
    

    Futtassa a következő parancsot a használni kívánt előfizetés kiválasztásához. Cserélje le a SubscriptionId kifejezést az Azure-előfizetés azonosítójára:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. Hozza létre az ADFTutorialResourceGroup erőforráscsoportot.

    New-AzResourceGroup -Name $resourceGroupName -Location "East Us" 
    
  4. Hozza létre az adat-előállítót.

     $df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    A kimenet megtekintéséhez futtassa a következő parancsot:

    $df
    
  5. Váltson arra a mappára, ahol létrehozta a JSON-fájlokat, és futtassa az alábbi parancsot egy Azure Storage társított szolgáltatás üzembe helyezéséhez:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. Futtassa az alábbi parancsot egy igény szerinti Spark társított szolgáltatás üzembe helyezéséhez:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. Futtassa az alábbi parancsot egy folyamat üzembe helyezéséhez:

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

Folyamat futásának indítása és monitorozása

  1. Folyamat futásának indítása Így megőrizheti a folyamat futásának azonosítóját későbbi monitorozás céljából.

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. A folyamatfuttatás állapotának a befejezésig való folyamatos ellenőrzéséig futtassa az alábbi szkriptet.

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. Itt látható a példa futtatás kimenete:

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. Ellenőrizze, hogy az outputfiles nevű mappa létrejött-e az adftutorial tároló spark mappájában a Spark program kimenetével.

A példában szereplő folyamat adatokat másol az egyik helyről egy másikra egy Azure Blob Storage-ban. Megtanulta végrehajtani az alábbi műveleteket:

  • Adat-előállító létrehozása
  • Társított szolgáltatások készítése és üzembe helyezése
  • Folyamat létrehozása és üzembe helyezése.
  • Folyamat futásának indítása
  • A folyamat futásának monitorozása.

A következő oktatóanyagra lépve megtudhatja, hogyan alakíthat át adatokat egy Hive szkriptnek a virtuális hálózatban lévő Azure HDInsight-fürtön való futtatásával.