Trasformare dati nel cloud usando l'attività Spark in Azure Data Factory

SI APPLICA A: Azure Data Factory Azure Synapse Analytics

Suggerimento

Provare Data Factory in Microsoft Fabric, una soluzione di analisi completa per le aziende. Microsoft Fabric copre tutti gli elementi, dallo spostamento dei dati all'analisi scientifica dei dati, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Scopri come avviare gratuitamente una nuova versione di valutazione .

In questa esercitazione si usa Azure PowerShell per creare una pipeline di Data Factory che trasforma i dati con un'attività Spark e un servizio collegato HDInsight su richiesta. In questa esercitazione vengono completati i passaggi seguenti:

  • Creare una data factory.
  • Creare e distribuire servizi collegati.
  • Creare e distribuire una pipeline.
  • Avviare un'esecuzione della pipeline.
  • Monitorare l'esecuzione della pipeline.

Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.

Prerequisiti

Nota

È consigliabile usare il modulo Azure Az PowerShell per interagire con Azure. Per iniziare, vedere Installare Azure PowerShell. Per informazioni su come eseguire la migrazione al modulo AZ PowerShell, vedere Eseguire la migrazione di Azure PowerShell da AzureRM ad Az.

  • Account di archiviazione di Azure. È possibile creare uno script Python e un file di input e caricarli nell'archiviazione di Azure. L'output del programma Spark viene archiviato in questo account di archiviazione. Il cluster Spark su richiesta usa lo stesso account di archiviazione come risorsa di archiviazione primaria.
  • Azure PowerShell. Seguire le istruzioni in Come installare e configurare Azure PowerShell.

Caricare lo script Python nell'account Archiviazione BLOB

  1. Creare un file Python denominato WordCount_Spark.py con il contenuto seguente:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. Sostituire <storageAccountName> con il nome del proprio account di archiviazione di Azure. Salvare quindi il file.

  3. Nell'Archivio BLOB di Azure creare un contenitore denominato adftutorial, se non esiste.

  4. Creare una cartella denominata spark.

  5. Creare una sottocartella denominata script nella cartella spark.

  6. Caricare il file WordCount_Spark.py nella sottocartella script.

Caricare il file di input

  1. Creare un file denominato minecraftstory.txt con del testo. Il programma Spark conta il numero di parole in questo testo.
  2. Creare una sottocartella denominata inputfiles nella cartella spark.
  3. Caricare il file minecraftstory.txt nella sottocartella inputfiles.

Creare servizi collegati

In questa sezione vengono creati due servizi collegati:

  • Un servizio collegato Archiviazione di Azure che collega un account di archiviazione di Azure alla data factory. Questo archivio viene usato dal cluster HDInsight su richiesta. Contiene anche lo script Spark da eseguire.
  • Un servizio collegato HDInsight su richiesta. Azure Data Factory crea automaticamente un cluster HDInsight, esegue il programma Spark ed elimina il cluster HDInsight dopo che è rimasto inattivo per un periodo di tempo preconfigurato.

Servizio collegato Archiviazione di Azure

Creare un file JSON usando l'editor preferito, copiare la definizione JSON seguente di un servizio collegato di Archiviazione di Azure e quindi salvare il file con il nome MyStorageLinkedService.json.

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
      }
    }
}

Sostituire <storageAccountName> e <storageAccountKey> con il nome e la chiave dell'account di archiviazione di Azure.

Servizio collegato HDInsight su richiesta

Creare un file JSON usando l'editor preferito, copiare la definizione JSON seguente di un servizio collegato di Azure HDInsight e quindi salvare il file con il nome MyOnDemandSparkLinkedService.json.

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

Aggiornare i valori per le proprietà seguenti nella definizione del servizio collegato:

  • hostSubscriptionId. Sostituire <subscriptionID> con l'ID della sottoscrizione di Azure. Il cluster HDInsight su richiesta verrà creato in questa sottoscrizione.
  • tenant. Sostituire <tenantID> con l'ID del tenant di Azure.
  • servicePrincipalId, servicePrincipalKey. Sostituire <servicePrincipalID> e <servicePrincipalKey> con ID e chiave dell'entità servizio nell'ID Microsoft Entra. Questa entità servizio deve essere un membro del ruolo Collaboratore della sottoscrizione o del gruppo di risorse in cui viene creato il cluster. Per informazioni dettagliate, vedere Creare un'applicazione Microsoft Entra e un'entità servizio. L'ID entità servizio equivale all'ID applicazione e una chiave entità servizio equivale al valore di un segreto client.
  • clusterResourceGroup. Sostituire <resourceGroupOfHDICluster> con il nome del gruppo di risorse in cui deve essere creato il cluster HDInsight.

