Creación de una canalización de datos de un extremo a otro en Databricks

En este artículo se muestra cómo crear e implementar una canalización de procesamiento de datos de un extremo a otro, incluido cómo ingerir datos sin procesar, transformar los datos y ejecutar análisis en los datos procesados.

Nota:

Aunque en este artículo se muestra cómo crear una canalización de datos completa mediante cuadernos de Databricks y un trabajo de Azure Databricks para orquestar un flujo de trabajo, Databricks recomienda usar Delta Live Tables, una interfaz declarativa para crear canalizaciones de procesamiento de datos confiables, fáciles de mantener y probar.

¿Qué es una canalización de datos?

Una canalización de datos implementa los pasos necesarios para mover datos de sistemas de origen, transformar esos datos en función de los requisitos y almacenar los datos en un sistema de destino. Una canalización de datos incluye todos los procesos necesarios para convertir los datos sin procesar en datos preparados que los usuarios pueden consumir. Por ejemplo, una canalización de datos podría preparar los datos para que los analistas de datos y los científicos de datos puedan extraer valor de los datos mediante análisis e informes.

Un flujo de trabajo de extracción, transformación y carga (ETL) es un ejemplo común de una canalización de datos. En el procesamiento de ETL, los datos se ingieren desde sistemas de origen y se escriben en un área de almacenamiento provisional, se transforman en función de los requisitos (garantizando la calidad de los datos, desduplicando registros, etc.) y, a continuación, se escriben en un sistema de destino, como un almacenamiento de datos o un lago de datos.

Pasos de la canalización de datos

Para ayudarle a empezar a crear canalizaciones de datos en Azure Databricks, el ejemplo incluido en este artículo le guiará por la creación de un flujo de trabajo de procesamiento de datos:

  • Use las características de Azure Databricks para explorar un conjunto de datos sin procesar.
  • Cree un cuaderno de Databricks para ingerir datos de origen sin procesar y escribir los datos sin procesar en una tabla de destino.
  • Cree un cuaderno de Databricks para transformar los datos de origen sin procesar y escribir los datos transformados en una tabla de destino.
  • Cree un cuaderno de Databricks para consultar los datos transformados.
  • Automatice la canalización de datos con un trabajo de Azure Databricks.

Requisitos

Ejemplo: Conjunto de datos Million Song

El conjunto de datos usado en este ejemplo es un subconjunto del conjunto de datos Million Song, una colección de características y metadatos para pistas de música contemporáneas. Este conjunto de datos está disponible en los conjuntos de datos de ejemplo incluidos en el área de trabajo de Azure Databricks.

Paso 1: creación de un clúster

Para realizar el procesamiento y el análisis de datos en este ejemplo, cree un clúster para proporcionar los recursos de proceso necesarios para ejecutar comandos.

Nota:

Dado que en este ejemplo se usa un conjunto de datos de ejemplo almacenado en DBFS y se recomienda la conservación de tablas en el catálogo de Unity, creará un clúster configurado con el modo de acceso de usuario único. El modo de acceso de usuario único proporciona acceso completo a DBFS, al tiempo que también permite el acceso al catálogo de Unity. Consulte Procedimientos recomendados para DBFS y el catálogo de Unity.

  1. Haga clic en Proceso en la barra lateral.
  2. En la página de proceso, haga clic en Crear clúster.
  3. En la página Nuevo clúster, escriba un nombre único para el clúster.
  4. En Modo de acceso, seleccione Usuario único.
  5. En Acceso de usuario único o entidad de servicio, seleccione el nombre de usuario.
  6. Deje los valores restantes en su estado predeterminado y haga clic en Crear clúster.

Para obtener más información sobre los clústeres de Databricks, consulte Compute.

Paso 2: Explorar los datos de origen

Para aprender a usar la interfaz de Azure Databricks para explorar los datos de origen sin procesar, consulte Exploración de los datos de origen de una canalización de datos. Si desea ir directamente a la ingesta y preparación de los datos, continúe con el Paso 3: Ingesta de los datos sin procesar.

Paso 3: Ingesta de los datos sin procesar

En este paso, cargará los datos sin procesar en una tabla para que estén disponibles para su posterior procesamiento. Para administrar recursos de datos en la plataforma de Databricks, como las tablas, Databricks recomienda el catálogo de Unity. Sin embargo, si no tiene permisos para crear el catálogo y el esquema necesarios para publicar tablas en el catálogo de Unity, todavía puede completar los pasos siguientes publicando tablas en el metastore de Hive.

