pyspark.pipelines 模块(此处别名为dp)模块使用修饰器实现其大部分核心功能。 这些修饰器接受一个函数,该函数定义流查询或批查询并返回一个 Apache Spark 数据帧。 以下语法显示了定义管道数据集的简单示例:
from pyspark import pipelines as dp
@dp.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset
本页概述了在管道中定义数据集的函数和查询。 有关可用修饰器的完整列表,请参阅 管道开发人员参考。
用于定义数据集的函数不应包含与数据集无关的任意 Python 逻辑,包括对第三方 API 的调用。 管道在规划、验证和更新期间多次运行这些函数。 包括任意逻辑可能会导致意外结果。
读取数据以开始数据集定义
用于定义管道数据集的函数通常以 spark.read 或 spark.readStream 作开头。 这些读取操作返回静态或流数据DataFrame对象,用于在获得DataFrame之前应用额外的转换。 返回 DataFrame 的其他 spark操作示例包括 spark.table 或 spark.range。
函数不应引用函数外部定义的数据帧。 尝试引用在不同范围定义的数据帧可能会导致意外行为。 有关用于创建多个表的元编程模式的示例,请参阅 循环中创建 for 表。
以下示例显示了使用批处理或流式处理逻辑读取数据的基本语法:
from pyspark import pipelines as dp
# Batch read on a table
@dp.materialized_view()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")
# Batch read on a path
@dp.materialized_view()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")
# Streaming read on a table
@dp.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")
# Streaming read on a path
@dp.table()
def function_name():
return (spark.read
.format("cloudFiles")
.option("cloudFile.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)
如果需要从外部 REST API 读取数据,请使用 Python 自定义数据源实现此连接。 请参阅 PySpark 自定义数据源。
注释
可以从 Python 数据集合(包括 pandas DataFrame、字典和列表)创建任意 Apache Spark 数据帧。 在开发和测试期间,这些模式可能很有用,但大多数生产管道数据集定义应首先从文件、外部系统或现有表或视图加载数据。
链接转换
管道几乎支持所有 Apache Spark 数据帧转换。 可以在数据集定义函数中包含任意数量的转换,但应确保使用的方法始终返回 DataFrame 对象。
如果你有一个中间转换来驱动多个下游工作负载,但不需要将其具体化为表,可以使用@dp.temporary_view()向管道添加临时视图。 然后,可以在多个下游数据集定义中使用spark.read.table("temp_view_name")来引用此视图。 以下语法演示了此模式:
from pyspark import pipelines as dp
@dp.temporary_view()
def a():
return spark.read.table("source").filter(...)
@dp.materialized_view()
def b():
return spark.read.table("a").groupBy(...)
@dp.materialized_view()
def c():
return spark.read.table("a").groupBy(...)
这可确保在流水线规划过程中,流水线能完全认识到您视图中的转换,并防止与在数据集定义之外运行任意的 Python 代码相关的潜在问题。
在你的函数中,可以将数据帧联结在一起,以创建新的数据帧,而无需将增量结果写入视图、具体化视图或流式处理表,如下所示的示例:
from pyspark import pipelines as dp
@dp.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)
如果所有数据帧都使用批处理逻辑执行其初始读取,则返回结果为静态数据帧。 如果您有任何正在流式传输的查询,那么返回结果将是流式数据帧。
返回数据帧
使用 @dp.table 从流读取的结果创建流表。 使用 @dp.materialized_view 从批量读取结果创建具体化视图。 大多数其他修饰器都适用于流式处理和静态数据帧,而一些修饰器则需要流式处理数据帧。
用于定义数据集的函数必须返回 Spark 数据帧。 切勿使用将文件或表保存或写入到管道数据集代码中的方法。
绝不能在管道代码中使用的 Apache Spark 操作示例:
collect()count()toPandas()save()saveAsTable()start()toTable()
注释
管道还支持使用 Pandas on Spark 来定义数据集函数。 请参阅 Spark 上的 Pandas API。
在 Python 管道中使用 SQL
PySpark 支持 spark.sql 操作员使用 SQL 编写 DataFrame 代码。 在管道源代码中使用此模式时,它会编译为具体化视图或流式处理表。
下面的代码示例等效于对数据集查询逻辑使用 spark.read.table("catalog_name.schema_name.table_name") :
@dp.materialized_view
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")
dlt.read 和 dlt.read_stream (旧版)
旧的 dlt 模块包括由 dlt.read() 和 dlt.read_stream() 引入的函数,是为了支持传统管道发布模式下的功能。 支持这些方法,但 Databricks 建议始终使用 spark.read.table() 和 spark.readStream.table() 函数,原因如下:
- 函数
dlt对读取当前管道外部定义的数据集的支持有限。 - 这些
spark函数支持指定用于读取操作的选项,例如skipChangeCommits。 函数不支持指定dlt选项。 - 模块
dlt已被模块pyspark.pipelines替换。 Databricks 建议在 Python 中编写管道代码时用于from pyspark import pipelines as dp导入pyspark.pipelines。