教程:运行第一个 Delta Live Tables 管道

重要

无服务器 DLT 管道处于公共预览状态。 要了解如何启用无服务器 DLT 管道,请联系 Azure Databricks 帐户团队。

本教程介绍如何通过 Databricks 笔记本中的代码配置 Delta Live Tables 管道,并通过触发管道更新来运行管道。 本教程包含一个示例管道,该管道用于引入和处理示例数据集,以及使用 PythonSQL 接口的示例代码。 还可以使用本教程中的说明,通过正确定义的 Delta Live Tables 语法创建包含任何笔记本的管道。

可以使用 Azure Databricks 工作区 UI 或自动化工具选项(例如 API、CLI、Databricks 资产捆绑包或 Databricks 工作流中的任务)配置 Delta Live Tables 管道和触发更新。 为了熟悉 Delta Live Tables 的功能和特性,Databricks 建议先使用 UI 来创建和运行管道。 此外,在 UI 中配置管道时,Delta Live Tables 会为管道生成 JSON 配置,该配置可用于实现编程工作流。

为了演示 Delta Live Tables 功能,本教程中的示例下载了一个公开可用的数据集。 但是,Databricks 有多种方法可以连接到数据源并引入实现实际用例的管道将使用的数据。 请查看“使用 Delta Live Tables 引入数据”。

要求

  • 若要启动非无服务器管道,必须具有群集创建权限或对用于定义 Delta Live Tables 群集的群集策略具有访问权限。 增量实时表运行时在运行管道之前创建群集,如果没有正确的权限,则会创建失败。

  • 若要使用本教程中的示例,必须已为工作区启用 Unity 目录

  • 必须在 Unity 目录中具有以下权限:

    • READ VOLUMEWRITE VOLUMEALL PRIVILEGES(对于 my-volume 卷)。
    • USE SCHEMAALL PRIVILEGES(对于 default 架构)。
    • USE CATALOGALL PRIVILEGES(对于 main 目录)。

    若要设置这些权限,请联系 Databricks 管理员或参阅 Unity Catalog 特权和安全对象

  • 本教程中的示例使用 Unity Catalog 来存储示例数据。 若要使用这些示例,请创建一个卷,并使用该卷的目录、架构和卷名称来设置示例使用的卷路径。

注意

如果尚未为工作区启用 Unity Catalog,本文也附有包含不需要 Unity Catalog 的示例的笔记本。 若要运行此示例,请在创建管道时选择 Hive metastore 作为存储选项。

在何处运行 Delta Live Tables 查询?

Delta Live Tables 查询主要在 Databricks 笔记本中实现,但 Delta Live Tables 不能在笔记本单元格中以交互方式运行。 在 Databricks 笔记本中执行包含增量实时表语法的单元格会导致出现错误消息。 若要运行查询,必须将笔记本配置为管道的一部分。

重要

  • 在为 Delta Live Tables 编写查询时,不能依赖笔记本的逐单元格执行顺序。 Delta Live Tables 将评估并运行笔记本中定义的所有代码,但其执行模型与笔记本的“全部运行”命令完全不同。
  • 不能在单个 Delta Live Tables 源代码文件中混合使用多种语言。 例如,笔记本只能包含 Python 查询或 SQL 查询。 如果必须在管道中使用多种语言,请在管道中使用对应语言专属的多个笔记本或文件。

还可以使用存储在文件中的 Python 代码。 例如,可以创建可导入到 Python 管道中的 Python 模块,或定义要在 SQL 查询中使用的 Python 用户定义函数 (UDF)。 若要了解如何导入 Python 模块,请参阅“从 Git 文件夹或使用工作区文件导入 Python 模块”。 若要了解如何使用 Python UDF,请参阅“用户定义的标量函数 - Python”。

示例:引入和处理纽约婴儿姓名数据

