Partager via


Transformer des données dans le cloud à l’aide d’une activité Spark dans Azure Data Factory

S’APPLIQUE À : Azure Data Factory Azure Synapse Analytics

Conseil

Essayez Data Factory dans Microsoft Fabric, une solution d’analyse tout-en-un pour les entreprises. Microsoft Fabric couvre tous les aspects, du déplacement des données à la science des données, en passant par l’analyse en temps réel, l’aide à la décision et la création de rapports. Découvrez comment démarrer un nouvel essai gratuitement !

Dans ce tutoriel, vous utilisez Azure PowerShell pour créer un pipeline Azure Data Factory qui transforme des données à l’aide d’une activité Spark et d’un service lié HDInsight à la demande. Dans ce tutoriel, vous allez effectuer les étapes suivantes :

  • Créer une fabrique de données.
  • Créer et déployer des services liés.
  • Créer et déployer un pipeline.
  • Démarrer une exécution de pipeline.
  • Surveiller l’exécution du pipeline.

Si vous n’avez pas d’abonnement Azure, créez un compte gratuit avant de commencer.

Prérequis

Notes

Nous vous recommandons d’utiliser le module Azure Az PowerShell pour interagir avec Azure. Pour commencer, consultez Installer Azure PowerShell. Pour savoir comment migrer vers le module Az PowerShell, consultez Migrer Azure PowerShell depuis AzureRM vers Az.

  • Compte Stockage Azure. Vous créez un script Python et un fichier d’entrée, puis vous les chargez vers le stockage Azure. La sortie du programme Spark est stockée dans ce compte de stockage. Le cluster Spark sur demande utilise le même compte de stockage comme stockage principal.
  • Azure PowerShell. Suivez les instructions de la page Installation et configuration d’Azure PowerShell.

Charger un script Python dans votre compte de stockage d’objets blob

  1. Créez un fichier Python nommé WordCount_Spark.py avec le contenu suivant :

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. Remplacez <storageAccountName> par le nom de votre compte de stockage Azure. Puis enregistrez le fichier.

  3. Dans votre stockage Blob Azure, créez un conteneur nommé adftutorial s’il n’existe pas.

  4. Créez un dossier nommé spark.

  5. Créer un sous-dossier nommé script sous le dossier spark.

  6. Téléchargez le fichier WordCount_Spark.py dans le sous-dossier script.

Télécharger le fichier d’entrée

  1. Créez un fichier nommé minecraftstory.txt avec du texte. Le programme Spark compte le nombre de mots dans ce texte.
  2. Créez un sous-dossier nommé inputfiles dans le dossier spark.
  3. Téléchargez le fichier minecraftstory.txt dans le sous-dossier inputfiles.

Créer des services liés

Vous créez deux services liés dans cette section :

  • Un service lié au stockage Azure relie un compte de stockage Azure à la fabrique de données. Ce stockage est utilisé par le cluster HDInsight à la demande. Il contient également le script Spark à exécuter.
  • Un service lié HDInsight à la demande. Azure Data Factory crée automatiquement un cluster HDInsight, exécute le programme Spark, puis supprime le cluster HDInsight à la fin de la période d’inactivité préconfigurée.

Service lié Stockage Azure

Créez un fichier JSON à l’aide de votre éditeur favori, copiez la définition JSON suivante d’un service lié au stockage Azure, puis enregistrez le fichier sous MyStorageLinkedService.json.

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
      }
    }
}

Mettez à jour les éléments <storageAccountName> et <storageAccountKey> avec le nom et la clé de votre compte de stockage Azure.

Service lié HDInsight à la demande

Créez un fichier JSON à l’aide de votre éditeur favori, copiez la définition JSON suivante d’un service lié HDInsight Azure et enregistrez le fichier sous MyOnDemandSparkLinkedService.json.

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

Mettez à jour les valeurs des propriétés suivantes dans la définition de service lié :

  • hostSubscriptionId. Remplacez <subscriptionID> par l’ID de votre abonnement Azure. Le cluster HDInsight à la demande est créé dans cet abonnement.
  • tenant Remplacez <tenantID> par l’ID de votre client Azure.
  • servicePrincipalId, servicePrincipalKey. Remplacez <servicePrincipalID> et <servicePrincipalKey> par l’ID et la clé de votre principal de service dans Microsoft Entra ID. Ce principal de service doit être membre du rôle Contributeur de l’abonnement ou du groupe de ressources dans lequel le cluster est créé. Pour plus d’informations, consultez créer une application Microsoft Entra et un principal de service. L’ID de principal de service est équivalent à l’ID d’application et une clé de principal de service est équivalente à la valeur d’un secret client.
  • clusterResourceGroup. Remplacez <resourceGroupOfHDICluster> par le nom du groupe de ressources dans lequel le cluster HDInsight doit être créé.

Notes

Azure HDInsight présente une limite relative au nombre total de cœurs que vous pouvez utiliser dans chaque région Azure prise en charge. Pour le service lié HDInsight à la demande, le cluster HDInsight sera créé au même emplacement de stockage Azure que celui utilisé comme stockage principal. Vérifiez que vous disposez des quotas de cœurs suffisants pour pouvoir créer le cluster avec succès. Pour plus d’informations, reportez-vous à l’article Configurer des clusters dans HDInsight avec Hadoop, Spark, Kafka et bien plus encore.

Créer un pipeline

Dans cette étape, vous allez créer un pipeline avec une activité Spark. L’activité utilise l’exemple word count. Téléchargez le contenu à partir de cet emplacement si vous ne l’avez pas encore fait.

Créez un fichier JSON dans votre éditeur favori, copiez la définition JSON suivante d’une définition de pipeline et enregistrez-la sous MySparkOnDemandPipeline.json.

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

Notez les points suivants :

  • rootPath pointe vers le dossier Spark du conteneur adftutorial.
  • entryFilePath pointe vers le fichier WordCount_Spark.py dans le sous-dossier de script du dossier Spark.

Créer une fabrique de données

Vous avez créé des définitions de service lié et de pipeline dans des fichiers JSON. Maintenant, nous allons créer une fabrique de données et déployer le service lié et les fichiers JSON de pipeline à l’aide des applets de commande PowerShell. Exécutez une par une les commandes PowerShell suivantes :

  1. Définissez les variables une par une.

    Nom du groupe de ressources

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    dataFactoryName. elle doit être globalement unique

    $dataFactoryName = "MyDataFactory09102017"
    

    Nom du pipeline

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. Lancez PowerShell. Gardez Azure PowerShell ouvert jusqu’à la fin de ce guide de démarrage rapide. Si vous fermez puis rouvrez Azure PowerShell, vous devez réexécuter ces commandes. Pour obtenir la liste des régions Azure dans lesquelles Data Factory est actuellement disponible, sélectionnez les régions qui vous intéressent dans la page suivante, puis développez Analytique pour localiser Data Factory : Disponibilité des produits par région. Les magasins de données (Stockage Azure, Azure SQL Database, etc.) et les services de calcul (HDInsight, etc.) utilisés par la fabrique de données peuvent être proposés dans d’autres régions.

    Exécutez la commande suivante, puis saisissez le nom d’utilisateur et le mot de passe que vous avez utilisés pour la connexion au portail Azure :

    Connect-AzAccount
    

    Exécutez la commande suivante pour afficher tous les abonnements de ce compte :

    Get-AzSubscription
    

    Exécutez la commande suivante pour sélectionner l’abonnement que vous souhaitez utiliser. Remplacez SubscriptionId par l’ID de votre abonnement Azure :

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. Créez le groupe de ressources : ADFTutorialResourceGroup.

    New-AzResourceGroup -Name $resourceGroupName -Location "East Us" 
    
  4. Créez la fabrique de données.

     $df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Exécutez la commande suivante pour afficher la sortie :

    $df
    
  5. Basculez vers le dossier où vous avez créé des fichiers JSON et exécutez la commande suivante pour déployer un service lié au stockage Azure :

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. Exécutez la commande suivante pour déployer un service lié Spark à la demande :

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. Exécutez la commande suivante pour déployer un pipeline :

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

Démarrer et surveiller l’exécution d’un pipeline

  1. Démarrer une exécution de pipeline. Cette opération capture également l’ID d’exécution du pipeline pour une surveillance ultérieure.

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. Exécutez le script suivant pour vérifier en permanence l’état de l’exécution du pipeline jusqu’à la fin.

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. Voici la sortie de l’exemple d’exécution :

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. Confirmez qu’un dossier nommé outputfiles est créé dans le dossier spark du conteneur adftutorial avec la sortie du programme Spark.

Dans cet exemple, le pipeline copie les données d’un emplacement vers un autre dans un stockage Blob Azure. Vous avez appris à :

  • Créer une fabrique de données.
  • Créer et déployer des services liés.
  • Créer et déployer un pipeline.
  • Démarrer une exécution de pipeline.
  • Surveiller l’exécution du pipeline.

Passez au didacticiel suivant pour découvrir comment transformer des données en exécutant un script Hive sur un cluster Azure HDInsight qui se trouve dans un réseau virtuel.