增量实时表 Python 语言参考

本文提供了有关 Delta Live Tables Python 编程接口的详细信息。

有关 SQL API 的信息,请参阅增量实时表 SQL 语言参考

有关配置自动加载程序的详细信息,请参阅什么是自动加载程序?

开始之前的准备工作

使用 Delta Live Tables Python 接口实现管道时,需要考虑以下重要事项:

  • 由于在规划和运行管道更新期间会多次调用 Python table()view() 函数,因此不要在这些函数中包含可能产生副作用的代码(例如,修改数据或发送电子邮件的代码)。 为了避免出现意外行为,定义数据集的 Python 函数应该仅包含定义表或视图所需的代码。
  • 若要执行发送电子邮件或与外部监控服务集成等操作,特别是在定义数据集的函数中,请使用事件挂钩。 在定义数据集的函数中实现这些操作将导致出现意外行为。
  • Python tableview 函数必须返回数据帧。 某些对数据帧进行操作的函数不返回数据帧,因此不应使用。 这些操作包括 collect()count()toPandas()save()saveAsTable() 等函数。 由于数据帧转换是在解析完整数据流图后执行的,因此使用此类操作可能会产生意想不到的副作用。 但是,你可以在 tableview 函数定义之外包括这些函数,因为此代码在图形初始化阶段运行一次。

导入 dlt Python 模块

增量实时表 Python 函数在 dlt 模块中定义。 利用 Python API 实现的管道必须导入此模块:

import dlt

创建 Delta Live Tables 具体化视图或流式处理表

在 Python 中,Delta Live Tables 根据定义查询来确定是将数据集更新为具体化视图还是流式处理表。 @table修饰器可用于定义具体化视图和流式处理表。

若要在 Python 中定义具体化视图,请将 @table 应用于对数据源执行静态读取的查询。 要定义流式处理表,请将@table应用于对数据源执行流式读取的查询,或使用create_streaming_table() 函数。 这两种数据集类型具有相同的语法规范,如下所示:

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

创建 Delta Live Tables 视图

要在 Python 中定义视图,请应用 @view 装饰器。 与 @table 修饰器一样,可以将 Delta Live Tables 中的视图用于静态或流式处理数据集。 下面是使用 Python 来定义视图的语法:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

示例:定义表和视图

若要在 Python 中定义表或视图,请将 @dlt.view@dlt.table 修饰器应用于函数。 你可以使用函数名称或 name 参数来分配表或视图名称。 以下示例定义了两个不同的数据集:一个将 JSON 文件作为输入源的 taxi_raw 视图,一个将 taxi_raw 视图作为输入的 filtered_data 表:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

示例:访问在同一管道中定义的数据集

除了从外部数据源读取数据外,还可以使用 Delta Live Tables read() 函数访问同一管道中定义的数据集。 以下示例演示如何使用 read() 函数创建 customers_filtered 数据集:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

还可以使用 spark.table() 函数访问同一管道中定义的数据集。 使用 spark.table() 函数访问管道中定义的数据集时,在函数参数中的数据集名称前加上 LIVE 关键字:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

示例:从元存储中注册的表读取

如果要从 Hive 元存储中注册的表读取数据,请在函数参数中忽略LIVE关键字,并选择性地使用数据库名称来限定表名称:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

若要通过示例来了解如何从 Unity Catalog 表读取数据,请参阅将数据引入 Unity Catalog 管道

示例:使用 spark.sql 访问数据集

你还可以在查询函数中使用 spark.sql 表达式返回数据集。 若要从内部数据集读取数据,请在数据集名称前追加 LIVE.

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

创建一个表,用作流式处理操作的目标

使用create_streaming_table()函数为流式处理操作输出的记录(包括apply_changes()apply_changes_from_snapshot()@append_flow输出记录)创建目标表。

注意

create_target_table()create_streaming_live_table() 函数已弃用。 Databricks 建议更新现有代码以使用 create_streaming_table() 函数。

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
参数
name