本文中的示例使用一个公开的数据集,其中包含纽约市婴儿姓名的记录。 这些示例演示如何使用 Delta Live Tables 管道执行以下操作:

  • 将公开可用的数据集中的原始 CSV 数据读取到表中。
  • 从原始数据表中读取记录,并使用增量实时表期望创建一个包含已清理数据的新表。
  • 使用清理后的记录作为创建派生数据集的 Delta Live Tables 查询的输入。

此代码演示了一个简化的奖牌体系结构示例。 请参阅什么是奖牌湖屋体系结构?

此示例的实现是针对 PythonSQL 接口提供的。 可以按照步骤创建包含示例代码的新笔记本,也可以跳至创建管道并使用本页提供的笔记本之一。

使用 Python 实现 Delta Live Tables 管道

创建 Delta Live Tables 数据集的 Python 代码必须返回数据帧,具有 PySpark 或 Pandas for Spark 经验的用户十分熟悉这种结构。 对于不熟悉数据帧的用户,Databricks 建议使用 SQL 接口。 请参阅使用 SQL 实现 Delta Live Tables 管道

所有增量实时表 Python API 都在 dlt 模块中实现。 使用 Python 实现的 Delta Live Tables 管道代码必须显式导入 Python 笔记本和文件顶部的 dlt 模块。 增量实时表与许多 Python 脚本有一个重要差别:你不是调用执行数据引入和转换的函数来创建增量实时表数据集。 取而代之的是,增量实时表会解释加载到管道的所有文件中的 dlt 模块中的修饰器函数,并生成数据流图。

若要在本教程中实现该示例,请将以下 Python 代码复制并粘贴到新的 Python 笔记本中。 应该按照所述顺序将每个示例代码片段添加到笔记本对应的单元格中。 若要查看用于创建笔记本的选项,请参阅创建笔记本

注意

使用 Python 接口创建管道时,默认情况下,表名称由函数名称定义。 例如,以下 Python 示例创建三个表,名称分别为 baby_names_rawbaby_names_preparedtop_baby_names_2021。 可以使用 name 参数替代表名称。 请参阅创建增量实时表具体化视图或流式处理表

导入增量实时表模块

所有增量实时表 Python API 都在 dlt 模块中实现。 在 Python 笔记本和文件的顶部显式导入 dlt 模块。

以下示例演示了此 import 语句以及 pyspark.sql.functions 的 import 语句。

import dlt
from pyspark.sql.functions import *

下载数据

若要获取此示例的数据,请下载 CSV 文件并将其存储在卷中,如下所示:

import os

os.environ["UNITY_CATALOG_VOLUME_PATH"] = "/Volumes/<catalog-name>/<schema-name>/<volume-name>/"
os.environ["DATASET_DOWNLOAD_URL"] = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
os.environ["DATASET_DOWNLOAD_FILENAME"] = "rows.csv"

dbutils.fs.cp(f"{os.environ.get('DATASET_DOWNLOAD_URL')}", f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}")

<catalog-name><schema-name><volume-name> 替换为 Unity 目录卷的目录、架构和卷名称。

从对象存储中的文件创建表

增量实时表支持从 Azure Databricks 所支持的所有格式加载数据。 请参阅数据格式选项

@dlt.table 修饰器告知 Delta Live Tables 创建一个包含函数返回的 DataFrame 结果的表。 在返回 Spark 数据帧的任何 Python 函数定义之前添加 @dlt.table 修饰器,以便在增量实时表中注册新表。 以下示例演示如何使用函数名称作为表名称并为表添加描述性的注释:

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = spark.read.csv(f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}", header=True, inferSchema=True)
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

从管道中的上游数据集添加表

