Compartir vía


Transformación de datos en la nube mediante la actividad de Spark en Azure Data Factory

SE APLICA A: Azure Data Factory Azure Synapse Analytics

Sugerencia

Data Factory en Microsoft Fabric es la próxima generación de Azure Data Factory, con una arquitectura más sencilla, inteligencia artificial integrada y nuevas características. Si no está familiarizado con la integración de datos, comience con Fabric Data Factory. Las cargas de trabajo de ADF existentes pueden actualizarse a Fabric para acceder a nuevas funcionalidades en ciencia de datos, análisis en tiempo real e informes.

En este tutorial, usará Azure PowerShell para crear un flujo de trabajo de Data Factory que transforme los datos mediante una actividad de Spark y un servicio vinculado de HDInsight bajo demanda. En este tutorial, realizará los siguientes pasos:

  • Creación de una factoría de datos.
  • Creación e implementación de servicios vinculados
  • Creación e implementación de una canalización
  • Inicio de la ejecución de una canalización.
  • Supervisión de la ejecución de la canalización

Si no tiene una suscripción de Azure, cree una cuenta free antes de comenzar.

Requisitos previos

Nota:

Se recomienda usar el módulo Az de PowerShell de Azure para interactuar con Azure. Para empezar, consulte Install Azure PowerShell. Para obtener información sobre cómo migrar al módulo Az PowerShell, consulte Migrate Azure PowerShell de AzureRM a Az.

  • cuenta de almacenamiento de Azure. Cree un script de Python y un archivo de entrada y cárguelos en el almacenamiento de Azure. La salida del programa Spark se almacena en esta cuenta de almacenamiento. El clúster de Spark a petición usa la misma cuenta de almacenamiento que el almacenamiento principal.
  • Azure PowerShell. Siga las instrucciones de Cómo instalar y configurar Azure PowerShell.

Carga del script de Python en la cuenta de Blob Storage

  1. Cree un archivo Python denominado WordCount_Spark.py con el siguiente contenido:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. Reemplace <storageAccountName> por el nombre de la cuenta de Azure Storage. A continuación, guarde el archivo.

  3. En el Azure Blob Storage, cree un contenedor denominado adftutorial si no existe.

  4. Cree una carpeta llamada spark.

  5. Cree una subcarpeta denominada script en la carpeta spark.

  6. Cargue el archivo WordCount_Spark.py a la subcarpeta script.

Carga del archivo de entrada

  1. Cree un archivo denominado minecraftstory.txt con algo de texto. El programa Spark contará el número de palabras de este texto.
  2. Cree una subcarpeta denominada inputfiles en la carpeta spark.
  3. Cargue minecraftstory.txt a la subcarpeta inputfiles.

Servicios vinculados al autor

En esta sección, deberá crear dos servicios vinculados:

  • Un servicio vinculado Azure Storage que vincula una cuenta de Azure Storage a la factoría de datos. Este almacenamiento lo usa el clúster HDInsight a petición. También contiene el script de Spark que se ejecutará.
  • Un servicio vinculado de HDInsight a petición. Azure Data Factory crea automáticamente un clúster de HDInsight, ejecuta el programa Spark y, a continuación, elimina el clúster de HDInsight después de que esté inactivo durante un tiempo preconfigurado.

servicio vinculado de Azure Storage

Cree un archivo JSON con el editor preferido, copie la siguiente definición JSON de un servicio vinculado de Azure Storage y guarde el archivo como MyStorageLinkedService.json.

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
      }
    }
}

Actualice el <storageAccountName> y <storageAccountKey> con el nombre y la clave de la cuenta de Azure Storage.

Servicio vinculado de HDInsight a petición

Cree un archivo JSON con el editor preferido, copie la siguiente definición JSON de un servicio vinculado de Azure HDInsight y guarde el archivo como MyOnDemandSparkLinkedService.json.

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

Actualice los valores de las siguientes propiedades en la definición de servicio vinculado:

  • hostSubscriptionId. Reemplace <subscriptionID> por el identificador de la suscripción de Azure. El clúster de HDInsight a petición se crea en esta suscripción de Azure.
  • inquilino. Reemplace <tenantID> por el identificador del inquilino de Azure.
  • servicePrincipalId, servicePrincipalKey. Reemplace <servicePrincipalID> y <servicePrincipalKey> por el id. y la clave de la entidad de servicio en Microsoft Entra ID. Esta entidad de servicio debe ser miembro del rol de colaborador de la suscripción o del grupo de recursos en el que se crea el clúster. Para obtener más información, consulte Creación de la aplicación y la entidad de servicio de Microsoft Entra. El Id. de entidad de servicio es equivalente al Id. de aplicación y una Clave de entidad de servicio es equivalente al valor de un Secreto de cliente.
  • clusterResourceGroup. Reemplace <resourceGroupOfHDICluster> por el nombre del grupo de recursos en el que se debe crear el clúster de HDInsight.