Para ingerir datos, Databricks recomienda usar el Cargador automático. Auto Loader detecta y procesa automáticamente los nuevos archivos a medida que llegan al almacenamiento de objetos en la nube.

Puede configurar Auto Loader para detectar automáticamente el esquema de los datos cargados, lo que le permite inicializar tablas sin declarar explícitamente el esquema de datos y evolucionar el esquema de tabla a medida que se introducen nuevas columnas. Esto elimina la necesidad de realizar un seguimiento manual y aplicar los cambios de esquema a lo largo del tiempo. Databricks recomienda la inferencia de esquema al usar el autocargador. Sin embargo, como se ve en el paso de exploración de datos, los datos de canciones no contienen información de encabezado. Dado que el encabezado no se almacena con los datos, deberá definir explícitamente el esquema, como se muestra en el ejemplo siguiente.

  1. En la barra lateral, haga clic en New IconNuevo y seleccione Cuaderno en el menú. Aparece el cuadro de diálogo Create Notebook (Crear cuaderno).

  2. Escriba un nombre para el cuaderno, por ejemplo Ingest songs data. De forma predeterminada:

    • Python es el lenguaje seleccionado.
    • El cuaderno está asociado al último clúster que usó. En este caso, el clúster que creó en el Paso 1: Crear un clúster.
  3. Escriba lo siguiente en la primera celda del cuaderno:

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    Si usa el catálogo de Unity, reemplace <table-name> por un catálogo, un esquema y un nombre de tabla para contener los registros ingeridos (por ejemplo, data_pipelines.songs_data.raw_song_data). En caso contrario, reemplace <table-name> por el nombre de una tabla que contenga los registros ingeridos, por ejemplo, raw_song_data.

    Reemplace <checkpoint-path> por una ruta de acceso a un directorio de DBFS para mantener los archivos de punto de comprobación, por ejemplo, /tmp/pipeline_get_started/_checkpoint/song_data.

  4. Haga clic en Run Menu y seleccione Ejecutar celda. En este ejemplo se define el esquema de datos mediante la información de README, se ingieren los datos de canciones de todos los archivos contenidos en file_path y se escriben los datos en la tabla especificada por table_name.

Paso 4: Preparar los datos sin procesar

Para preparar los datos sin procesar para el análisis, los siguientes pasos transformarán los datos de canciones sin procesar filtrando las columnas innecesarias y agregando un nuevo campo que contenga una marca de tiempo para la creación del nuevo registro.

  1. En la barra lateral, haga clic en New IconNuevo y seleccione Cuaderno en el menú. Aparece el cuadro de diálogo Create Notebook (Crear cuaderno).

  2. Proporcionar un nombre para el cuaderno. Por ejemplo, Prepare songs data. Cambie el idioma predeterminado a SQL.

  3. Escriba lo siguiente en la primera celda del cuaderno:

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    Si usa el catálogo de Unity, reemplace <table-name> por un catálogo, un esquema y un nombre de tabla que contengan los registros filtrados y transformados (por ejemplo, data_pipelines.songs_data.prepared_song_data). En caso contrario, reemplace <table-name> por el nombre de una tabla que contenga los registros filtrados y transformados, (por ejemplo, prepared_song_data).

    Reemplace <raw-songs-table-name> por el nombre de la tabla que contenga los registros de canciones sin procesar ingeridos en el paso anterior.

  4. Haga clic en Run Menu y seleccione Ejecutar celda.

Paso 5: Consultar los datos transformados

En este paso, ampliará la canalización de procesamiento mediante la adición de consultas para analizar los datos de canciones. Estas consultas usan los registros preparados creados en el paso anterior.

  1. En la barra lateral, haga clic en New IconNuevo y seleccione Cuaderno en el menú. Aparece el cuadro de diálogo Create Notebook (Crear cuaderno).

  2. Proporcionar un nombre para el cuaderno. Por ejemplo, Analyze songs data. Cambie el idioma predeterminado a SQL.

  3. Escriba lo siguiente en la primera celda del cuaderno:

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    Reemplace <prepared-songs-table-name> por el nombre de la tabla que contiene los datos preparados. Por ejemplo, data_pipelines.songs_data.prepared_song_data.

  4. Haga clic en el Down Caret en el menú de acciones de celda, seleccione Agregar celda a continuación y escriba lo siguiente en la nueva celda:

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    Reemplace <prepared-songs-table-name> por el nombre de la tabla preparada creada en el paso anterior. Por ejemplo, data_pipelines.songs_data.prepared_song_data.

  5. Para ejecutar las consultas y ver la salida, haga clic en Ejecutar todo.

