Transformace dat v cloudu pomocí aktivity Sparku v Azure Data Factory

VZTAHUJE SE NA: Azure Data Factory Azure Synapse Analytics

Tip

Data Factory v Microsoft Fabric je nová generace Azure Data Factory s jednodušší architekturou, integrovanou AI a novými funkcemi. Pokud s integrací dat začínáte, začněte Fabric Data Factory. Stávající úlohy ADF lze upgradovat na Fabric pro přístup k novým funkcím v oblastech datové vědy, analýz v reálném čase a vytváření sestav.

V tomto kurzu použijete Azure PowerShell k vytvoření kanálu služby Data Factory, který transformuje data pomocí aktivity Sparku a propojené služby HDInsight na vyžádání. V tomto kurzu provedete následující kroky:

  • Vytvoření datové továrny
  • Vytvoření a nasazení propojených služeb
  • Vytvořte a nasaďte potrubí.
  • Zahajte spuštění potrubí.
  • Sledujte spuštění kanálu.

Pokud nemáte předplatné Azure, vytvořte si účet free před tím, než začnete.

Požadavky

Poznámka:

K interakci s Azure doporučujeme použít modul Azure Az PowerShell. Pokud chcete začít, přečtěte si téma Install Azure PowerShell. Informace o migraci do modulu Az PowerShell najdete v tématu Migrace Azure PowerShell z AzureRM do Az.

  • Účet služby Azure Storage. Vytvoříte Python skript a vstupní soubor a nahrajete je do úložiště Azure. V tomto účtu úložiště se ukládá výstup z programu Sparku. Cluster Spark na vyžádání používá stejný účet úložiště jako primární úložiště.
  • Azure PowerShell. Postupujte podle pokynů v Jak nainstalovat a nakonfigurovat Azure PowerShell.

Nahrání skriptu Python do účtu Blob Storage

  1. Vytvořte soubor Python s názvem WordCount_Spark.py s následujícím obsahem:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. Nahraďte <storageAccountName> názvem účtu Azure Storage. Pak soubor uložte.

  3. V Azure Blob Storage vytvořte kontejner s názvem adftutorial pokud neexistuje.

  4. Vytvořte složku s názvem spark.

  5. Ve složce spark vytvořte podsložku script.

  6. Do podsložky script uložte soubor WordCount_Spark.py.

Nahrání vstupního souboru

  1. Vytvořte soubor minecraftstory.txt s nějakým textem. Program Sparku spočítá slova v tomto textu.
  2. Ve složce inputfiles vytvořte podsložku spark.
  3. Do podsložky minecraftstory.txt uložte soubor inputfiles.

Autorem propojené služby

V této části vytvoříte dvě propojené služby:

  • Propojená služba Azure Storage, která propojila účet Azure Storage s datovou továrnou. Toto úložiště používá HDInsight cluster na vyžádání. Obsahuje také skript Sparku, který se má spustit.
  • Propojená služba HDInsight na vyžádání. Azure Data Factory automaticky vytvoří cluster HDInsight, spustí program Spark a poté odstraní cluster HDInsight poté, co je nečinný po předem nastavenou dobu.

propojená služba Azure Storage

Vytvořte soubor JSON pomocí preferovaného editoru, zkopírujte následující definici JSON propojené služby Azure Storage a pak soubor uložte jako MyStorageLinkedService.json.

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
      }
    }
}

Aktualizujte <storageAccountName> a <storageAccountKey> názvem a klíčem vašeho účtu Azure Storage.

Propojená služba HDInsight na vyžádání

Vytvořte soubor JSON pomocí preferovaného editoru, zkopírujte následující definici JSON propojené služby Azure HDInsight a uložte ho jako MyOnDemandSparkLinkedService.json.

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

V definici propojené služby aktualizujte hodnoty následujících vlastností:

  • hostSubscriptionId. Nahraďte <subscriptionID> ID vašeho předplatného Azure. V tomto předplatném se vytvoří cluster HDInsight na vyžádání.
  • tenant. Nahraďte <tenantID> ID vašeho tenanta Azure.
  • servicePrincipalId, servicePrincipalKey. Nahraďte <servicePrincipalID> a <servicePrincipalKey> příslušným ID a klíčem vašeho hlavního objektu služby v Microsoft Entra ID. Tento instanční objekt služby musí být členem role Přispěvatel pro předplatné nebo skupinu prostředků, ve které se cluster vytvoří. Podrobnosti najdete v tématu vytvoření aplikace a instančního objektu Microsoft Entra. ID instančního objektu odpovídá ID aplikace a klíč instančního objektu odpovídá hodnotě tajného klíče klienta.
  • clusterResourceGroup. Nahraďte <resourceGroupOfHDICluster> názvem skupiny prostředků, ve které se má cluster HDInsight vytvořit.

