通过


Databricks SQL 中的 ETL

处理大量数据时,需要一个管道,该管道只能处理新的和已更改的记录,而不是重新处理整个数据集。 这称为增量 ETL。 在 Databricks SQL 中,可以使用流式处理表和具体化视图生成增量 ETL 管道,而无需编写过程代码或计划手动刷新。

本教程将引导你完成一种常见模式:跟踪一段时间内的产品更改。 创建源表、捕获更改事件、生成一个维度表来保留每个产品的完整历史记录,并在顶部添加聚合报告层。

本教程的主要功能是 AUTO CDC。 在传统的仓库中,你将编写复杂的 MERGE INTO 语句来协调目标表中的插入、更新和删除事件。 此方法容易出错,尤其是在事件无序到达时。 AUTO CDC 为你处理此问题。 声明业务密钥、排序列以及是否希望 SCD 类型 1(仅最新值)或 SCD 类型 2(完整历史记录),Azure Databricks自动应用正确的合并逻辑。 有关 CDC 的概述,请参阅 AUTO CDC API:使用管道简化更改数据捕获

在本教程结束时,你将拥有:

  1. 创建了一个源表,用于跟踪更改数据馈送的更改。
  2. 检查了原始更改数据以了解 CDC 事件流。
  3. 用于 AUTO CDC 从这些事件生成 SCD 类型 2 维度表。
  4. 通过管道以增量方式处理删除事件。
  5. 创建了以增量方式维护聚合报表的具体化视图。
  6. 配置 SCHEDULE REFRESH EVERY 1 DAY 后,更改会自动通过管道传播。

要求

若要完成此教程,必须满足以下要求:

步骤 1:设置目录和架构

打开 Databricks SQL 编辑器 并设置工作目录和架构。 必须具有选择 USE 的目录和架构的权限:

USE CATALOG <your-catalog>;
USE SCHEMA <your-schema>;

步骤 2:创建源表并加载数据

创建启用了更改数据馈送(CDF)的 products 表,使用 Delta Lake 在 Azure Databricks 上。 CDF 是一项 Delta Lake 功能,以可查询的更改日志的形式记录每个插入、更新和删除。 这类似于事务源系统中的 CDC 流,只是更改直接在 Delta 表中捕获,而不是从外部日志捕获。 在此处使用 CDF 生成下游管道将使用的更改事件。

  1. 创建表并加载初始记录:

    CREATE OR REPLACE TABLE products (
      product_id INT,
      product_name STRING,
      category STRING,
      warehouse STRING
    )
    TBLPROPERTIES (delta.enableChangeDataFeed = true);
    
    INSERT INTO products VALUES
      (1, 'Spoon', 'Cutlery', 'Seattle'),
      (2, 'Fork', 'Cutlery', 'Portland'),
      (3, 'Knife', 'Cutlery', 'Denver'),
      (4, 'Chair', 'Furniture', 'Austin'),
      (5, 'Table', 'Furniture', 'Chicago'),
      (6, 'Lamp', 'Lighting', 'Boston'),
      (7, 'Mug', 'Kitchenware', 'Seattle'),
      (8, 'Plate', 'Kitchenware', 'Atlanta'),
      (9, 'Bowl', 'Kitchenware', 'Dallas'),
      (10, 'Glass', 'Kitchenware', 'Phoenix');
    
  2. 模拟上游更改,包括新产品、仓库移动和类别重新分配:

    INSERT INTO products VALUES
      (11, 'Napkin', 'Dining', 'San Francisco'),
      (12, 'Coaster', 'Dining', 'New York');
    
    UPDATE products SET warehouse = 'Los Angeles' WHERE product_id = 1;
    UPDATE products SET category = 'Dining' WHERE product_id = 2;
    

步骤 3:查询变更数据流

在生成下游管道之前,有助于查看原始更改事件,以便您了解AUTO CDC将处理的内容。 该 table_changes() 函数读取 CDF 日志,并返回已捕获的每个操作及其对应的元数据列:

SELECT
  product_id, product_name, warehouse,
  _change_type, _commit_version
FROM table_changes('products', 1)
ORDER BY _commit_version, product_id;

例如,勺子有三个事件:一个 insert (西雅图)、一个 update_preimage (西雅图)和一个 update_postimage (洛杉矶)。

请注意,单个逻辑更改(例如,将 Spoon 移动到其他仓库)会产生多个事件:预映像和后映像。 在传统的仓库中,你将编写一个 MERGE 语句,将所有这些事件协调到目标表中,使用单独的逻辑处理插入、更新和删除,并确保按正确的顺序应用事件。 这正是 AUTO CDC 在下一步中消除的复杂性。

步骤 4:使用 生成 SCD 类型 2 维度

重要

AUTO CDC 处于 Beta 版。 需要 Databricks Runtime 17.3 或更高版本。

