Transformación con Azure Databricks

SE APLICA A: Azure Data Factory Azure Synapse Analytics

En este tutorial, creará una canalización de un extremo a otro que contenga las actividades Validation (Validación), Copy data (Copiar datos) y Notebook (Cuaderno) en Azure Data Factory.

  • Validation: garantiza que el conjunto de datos de origen está listo para el consumo de bajada antes de desencadenar el trabajo de copia y análisis.

  • Copy Data: duplica el conjunto de datos de origen en el almacenamiento receptor, que se monta como DBFS en el cuaderno de Azure Databricks. De esta manera, Spark puede consumirlo directamente.

  • Notebook: desencadena el cuaderno de Databricks que transforma el conjunto de datos. También agrega el conjunto de datos a una carpeta procesada o a Azure Synapse Analytics.

Por motivos de simplicidad, la plantilla de este tutorial no crea un desencadenador programado. Puede agregar uno si es necesario.

Diagram of the pipeline

Prerrequisitos

  • Una cuenta de Azure Blob Storage con un contenedor llamado sinkdata para su uso como receptor.

    Anote el nombre de la cuenta de almacenamiento, el nombre del contenedor y la clave de acceso. Los necesitará más adelante en la plantilla.

  • Un área de trabajo de Azure Databricks.

Importación de un cuaderno para Transformation (Transformación)

Para importar un cuaderno de Transformation al área de trabajo de Databricks:

  1. Inicie sesión en el área de trabajo de Azure Databricks y, a continuación, seleccione Importar. Menu command for importing a workspace La ruta de acceso al área de trabajo puede ser diferente de la que se muestra, pero recuérdela para más adelante.

  2. Seleccione Importar desde: URL. En el cuadro de texto, escriba https://adflabstaging1.blob.core.windows.net/share/Transformations.html.

    Selections for importing a notebook

  3. Ahora vamos a actualizar el cuaderno de Transformation con la información de la conexión de almacenamiento.

    En el cuaderno importado, vaya a command 5 (comando 5), como se muestra en el siguiente fragmento de código.

    • Reemplace <storage name> y <access key> por su propia información de conexión de almacenamiento.
    • Use la cuenta de almacenamiento con el contenedor sinkdata.
    # Supply storageName and accessKey values  
    storageName = "<storage name>"  
    accessKey = "<access key>"  
    
    try:  
      dbutils.fs.mount(  
        source = "wasbs://sinkdata\@"+storageName+".blob.core.windows.net/",  
        mount_point = "/mnt/Data Factorydata",  
        extra_configs = {"fs.azure.account.key."+storageName+".blob.core.windows.net": accessKey})  
    
    except Exception as e:  
      # The error message has a long stack track. This code tries to print just the relevant line indicating what failed.
    
    import re
    result = re.findall(r"\^\s\*Caused by:\s*\S+:\s\*(.*)\$", e.message, flags=re.MULTILINE)
    if result:
      print result[-1] \# Print only the relevant error message
    else:  
      print e \# Otherwise print the whole stack trace.  
    
  4. Genere un token de acceso de Databricks para Data Factory para acceder a Databricks.

    1. En el área de trabajo de Databricks, seleccione el icono de su perfil de usuario en la esquina superior derecha.
    2. Seleccione Configuración de usuario. Menu command for user settings
    3. Seleccione Generar nuevo token en la pestaña Tokens de acceso.
    4. Seleccione Generar.

    &quot;Generate&quot; button

    Guarde el de token de acceso para usarlo más adelante en la creación de un servicio vinculado de Databricks. El token de acceso será similar a dapi32db32cbb4w6eee18b7d87e45exxxxxx.

