在 Databricks SQL 中使用流式处理表加载数据
Databricks 建议使用流式处理表通过 Databricks SQL 引入数据。 流式处理表是一种注册到 Unity Catalog 的表,额外支持流式处理或增量数据处理。 系统会自动为每个流式处理表创建一个增量事实表管道。 可以使用流式处理表从 Kafka 和云对象存储进行增量数据加载。
本文演示如何使用流式处理表从配置为 Unity Catalog 卷(建议)或外部位置的云对象存储加载数据。
注意
若要了解如何使用 Delta Lake 表作为流式处理源和接收器,请参阅 Delta 表流式处理读取和写入。
开始之前的准备工作
在开始之前,必须满足以下要求。
工作区要求:
- 启用了无服务器的 Azure Databricks 帐户。 有关详细信息,请参阅启用无服务器 SQL 仓库。
- 一个启用了 Unity Catalog 的工作区。 有关详细信息,请参阅设置和管理 Unity Catalog。
计算要求:
你必须使用下列项之一:
使用
Current
通道的 SQL 仓库。Databricks Runtime 13.3 LTS 或更高版本上共享访问模式的计算。
Databricks Runtime 15.4 LTS 或更高版本上具有单用户访问模式的计算。
在 Databricks Runtime 15.3 及更低版本上,不能使用单个用户计算来查询其他用户拥有的流式处理表。 仅当拥有流式处理表时,才能在 Databricks Runtime 15.3 及更低版本上使用单用户计算。 表的创建者是所有者。
Databricks Runtime 15.4 LTS 及更高版本支持在单个用户计算上对增量实时表生成的表进行查询,而不考虑表所有权。 若要利用 Databricks Runtime 15.4 LTS 及更高版本中提供的数据筛选,你必须确认你的工作区是否已启用无服务器计算,因为支持增量实时表生成的表的数据筛选功能在无服务器计算上运行。 使用单个用户计算运行数据筛选操作时,可能会为无服务器计算资源付费。 请参阅单用户计算上的精细访问控制。
权限要求:
- 对 Unity Catalog 外部位置的
READ FILES
特权。 有关详细信息,请参阅创建外部位置以将云存储连接到 Azure Databricks。 - 对在其中创建流式处理表的目录的
USE CATALOG
特权。 - 对在其中创建流式处理表的架构的
USE SCHEMA
特权。 - 对在其中创建流式处理表的架构的
CREATE TABLE
特权。
其他要求:
源数据的路径。
卷路径示例:
/Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>
外部位置路径示例:
abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis
注意
本文假设要加载的数据位于云存储位置,该位置对应于你有权访问的 Unity Catalog 卷或外部位置。
发现和预览源数据
在工作区的边栏中,单击“查询”,然后单击“创建查询”。
在查询编辑器中,从下拉列表中选择使用该
Current
通道的 SQL 仓库。将以下内容粘贴到编辑器中,将尖括号 (
<>
) 中的值替换为标识源数据的信息,然后单击“运行”。注意
如果函数的默认值无法分析数据,则在运行
read_files
表值函数时可能会遇到架构推理错误。 例如,可能需要为多行 CSV 或 JSON 文件配置多行模式。 有关分析程序选项的列表,请参阅 read_files 表值函数。/* Discover your data in a volume */ LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>" /* Preview your data in a volume */ SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10 /* Discover your data in an external location */ LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>" /* Preview your data */ SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
将数据加载到流式处理表中
若要用云对象存储中的数据创建流式处理表,请将以下内容粘贴到查询编辑器中,然后单击“运行”:
/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')
/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')
使用 DLT 管道刷新流式处理表
本部分介绍使用查询中定义的源中提供的最新数据刷新流式处理表的模式。
当你 CREATE
或 REFRESH
流式处理表时,更新将使用无服务器增量实时表管道进行处理。 定义的每个流式处理表都有关联的 Delta Live Tables 管道。
运行 REFRESH
命令后,将返回 DLT 管道链接。 可以使用 DLT 管道链接来检查刷新状态。
注意
只有表的所有者可以刷新流式处理表以获取最新数据。 创建表的用户就是所有者,所有者不能更改。 在使用时间旅行查询之前,可能需要刷新流式处理表。
请参阅什么是增量实时表?。
仅引入新数据
默认情况下,read_files
函数在表创建期间读取源目录中的所有现有数据,然后在每次刷新时处理新进入的记录。
若要避免在创建表时引入源目录中已存在的数据,请将 includeExistingFiles
选项设置为 false
。 这意味着,将仅处理表创建后进入目录的数据。 例如:
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
includeExistingFiles => false)
完全刷新流式处理表
完全刷新使用最新定义重新处理源中的所有可用数据。 不建议对不保留整个数据历史记录或保留期较短的源(如 Kafka)调用完全刷新,因为完全刷新会截断现有数据。 如果数据在源中不再可用,则可能无法恢复旧数据。
例如:
REFRESH STREAMING TABLE my_bronze_table FULL
为流式处理表计划自动刷新
若要将流式处理表配置为根据定义的计划自动刷新,请将以下内容粘贴到查询编辑器中,然后单击“运行”:
ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
CRON '<cron-string>'
[ AT TIME ZONE '<timezone-id>' ]];
有关刷新计划查询的示例,请参阅 ALTER STREAMING TABLE。
跟踪刷新状态
可以在 Delta Live Tables UI 中查看管理流式处理表的管道或查看 DESCRIBE EXTENDED
命令为流式处理表返回的刷新信息,以此查看流式处理表的刷新状态。
DESCRIBE EXTENDED <table-name>
从 Kafka 流式引入
有关从 Kafka 流式引入的示例,请参阅 read_kafka。
授予用户对流式处理表的访问权限
若要授予用户对流式处理表的 SELECT
特权,以便他们可以对其进行查询,请将以下内容粘贴到查询编辑器中,然后单击“运行”:
GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>
有关授予 Unity Catalog 安全对象特权的详细信息,请参阅 Unity Catalog 特权和安全对象。