类型:str

表名称。

此参数是必需的。
comment

类型:str

表的可选说明。
spark_conf

类型:dict

用于执行此查询的 Spark 配置的可选列表。
table_properties

类型:dict

表的表属性可选列表。
partition_cols

类型:array

包含一列或多列的可选列表,用于对表进行分区。
path

类型:str

表数据的可选存储位置。 如果未设置,系统默认为管道存储位置。
schema

类型:strStructType

表的可选架构定义。 架构可以定义为 SQL DDL 字符串,或使用 Python 定义
StructType
expect_all
expect_all_or_drop
expect_all_or_fail

类型:dict

表的可选数据质量约束。 请参阅多个期望

控制表的具体化方式

表还提供对其具体化的额外控制:

  • 指定如何使用 partition_cols 对表进行分区。 可以使用分区来加快查询速度。
  • 可以在定义视图或表时设置表属性。 请参阅 Delta Live Tables 表属性
  • 使用 path 设置为表数据设置存储位置。 默认情况下,如果未设置 path,表数据将存储在管道存储位置中。
  • 可在架构定义中使用生成的列。 请参阅示例:指定架构和分区列

注意

对于小于 1 TB 的表,Databricks 建议让增量实时表控制数据组织方式。 除非预期表会增长到超过 1 TB,否则不应指定分区列。

示例:指定架构和分区列

可以选择性地使用 Python StructType 或 SQL DDL 字符串指定表架构。 如果使用 DDL 字符串指定了表架构,则定义可以包括生成的列

以下示例使用一个通过 Python StructType 指定的架构创建名为 sales 的表:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

以下示例使用 DDL 字符串指定表的架构,定义生成的列,并定义分区列:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

默认情况下,如果未指定架构,则增量实时表将从 table 定义推断架构。

将流式处理表配置为忽略源流式处理表中的更改

注意

  • skipChangeCommits 标志仅适用于使用 option() 函数的 spark.readStream。 不能在 dlt.read_stream() 函数中使用此标志。
  • 当源流式处理表定义为 apply_changes() 函数的目标时,无法使用 skipChangeCommits 标志。

默认情况下,流式处理表需要“仅追加”源。 如果一个流式处理表使用另一个流式处理表作为源,而源流式处理表需要执行更新或删除操作(例如 GDPR 的“被遗忘权”处理),可以在读取源流式处理表时设置 skipChangeCommits 标志来忽略那些更改。 有关此标志的详细信息,请参阅忽略更新和删除

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Python Delta Live Tables 属性

下表描述了在使用 Delta Live Tables 定义表和视图时可以指定的选项和属性:

@table 或 @view
name

类型:str

表或视图的可选名称。 如果未定义,将使用函数名称作为表名或视图名称。
comment

类型:str

表的可选说明。
spark_conf

类型:dict

用于执行此查询的 Spark 配置的可选列表。
table_properties

类型:dict

表的表属性可选列表。
path

类型:str

表数据的可选存储位置。 如果未设置,系统默认为管道存储位置。
partition_cols

类型:a collection of str

包含一列或多列的可选集合(例如,list),用于对表进行分区。
schema

类型:strStructType

表的可选架构定义。 架构可以定义为 SQL DDL 字符串,或使用 Python 定义
StructType
temporary

类型:bool

创建表,但不发布表的元数据。 temporary 关键字指示 Delta Live Tables 创建可用于管道但不应在管道外部访问的表。 为了缩短处理时间,临时表会在创建它的管道的生存期内持久保留,而不仅仅是一次更新。

默认值为“False”。
表或视图定义
def <function-name>()

用于定义数据集的 Python 函数。 如果未设置 name 参数,则使用 <function-name> 作为目标数据集名称。
query

一个 Spark SQL 语句,它返回 Spark Dataset 或 Koalas DataFrame。