Uso de esta plantilla

  1. Vaya a la plantilla Transformation with Azure Databricks (Transformación con Azure Databricks) y cree nuevos servicios vinculados para las siguientes conexiones.

    Connections setting

    • Conexión al blob de origen: para acceder a los datos de origen.

      Puede usar el almacenamiento de blobs público, que contiene los archivos de origen para este ejercicio. Tome como referencia la siguiente captura de pantalla para la configuración. Use la siguiente dirección URL de SAS para conectarse al almacenamiento de origen (acceso de solo lectura):

      https://storagewithdata.blob.core.windows.net/data?sv=2018-03-28&si=read%20and%20list&sr=c&sig=PuyyS6%2FKdB2JxcZN0kPlmHSBlD8uIKyzhBWmWzznkBw%3D

      Selections for authentication method and SAS URL

    • Conexión al blob de destino: para almacenar los datos copiados.

      En la ventana Nuevo servicio vinculado, seleccione su blob de almacenamiento receptor.

      Sink storage blob as a new linked service

    • Azure Databricks: para conectarse al clúster de Databricks.

      Cree un servicio vinculado de Databricks con la clave de acceso que ha generado anteriormente. Puede seleccionar un clúster interactivo si tiene uno. En este ejemplo se usa la opción New job cluster (Nuevo clúster de trabajo).

      Selections for connecting to the cluster

  2. Seleccione Usar esta plantilla. Verá una canalización creada.

    Create a pipeline

Introducción y configuración de canalizaciones

En la nueva canalización, la mayoría de las opciones se han configurado automáticamente con los valores predeterminados. Revise las configuraciones de la canalización y realice los cambios necesarios.

  1. En la actividad Validation llamada Availability flag (Marca de disponibilidad), compruebe que el valor del conjunto de datos de origen sea el conjunto SourceAvailabilityDataset que ha creado anteriormente.

    Source dataset value

  2. En la actividad Copy data llamada file-to-blob (de archivo a blob), compruebe las pestañas Source (Origen) y Sink (Receptor). Cambie la configuración si es necesario.

    • Pestaña origen Source tab

    • Pestaña receptor Sink tab

  3. En la actividad Notebook llamada Transformation, revise y actualice las rutas de acceso y la configuración según sea necesario.

    El campo Databricks linked service (Servicio vinculado de Databricks) debería estar rellenado previamente con el valor de un paso anterior, como se muestra a continuación: Populated value for the Databricks linked service

    Para comprobar la configuración de Notebook:

    1. Seleccione la pestaña Configuración. En Notebook path (Ruta de acceso del cuaderno), compruebe que la ruta de acceso predeterminada es correcta. Es posible que necesite examinar y elegir la ruta de acceso correcta del cuaderno.

      Notebook path

    2. Expanda el selector Base Parameters (Parámetros base) y compruebe que los parámetros coinciden con lo que se muestra en la siguiente captura de pantalla. Estos parámetros se pasan al cuaderno de Databricks desde Data Factory.

      Base parameters

  4. Compruebe que los parámetros de canalización coinciden con los que aparecen en la siguiente captura de pantalla:Pipeline parameters

  5. Conéctese a sus conjuntos de datos.

    Nota

    En los conjuntos de archivos de siguientes, la ruta de acceso del archivo se ha especificado automáticamente en la plantilla. Si se requiere algún cambio, asegúrese de especificar la ruta de acceso para el contenedor y el directorio en caso de error de conexión.

    • SourceAvailabilityDataset: para comprobar si los datos de origen están disponibles.

      Selections for linked service and file path for SourceAvailabilityDataset

    • SourceFilesDataset: para acceder a los datos de origen.

      Selections for linked service and file path for SourceFilesDataset

    • DestinationFilesDataset: para copiar los datos en la ubicación de destinos del receptor. Use los valores siguientes:

      • Linked service - (Servicio vinculado): sinkBlob_LS, creado en un paso anterior.

      • File path - (Ruta de acceso del archivo): sinkdata/staged_sink.

        Selections for linked service and file path for DestinationFilesDataset

  6. Seleccione Debug (Depurar) para ejecutar la canalización. Puede consultar el vínculo a los registros de Databricks si quiere obtener registros de Spark más detallados.

    Link to Databricks logs from output

    También puede comprobar el archivo de datos mediante el Explorador de Azure Storage.

    Nota

    Para establecer una correlación con ejecuciones de canalización de Data Factory, en este ejemplo se agrega el identificador de ejecución de la canalización de la factoría de datos a la carpeta de salida. Esto ayuda a hacer un seguimiento de los archivos generados por cada ejecución. Appended pipeline run ID

Pasos siguientes