Paso 6: Crear un trabajo de Azure Databricks para ejecutar la canalización

Puede crear un flujo de trabajo para automatizar la ejecución de los pasos de ingesta de datos, procesamiento y análisis mediante un trabajo de Azure Databricks.

  1. En el área de trabajo Ciencia de datos e Ingeniería, realice una de las siguientes acciones:
    • Haga clic en Jobs IconJobs en la barra lateral y haga clic en Create Job Button.
    • En la barra lateral, haga clic en New IconNuevo y seleccione Trabajo.
  2. En el cuadro de diálogo de tarea en la pestaña Tareas, reemplace Agregar un nombre para el trabajo... por el nombre del trabajo que quiera usar. Por ejemplo, "Flujo de trabajo de canciones".
  3. En el campo Nombre de tarea, escriba el nombre de la primera tarea, por ejemplo, Ingest_songs_data.
  4. En el campo Tipo, seleccione el tipo de tarea Cuaderno.
  5. En Origen, seleccione Área de trabajo.
  6. Use el explorador de archivos para buscar el cuaderno de ingesta de datos, haga clic en el nombre del cuaderno y luego en Confirmar.
  7. En Clúster, seleccione Shared_job_cluster o el clúster que creó en el paso Create a cluster.
  8. Haga clic en Crear.
  9. Haga clic en el Add Task Button debajo de la tarea que acaba de crear y seleccione Notebook.
  10. En el campo Nombre de tarea, escriba el nombre que quiera darle a la tarea, por ejemplo, Prepare_songs_data.
  11. En el campo Tipo, seleccione el tipo de tarea Cuaderno.
  12. En Origen, seleccione Área de trabajo.
  13. Use el explorador de archivos para buscar el cuaderno de preparación de datos, haga clic en el nombre del cuaderno y luego en Confirmar.
  14. En Clúster, seleccione Shared_job_cluster o el clúster que creó en el paso Create a cluster.
  15. Haga clic en Crear.
  16. Haga clic en el Add Task Button debajo de la tarea que acaba de crear y seleccione Notebook.
  17. En el campo Nombre de tarea, escriba el nombre que quiera darle a la tarea, por ejemplo, Analyze_songs_data.
  18. En el campo Tipo, seleccione el tipo de tarea Cuaderno.
  19. En Origen, seleccione Área de trabajo.
  20. Use el explorador de archivos para buscar el cuaderno de análisis de datos, haga clic en el nombre del cuaderno y luego en Confirmar.
  21. En Clúster, seleccione Shared_job_cluster o el clúster que creó en el paso Create a cluster.
  22. Haga clic en Crear.
  23. Para ejecutar el flujo de trabajo, haga clic en Run Now Button. Para ver detalles de la ejecución, haga clic en el vínculo de la columna Hora de inicio de la ejecución en la vista Ejecuciones de trabajo. Haga clic en cada tarea para ver los detalles de la ejecución de la tarea.
  24. Para ver los resultados cuando finalice el flujo de trabajo, haga clic en la tarea de análisis de datos final. Aparece la página Salida y muestra los resultados de la consulta.

Paso 7: Programar el trabajo de canalización de datos

Nota:

Para demostrar el uso de un trabajo de Azure Databricks para orquestar un flujo de trabajo programado, en este ejemplo de introducción se separan los pasos de ingesta, preparación y análisis en cuadernos independientes y, a continuación, se usa cada cuaderno para crear una tarea en el trabajo. Si todo el procesamiento está contenido en un solo cuaderno, puede programar fácilmente el cuaderno directamente desde la interfaz de usuario del cuaderno de Azure Databricks. Consulte Creación y administración de trabajos de cuaderno programados.

Un requisito común es ejecutar una canalización de datos de forma programada. Para definir una programación para el trabajo que ejecuta la canalización:

  1. Haga clic en Jobs IconTrabajos en la barra lateral.
  2. En la columna Name (Nombre), haga clic en el nombre de trabajo. En el panel lateral se muestran los detalles del trabajo.
  3. Haga clic en Agregar desencadenador en el panel Detalles del trabajo y seleccione Programado en Tipo de desencadenador.
  4. Especifique el período, la hora de inicio y la zona horaria. Opcionalmente, active la casilla Show Cron Syntax (Mostrar sintaxis de cron) para mostrar y editar la programación en Quartz Cron Syntax (Sintaxis Quartz de cron).
  5. Haga clic en Save(Guardar).

Más información