使用 dlt.read()spark.table() 从同一管道中定义的数据集执行完整读取操作。 使用 spark.table() 函数从同一管道中定义的数据集读取数据时,在函数参数中的数据集名称前加上 LIVE 关键字。 例如,从名为 customers 的数据集读取数据:

spark.table("LIVE.customers")

还可以使用 spark.table() 函数从元存储中注册的表中读取数据,方法是省略 LIVE 关键字,并选择性地使用数据库名称限定表名称:

spark.table("sales.customers")

使用 dlt.read_stream() 从同一管道中定义的数据集执行流式读取操作。

使用 spark.sql 函数定义 SQL 查询,以创建返回数据集。

使用 PySpark 语法通过 Python 定义 Delta Live Tables 查询。
预期
@expect("description", "constraint")

声明由以下参数确定的数据质量约束:
description。 如果某行违反了预期,则在目标数据集中包含该行。
@expect_or_drop("description", "constraint")

声明由以下参数确定的数据质量约束:
description。 如果某行违反了预期,则从目标数据集中删除该行。
@expect_or_fail("description", "constraint")

声明由以下参数确定的数据质量约束:
description。 如果某行违反了预期,则立即停止执行。
@expect_all(expectations)

声明一个或多个数据质量约束。
expectations 是一个 Python 字典,其中的键是预期说明,值是预期约束。 如果某行违反了其中一个预期,则在目标数据集中包含该行。
@expect_all_or_drop(expectations)

声明一个或多个数据质量约束。
expectations 是一个 Python 字典,其中的键是预期说明,值是预期约束。 如果某行违反了任何预期,则从目标数据集中删除该行。
@expect_all_or_fail(expectations)

声明一个或多个数据质量约束。
expectations 是一个 Python 字典,其中的键是预期说明,值是预期约束。 如果某行违反了任何预期,则立即停止执行。

在增量实时表中使用 Python 从更改源进行变更数据捕获

使用 Python API 中的apply_changes()函数,以使用增量实时变更改数据捕获 (CDC) 功能处理更改数据源 (CDF) 中的源数据。

重要

必须声明一个要向其应用更改的目标流式处理表。 可以选择为目标表指定架构。 指定apply_changes()目标表的架构时,必须包含具有与sequence_by字段相同数据类型的__START_AT__END_AT列。

要创建所需的目标表,可以在增量实时表 Python 接口中使用create_streaming_table()函数。

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

注意

对于APPLY CHANGES处理,INSERTUPDATE事件的默认行为是从源更新插入 CDC 事件:更新目标表中与指定的键匹配的所有行,或者当目标表中不存在某个匹配的记录时插入新行。 可以使用 APPLY AS DELETE WHEN 条件指定对 DELETE 事件的处理。

要了解有关使用更改源进行 CDC 处理的详细信息,请参阅APPLY CHANGES API:使用增量实时表简化变更数据捕获。 有关使用apply_changes()函数的示例,请参阅示例:使用 CDF 源数据处理 SCD 类型 1 和 SCD 类型 2

重要

必须声明一个要向其应用更改的目标流式处理表。 可以选择为目标表指定架构。 指定apply_changes目标表的架构时,必须包含具有与sequence_by字段相同数据类型的__START_AT__END_AT列。

请参阅APPLY CHANGES API:使用增量实时表简化变更数据捕获

参数
target

类型:str

要更新的表的名称。 可以在执行 apply_changes() 函数之前使用 create_streaming_table() 函数创建目标表。

此参数是必需的。
source

类型:str

包含 CDC 记录的数据源。

此参数是必需的。
keys

类型:list

唯一标识源数据中的行的列或列组合。 这用于标识哪些 CDC 事件适用于目标表中的特定记录。

可以指定以下任一项:

