Tutorial: Creación de la primera canalización mediante el Editor de canalizaciones de Lakeflow

Aprenda a crear una nueva canalización utilizando Spark Declarative Pipelines (SDP) de Lakeflow para la orquestación de datos y Auto Loader. En este tutorial se amplía la tubería de ejemplo mediante la limpieza de datos y la creación de una consulta para encontrar los principales 100 usuarios.

En este tutorial, aprenderá a usar el Editor de canalizaciones de Lakeflow para:

  • Cree una nueva canalización con la estructura de carpetas predeterminada y comience con un conjunto de archivos de ejemplo.
  • Defina restricciones de calidad de datos mediante expectativas.
  • Utiliza las características del editor para ampliar el flujo de trabajo con una nueva transformación para realizar análisis de tus datos.

Requisitos

Antes de comenzar este tutorial, debe:

  • Inicie sesión en un área de trabajo de Azure Databricks.
  • Tener habilitado el catálogo de Unity para el área de trabajo.
  • Tener permiso para crear un recurso de computación o acceso a un recurso de computación.
  • Tener permisos para crear un nuevo esquema en un catálogo. Los permisos necesarios son ALL PRIVILEGES o USE CATALOG y CREATE SCHEMA.

Paso 1: crear una canalización

En este paso, creará una canalización mediante la estructura de carpetas predeterminada y los ejemplos de código. Los ejemplos de código hacen referencia a la users tabla en el wanderbricks origen de datos de ejemplo.

  1. En el área de trabajo de Azure Databricks, haga clic en Icono de sumaNuevo, a continuación, Icono de pipeline.ETL. Se abrirá el editor de canalizaciones con un nombre de canalización predeterminado, como New Pipeline <date> <time>.

  2. (Opcional) Seleccione el nombre y escriba un nombre descriptivo para la canalización.

  3. (Opcional) A la derecha del nombre, haga clic en el catálogo y el esquema para establecer valores predeterminados diferentes.

  4. (Opcional) En el archivo de origen my_transformation creado automáticamente, seleccione Python o SQL en la lista desplegable idioma para establecer el idioma del archivo.

  5. Haga clic en el icono de código.Use el código de ejemplo.

    El código de ejemplo del idioma seleccionado aparece en el my_transformation archivo de origen de la transformations carpeta . Los conjuntos de datos de salida aún no se han creado, y el gráfico de la canalización a la derecha de la pantalla está vacío.

  6. Para ejecutar el código de canalización (el código de la transformations carpeta), haga clic en Ejecutar canalización en la parte superior derecha de la pantalla.

    Una vez completada la ejecución, la parte inferior del área de trabajo muestra las dos tablas nuevas que se crearon sample_users_<date_time> y sample_aggregation_<date_time>. El gráfico de canalización en el lado derecho del área de trabajo ahora muestra las dos tablas, incluido que sample_users es la fuente de sample_aggregation. Anote el nombre completo sample_users_<date_time> de la tabla; haga referencia a él en el paso siguiente.

Paso 2: Aplicar comprobaciones de calidad de datos

En este paso, agregará una comprobación de calidad de datos a la sample_users tabla. Las expectativas de canalización se usan para restringir los datos. En este caso, eliminará los registros de usuario que no tengan una dirección de correo electrónico válida y generará la tabla limpiada como users_cleaned.

  1. En el navegador de recursos de canalización de la izquierda, haga clic en el icono Más, y seleccione Transformación.

  2. En el cuadro de diálogo Crear nuevo archivo de transformación , realice las siguientes selecciones:

    • Elija Python o SQL para el Language. Esto no tiene que coincidir con la selección anterior.
    • Asigne un nombre al archivo. En este caso, elija users_cleaned.
    • En Ruta de acceso de destino, deje el valor predeterminado.
    • En Tipo de conjunto de datos, déjelo como Ninguno seleccionado o elija Vista materializada. Si selecciona Vista materializada, genera código de ejemplo automáticamente.
  3. Haga clic en Crear para crear el archivo de código de transformación.

  4. En el nuevo archivo de código, edite el código para que coincida con lo siguiente (use SQL o Python, en función de la selección en la pantalla anterior). Reemplace sample_users_<date_time> por el nombre completo de su tabla sample_users de la sección anterior.

    SQL

    -- Drop all rows that do not have an email address
    
    CREATE MATERIALIZED VIEW users_cleaned
    (
      CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW
    ) AS
    SELECT *
    FROM sample_users_<date_time>;
    

    Python

    from pyspark import pipelines as dp
    
    # Drop all rows that do not have an email address
    
    @dp.materialized_view
    @dp.expect_or_drop("no null emails", "email IS NOT NULL")
    def users_cleaned():
        return (
            spark.read.table("sample_users_<date_time>")
        )
    
  5. Haga clic en Ejecutar canalización para actualizar la canalización. Ahora debería tener tres tablas.

Paso 3: Análisis de los usuarios principales

A continuación, obtenga los 100 primeros usuarios por el número de reservas que han creado. Una la wanderbricks.bookings tabla a la users_cleaned vista materializada.

  1. En el explorador de recursos de canalización de la izquierda, haga clic en El icono Más y seleccione Transformación.

  2. En el cuadro de diálogo Crear nuevo archivo de transformación , realice las siguientes selecciones:

    • Elija Python o SQL para el Language. Esto no tiene que coincidir con las selecciones anteriores.
    • Asigne un nombre al archivo. En este caso, elija users_and_bookings.
    • En Ruta de acceso de destino, deje el valor predeterminado.
    • En Tipo de conjunto de datos, déjelo como Ninguno seleccionado.
  3. Haga clic en Crear para crear el archivo de código de transformación.

  4. En el nuevo archivo de código, edite el código para que coincida con lo siguiente (use SQL o Python, en función de la selección en la pantalla anterior).

    SQL

    -- Get the top 100 users by number of bookings
    
    CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS
    SELECT u.name AS name, COUNT(b.booking_id) AS booking_count
    FROM users_cleaned u
    JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id
    GROUP BY u.name
    ORDER BY booking_count DESC
    LIMIT 100;
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import col, count, desc
    
    # Get the top 100 users by number of bookings
    
    @dp.materialized_view
    def users_and_bookings():
        return (
            spark.read.table("users_cleaned")
            .join(spark.read.table("samples.wanderbricks.bookings"), "user_id")
            .groupBy(col("name"))
            .agg(count("booking_id").alias("booking_count"))
            .orderBy(desc("booking_count"))
            .limit(100)
        )
    
  5. Haga clic en Ejecutar canalización para actualizar los conjuntos de datos. Una vez completada la ejecución, puede ver en el gráfico de canalización que hay cuatro tablas, incluida la nueva users_and_bookings .

    Gráfico de canalización que muestra cuatro tablas en la canalización

Pasos siguientes

Ahora que ha aprendido a usar algunas de las características del editor de canalizaciones de Lakeflow y ha creado una canalización, estas son otras características para obtener más información sobre: