在 Databricks SQL 中使用流式处理表

Databricks 建议使用流式处理表通过 Databricks SQL 引入数据。 流式处理表是注册到 Unity 目录的表,可额外支持流式处理或增量数据处理。 为每个流式处理表自动创建管道。 可以使用流式处理表从 Kafka 和云对象存储进行增量数据加载。

注释

若要了解如何使用 Delta Lake 表作为流式处理源和接收器,请参阅 Delta 表流式处理读取和写入

要求

若要使用流式处理表,必须满足以下要求。

工作区要求:

Databricks SQL 中创建的流式处理表由无服务器 Lakeflow 声明式管道支持。 工作区必须支持无服务器管道才能使用此功能。

计算要求:

你必须使用下列项之一:

  • 使用 Current 通道的 SQL 仓库。
  • 在 Databricks Runtime 13.3 LTS 或更高版本上,以标准访问模式(之前称为共享访问模式)进行计算。
  • 在 Databricks Runtime 15.4 LTS 或更高版本上使用专用访问模式(以前是单用户访问模式)进行计算。

    在 Databricks Runtime 15.3 及更低版本上,不能使用专用计算来查询 其他用户拥有的流式处理表。 只有当你拥有流式处理表时,才能在 Databricks Runtime 15.3 及更低版本上使用专用计算。 表的创建者是所有者。

    Databricks Runtime 15.4 LTS 及更高版本支持在专用计算上查询 Lakeflow 声明性管道生成的表,即使你不是表所有者也是如此。 在使用专用计算运行数据筛选操作时,您可能会为无服务器计算资源付费。 请参阅 专用计算上的精细访问控制

权限要求:

  • 对你在其中创建流式处理表的目录和架构拥有 USE CATALOGUSE SCHEMA 权限。
  • 对在其中创建流式处理表的架构的 CREATE TABLE 特权。
  • 有权访问为流式处理表提供源数据的表或位置。

创建流式处理表

流式处理表由 Databricks SQL 中的 SQL 查询定义。 创建流式处理表时,源表中当前的数据用于生成流式处理表。 之后,通常按照预定的时间表刷新表,以从源表中获取任何新增数据,并将其附加到流式表中。

当你创建流式处理表时,你将被视为该表的所有者。

若要从现有表创建流式处理表,请使用该语句 CREATE STREAMING TABLE,如以下示例所示:

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT product, price FROM STREAM raw_data;

在这种情况下,流式表 sales 是从 raw_data 表的特定列创建的,并计划每小时刷新一次。 使用的查询必须是 流式处理 查询。 要使用流式处理语义从源中读取,请使用 STREAM 关键字。

使用 CREATE OR REFRESH STREAMING TABLE 语句创建流表时,初始数据刷新和填充会立即开始。 这些操作不消耗 DBSQL 仓库计算资源。 相反,流式处理表依赖于无服务器 Lakeflow 声明性管道进行创建和刷新。 系统会自动为每个流式处理表创建和管理专用无服务器管道。

使用自动加载程序加载文件

若要从卷中的文件创建流数据表,请使用 Auto Loader。 将自动加载程序与 Lakeflow 声明性管道配合使用,以便从云对象存储执行大多数数据引入任务。 自动加载器和 Lakeflow 声明性管道旨在随着数据到达云存储而以增量方式和幂等方式加载不断增长的数据。

若要在 Databricks SQL 中使用自动加载程序,请使用函数 read_files 。 以下示例演示如何使用自动加载程序将 JSON 文件卷读取到流式处理表中:

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT * FROM STREAM read_files(
    "/Volumes/my_catalog/my_schema/my_volume/path/to/data",
    format => "json"
  );

若要从云存储中读取数据,还可以使用自动加载程序:

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    format => "json"
  );

若要了解自动加载程序,请参阅什么是自动加载程序? 若要了解有关在 SQL 中使用自动加载程序的详细信息,请参阅 从对象存储加载数据

从其他源流式处理引入

有关从其他源(包括 Kafka)引入的示例,请参阅 使用 Lakeflow 声明性管道加载数据

只引入新数据

默认情况下,该 read_files 函数在创建表期间读取源目录中的所有现有数据,然后在每次刷新时处理新到达的记录。

为了避免在创建表时引入源目录中已存在的数据,请将 includeExistingFiles 选项设置为 false。 这意味着,将仅处理表创建后进入目录的数据。 例如:

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT *
  FROM STREAM read_files(
    '/path/to/files',
    includeExistingFiles => false
  );

设置运行时通道

使用 SQL 仓库创建的流式处理表会通过管道流程自动刷新。 Lakeflow 声明式管道默认使用 current 频道中的运行时。 请参阅 Lakeflow 声明性管道发行说明和发布升级过程 ,了解发布过程。

