使用 Lakeflow 声明性管道流以增量方式加载和处理数据

在 Lakeflow 声明性管道中通过 处理数据。 每个流都包含 一个查询 ,通常为 一个目标。 流将查询作为批或以增量方式作为数据流处理到目标中。 流位于 Azure Databricks 中的 ETL 管道内。

通常,在 Lakeflow 声明性管道中创建更新目标的查询时,会自动定义流,但也可以显式定义其他流,以便进行更复杂的处理,例如追加到来自多个源的单个目标。

更新

每次更新一个流的定义管道时,该流都会运行。 该流将创建或更新具有最新可用数据的表。 更新可能会执行增量刷新(仅处理新记录或执行完全刷新),以重新处理数据源中的所有记录,具体取决于流类型和数据更改的状态。

创建默认流

在管道中创建 Lakeflow Declarative Pipelines 对象时,通常需要定义一个表或视图,以及支持该表或视图的查询。 例如,在此 SQL 查询中,通过从名为customers_silver的表中读取数据来创建一个名为customers_bronze的流式处理表。

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

也可以在 Python 中创建相同的流表。 在 Python 中,通常通过创建返回数据帧的查询函数来使用 Lakeflow 声明性管道,并使用修饰器访问 Lakeflow 声明性管道功能:

import dlt

@dlt.table()
def customers_silver():
  return spark.readStream.table("customers_bronze")

在此示例中,你创建了一个流式处理表。 还可以在 SQL 和 Python 中创建具有类似语法的具体化视图。 有关详细信息,请参阅 流式处理表具体化视图

此示例创建一个默认流以及流数据表。 流式处理表的默认流是 追加 流,它为每个触发器添加新行。 这是使用 Lakeflow 声明性管道(在单个步骤中创建流和目标)的最常用方法。 可以使用此样式引入数据或转换数据。

追加流还支持需要从多个流式处理源读取数据以更新单个目标的处理。 例如,如果拥有现有的流式处理表和流,并且想要添加写入此现有流式处理表的新流源,则可以使用追加流功能。

使用多个流写入单个目标

在前面的示例中,你在单个步骤中创建了一个流和一个流式处理表。 也可以为以前创建的表创建流。 在本示例中,您可以看到在不同的步骤中创建表及其相关联的工作流。 此代码的结果与创建默认流相同,包括对流式处理表和流使用相同的名称。

Python语言

import dlt

# create streaming table
dlt.create_streaming_table("customers_silver")

# add a flow
@dlt.append_flow(
  target = "customers_silver")
def customer_silver():
  return spark.readStream.table("customers_bronze")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;

-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);

独立于目标创建流的做法可以让您同时创建多个流,将数据追加到同一目标。

在 Python 接口中使用@append_flow修饰器或在 SQL 接口中使用CREATE FLOW...INSERT INTO子句来创建新的流,例如以来自多个流源的流式处理表为目标。 使用追加流处理如下任务:

  • 添加流式处理源,这些源将数据追加到现有流式处理表中,且无需完全刷新表内容。 例如,你可能有一个表合并了业务所在的每个区域的区域数据。 新区域推出后,你无需执行完全刷新即可将新区域数据添加到该表中。 有关将流媒体来源添加到现有流表的示例,请参阅 示例:从多个 Kafka 主题写入流表
  • 通过追加漏掉的历史数据(回填)来更新流式表。 例如,你有一个由 Apache Kafka 主题写入的现有流式处理表。 此外,表中还存储了历史数据,这些数据只需要插入到流式处理表中一次,并且你无法流式传输数据,因为在插入数据之前,处理包括执行复杂的聚合。 有关回填的示例,请参阅 示例:运行一次性数据回填
  • 合并来自多个源的数据并写入单个流式处理表,而不是在查询中使用 UNION 子句。 如果使用追加流处理而非 UNION,则可以用增量方式更新目标表,而无需运行完全刷新更新。 有关以这种方式完成的联合示例,请参阅示例:使用追加流处理而非 UNION

追加流处理输出的记录的目标可以是现有表或新表。 对于 Python 查询,请使用 create_streaming_table() 函数创建目标表。

以下示例向同一目标添加两个数据流,从而创建两个源表的并集:

Python语言

import dlt

# create a streaming table
dlt.create_streaming_table("customers_us")

# add the first append flow
@dlt.append_flow(target = "customers_us")
def append1():
  return spark.readStream.table("customers_us_west")

# add the second append flow
@dlt.append_flow(target = "customers_us")
def append2():
  return spark.readStream.table("customers_us_east")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;

-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);

-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);

重要

  • 如果需要使用预期定义数据质量约束,请将目标表的预期定义为 create_streaming_table() 函数或现有表定义的一部分。 无法在 @append_flow 定义中定义期望。
  • 流由流名称标识,此名称用于标识流式处理检查点。 使用流名称来标识检查点意味着:
    • 如果管道中的现有流已重命名,则检查点不会进行传递,并且重命名的流实际上是一个全新的流。
    • 不能在管道中重用流名称,因为现有检查点与新的流定义不匹配。

流类型

流式处理表和具体化视图的默认流是追加流。 还可以创建流以从变更数据捕获数据源进行读取。 下表描述了不同类型的流。

流类型 DESCRIPTION
追加 追加流是最常见的流类型,其中源中的新记录会随着每次更新写入目标。 它们对应于结构化流式处理中的追加模式。 可以添加 ONCE 标志,指示一个批查询,除非目标已完全刷新,否则其数据只应插入目标一次。 任意数量的追加流都可以写入特定目标。
默认流(使用目标流式处理表或具体化视图创建)将具有与目标相同的名称。 其他目标没有默认流。
自动 CDC (以前 应用更改 自动 CDC 流引入包含更改数据捕获(CDC)数据的查询。 自动 CDC 流只能面向流式处理表,并且源必须是流式来源(即使在 ONCE 流的情况下也是如此)。 多个自动化 CDC 流可以针对单个流处理表。 充当自动 CDC 流的目标的流式处理表只能由其他自动 CDC 流作为目标。
有关 CDC 数据的详细信息,请参阅 AUTO CDC API:使用 Lakeflow 声明性管道简化更改数据捕获

其他信息

有关流及其用法的详细信息,请参阅以下主题: