使用增量实时表转换数据

本文介绍如何使用增量实时表声明数据集的转换,并指定如何通过查询逻辑处理记录。 其中还包含一些常用转换模式的示例,这些模式在生成增量实时表管道时很有用。

可以针对返回数据帧的任何查询定义数据集。 可以使用 Apache Spark 内置操作、UDF、自定义逻辑和 MLflow 模型作为增量实时表管道中的转换。 将数据引入增量实时表管道后,可以针对上游源定义新的数据集,以创建新的流式处理表、具体化视图和视图。

如要了解如何使用 Delta Live Tables 有效地执行有状态处理,请参阅使用水印优化 Delta Live Tables 中的有状态处理

何时使用视图、具体化视图和流式处理表

为确保管道高效且可维护,请在实现管道查询时选择最佳数据集类型。

对于以下情况,请考虑使用视图:

  • 你有一个大型或复杂的查询,希望将其分解为更易于管理的查询。
  • 你想要使用期望来验证中间结果。
  • 你希望降低存储和计算成本,并且不需要具体化查询结果。 由于表已具体化,因此它们需要额外的计算和存储资源。

在以下情况下考虑使用具体化视图:

  • 多个下游查询使用表。 由于视图是按需计算的,因此每次查询视图时都会重新计算视图。
  • 其他管道、作业或查询将使用表。 由于视图未具体化,因此你只能在同一个管道中使用它们。
  • 你想要在开发过程中查看查询结果。 由于表已具体化并可以在管道外部查看和查询,因此在开发过程中使用表可帮助验证计算的正确性。 验证后,将不需要具体化的查询转换为视图。

在以下情况下考虑使用流式处理表:

  • 查询是针对持续或以增量方式增长的数据源定义的。
  • 应以增量方式计算查询结果。
  • 管道需要高吞吐量和低延迟。

注意

流式处理表始终是针对流式处理源定义的。 你还可以将流式处理源与 APPLY CHANGES INTO 结合使用以应用 CDC 源中的更新。 请参阅在 Delta Live Tables 中使用 APPLY CHANGES API 简化变更数据捕获

在单个管道中结合使用流式处理表和具体化视图

流式处理表继承了 Apache Spark 结构化流式处理的处理保证,并配置为处理来自仅追加数据源的查询,其中的新行始终插入到源表中,而不会经过修改。

注意

虽然默认情况下,流式处理表要求使用“仅追加”数据源,但当流式处理源是另一个需要更新或删除的流式处理表时,可以使用 skipChangeCommits 标志改写此行为。

常见的流式处理模式包括引入源数据以在管道中创建初始数据集。 这些初始数据集通常称为 bronze 表,通常用于执行简单的转换。

相比之下,管道中的最终表(通常称为 gold 表)通常需要复杂的聚合,或从作为 APPLY CHANGES INTO 操作目标的源中读取。 由于这些操作本质上是创建更新而不是追加,因此不支持将它们作为流式处理表的输入。 这些转换更适合具体化视图。

通过将流式处理表和具体化视图混合到单个管道中,可以简化管道并避免对原始数据进行代价高昂的重新引入或重新处理,还能充分利用 SQL 来计算经过有效编码和筛选的数据集上的复杂聚合。 以下示例演示了这种类型的混合处理:

注意

这些示例使用自动加载程序从云存储加载文件。 若要在启用了 Unity Catalog 的管道中使用自动加载程序加载文件,必须使用外部位置。 若要详细了解如何将 Unity Catalog 与 Delta Live Tables 配合使用,请参阅将 Unity Catalog 与 Delta Live Tables 管道配合使用

Python

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return dlt.read_stream("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return dlt.read("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM cloud_files(
  "abfss://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH LIVE TABLE live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

详细了解如何使用自动加载程序从 Azure 存储高效读取 JSON 文件,以进行增量处理。

流静态联接

在使用主要静态维度表对连续仅追加数据流进行非规范化时,流静态联接是一个不错的选择。

每次更新管道时,流中的新记录都会与静态表的最新快照联接在一起。 如果在处理流式处理表中的相应数据后在静态表中添加或更新记录,则除非执行完全刷新,否则不会重新计算最终的记录。

在配置为触发执行的管道中,静态表返回更新开始时的结果。 在配置为连续执行的管道中,每次表处理更新时,就会查询静态表的最新版本。

下面是流静态联接的示例:

Python

@dlt.table
def customer_sales():
  return dlt.read_stream("sales").join(dlt.read("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

高效计算聚合

可以使用流式处理表来以增量方式计算简单的分布聚合(例如计数、最小值、最大值或总和)和代数聚合(例如平均值或标准偏差)。 Databricks 建议对组数量有限的查询进行增量聚合,例如,包含 GROUP BY country 子句的查询。 每次更新时仅读取新的输入数据。

在增量实时表管道中使用 MLflow 模型

注意

若要在启用了 Unity Catalog 的管道中使用 MLflow 模型,必须将管道配置为使用 preview 通道。 若要使用 current 通道,必须将管道配置为发布到 Hive 元存储。

可以在增量实时表管道中使用 MLflow 训练的模型。 MLflow 模型在 Azure Databricks 中被视为转换,这意味着,它们将作用于 Spark 数据帧输入并将结果作为 Spark 数据帧返回。 由于增量实时表针对数据帧定义数据集,因此你只需编写几行代码即可将利用 MLflow 的 Apache Spark 工作负载转换为增量实时表。 有关 MLflow 的详细信息,请参阅使用 MLflow 进行 ML 生命周期管理

如果你已有一个调用 MLflow 模型的 Python 笔记本,你可以使用 @dlt.table 修饰器并确保将函数定义为返回转换结果,使此代码适应增量实时表。 增量实时表默认不会安装 MLflow,因此请确保在笔记本顶部添加 %pip install mlflow 并导入 mlflowdlt。 有关增量实时表语法的介绍,请参阅教程:在增量实时表中使用 Python 声明数据管道

若要在增量实时表中使用 MLflow 模型,请完成以下步骤:

  1. 获取 MLflow 模型的运行 ID 和模型名称。 运行 ID 和模型名称用于构造 MLflow 模型的 URI。
  2. 使用 URI 定义 Spark UDF,以加载 MLflow 模型。
  3. 在表定义中调用 UDF 以使用 MLflow 模型。

以下示例演示了此模式的基本语法:

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return dlt.read(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

下面是一个完整的代码示例,它定义一个名为 loaded_model_udf 的 Spark UDF,用于加载一个基于贷款风险数据训练的 MLflow 模型。 用于预测的数据列作为参数传递给该 UDF。 表 loan_risk_predictions 计算 loan_risk_input_data 中每一行的预测。

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return dlt.read("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

保留手动删除或更新

增量实时表允许手动删除或更新表中的记录,以及执行刷新操作以重新计算下游表。

默认情况下,增量实时表会在每次更新管道时根据输入数据重新计算表结果,因此必须确保不会从源数据中重新加载已删除的记录。 将 pipelines.reset.allowed 表属性设置为 false 可防止对表进行刷新,但不能防止对表进行增量写入,或防止新数据流入表中。

下图演示了使用两个流式处理表的示例:

  • raw_user_table 从源中引入原始用户数据。
  • bmi_table 使用 raw_user_table 中的重量和高度以增量方式计算 BMI 评分。

你想要手动删除或更新 raw_user_table 中的用户记录并重新计算 bmi_table

Retain data diagram

以下代码演示了将 pipelines.reset.allowed 表属性设置为 false 以禁用 raw_user_table 完全刷新,以便一直保留所需的更改,但在运行管道更新时重新计算下游表:

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM cloud_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);