Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Aprenda a crear una nueva canalización utilizando Spark Declarative Pipelines (SDP) de Lakeflow para la orquestación de datos y Auto Loader. En este tutorial se amplía la tubería de ejemplo mediante la limpieza de datos y la creación de una consulta para encontrar los principales 100 usuarios.
En este tutorial, aprenderá a usar el Editor de canalizaciones de Lakeflow para:
- Cree una nueva canalización con la estructura de carpetas predeterminada y comience con un conjunto de archivos de ejemplo.
- Defina restricciones de calidad de datos mediante expectativas.
- Utiliza las características del editor para ampliar el flujo de trabajo con una nueva transformación para realizar análisis de tus datos.
Requisitos
Antes de comenzar este tutorial, debe:
- Inicie sesión en un área de trabajo de Azure Databricks.
- Tener habilitado el catálogo de Unity para el área de trabajo.
- Asegúrese de que el editor de flujos de trabajo de Lakeflow esté habilitado para su área de trabajo y de haber optado por su activación. Consulte Habilitación del Editor de canalizaciones de Lakeflow y supervisión actualizada.
- Tener permiso para crear un recurso de computación o acceso a un recurso de computación.
- Tener permisos para crear un nuevo esquema en un catálogo. Los permisos necesarios son
ALL PRIVILEGESoUSE CATALOGyCREATE SCHEMA.
Paso 1: crear una canalización
En este paso, creará una canalización mediante la estructura de carpetas predeterminada y los ejemplos de código. Los ejemplos de código hacen referencia a la users tabla en el wanderbricks origen de datos de ejemplo.
En el área de trabajo de Azure Databricks, haga clic en
Nuevo, luego
Canalización de ETL. Se abre el editor de canalización, en la página Crear una canalización.
Haga clic en el encabezado para asignar un nombre a la canalización.
Justo debajo del nombre, elija el catálogo y el esquema predeterminados para las tablas de salida. Se usan cuando no se especifica un catálogo y un esquema en las definiciones de canalización.
En Paso siguiente para su canalización, haga clic en uno de los siguientes:
Comience con el código de ejemplo en SQL o
Comience con código de ejemplo en Python, según su preferencia de lenguaje. Esto cambia el idioma predeterminado del código de ejemplo, pero puede agregar código en el otro lenguaje más adelante. Esto crea una estructura de carpetas predeterminada con código de ejemplo para empezar.
Puede ver el código de ejemplo en el explorador de recursos de canalización en el lado izquierdo del área de trabajo. Debajo de
transformationshay dos archivos que generan un conjunto de datos de pipeline cada uno. Debajo deexplorationshay un cuaderno que tiene código para ayudarle a ver la salida de la tubería. Hacer clic en un archivo le permite ver y editar el código en el editor.Los conjuntos de datos de salida aún no se han creado, y el gráfico de la canalización a la derecha de la pantalla está vacío.
Para ejecutar el código de canalización (el código de la
transformationscarpeta), haga clic en Ejecutar canalización en la parte superior derecha de la pantalla.Una vez completada la ejecución, la parte inferior del área de trabajo muestra las dos tablas nuevas que se crearon
sample_users_<pipeline-name>ysample_aggregation_<pipeline-name>. También puede ver que el gráfico de canalización en el lado derecho del área de trabajo ahora muestra las dos tablas, incluyendo quesample_userses la fuente desample_aggregation.
Paso 2: Aplicar comprobaciones de calidad de datos
En este paso, agregará una comprobación de calidad de datos a la sample_users tabla. Las expectativas de canalización se usan para restringir los datos. En este caso, eliminará los registros de usuario que no tengan una dirección de correo electrónico válida y generará la tabla limpiada como users_cleaned.
En el explorador de activos de canalización, haga clic en
y seleccione Transformación.
En el cuadro de diálogo Crear nuevo archivo de transformación , realice las siguientes selecciones:
- Elija Python o SQL para el lenguaje. Esto no tiene que coincidir con la selección anterior.
- Asigne un nombre al archivo. En este caso, elija
users_cleaned. - En Ruta de acceso de destino, deje el valor predeterminado.
- En Tipo de conjunto de datos, déjelo como Ninguno seleccionado o elija Vista materializada. Si selecciona Vista materializada, genera código de ejemplo automáticamente.
En el nuevo archivo de código, edite el código para que coincida con lo siguiente (use SQL o Python, en función de la selección en la pantalla anterior). Reemplace por
<pipeline-name>el nombre completo de lasample_userstabla.SQL
-- Drop all rows that do not have an email address CREATE MATERIALIZED VIEW users_cleaned ( CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW ) AS SELECT * FROM sample_users_<pipeline-name>;Pitón
from pyspark import pipelines as dp # Drop all rows that do not have an email address @dp.table @dp.expect_or_drop("no null emails", "email IS NOT NULL") def users_cleaned(): return ( spark.read.table("sample_users_<pipeline_name>") )Haga clic en Ejecutar canalización para actualizar la canalización. Ahora debería tener tres tablas.
Paso 3: Análisis de los usuarios principales
A continuación, obtenga los 100 primeros usuarios por el número de reservas que ha creado. Una la wanderbricks.bookings tabla a la users_cleaned vista materializada.
En el explorador de activos de canalización, haga clic en
y seleccione Transformación.
En el cuadro de diálogo Crear nuevo archivo de transformación , realice las siguientes selecciones:
- Elija Python o SQL para el lenguaje. Esto no tiene que coincidir con las selecciones anteriores.
- Asigne un nombre al archivo. En este caso, elija
users_and_bookings. - En Ruta de acceso de destino, deje el valor predeterminado.
- En Tipo de conjunto de datos, déjelo como Ninguno seleccionado.
En el nuevo archivo de código, edite el código para que coincida con lo siguiente (use SQL o Python, en función de la selección en la pantalla anterior).
SQL
-- Get the top 100 users by number of bookings CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS SELECT u.name AS name, COUNT(b.booking_id) AS booking_count FROM users_cleaned u JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id GROUP BY u.name ORDER BY booking_count DESC LIMIT 100;Pitón
from pyspark import pipelines as dp from pyspark.sql.functions import col, count, desc # Get the top 100 users by number of bookings @dp.table def users_and_bookings(): return ( spark.read.table("users_cleaned") .join(spark.read.table("samples.wanderbricks.bookings"), "user_id") .groupBy(col("name")) .agg(count("booking_id").alias("booking_count")) .orderBy(desc("booking_count")) .limit(100) )Haga clic en Ejecutar canalización para actualizar los conjuntos de datos. Una vez completada la ejecución, puede ver en el gráfico de canalización que hay cuatro tablas, incluida la nueva
users_and_bookings.
Pasos siguientes
Ahora que ha aprendido a usar algunas de las características del editor de canalizaciones de Lakeflow y ha creado una canalización, estas son otras características para obtener más información sobre:
Herramientas para trabajar con transformaciones y depuración al crear canalizaciones:
- Ejecución selectiva
- Vistas previas de datos
- DAG interactivo (grafo de los conjuntos de datos en el pipeline)
Integración integrada de Databricks Asset Bundles para una colaboración eficaz, control de versiones e integración de CI/CD directamente desde el editor: