Compartir a través de


Tutorial: Creación de la primera canalización mediante el Editor de canalizaciones de Lakeflow

Aprenda a crear una nueva canalización utilizando Spark Declarative Pipelines (SDP) de Lakeflow para la orquestación de datos y Auto Loader. En este tutorial se amplía la tubería de ejemplo mediante la limpieza de datos y la creación de una consulta para encontrar los principales 100 usuarios.

En este tutorial, aprenderá a usar el Editor de canalizaciones de Lakeflow para:

  • Cree una nueva canalización con la estructura de carpetas predeterminada y comience con un conjunto de archivos de ejemplo.
  • Defina restricciones de calidad de datos mediante expectativas.
  • Utiliza las características del editor para ampliar el flujo de trabajo con una nueva transformación para realizar análisis de tus datos.

Requisitos

Antes de comenzar este tutorial, debe:

  • Inicie sesión en un área de trabajo de Azure Databricks.
  • Tener habilitado el catálogo de Unity para el área de trabajo.
  • Asegúrese de que el editor de flujos de trabajo de Lakeflow esté habilitado para su área de trabajo y de haber optado por su activación. Consulte Habilitación del Editor de canalizaciones de Lakeflow y supervisión actualizada.
  • Tener permiso para crear un recurso de computación o acceso a un recurso de computación.
  • Tener permisos para crear un nuevo esquema en un catálogo. Los permisos necesarios son ALL PRIVILEGES o USE CATALOG y CREATE SCHEMA.

Paso 1: crear una canalización

En este paso, creará una canalización mediante la estructura de carpetas predeterminada y los ejemplos de código. Los ejemplos de código hacen referencia a la users tabla en el wanderbricks origen de datos de ejemplo.

  1. En el área de trabajo de Azure Databricks, haga clic en el icono Más.Nuevo, luego icono de canalización.Canalización de ETL. Se abre el editor de canalización, en la página Crear una canalización.

  2. Haga clic en el encabezado para asignar un nombre a la canalización.

  3. Justo debajo del nombre, elija el catálogo y el esquema predeterminados para las tablas de salida. Se usan cuando no se especifica un catálogo y un esquema en las definiciones de canalización.

  4. En Paso siguiente para su canalización, haga clic en uno de los siguientes: icono de esquema.Comience con el código de ejemplo en SQL o icono de esquema.Comience con código de ejemplo en Python, según su preferencia de lenguaje. Esto cambia el idioma predeterminado del código de ejemplo, pero puede agregar código en el otro lenguaje más adelante. Esto crea una estructura de carpetas predeterminada con código de ejemplo para empezar.

  5. Puede ver el código de ejemplo en el explorador de recursos de canalización en el lado izquierdo del área de trabajo. Debajo de transformations hay dos archivos que generan un conjunto de datos de pipeline cada uno. Debajo de explorations hay un cuaderno que tiene código para ayudarle a ver la salida de la tubería. Hacer clic en un archivo le permite ver y editar el código en el editor.

    Los conjuntos de datos de salida aún no se han creado, y el gráfico de la canalización a la derecha de la pantalla está vacío.

  6. Para ejecutar el código de canalización (el código de la transformations carpeta), haga clic en Ejecutar canalización en la parte superior derecha de la pantalla.

    Una vez completada la ejecución, la parte inferior del área de trabajo muestra las dos tablas nuevas que se crearon sample_users_<pipeline-name> y sample_aggregation_<pipeline-name>. También puede ver que el gráfico de canalización en el lado derecho del área de trabajo ahora muestra las dos tablas, incluyendo que sample_users es la fuente de sample_aggregation.

Paso 2: Aplicar comprobaciones de calidad de datos

En este paso, agregará una comprobación de calidad de datos a la sample_users tabla. Las expectativas de canalización se usan para restringir los datos. En este caso, eliminará los registros de usuario que no tengan una dirección de correo electrónico válida y generará la tabla limpiada como users_cleaned.

  1. En el explorador de activos de canalización, haga clic en icono Más y seleccione Transformación.

  2. En el cuadro de diálogo Crear nuevo archivo de transformación , realice las siguientes selecciones:

    • Elija Python o SQL para el lenguaje. Esto no tiene que coincidir con la selección anterior.
    • Asigne un nombre al archivo. En este caso, elija users_cleaned.
    • En Ruta de acceso de destino, deje el valor predeterminado.
    • En Tipo de conjunto de datos, déjelo como Ninguno seleccionado o elija Vista materializada. Si selecciona Vista materializada, genera código de ejemplo automáticamente.
  3. En el nuevo archivo de código, edite el código para que coincida con lo siguiente (use SQL o Python, en función de la selección en la pantalla anterior). Reemplace por <pipeline-name> el nombre completo de la sample_users tabla.

    SQL

    -- Drop all rows that do not have an email address
    
    CREATE MATERIALIZED VIEW users_cleaned
    (
      CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW
    ) AS
    SELECT *
    FROM sample_users_<pipeline-name>;
    

    Pitón

    from pyspark import pipelines as dp
    
    # Drop all rows that do not have an email address
    
    @dp.table
    @dp.expect_or_drop("no null emails", "email IS NOT NULL")
    def users_cleaned():
        return (
            spark.read.table("sample_users_<pipeline_name>")
        )
    
  4. Haga clic en Ejecutar canalización para actualizar la canalización. Ahora debería tener tres tablas.

Paso 3: Análisis de los usuarios principales

A continuación, obtenga los 100 primeros usuarios por el número de reservas que ha creado. Una la wanderbricks.bookings tabla a la users_cleaned vista materializada.

  1. En el explorador de activos de canalización, haga clic en icono Más y seleccione Transformación.

  2. En el cuadro de diálogo Crear nuevo archivo de transformación , realice las siguientes selecciones:

    • Elija Python o SQL para el lenguaje. Esto no tiene que coincidir con las selecciones anteriores.
    • Asigne un nombre al archivo. En este caso, elija users_and_bookings.
    • En Ruta de acceso de destino, deje el valor predeterminado.
    • En Tipo de conjunto de datos, déjelo como Ninguno seleccionado.
  3. En el nuevo archivo de código, edite el código para que coincida con lo siguiente (use SQL o Python, en función de la selección en la pantalla anterior).

    SQL

    -- Get the top 100 users by number of bookings
    
    CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS
    SELECT u.name AS name, COUNT(b.booking_id) AS booking_count
    FROM users_cleaned u
    JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id
    GROUP BY u.name
    ORDER BY booking_count DESC
    LIMIT 100;
    

    Pitón

    from pyspark import pipelines as dp
    from pyspark.sql.functions import col, count, desc
    
    # Get the top 100 users by number of bookings
    
    @dp.table
    def users_and_bookings():
        return (
            spark.read.table("users_cleaned")
            .join(spark.read.table("samples.wanderbricks.bookings"), "user_id")
            .groupBy(col("name"))
            .agg(count("booking_id").alias("booking_count"))
            .orderBy(desc("booking_count"))
            .limit(100)
        )
    
  4. Haga clic en Ejecutar canalización para actualizar los conjuntos de datos. Una vez completada la ejecución, puede ver en el gráfico de canalización que hay cuatro tablas, incluida la nueva users_and_bookings .

    Gráfico de canalización que muestra cuatro tablas en la canalización

Pasos siguientes

Ahora que ha aprendido a usar algunas de las características del editor de canalizaciones de Lakeflow y ha creado una canalización, estas son otras características para obtener más información sobre: