了解如何使用 Lakeflow Spark 声明性管道(SDP)为数据业务流程和自动加载程序创建新管道。 本教程通过清理数据和创建查询,以扩展示例管道,从而找出前 100 个用户。
本教程介绍如何使用 Lakeflow 管道编辑器:
- 使用默认文件夹结构创建新管道,并从一组示例文件开始。
- 使用预期定义数据质量约束。
- 使用编辑器功能通过新的转换来扩展管道,以对数据执行分析。
要求
在开始本教程之前,必须:
- 登录到 Azure Databricks 工作区。
- 为工作区启用 Unity Catalog。
- 为工作区启用 Lakeflow 管道编辑器,并且必须选择加入。 请参阅 “启用 Lakeflow 管道编辑器”和“更新后的监视”。
- 有权创建计算资源或访问计算资源。
- 有权在目录中创建新架构。 所需的权限为
ALL PRIVILEGES或USE CATALOG和CREATE SCHEMA。
步骤 1:创建管道
在此步骤中,将使用默认文件夹结构和代码示例创建管道。 代码示例引用 users 示例数据源中的 wanderbricks 表。
在 Azure Databricks 工作区中,单击
“新建”,然后选择
ETL 管道。 这会在“创建管道”页上打开管道编辑器。
单击标头,为管道命名。
在名称下方,选择输出表的默认目录和架构。 如果未在管道定义中指定目录和架构,则使用默认的目录和架构。
在管道的“下一步”下,单击任一
从 SQL 或
中的示例代码开始。从 Python 中的示例代码开始,具体取决于语言首选项。 这会更改示例代码的默认语言,但稍后可以添加其他语言的代码。 这会创建一个包含示例代码的默认文件夹结构,以帮助你入门。
可以在工作区左侧的管道资产浏览器中查看示例代码。 下面是
transformations两个文件,每个文件生成一个管道数据集。 在explorations下方有一个笔记本,里面有帮助你查看管道输出的代码。 单击文件可在编辑器中查看和编辑代码。尚未创建输出数据集,屏幕右侧的 管道图 为空。
若要运行管道代码(文件夹中的代码
transformations),请单击屏幕右上角的 “运行管道 ”。运行完成后,工作区的底部会显示出两个新表,
sample_users_<pipeline-name>和sample_aggregation_<pipeline-name>。 还可以看到工作区右侧的 管道图 现在显示了两个表,其中sample_users是sample_aggregation的源表。
步骤 2:应用数据质量检查
在此步骤中,向sample_users表添加数据质量检查。 使用 管道预期 来约束数据。 在这种情况下,将删除没有有效电子邮件地址的任何用户记录,并将清理的表输出为 users_cleaned。
在管道资产浏览器中,单击
然后选择 “转换”。
在“ 创建新转换文件 ”对话框中,进行以下选择:
- 为语言选择 Python 或 SQL。 这不必与上一个选择匹配。
- 为文件命名。 在这种情况下,请选择
users_cleaned。 - 对于 目标路径,保留默认值。
- 对于 数据集类型,请将其保留为 未选择 或选择 具体化视图。 如果选择 具体化视图,它将为你生成示例代码。
在新代码文件中,编辑代码以匹配以下内容(根据上一屏幕上的选择使用 SQL 或 Python)。 替换
<pipeline-name>为sample_users表的全名。SQL
-- Drop all rows that do not have an email address CREATE MATERIALIZED VIEW users_cleaned ( CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW ) AS SELECT * FROM sample_users_<pipeline-name>;Python
from pyspark import pipelines as dp # Drop all rows that do not have an email address @dp.table @dp.expect_or_drop("no null emails", "email IS NOT NULL") def users_cleaned(): return ( spark.read.table("sample_users_<pipeline_name>") )单击“ 运行管道 ”以更新管道。 它现在应该有三个表格。
步骤 3:分析排名靠前的用户
接下来,根据已创建的预订数量获取前 100 位用户。 将 wanderbricks.bookings 表联接到 users_cleaned 物化视图。
在管道资产浏览器中,单击
然后选择 “转换”。
在“ 创建新转换文件 ”对话框中,进行以下选择:
- 为语言选择 Python 或 SQL。 这不必与以前的选择匹配。
- 为文件命名。 在这种情况下,请选择
users_and_bookings。 - 对于 目标路径,保留默认值。
- 对于数据集类型,请保持未选择。
在新代码文件中,编辑代码以匹配以下内容(根据上一屏幕上的选择使用 SQL 或 Python)。
SQL
-- Get the top 100 users by number of bookings CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS SELECT u.name AS name, COUNT(b.booking_id) AS booking_count FROM users_cleaned u JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id GROUP BY u.name ORDER BY booking_count DESC LIMIT 100;Python
from pyspark import pipelines as dp from pyspark.sql.functions import col, count, desc # Get the top 100 users by number of bookings @dp.table def users_and_bookings(): return ( spark.read.table("users_cleaned") .join(spark.read.table("samples.wanderbricks.bookings"), "user_id") .groupBy(col("name")) .agg(count("booking_id").alias("booking_count")) .orderBy(desc("booking_count")) .limit(100) )单击“ 运行管道 ”更新数据集。 运行完成后,可以在 Pipeline Graph 中看到有四个表,包括新
users_and_bookings表。
后续步骤
现在,你已了解如何使用 Lakeflow 管道编辑器的一些功能并创建了管道,下面提供了一些其他功能来了解有关以下内容的详细信息:
用于在创建管道时使用和调试转换的工具:
内置的 Databricks 资产捆绑包 集成,用于直接从编辑器进行高效协作、版本控制和 CI/CD 集成: