使用增量实时表转换数据

本文介绍如何使用增量实时表声明数据集的转换,并指定如何通过查询逻辑处理记录。 它还包含用于生成 Delta Live Tables 管道的常见转换模式的示例。

可以针对返回数据帧的任何查询定义数据集。 可以使用 Apache Spark 内置操作、UDF、自定义逻辑和 MLflow 模型作为增量实时表管道中的转换。 将数据引入增量实时表管道后,可以针对上游源定义新数据集,以创建新的流式处理表、具体化视图和视图。

如要了解如何使用 Delta Live Tables 有效地执行有状态处理,请参阅使用水印优化 Delta Live Tables 中的有状态处理

何时使用视图、具体化视图和流式表

实现管道查询时,请选择最佳数据集类型,以确保它们高效且可维护。

请考虑使用视图执行以下操作:

  • 将要轻松管理的大型查询或复杂查询分解。
  • 使用预期验证中间结果。
  • 减少不需要保留的结果的存储和计算成本。 由于表已具体化,因此它们需要额外的计算和存储资源。

在以下情况下考虑使用具体化视图:

  • 多个下游查询使用表。 由于视图是按需计算的,因此每次查询视图时都会重新计算视图。
  • 其他管道、作业或查询将使用表。 由于视图未具体化,因此你只能在同一个管道中使用它们。
  • 你想要在开发过程中查看查询结果。 由于表已具体化并可以在管道外部查看和查询,因此在开发过程中使用表可帮助验证计算的正确性。 验证后,将不需要具体化的查询转换为视图。

在以下情况下考虑使用流式表:

  • 查询是针对持续或以增量方式增长的数据源定义的。
  • 应以增量方式计算查询结果。
  • 管道需要高吞吐量和低延迟。

注意

流式处理表始终是针对流式处理源定义的。 你还可以将流式处理源与 APPLY CHANGES INTO 结合使用以应用 CDC 源中的更新。 请参阅 APPLY CHANGES API:使用增量实时表简化变更数据捕获

从目标架构中排除表

如果必须计算不适合外部使用的中间表,则可以阻止它们使用 TEMPORARY 关键字发布到架构。 临时表仍根据增量实时表语义存储和处理数据,但不应在当前管道外部访问。 临时表在创建它的管道的生存期内保持不变。 使用以下语法声明临时表:

SQL

CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;

Python

@dlt.table(
  temporary=True)
def temp_table():
  return ("...")

在单个管道中结合使用流式表和具体化视图

流式表继承了 Apache Spark 结构化流式处理的处理保证,并配置为处理来自仅追加数据源的查询,其中的新行始终插入到源表中,而不会经过修改。

注意

虽然默认情况下,流式表要求使用“仅追加”数据源,但当流式处理源是另一个需要更新或删除的流式表时,可以使用 skipChangeCommits 标志改写此行为。

常见的流式处理模式涉及引入源数据以在管道中创建初始数据集。 这些初始数据集通常称为 bronze 表,通常用于执行简单的转换。

相比之下,管道中的最终表通常称为黄金表,通常需要复杂的聚合或从操作的目标 APPLY CHANGES INTO 读取。 由于这些操作本质上是创建更新而不是追加,因此不支持将它们作为流式表的输入。 这些转换更适合具体化视图。

通过将流式表和具体化视图混合到单个管道中,可以简化管道并避免对原始数据进行代价高昂的重新引入或重新处理,还能充分利用 SQL 来计算经过有效编码和筛选的数据集上的复杂聚合。 以下示例演示了这种类型的混合处理:

注意

这些示例使用自动加载程序从云存储加载文件。 若要在启用了 Unity Catalog 的管道中使用自动加载程序加载文件,必须使用外部位置。 若要详细了解如何将 Unity Catalog 与 Delta Live Tables 配合使用,请参阅将 Unity Catalog 与 Delta Live Tables 管道配合使用