* 字符串列表:["userId", "orderId"]
* Spark SQL col() 函数列表:[col("userId"), col("orderId"]

col() 函数的参数不能包含限定符。 例如,可以使用 col(userId),但不能使用 col(source.userId)

此参数是必需的。
sequence_by

类型:strcol()

指定源数据中 CDC 事件的逻辑顺序的列名。 增量实时表使用此排序来处理乱序到达的更改事件。

可以指定以下任一项:

* 字符串:"sequenceNum"
* Spark SQL col() 函数:col("sequenceNum")

col() 函数的参数不能包含限定符。 例如,可以使用 col(userId),但不能使用 col(source.userId)

此参数是必需的。
ignore_null_updates

类型:bool

允许引入包含目标列子集的更新。 当 CDC 事件匹配现有行并且ignore_null_updatesTrue时,具有null的列会在目标中保留其现有值。 这也适用于值为 null 的嵌套列。 当ignore_null_updatesFalse时,会使用null值覆盖现有值。

此参数是可选的。

默认值为 False
apply_as_deletes

类型:strexpr()

指定何时应将 CDC 事件视为 DELETE 而不是更新插入。 为了处理乱序数据,被删除的行被暂时保留为基础 Delta 表中的无效标记,并在元存储中创建一个视图来筛选掉这些无效标记。 保留间隔可以配置为
pipelines.cdc.tombstoneGCThresholdInSeconds 表属性

可以指定以下任一项:

* 字符串:"Operation = 'DELETE'"
* Spark SQL expr() 函数:expr("Operation = 'DELETE'")

此参数是可选的。
apply_as_truncates

类型:strexpr()

指定何时应将 CDC 事件视为完整表 TRUNCATE。 由于此子句会触发目标表的完全截断,因此应仅将其用于需要此功能的特定用例。

仅 SCD 类型 1 支持 apply_as_truncates 参数。 SCD 类型 2 不支持截断操作。

可以指定以下任一项:

* 字符串:"Operation = 'TRUNCATE'"
* Spark SQL expr() 函数:expr("Operation = 'TRUNCATE'")

此参数是可选的。
column_list

except_column_list

类型:list

要包含在目标表中的列的子集。 使用 column_list 指定要包含的列的完整列表。 使用 except_column_list 指定要排除的列。 可以将任一值声明为字符串列表或 Spark SQL col() 函数:

* column_list = ["userId", "name", "city"]
* column_list = [col("userId"), col("name"), col("city")]
* except_column_list = ["operation", "sequenceNum"]
* except_column_list = [col("operation"), col("sequenceNum")

col() 函数的参数不能包含限定符。 例如,可以使用 col(userId),但不能使用 col(source.userId)

此参数是可选的。

当没有 column_listexcept_column_list 参数传递给函数时,默认设置是包含目标表中的所有列。
stored_as_scd_type

类型:strint

将记录存储为 SCD 类型 1 还是 SCD 类型 2。

对于 SCD 类型 1,将其设置为 1;对于 SCD 类型 2,将其设置为 2

此子句是可选的。

默认值为 SCD 类型 1。
track_history_column_list

track_history_except_column_list

类型:list

要在目标表中跟踪其历史记录的输出列子集。 使用 track_history_column_list 指定要跟踪的列的完整列表。 使用
使用 track_history_except_column_list 指定要从跟踪中排除的列。 可将任一值声明为字符串列表或 Spark SQL col() 函数:- track_history_column_list = ["userId", "name", "city"]。 - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

col() 函数的参数不能包含限定符。 例如,可以使用 col(userId),但不能使用 col(source.userId)

此参数是可选的。

默认设置是未将 track_history_column_list
track_history_except_column_list 参数传递给函数时包含目标表中的所有列。

在增量实时表中使用 Python 从数据库快照进行变更数据捕获

重要

APPLY CHANGES FROM SNAPSHOT API 为公共预览版

使用 Python API 中的apply_changes_from_snapshot()函数,以使用增量实时表变更数据捕获 (CDC) 功能处理数据库快照中的源数据。

重要

必须声明一个要向其应用更改的目标流式处理表。 可以选择为目标表指定架构。 在指定 apply_changes_from_snapshot() 目标表的架构时,还必须包含具有与 sequence_by 字段相同数据类型的 __START_AT__END_AT 列。

要创建所需的目标表,可以在增量实时表 Python 接口中使用create_streaming_table()函数。

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any, # Not sure this is valid
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

注意

对于APPLY CHANGES FROM SNAPSHOT处理,默认行为是在目标中不存在具有相同键的匹配记录时插入新行。 如果匹配记录确实存在,则只有当行中的任何值都已更改时,才会更新该记录。 删除目标中存在键但源中不再存在键的行。

要了解有关使用快照进行 CDC 处理的详细信息,请参阅APPLY CHANGES API:使用增量实时表简化变更数据捕获。 有关使用apply_changes_from_snapshot()函数的示例,请参阅定期快照引入历史快照引入示例。

参数
target

类型:str

要更新的表的名称。 可以在运行apply_changes()函数之前使用create_streaming_table()函数创建目标表。

此参数是必需的。
source

类型:strlambda function

要定期拍摄快照的表或视图的名称或返回要处理的快照 DataFrame 的 Python lambda 函数和快照版本。 请参阅实现源参数

此参数是必需的。
keys

类型:list

唯一标识源数据中的行的列或列组合。 这用于标识哪些 CDC 事件适用于目标表中的特定记录。

可以指定以下任一项:

* 字符串列表:["userId", "orderId"]
* Spark SQL col() 函数列表:[col("userId"), col("orderId"]

col() 函数的参数不能包含限定符。 例如,可以使用 col(userId),但不能使用 col(source.userId)

此参数是必需的。
stored_as_scd_type

类型:strint

将记录存储为 SCD 类型 1 还是 SCD 类型 2。

对于 SCD 类型 1,将其设置为 1;对于 SCD 类型 2,将其设置为 2

此子句是可选的。

默认值为 SCD 类型 1。
track_history_column_list

track_history_except_column_list

类型:list

要在目标表中跟踪其历史记录的输出列子集。 使用 track_history_column_list 指定要跟踪的列的完整列表。 使用
使用 track_history_except_column_list 指定要从跟踪中排除的列。 可将任一值声明为字符串列表或 Spark SQL col() 函数:- track_history_column_list = ["userId", "name", "city"]。 - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

col() 函数的参数不能包含限定符。 例如,可以使用 col(userId),但不能使用 col(source.userId)

此参数是可选的。

默认设置是未将 track_history_column_list
track_history_except_column_list 参数传递给函数时包含目标表中的所有列。

实现source参数

apply_changes_from_snapshot()函数包括source参数。 对于处理历史快照,source参数应为 Python lambda 函数,该函数将两个值返回到apply_changes_from_snapshot()函数:包含要处理的快照数据的 Python DataFrame 和快照版本。

以下是 lambda 函数的签名:

lambda Any => Optional[(DataFrame, Any)]
  • lambda 函数的参数是最近处理的快照版本。
  • lambda 函数的返回值是None或两个值的元组:元组的第一个值是包含要处理的快照的 DataFrame。 元组的第二个值是表示快照逻辑顺序的快照版本。

实现和调用 lambda 函数的示例:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

每次触发包含apply_changes_from_snapshot()函数的管道时,增量实时表运行时都会执行以下步骤:

  1. 运行next_snapshot_and_version函数以加载下一个快照 DataFrame 和相应的快照版本。
  2. 如果未返回 DataFrame,则运行会终止,管道更新会标记为已完成。
  3. 检测新快照中的更改,并以增量方式将其应用于目标表。
  4. 返回到步骤 #1 以加载下一个快照及其版本。

限制

Delta Live Tables Python 接口具有以下限制:

不支持 pivot() 函数。 Spark 中的 pivot 操作需要预先加载输入数据以计算输出架构。 Delta Live Tables 不支持此功能。