Poznámka:

Azure HDInsight má omezení celkového počtu jader, která můžete použít v každé Azure oblasti, kterou podporuje. Pro propojenou službu HDInsight na vyžádání bude cluster HDInsight vytvořen ve stejném umístění jako Azure Storage, které je používáno jako primární úložiště. Ujistěte se, že máte dostatečné kvóty pro jádra, aby bylo možné cluster úspěšně vytvořit. Další informace najdete v tématu Nastavení clusterů v HDInsight se systémem Hadoop, Spark, Kafka a dalšími.

Navrhnout kanál

V tomto kroku vytvoříte nový datový tok se Spark aktivitou. Aktivita používá ukázku word count (počet slov). Pokud jste to ještě neudělali, stáhněte obsah z tohoto umístění.

Pomocí preferovaného editoru vytvořte soubor JSON, zkopírujte do něj následující definici JSON kanálu a potom tento soubor uložte jako MySparkOnDemandPipeline.json.

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

Mějte na paměti následující body:

  • rootPath odkazuje na složku spark kontejneru adftutorial.
  • entryFilePath odkazuje na soubor WordCount_Spark.py v podsložce script složky spark.

Vytvoření datové továrny

Vytvořili jste definice propojené služby a kanálu v souborech JSON. Teď vytvoříme datovou továrnu a nasadíme propojené soubory JSON služby a kanálu pomocí rutin PowerShellu. Postupně spusťte následující příkazy PowerShellu:

  1. Nastavte proměnné jednu po druhé.

    Název skupiny prostředků

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    Název datové továrny. Musí být globálně jedinečný.

    $dataFactoryName = "MyDataFactory09102017"
    

    Název kanálu

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. Spusťte PowerShell. Nechte Azure PowerShell otevřené až do konce tohoto rychlého startu. Pokud ho zavřete a znovu otevřete, bude potřeba tyto příkazy spustit znovu. Seznam Azure oblastí, ve kterých je služba Data Factory aktuálně dostupná, vyberte oblasti, které vás zajímají, na následující stránce a rozbalte Analytics a vyhledejte Data Factory: Products available by region. Úložiště dat (Azure Storage, Azure SQL Database atd.) a výpočty (HDInsight atd.) používané datovými továrnami můžou být v jiných oblastech.

    Spusťte následující příkaz a zadejte uživatelské jméno a heslo, které používáte pro přihlášení k portálu Azure:

    Connect-AzAccount
    

    Spuštěním následujícího příkazu zobrazíte všechna předplatná pro tento účet:

    Get-AzSubscription
    

    Spuštěním následujícího příkazu vyberte předplatné, se kterým chcete pracovat. Nahraďte SubscriptionId ID vašeho předplatného Azure:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. Vytvořte skupinu prostředků ADFTutorialResourceGroup.

    New-AzResourceGroup -Name $resourceGroupName -Location "East Us" 
    
  4. Vytvořte datovou továrnu.

     $df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Spusťte následující příkaz pro zobrazení výstupu:

    $df
    
  5. Přepněte do složky, ve které jste vytvořili soubory JSON, a spuštěním následujícího příkazu nasaďte propojenou službu Azure Storage:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. Spuštěním následujícího příkazu nasaďte propojenou službu Sparku na vyžádání:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. Spuštěním následujícího příkazu nasaďte kanál:

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

Spuštění kanálu a jeho monitorování

  1. Zahajte spuštění potrubí. Zaznamená se také ID spuštění kanálu pro budoucí monitorování.

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. Spusťte následující skript, který bude nepřetržitě kontrolovat stav běhu kanálu, dokud neskončí.

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. Zde je výstup tohoto ukázkového spuštění:

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. Pomocí výstupu z programu Sparku potvrďte vytvoření složky outputfiles ve složce spark kontejneru adftutorial.

Datový kanál v tomto příkladu kopíruje data z jednoho umístění do jiného umístění v úložišti Azure Blob. Naučili jste se:

  • Vytvoření datové továrny
  • Vytvoření a nasazení propojených služeb
  • Vytvořte a nasaďte potrubí.
  • Zahajte spuštění potrubí.
  • Sledujte spuštění kanálu.

V dalším kurzu se dozvíte, jak transformovat data spuštěním skriptu Hive v clusteru Azure HDInsight, který je ve virtuální síti.