流式处理表以增量方式处理数据。 每次刷新时,它只读取自上次运行以来的新行,因此不需要重新处理完整的数据集。 这使得它特别适合大容量或频繁更改的数据源。

AUTO CDC 在流式表上添加变更数据捕获处理。 声明业务密钥和排序列,让Azure Databricks应用正确的逻辑,而不是编写手动处理插入、更新和删除的 MERGE INTO 语句。 AUTO CDC 还自动处理无序事件,这是处理 MERGE INTO 从分布式系统到达的事件或具有重叠时间戳的批处理加载时常见的问题。

以下语句创建一个 SCD 类型 2 表,该表保留每个产品的完整版本历史记录。 每个版本获取 __START_AT__END_AT 时间戳。 当前版本用一个 NULL__END_AT 标记。

CREATE OR REFRESH STREAMING TABLE products_history
SCHEDULE REFRESH EVERY 1 DAY
FLOW AUTO CDC
FROM STREAM products WITH (readChangeFeed = true)
KEYS (product_id)
APPLY AS DELETE WHEN _change_type = 'delete'
SEQUENCE BY _commit_timestamp
COLUMNS * EXCEPT (_change_type, _commit_version, _commit_timestamp)
STORED AS SCD TYPE 2;
  • SCHEDULE REFRESH EVERY 1 DAY:按每日计划刷新表。
  • FLOW AUTO CDC:将此声明为 CDC 流。 Azure Databricks自动应用插入、更新和删除语义。
  • KEYS (product_id):业务密钥。 具有相同键的事件合并到版本化行中。
  • APPLY AS DELETE WHEN _change_type = 'delete':在删除事件到达时关闭当前版本。 这样就可以定义标识删除事件的条件。
  • SEQUENCE BY _commit_timestamp:建立事件排序。 正确处理无序到达。
  • STORED AS SCD TYPE 2:保留完整历史记录。 AUTO CDC 同时支持 SCD 类型 1 和 SCD 类型 2。

查询维度表:

SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
  • 勺子:两个版本。 西雅图(关闭, __END_AT 设置)和洛杉矶(当前, __END_AT = NULL)。
  • 分叉:两个版本。 卡特利类别(已关闭)和餐饮类别(当前)。
  • 餐巾纸和杯垫:各一个版本(新插入,__END_AT = NULL)。
  • 其他所有产品:各一个版本(__END_AT = NULL)。

步骤 5:处理通过管道删除

现在,通过从源表中删除两个已停用的产品来模拟它们:

DELETE FROM products WHERE product_id = 9;
DELETE FROM products WHERE product_id = 10;

这些删除事件记录在 CDF 日志中,但流式处理表尚未看到它们。 刷新流表以处理新事件。

REFRESH STREAMING TABLE products_history;

查询维度表以验证是否已应用删除:

SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;

碗和玻璃现在通过__END_AT设置已停产,被标记为停止。 所有其他当前产品保持不变。 流式处理表仅处理新的删除事件,而无需重新处理上一次刷新中的插入和更新。

步骤 6:创建聚合具体化视图

现在,你有了一个能够随源变化及时更新的维度表,可以在其上添加一个报告层。

具体化视图将预先计算的查询结果存储为物理表。 与常规视图每次读取时重新执行查询不同,物化视图会持久化结果,只在每次刷新时重新计算受上游更改影响的行。 这使得它特别适合用于对查询性能要求较高的仪表板和报表。

CREATE OR REPLACE MATERIALIZED VIEW products_by_category
SCHEDULE REFRESH EVERY 1 DAY
AS
SELECT
  category,
  COUNT(*) AS active_products
FROM products_history
WHERE __END_AT IS NULL
GROUP BY category;

SCHEDULE REFRESH EVERY 1 DAY 表示此视图按每日计划刷新。 与流式处理表上的相同计划相结合,现在有一个三阶段管道,其中源表的更改会通过维度级联,并在每个刷新周期中聚合。 无需手动刷新才能运行。

SELECT * FROM products_by_category ORDER BY active_products DESC;

步骤 7:验证端到端级联

若要验证完整的管道级联,请对源表进行更改:

UPDATE products SET warehouse = 'Seattle' WHERE product_id = 3;

刀从丹佛移动到西雅图。 此单一 DML 更改触发整个管道级联,演示三个阶段如何协同工作:

  1. products 通过 CDF 记录更改事件。
  2. products_history 处理事件并为 Knife 添加新版本。
  3. products_by_category 仅重新计算受影响的 Cutlery 行。

验证:

SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
WHERE product_id = 3
ORDER BY __START_AT;

SELECT * FROM products_by_category ORDER BY active_products DESC;

清理

若要清理本教程创建的资源,请使用以下 SQL:

DROP MATERIALIZED VIEW IF EXISTS products_by_category;
DROP STREAMING TABLE IF EXISTS products_history;
DROP TABLE IF EXISTS products;

其他资源