Nota:

Azure HDInsight tiene limitación en el número total de núcleos que puede usar en cada región Azure que admite. Para el servicio vinculado de HDInsight a petición, el clúster de HDInsight se creará en la misma ubicación del Azure Storage usado como almacenamiento principal. Asegúrese de que dispone de suficientes cuotas de núcleos para que el clúster pueda crearse correctamente. Para obtener más información, consulte Configuración de clústeres en HDInsight con Hadoop, Spark, Kafka, etc.

Definición de una canalización

En este paso, tú creas una nueva canalización con una actividad de Spark. La actividad usa el ejemplo de recuento de palabras. Descargue el contenido desde esta ubicación si aún no lo ha hecho.

Cree un archivo JSON en el editor que prefiera, copie la siguiente definición de JSON de una definición de canalización y guárdela como MySparkOnDemandPipeline.json.

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

Tenga en cuenta los siguientes puntos:

  • rootPathz apunta a la carpeta de Spark del contenedor adftutorial.
  • entryFilePath apunta al archivo WordCount_Spark.py de la subcarpeta de script de la carpeta de Spark.

Crear una factoría de datos

Ha creado definiciones de servicios vinculados y de canalización en archivos JSON. Ahora, vamos a crear una factoría de datos y a implementar el servicio vinculado y los archivos JSON de canalización mediante cmdlets de PowerShell. Ejecute los comandos de PowerShell siguientes uno por uno:

  1. Establezca las variables una a una.

    Nombre del grupo de recursos

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    Nombre de factoría de datos. Debe ser único globalmente.

    $dataFactoryName = "MyDataFactory09102017"
    

    Nombre de la canalización

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. Inicie PowerShell. Mantenga Azure PowerShell abierto hasta el final de este inicio rápido. Si lo cierra y vuelve a abrirlo, deberá ejecutar los comandos de nuevo. Para obtener una lista de las regiones de Azure en las que Data Factory está disponible actualmente, seleccione las regiones que le interesan en la página siguiente y, a continuación, expanda Analytics para buscar Data Factory: Productos disponibles por región. Los almacenes de datos (Azure Storage, Azure SQL Database, etc.) y los procesos (HDInsight, etc.) usados por la factoría de datos pueden estar en otras regiones.

    Ejecute el comando siguiente y escriba el nombre de usuario y la contraseña que use para iniciar sesión en el portal de Azure:

    Connect-AzAccount
    

    Ejecute el siguiente comando para ver todas las suscripciones de esta cuenta:

    Get-AzSubscription
    

    Ejecute el comando siguiente para seleccionar la suscripción con la que desea trabajar. Reemplace SubscriptionId por el identificador de la suscripción de Azure:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. Creación del grupo de recursos: ADFTutorialResourceGroup.

    New-AzResourceGroup -Name $resourceGroupName -Location "East Us" 
    
  4. Cree la factoría de datos.

     $df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Ejecute el comando siguiente para ver la salida:

    $df
    
  5. Cambie a la carpeta donde creó archivos JSON y ejecute el siguiente comando para implementar un servicio vinculado de Azure Storage:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. Ejecute el siguiente comando para implementar un servicio vinculado de Spark a petición:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. Ejecute el siguiente comando para implementar una canalización:

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

Inicio y supervisión de una ejecución de la canalización

  1. Inicio de la ejecución de una canalización. También se captura el id. de ejecución de la canalización para poder realizar una supervisión en un futuro.

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. Ejecute el script siguiente para comprobar continuamente el estado de ejecución de la canalización hasta que termine.

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. Este es el resultado de la ejecución de ejemplo:

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. Confirme que se haya creado una carpeta denominada outputfiles en la carpeta spark del contenedor adftutorial con la salida del programa de Spark.

La tubería de este ejemplo copia datos de una ubicación a otra en Azure Blob Storage. Ha aprendido a:

  • Creación de una factoría de datos.
  • Creación e implementación de servicios vinculados
  • Creación e implementación de una canalización
  • Inicio de la ejecución de una canalización.
  • Supervisión de la ejecución de la canalización

Pase al siguiente tutorial para aprender a transformar datos mediante la ejecución del script de Hive en un clúster de Azure HDInsight que se encuentra en una red virtual.