Nota

Azure HDInsight applica un limite al numero totale di core che è possibile usare in ogni area di Azure supportata. Per il servizio collegato HDInsight su richiesta, il cluster HDInsight verrà creato nello stesso percorso dell'archivio di Azure usato come risorsa di archiviazione primaria. Assicurarsi che siano disponibili sufficienti quote di core per la creazione del cluster. Per altre informazioni, vedere Configurare i cluster di HDInsight con Hadoop, Spark, Kafka e altro ancora.

Creare una pipeline

In questo passaggio si crea una nuova pipeline con un'attività Spark. L'attività usa l'esempio del conteggio parole. Se non è già stato fatto, scaricare il contenuto da questo percorso.

Creare un file JSON usando l'editor preferito, copiare la definizione JSON seguente di una pipeline e quindi salvare il file con il nome MySparkOnDemandPipeline.json.

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

Notare i punti seguenti:

  • rootPath punta alla cartella spark del contenitore adftutorial.
  • entryFilePath punta al file WordCount_Spark.py nella sottocartella script della cartella spark.

Creare una data factory

Sono state create definizioni di servizio collegato e pipeline nei file JSON. A questo punto si creerà una data factory e si distribuirà i file JSON del servizio e della pipeline collegati usando i cmdlet di PowerShell. Eseguire questi comandi di PowerShell uno alla volta:

  1. Impostare le variabili una alla volta.

    Nome gruppo di risorse

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    Nome della data factory. Deve essere univoco a livello globale

    $dataFactoryName = "MyDataFactory09102017"
    

    Nome pipeline

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. Avviare PowerShell. Tenere aperto Azure PowerShell fino al termine di questa guida introduttiva. Se si chiude e si riapre, sarà necessario eseguire di nuovo questi comandi. Per un elenco di aree di Azure in cui Data Factory è attualmente disponibile, selezionare le aree di interesse nella pagina seguente, quindi espandere Analytics per individuare Data Factory: Prodotti disponibili in base all'area. Gli archivi dati (Archiviazione di Azure, database SQL di Azure e così via) e le risorse di calcolo (HDInsight e così via) usati dalla data factory possono trovarsi in altre aree.

    Eseguire questo comando e immettere il nome utente e la password usati per accedere al portale di Azure:

    Connect-AzAccount
    

    Eseguire questo comando per visualizzare tutte le sottoscrizioni per l'account:

    Get-AzSubscription
    

    Eseguire il comando seguente per selezionare la sottoscrizione da usare. Sostituire SubscriptionId con l'ID della sottoscrizione di Azure:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. Creare il gruppo di risorse ADFTutorialResourceGroup.

    New-AzResourceGroup -Name $resourceGroupName -Location "East Us" 
    
  4. Creare la data factory.

     $df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Eseguire questo comando per visualizzare l'output:

    $df
    
  5. Passare alla cartella in cui sono stati creati i file JSON ed eseguire questo comando per distribuire un servizio collegato di Archiviazione di Azure:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. Eseguire questo comando per distribuire un servizio collegato Spark su richiesta:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. Eseguire questo comando per distribuire una pipeline:

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

Avviare e monitorare un'esecuzione della pipeline

  1. Avviare un'esecuzione della pipeline. Viene anche acquisito l'ID di esecuzione della pipeline per il monitoraggio futuro.

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. Eseguire questo script per verificare costantemente lo stato di esecuzione della pipeline fino al termine.

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. Ecco l'output dell'esecuzione di esempio:

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. Verificare che sia stata creata una cartella denominata outputfiles nella cartella spark del contenitore adftutorial con l'output del programma Spark.

La pipeline in questo esempio copia i dati da una posizione a un'altra in un archivio BLOB di Azure. Contenuto del modulo:

  • Creare una data factory.
  • Creare e distribuire servizi collegati.
  • Creare e distribuire una pipeline.
  • Avviare un'esecuzione della pipeline.
  • Monitorare l'esecuzione della pipeline.

Passare alla prossima esercitazione per informazioni su come trasformare i dati tramite l'esecuzione di uno script Hive in un cluster Azure HDInsight che si trova in una rete virtuale.