Compartir a través de


Tutorial: Construcción de una canalización ETL con Lakeflow Declarative Pipelines

Aprenda a crear e implementar una canalización de ETL (extracción, transformación y carga) para la orquestación de datos mediante Canalizaciones Declarativas de Lakeflow y Auto Loader. Una canalización de ETL implementa los pasos para leer datos de sistemas de origen, transformar esos datos en función de los requisitos, como comprobaciones de calidad de datos y desduplicación de registros, y escribir los datos en un sistema de destino, como un almacenamiento de datos o un lago de datos.

En este tutorial, utilizarás los pipelines declarativos de Lakeflow y Auto Loader para:

  • Importar datos de origen brutos en una tabla de destino.
  • Transforme los datos de origen sin procesar y escriba los datos transformados en dos vistas materializadas de destino.
  • Consulte los datos transformados.
  • Automatice la canalización de ETL con un trabajo de Databricks.

Para obtener más información sobre las canalizaciones declarativas de Lakeflow y el cargador automático, consulte Canalizaciones declarativas de Lakeflow y ¿Qué es Auto Loader?

Requisitos

Para completar este tutorial, debe cumplir los siguientes requisitos:

Acerca del conjunto de datos

El conjunto de datos usado en este ejemplo es un subconjunto del conjunto de datos Million Song, una colección de características y metadatos para pistas de música contemporáneas. Este conjunto de datos está disponible en los conjuntos de datos de ejemplo incluidos en el área de trabajo de Azure Databricks.

Paso 1: Creación de una canalización

Primero, creará una canalización ETL en Lakeflow Declarative Pipelines. Las canalizaciones declarativas de Lakeflow crean canalizaciones mediante la resolución de dependencias definidas en cuadernos o archivos ( denominados código fuente) mediante la sintaxis de canalizaciones declarativas de Lakeflow. Cada archivo de código fuente solo puede contener un idioma, pero se pueden agregar varios cuadernos o archivos específicos de idioma al pipeline. Para obtener más información, consulte Canalizaciones declarativas de Lakeflow.

Importante

Deje el campo Código fuente en blanco para crear y configurar un cuaderno para la creación automática de código fuente.

En este tutorial se utiliza la computación sin servidor y el catálogo de Unity. Para todas las opciones de configuración que no se especifican, use la configuración predeterminada. Si el proceso sin servidor no está habilitado o se admite en el área de trabajo, puede completar el tutorial como escrito mediante la configuración de proceso predeterminada. Si usa la configuración de proceso predeterminada, debe seleccionar manualmente Unity Catalog en Opciones de almacenamiento en la sección Destino de la interfaz de usuario de Crear Canalización.

Para crear una nueva canalización de ETL en Lakeflow declarative pipelines, siga estos pasos:

  1. En el área de trabajo, haga clic en el icono Flujos de trabajo.Trabajos y canalizaciones en la barra lateral.
  2. En Nuevo, haga clic en Canalización ETL.
  3. En Nombre de canalización, escriba un nombre de canalización único.
  4. Active la casilla Sin servidor .
  5. En Destino, para configurar una ubicación del catálogo de Unity donde se publican las tablas, seleccione un catálogo existente y escriba un nombre nuevo en Esquema para crear un nuevo esquema en el catálogo.
  6. Haga clic en Crear.

Aparece la interfaz de usuario de canalización para la nueva canalización.

Paso 2: Desarrollo de una canalización

Importante

Los cuadernos solo pueden contener un solo lenguaje de programación. No mezcle código de Python y SQL en cuadernos de código fuente de canalización.

En este paso, usará Cuadernos de Databricks para desarrollar y validar el código fuente de las canalizaciones declarativas de Lakeflow de forma interactiva.

El código usa Auto Loader para la ingesta incremental de datos. Auto Loader detecta y procesa automáticamente los nuevos archivos a medida que llegan al almacenamiento de objetos en la nube. Para más información, consulte ¿Qué es autocargador?

Se crea y configura automáticamente un cuaderno de código fuente en blanco para la canalización. El cuaderno se crea en un nuevo directorio del directorio de usuario. El nombre del nuevo directorio y archivo coinciden con el nombre de la canalización. Por ejemplo, /Users/someone@example.com/my_pipeline/my_pipeline.

Al desarrollar un pipeline, puede elegir entre Python o SQL. Se incluyen ejemplos para ambos idiomas. En función de su elección de idioma, compruebe que selecciona el idioma predeterminado del cuaderno. Para obtener más información sobre el soporte de notebooks para el desarrollo de código de canalizaciones declarativas de Lakeflow, consulte Desarrolle y depure canalizaciones ETL con un notebook en Canalizaciones declarativas de Lakeflow.

  1. Un vínculo para acceder a este cuaderno se encuentra en el campo Código fuente del panel Detalles de canalización . Haga clic en el vínculo para abrir el cuaderno antes de continuar con el paso siguiente.

  2. Haga clic en Conectar en la esquina superior derecha para abrir el menú de configuración de proceso.

  3. Mantenga el puntero sobre el nombre de la canalización que creó en el paso 1.

  4. Haga clic en Conectar.

  5. Junto al título del cuaderno en la parte superior, seleccione el lenguaje predeterminado del cuaderno (Python o SQL).

  6. Copie y pegue el código siguiente en una celda del cuaderno.

    Pitón

    # Import modules
    import dlt
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dlt.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .option("inferSchema", True)
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dlt.table(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dlt.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dlt.expect("valid_title", "song_title IS NOT NULL")
    @dlt.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dlt.table(
      comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    (
     artist_id STRING,
     artist_lat DOUBLE,
     artist_long DOUBLE,
     artist_location STRING,
     artist_name STRING,
     duration DOUBLE,
     end_of_fade_in DOUBLE,
     key INT,
     key_confidence DOUBLE,
     loudness DOUBLE,
     release STRING,
     song_hotnes DOUBLE,
     song_id STRING,
     start_of_fade_out DOUBLE,
     tempo DOUBLE,
     time_signature INT,
     time_signature_confidence DOUBLE,
     title STRING,
     year INT,
     partial_sequence STRING,
     value STRING
    )
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
    '/databricks-datasets/songs/data-001/');
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
     artist_name,
     year,
     COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC
    
  7. Haga clic en Iniciar para iniciar una actualización de la canalización conectada.

Paso 3: Consultar los datos transformados

En este paso, consultará los datos procesados en la canalización de ETL para analizar los datos de la canción. Estas consultas usan los registros preparados creados en el paso anterior.

En primer lugar, ejecute una consulta que encuentre a los artistas que han lanzado la mayoría de las canciones cada año desde 1990.

  1. En la barra lateral, haga clic en SQL Editor IconEditor de SQL.

  2. Haga clic en el icono de añadir o de suma en la pestaña nueva y seleccione Crear nueva consulta en el menú.

  3. Escribe lo siguiente:

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
    FROM <catalog>.<schema>.top_artists_by_year
    WHERE year >= 1990
    ORDER BY total_number_of_songs DESC, year DESC
    

    Reemplace <catalog> y <schema> por el nombre del catálogo y el esquema en el que se encuentra la tabla. Por ejemplo, data_pipelines.songs_data.top_artists_by_year.

  4. Haga clic en Ejecutar seleccionado.

Ahora, ejecute otra consulta que encuentre canciones con un ritmo de 4/4 y un tempo bailable.

  1. Haga clic en el nuevo icono de Icono Agregar o más pulsación y seleccione Crear nueva consulta en el menú.

  2. Escriba el código siguiente:

     -- Find songs with a 4/4 beat and danceable tempo
     SELECT artist_name, song_title, tempo
     FROM <catalog>.<schema>.songs_prepared
     WHERE time_signature = 4 AND tempo between 100 and 140;
    

    Reemplace <catalog> y <schema> por el nombre del catálogo y el esquema en el que se encuentra la tabla. Por ejemplo, data_pipelines.songs_data.songs_prepared.

  3. Haga clic en Ejecutar seleccionado.

Paso 4: Creación de un trabajo para ejecutar la canalización

A continuación, cree un flujo de trabajo para automatizar los pasos de ingesta, procesamiento y análisis de datos mediante un trabajo de Databricks.

  1. En el área de trabajo, haga clic en el icono Flujos de trabajo.Trabajos y canalizaciones en la barra lateral.
  2. En Nuevo, haga clic en Trabajo.
  3. En el cuadro de título de la tarea, reemplace Nueva fecha y hora< del trabajo > por el nombre del trabajo. Por ejemplo, Songs workflow.
  4. En Nombre de tarea, escriba un nombre para la primera tarea, por ejemplo, ETL_songs_data.
  5. En Tipo, seleccione Canalización.
  6. En Canalización, seleccione la canalización que creó en el paso 1.
  7. Haga clic en Crear.
  8. Para ejecutar el flujo de trabajo, haga clic en Ejecutar ahora. Para ver los detalles de la ejecución, haga clic en la pestaña Ejecuciones . Haga clic en la tarea para ver los detalles de la ejecución de la tarea.
  9. Para ver los resultados cuando se completa el flujo de trabajo, haga clic en Ir a la última ejecución correcta o la hora de inicio de la ejecución del trabajo. Aparece la página Salida y muestra los resultados de la consulta.

Consulte Supervisión y observabilidad de trabajos de Lakeflow para más información sobre las ejecuciones de tareas.

Paso 5: Programar el trabajo de canalización

Para ejecutar la canalización de ETL según una programación, siga estos pasos:

  1. Vaya a la interfaz de usuario Trabajos y Pipelines en la misma área de trabajo de Azure Databricks donde está el trabajo.
  2. Opcionalmente, seleccione los filtros Trabajos y Propiedad de mí .
  3. En la columna Nombre , haga clic en el nombre del trabajo. En el panel lateral se muestran los detalles del trabajo.
  4. Haga clic en Agregar desencadenador en el panel Programaciones y desencadenadores y seleccione Programado en Tipo de desencadenador.
  5. Especifique el período, la hora de inicio y la zona horaria.
  6. Haga clic en Guardar.

Más información