可以使用 dlt.read() 从当前增量实时表管道中声明的其他数据集读取数据。 以这种方式声明新表会创建一个依赖项,增量实时表在执行更新之前会自动解析该依赖项。 以下代码还包括根据期望监视和强制实施数据质量的示例。 请参阅使用 Delta Live Tables 管理数据质量

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    dlt.read("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

创建包含扩充数据视图的表

由于增量实时表将管道更新作为一系列依赖项关系图进行处理,因此你可以通过声明具有特定业务逻辑的表来声明为仪表板、BI 和分析提供支持的高度扩充的视图。

Delta Live Tables 中的表在概念上等同于具体化视图。 Spark 上的传统视图在每次查询视图时会运行逻辑,而 Delta Live Tables 表会将最新版本的查询结果存储在数据文件中。 由于增量实时表管理管道中所有数据集的更新,因此你可以根据具体化视图的延迟要求来计划管道更新,并可以知道针对这些表的查询包含最新版本的可用数据。

以下代码定义的表演示了与从管道中的上游数据派生的具体化视图类似的概念:

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    dlt.read("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

若要配置使用笔记本的管道,请参阅“创建管道”。

使用 SQL 实现 Delta Live Tables 管道

Databricks 建议 SQL 用户优先使用 SQL 语法的增量实时表在 Azure Databricks 上生成新的 ETL、引入和转换管道。 Delta Live Tables 的 SQL 接口使用许多新的关键字、构造和表值函数扩展了标准的 Spark SQL。 通过对标准 SQL 的这些补充,用户可以声明数据集之间的依赖关系并部署生产级基础结构,而无需学习任何新工具或其他概念。

对于熟悉 Spark 数据帧并需要支持更广泛的测试和难以使用 SQL 实现的操作(例如元编程操作)的用户,Databricks 建议使用 Python 接口。 请参阅示例:引入和处理纽约婴儿姓名数据

下载数据

若要获取此示例的数据,请复制以下代码,将其粘贴到新笔记本中,然后运行笔记本。 若要查看用于创建笔记本的选项,请参阅创建笔记本

%sh
wget -O "/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv" "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"

<catalog-name><schema-name><volume-name> 替换为 Unity 目录卷的目录、架构和卷名称。

根据 Unity Catalog 中的文件创建表

对于本示例的其余部分,请复制以下 SQL 片段并将其粘贴到一个新的 SQL 笔记本中,需将该笔记本与上一节中的笔记本分开。 应该按照所述顺序将每个示例 SQL 代码片段添加到笔记本对应的单元格中。

增量实时表支持从 Azure Databricks 所支持的所有格式加载数据。 请参阅数据格式选项

所有增量实时表 SQL 语句都使用 CREATE OR REFRESH 语法和语义。 当你更新管道时,增量实时表会确定是否可以通过增量处理实现表的逻辑上正确的结果,或者是否需要完全重新计算。

以下示例通过从存储在 Unity Catalog 卷中的 CSV 文件加载数据来创建表:

CREATE OR REFRESH LIVE TABLE baby_names_sql_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count FROM read_files(
  '/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST')

<catalog-name><schema-name><volume-name> 替换为 Unity 目录卷的目录、架构和卷名称。

将上游数据集中的表添加到管道

可以使用 live 虚拟架构从当前增量实时表管道中声明的其他数据集查询数据。 以这种方式声明新表会创建一个依赖项,增量实时表在执行更新之前会自动解析该依赖项。 live 架构是在增量实时表中实现的自定义关键字,如果你希望发布自己的数据集,该架构可以替代目标架构。 请参阅将 Unity Catalog 与 Delta Live Tables 管道配合使用将数据从 Delta Live Tables 管道发布到 Hive 元存储

以下代码还包括根据期望监视和强制实施数据质量的示例。 请参阅使用 Delta Live Tables 管理数据质量

CREATE OR REFRESH LIVE TABLE baby_names_sql_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM live.baby_names_sql_raw;

创建扩充的数据视图

由于增量实时表将管道更新作为一系列依赖项关系图进行处理,因此你可以通过声明具有特定业务逻辑的表来声明为仪表板、BI 和分析提供支持的高度扩充的视图。

实时表在概念上等同于具体化视图。 Spark 上的传统视图在每次查询视图时会运行逻辑,而实时表将最新版本的查询结果存储在数据文件中。 由于增量实时表管理管道中所有数据集的更新,因此你可以根据具体化视图的延迟要求来计划管道更新,并可以知道针对这些表的查询包含最新版本的可用数据。

以下代码创建上游数据的扩充具体化视图:

CREATE OR REFRESH LIVE TABLE top_baby_names_sql_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM live.baby_names_sql_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

若要配置使用笔记本的管道,请继续创建管道

创建管道

Delta Live Tables 通过使用 Delta Live Tables 语法解析笔记本或文件(称为“源代码”或“库”)中定义的依赖项来创建管道。 每个源代码文件只能包含一种语言,但可以在管道中混合使用不同语言的库。

  1. 单击边栏中的“增量实时表”,然后单击“创建管道”。
  2. 为管道命名。
  3. 可以选中“无服务器”复选框以对此管道使用完全托管的计算。 选择“无服务器”时,将从 UI 中移除“计算”设置。
  4. (可选)选择产品版本
  5. 对于“管道模式”,选择“已触发” 。
  6. 配置包含管道源代码的一个或多个笔记本。 在“路径”文本框中,输入笔记本的路径,或单击“文件选取器图标”以选择笔记本。
  7. 选择管道发布的数据集的目标,即 Hive 元存储或 Unity Catalog。 请参阅发布数据集
    • Hive(元存储)
      • (可选)为管道输出数据输入一个“存储位置”。 如果将“存储位置”留空,系统将使用默认位置。
      • (可选)指定一个“目标架构”,以将数据集发布到 Hive 元存储。
    • Unity Catalog:指定目录目标架构以将数据集发布到 Unity Catalog。
  8. (可选)如果未选择“无服务器”,则可以为管道配置计算设置。 要了解计算设置的选项,请参阅配置增量实时表的管道设置
  9. (可选)单击“添加通知”以配置一个或多个电子邮件地址来接收管道事件的通知。 请参阅为管道事件添加电子邮件通知
  10. (可选)配置管道的高级设置。 要了解高级设置的选项,请参阅配置增量实时表的管道设置
  11. 单击 “创建”

单击“创建”后,系统将显示“管道详细信息”页面 。 也可以通过在“增量实时表”选项卡中单击管道名称来访问管道。

启动管道更新

若要启动管道的更新,请单击顶部面板中的 Delta Live Tables 启动图标 按钮。 系统会返回一条消息,确认将启动管道。

成功启动更新后,增量实时表系统:

  1. 使用由增量实时表系统创建的群集配置来启动群集。 也可指定一个自定义群集配置
  2. 创建任何不存在的表,并确保架构对于任何现有表都是正确的。
  3. 使用最新的可用数据来更新表。
  4. 在完成更新时关闭群集。

注意

默认情况下,执行模式设置为“生产”,为每次更新部署临时计算资源。 可以使用“开发”模式更改此行为,以便在开发和测试期间将相同的计算资源用于多个管道更新。 请参阅开发和生产模式

发布数据集

可以通过将表发布到 Hive 元存储或 Unity Catalog,使 Delta Live Tables 数据集可供查询。 如果不指定用于发布数据的目标,则在增量实时表管道中创建的表只能由该管道中的其他操作访问。 请参阅将数据从 Delta Live Tables 管道发布到 Hive 元存储将 Unity Catalog 与 Delta Live Tables 管道配合使用

示例源代码笔记本

可以将这些笔记本导入 Azure Databricks 工作区,并使用这些笔记本部署 Delta Live Tables 管道。 请参阅创建管道

增量实时表 Python 笔记本入门

获取笔记本

增量实时表 SQL 笔记本入门

获取笔记本

没有 Unity Catalog 的工作区的示例源代码笔记本

可以在未启用 Unity Catalog 的情况下将这些笔记本导入 Azure Databricks 工作区,并使用这些笔记本部署 Delta Live Tables 管道。 请参阅创建管道

增量实时表 Python 笔记本入门

获取笔记本

增量实时表 SQL 笔记本入门

获取笔记本