在 Databricks SQL 中使用流式处理表加载数据

Databricks 建议使用流式处理表通过 Databricks SQL 引入数据。 流式处理表是一种注册到 Unity Catalog 的表,额外支持流式处理或增量数据处理。 系统会自动为每个流式处理表创建一个增量事实表管道。 可以使用流式处理表从 Kafka 和云对象存储进行增量数据加载。

本文演示如何使用流式处理表从配置为 Unity Catalog 卷(建议)或外部位置的云对象存储加载数据。

注意

若要了解如何使用 Delta Lake 表作为流式处理源和接收器,请参阅 Delta 表流式处理读取和写入

开始之前的准备工作

在开始之前,必须满足以下要求。

工作区要求:

计算要求:

你必须使用下列项之一:

  • 使用 Current 通道的 SQL 仓库。

  • Databricks Runtime 13.3 LTS 或更高版本上共享访问模式的计算。

  • Databricks Runtime 15.4 LTS 或更高版本上具有单用户访问模式的计算。

    在 Databricks Runtime 15.3 及更低版本上,不能使用单个用户计算来查询其他用户拥有的流式处理表。 仅当拥有流式处理表时,才能在 Databricks Runtime 15.3 及更低版本上使用单用户计算。 表的创建者是所有者。

    Databricks Runtime 15.4 LTS 及更高版本支持在单个用户计算上对增量实时表生成的表进行查询,而不考虑表所有权。 若要利用 Databricks Runtime 15.4 LTS 及更高版本中提供的数据筛选,你必须确认你的工作区是否已启用无服务器计算,因为支持增量实时表生成的表的数据筛选功能在无服务器计算上运行。 使用单个用户计算运行数据筛选操作时,可能会为无服务器计算资源付费。 请参阅单用户计算上的精细访问控制

权限要求:

  • 对 Unity Catalog 外部位置的 READ FILES 特权。 有关详细信息,请参阅创建外部位置以将云存储连接到 Azure Databricks
  • 对在其中创建流式处理表的目录的 USE CATALOG 特权。
  • 对在其中创建流式处理表的架构的 USE SCHEMA 特权。
  • 对在其中创建流式处理表的架构的 CREATE TABLE 特权。

其他要求:

  • 源数据的路径。

    卷路径示例:/Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>

    外部位置路径示例:abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis

    注意

    本文假设要加载的数据位于云存储位置,该位置对应于你有权访问的 Unity Catalog 卷或外部位置。

发现和预览源数据

  1. 在工作区的边栏中,单击“查询”,然后单击“创建查询”。

  2. 在查询编辑器中,从下拉列表中选择使用该 Current 通道的 SQL 仓库。

  3. 将以下内容粘贴到编辑器中,将尖括号 (<>) 中的值替换为标识源数据的信息,然后单击“运行”。

    注意

    如果函数的默认值无法分析数据,则在运行 read_files 表值函数时可能会遇到架构推理错误。 例如,可能需要为多行 CSV 或 JSON 文件配置多行模式。 有关分析程序选项的列表,请参阅 read_files 表值函数

    /* Discover your data in a volume */
    LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"
    
    /* Preview your data in a volume */
    SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10
    
    /* Discover your data in an external location */
    LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>"
    
    /* Preview your data */
    SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
    

将数据加载到流式处理表中

若要用云对象存储中的数据创建流式处理表,请将以下内容粘贴到查询编辑器中,然后单击“运行”:

/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')

/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')

使用 DLT 管道刷新流式处理表

本部分介绍使用查询中定义的源中提供的最新数据刷新流式处理表的模式。

当你 CREATEREFRESH 流式处理表时,更新将使用无服务器增量实时表管道进行处理。 定义的每个流式处理表都有关联的 Delta Live Tables 管道。

运行 REFRESH 命令后,将返回 DLT 管道链接。 可以使用 DLT 管道链接来检查刷新状态。

注意

只有表的所有者可以刷新流式处理表以获取最新数据。 创建表的用户就是所有者,所有者不能更改。 在使用时间旅行查询之前,可能需要刷新流式处理表。

请参阅什么是增量实时表?

仅引入新数据

默认情况下,read_files 函数在表创建期间读取源目录中的所有现有数据,然后在每次刷新时处理新进入的记录。

若要避免在创建表时引入源目录中已存在的数据,请将 includeExistingFiles 选项设置为 false。 这意味着,将仅处理表创建后进入目录的数据。 例如:

CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
  'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
  includeExistingFiles => false)

完全刷新流式处理表

完全刷新使用最新定义重新处理源中的所有可用数据。 不建议对不保留整个数据历史记录或保留期较短的源(如 Kafka)调用完全刷新,因为完全刷新会截断现有数据。 如果数据在源中不再可用,则可能无法恢复旧数据。

例如:

REFRESH STREAMING TABLE my_bronze_table FULL

为流式处理表计划自动刷新

若要将流式处理表配置为根据定义的计划自动刷新,请将以下内容粘贴到查询编辑器中,然后单击“运行”:

ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
        CRON '<cron-string>'
                [ AT TIME ZONE '<timezone-id>' ]];

有关刷新计划查询的示例,请参阅 ALTER STREAMING TABLE

跟踪刷新状态

可以在 Delta Live Tables UI 中查看管理流式处理表的管道或查看 DESCRIBE EXTENDED 命令为流式处理表返回的刷新信息,以此查看流式处理表的刷新状态。

DESCRIBE EXTENDED <table-name>

从 Kafka 流式引入

有关从 Kafka 流式引入的示例,请参阅 read_kafka

授予用户对流式处理表的访问权限

若要授予用户对流式处理表的 SELECT 特权,以便他们可以对其进行查询,请将以下内容粘贴到查询编辑器中,然后单击“运行”:

GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

有关授予 Unity Catalog 安全对象特权的详细信息,请参阅 Unity Catalog 特权和安全对象

其他资源