Python

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return dlt.read_stream("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return dlt.read("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM read_files(
  "abfss://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

详细了解如何使用 自动加载程序 从 Azure 存储增量引入 JSON 文件。

流静态联接

在使用主要静态维度表对连续仅追加数据流进行非规范化时,流静态联接是一个不错的选择。

每次更新管道时,流中的新记录都会与静态表的最新快照联接在一起。 如果在处理流式表中的相应数据后在静态表中添加或更新记录,则除非执行完全刷新,否则不会重新计算最终的记录。

在配置为触发执行的管道中,静态表返回更新开始时的结果。 在配置为连续执行的管道中,每次表处理更新时都会查询最新版本的静态表。

下面是流静态联接的示例:

Python

@dlt.table
def customer_sales():
  return dlt.read_stream("sales").join(dlt.read("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

高效计算聚合

可以使用流式表来以增量方式计算简单的分布聚合(例如计数、最小值、最大值或总和)和代数聚合(例如平均值或标准偏差)。 Databricks 建议对具有有限组的查询(例如具有 GROUP BY country 子句的查询)进行增量聚合。 每次更新时仅读取新的输入数据。

若要详细了解如何编写执行增量聚合的 Delta Live Tables 查询,请参阅使用水印执行窗口聚合

在增量实时表管道中使用 MLflow 模型

注意

若要在启用了 Unity Catalog 的管道中使用 MLflow 模型,必须将管道配置为使用 preview 通道。 若要使用 current 通道,必须将管道配置为发布到 Hive 元存储。

可以在增量实时表管道中使用 MLflow 训练的模型。 MLflow 模型在 Azure Databricks 中被视为转换,这意味着,它们将作用于 Spark 数据帧输入并将结果作为 Spark 数据帧返回。 由于 Delta Live Tables 根据数据帧定义数据集,因此,只需几行代码,即可将使用 MLflow 的 Apache Spark 工作负载转换为增量实时表。 有关 MLflow 的详细信息,请参阅使用 MLflow 进行 ML 生命周期管理

如果你已有一个调用 MLflow 模型的 Python 笔记本,你可以使用 @dlt.table 修饰器并确保将函数定义为返回转换结果,使此代码适应增量实时表。 增量实时表默认不安装 MLflow,因此请检查是否%pip install mlflow导入并dlt位于mlflow笔记本顶部。 有关增量实时表语法的介绍,请参阅教程:使用 Python 实现增量实时表管道

若要在增量实时表中使用 MLflow 模型,请完成以下步骤:

  1. 获取 MLflow 模型的运行 ID 和模型名称。 运行 ID 和模型名称用于构造 MLflow 模型的 URI。
  2. 使用 URI 定义 Spark UDF,以加载 MLflow 模型。
  3. 在表定义中调用 UDF 以使用 MLflow 模型。

以下示例演示了此模式的基本语法:

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return dlt.read(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

下面是一个完整的代码示例,它定义一个名为 loaded_model_udf 的 Spark UDF,用于加载一个基于贷款风险数据训练的 MLflow 模型。 用于预测的数据列作为参数传递给该 UDF。 表 loan_risk_predictions 计算 loan_risk_input_data 中每一行的预测。

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return dlt.read("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

保留手动删除或更新

增量实时表允许手动删除或更新表中的记录,以及执行刷新操作以重新计算下游表。

默认情况下,增量实时表会在每次更新管道时根据输入数据重新计算表结果,因此必须确保不会从源数据中重新加载已删除的记录。 pipelines.reset.allowed设置表属性以防止false刷新表,但不会阻止对表或新数据的增量写入流向表。

下图演示了使用两个流式表的示例:

  • raw_user_table 从源中引入原始用户数据。
  • bmi_table 使用 raw_user_table 中的重量和高度以增量方式计算 BMI 评分。

你想要手动删除或更新 raw_user_table 中的用户记录并重新计算 bmi_table

保留数据图

以下代码演示了将 pipelines.reset.allowed 表属性设置为 false 以禁用 raw_user_table 完全刷新,以便一直保留所需的更改,但在运行管道更新时重新计算下游表:

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM read_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);