Databricks 建议使用 current 通道来处理生产工作负荷。 新功能首先发布到 preview 频道。 可以将管道设置为预览 Lakeflow 声明性管道通道,以通过指定 preview 为表属性来测试新功能。 可以在创建表时或使用 ALTER 语句创建表后指定此属性。

下面的代码示例演示如何在 CREATE 语句中将通道设置为预览:

CREATE OR REFRESH STREAMING TABLE sales
  TBLPROPERTIES ('pipelines.channel' = 'preview')
  SCHEDULE EVERY 1 hour
  AS SELECT *
  FROM STREAM raw_data;

隐藏敏感数据

重要

此功能目前以公共预览版提供。

可以使用流式处理表来对访问表的用户隐藏敏感数据。 一种方法是定义查询,以便完全排除敏感列或行。 或者,可以根据查询用户的权限应用列掩码或行筛选器。 例如,可以为不在组中tax_id的用户隐藏HumanResourcesDept该列。 为此,在创建流式处理表期间使用 ROW FILTERMASK 语法。 有关详细信息,请参阅使用行筛选器和列掩码筛选敏感表数据

刷新流式处理表

创建流式处理表时可以自动计划刷新。 你也可以手动刷新流式处理表。 即使已计划刷新,也可以随时调用手动刷新。 刷新由与流式处理表一起自动创建的同一管道处理。

刷新流式处理表:

REFRESH STREAMING TABLE sales;

可以使用DESCRIBE TABLE EXTENDED检查最新刷新状态。

注释

只有表的所有者才能刷新流表以获取最新数据。 创建表的用户是所有者,并且无法更改所有者。 在使用按时间顺序查看查询之前,可能需要刷新流式处理表。

刷新的工作原理

流式处理表刷新仅评估自上次更新以来延迟到达的新行,并且仅追加新数据。

每次刷新都使用流式处理表的当前定义来处理此新数据。 修改流式处理表定义不会自动重新计算现有数据。 如果修改与现有数据不兼容(例如更改数据类型),则下一次刷新将失败并显示错误。

以下示例说明流式处理表定义的更改如何影响刷新行为:

  • 删除筛选器不会重新处理以前筛选的行。
  • 更改列投影不会影响现有数据的处理方式。
  • 具有静态快照的联接使用初始处理时的快照状态。 将忽略与更新的快照匹配的迟到数据。 如果维度延迟到达,可能会导致事实丢弃。
  • 修改现有列的 CAST 会引发错误。

如果现有的流式处理表无法支持数据更改的方式,你可以执行完整刷新。

完全刷新流式处理表

全量刷新会根据最新定义重新处理源中所有可用的数据。 对于不保留数据全部历史记录或保留期较短的源(如 Kafka),不建议调用完全刷新,因为完全刷新会截断现有的数据。 如果数据在源中不再可用,则可能无法恢复旧数据。

例如:

REFRESH STREAMING TABLE sales FULL;

更改流式处理表的计划

可以修改(或设置)流式处理表的自动刷新计划。 以下示例展示如何使用 ALTER STREAMING TABLE 设置计划:

ALTER STREAMING TABLE sales
  ADD SCHEDULE every 1 hour;

有关刷新计划查询的示例,请参阅 ALTER STREAMING TABLE

跟踪刷新状态

可以通过在 Lakeflow 声明性管道 UI 中查看管理流式处理表的管道,或者通过查看用于流式处理表的命令所返回的刷新信息,来查看流式处理表刷新的状态。

DESCRIBE TABLE EXTENDED <table-name>;

或者,可以在目录资源管理器中查看流式处理表并查看刷新状态:

  1. 点击边栏中的“数据”图标目录
  2. 在左侧的目录资源管理器树中,打开目录并选择流式处理表所在的架构。
  3. 打开所选架构下的“表”项,然后单击流式处理表

在此处,可以使用流式处理表名称下的选项卡查看和编辑有关流式处理表的信息,包括:

  • 刷新状态和历史记录
  • 表架构
  • 示例数据(需要活动计算)
  • 权限
  • 世系,包括此流式处理表所依赖的表和管道
  • 深入了解使用情况
  • 为此流式处理表创建的监视器

控制对流式处理表的访问

流式表支持多样化的访问控制,以支持数据共享,同时避免泄露潜在的敏感数据。 具有 MANAGE 权限的流式处理表所有者或用户可以向其他用户授予 SELECT 权限。 对流式处理表拥有 SELECT 访问权限的用户不需要对该流式处理表所引用的表拥有 SELECT 访问权限。 此访问控制支持数据共享,同时控制对基础数据的访问。

授予对流式处理表的权限

若要授予对流式处理表的访问权限,请使用 GRANT 语句

GRANT <privilege_type> ON <st_name> TO <principal>;

privilege_type可以是:

  • SELECT - 用户可以选择 (SELECT) 流式处理表。
  • REFRESH - 用户可以选择 (REFRESH) 流式处理表。 刷新是使用所有者的权限运行的。

以下示例创建一个流式表单,并向用户授予 SELECT 和刷新权限:

CREATE MATERIALIZED VIEW st_name AS SELECT * FROM source_table;

-- Grant read-only access:
GRANT SELECT ON st_name TO read_only_user;

-- Grand read and refresh access:
GRANT SELECT ON st_name TO refresh_user;
GRANT REFRESH ON st_name TO refresh_user;

有关授予 Unity Catalog 安全对象特权的详细信息,请参阅 Unity Catalog 特权和安全对象

撤销对流式处理表的权限

若要撤销对流式处理表的访问权限,请使用 REVOKE 语句

REVOKE privilege_type ON <st_name> FROM principal;

当撤销了流式处理表所有者或已被授予流式处理表 SELECTMANAGE 权限的任何其他用户对源表的 SELECT 权限,或者删除了源表时,流式处理表所有者或被授予访问权限的用户仍然可以查询该流式处理表。 但是,会发生以下行为:

  • 流式处理表所有者或失去流式处理表访问权限的其他人无法再 REFRESH 使用该流式处理表,流式处理表将变得过时。
  • 如果计划自动执行,则下一个计划 REFRESH 失败或未运行。

以下示例从 SELECT 撤销了 read_only_user 权限:

REVOKE SELECT ON st_name FROM read_only_user;

从流式处理表永久删除记录

重要

对流式处理表的 REORG 语句的支持现推出公共预览版

注释

  • 对流式处理表使用 REORG 语句需要 Databricks Runtime 15.4 及更高版本。
  • 尽管可以将 REORG 语句与任何流式处理表一起使用,但只有在从启用了 删除矢量 的流式处理表中删除记录时,才需要使用该语句。 在未启用删除向量的情况下与流式处理表一起使用时,该命令不起作用。

若要为符合 GDPR 合规性要求,从启用了删除矢量的流式处理表的基础存储中物理删除记录,必须采取额外步骤,以确保 VACUUM 操作在流式处理表的数据上运行。

若要从基础存储中物理删除记录,请执行以下作:

  1. 更新记录或删除流数据表中的记录。
  2. 在流式处理表上运行REORG语句,并指定APPLY (PURGE)参数。 例如,REORG TABLE <streaming-table-name> APPLY (PURGE);
  3. 等待流式处理表的数据保留期结束。 默认数据保留期为 7 天,但可以使用表属性进行配置 delta.deletedFileRetentionDuration 。 请参阅为“按时间顺序查看”查询配置数据保留
  4. REFRESH 流式处理表。 请参阅刷新流式处理表。 在 REFRESH 操作后的24小时内,Lakeflow 声明式管道的维护任务,包括确保记录被永久删除的VACUUM 操作,都会自动运行。

使用查询历史记录监视运行

可以使用查询历史记录页访问查询详情和查询概况,这些查询概况有助于识别用于运行流式处理表更新的 Lakeflow 声明式管道中性能欠佳的查询和瓶颈。 有关查询历史记录和查询配置文件中可用的信息的概述,请参阅 查询历史记录查询配置文件

重要

此功能目前以公共预览版提供。 工作区管理员可以从“预览版”页启用此功能。 请参阅 “管理 Azure Databricks 预览版”。

与流式处理表相关的所有语句都显示在查询历史记录中。 可以使用“语句”下拉列表筛选器来选择任何命令并检查相关查询。 所有 CREATE 语句后都跟随一个在管道上异步执行的 REFRESH 语句。 这些 REFRESH 语句通常包括详细的查询计划,用于提供优化性能的见解。

若要访问 REFRESH 查询历史记录 UI 中的语句,请使用以下步骤:

  1. 单击 “历史记录”图标。 在左侧栏中打开 “查询历史记录 ”UI。
  2. 从“语句”下拉列表筛选器中选择 REFRESH REFRESH 复选框。
  3. 单击查询语句的名称可查看摘要详细信息,例如查询持续时间和聚合指标。
  4. 单击“ 查看查询配置文件 ”打开查询配置文件。 有关浏览查询配置文件的详细信息,请参阅查询配置文件
  5. (可选)可以使用“查询源”部分中的链接打开相关的查询或管道。

还可以使用 SQL 编辑器中的链接或附加到 SQL 仓库的笔记本访问查询详细信息。

其他资源