用于使用 v2 API 将数据帧写入外部存储的接口。
对于 Databricks 表和 Delta Lake 的大多数用例,DataFrameWriterV2 提供比原始 DataFrameWriter 更强大、更灵活的选项:
- 更好的表属性支持
- 对分区进行更精细的控制
- 条件覆盖功能
- 支持聚类分析
- 用于创建或替换作的更清晰的语义
支持 Spark Connect
Syntax
用于 DataFrame.writeTo(table) 访问此接口。
方法
| 方法 | 说明 |
|---|---|
using(provider) |
指定基础输出数据源的提供程序。 |
option(key, value) |
添加写入选项。 例如,若要创建托管表: df.writeTo("test").using("delta").option("path", "s3://test").createOrReplace(). |
options(**options) |
添加写入选项。 |
tableProperty(property, value) |
添加表属性。 例如,用于 tableProperty("location", "s3://test") 创建 EXTERNAL(非托管)表。 |
partitionedBy(col, *cols) |
使用给定的列或转换对创建、createOrReplace 创建的输出表进行分区或替换。 |
clusterBy(col, *cols) |
按给定列对数据进行聚类分析,以优化查询性能。 |
create() |
从数据帧的内容创建新表。 |
replace() |
将现有表替换为数据帧的内容。 |
createOrReplace() |
创建新表或将现有表替换为数据帧的内容。 |
append() |
将数据帧的内容追加到输出表中。 |
overwrite(condition) |
覆盖与输出表中数据帧内容匹配的给定筛选条件的行。 |
overwritePartitions() |
覆盖数据帧包含至少一行且包含输出表中数据帧内容的所有分区。 |
示例
创建新表
# Create a new table with DataFrame contents
df = spark.createDataFrame([{"name": "Alice", "age": 30}])
df.writeTo("my_table").create()
# Create with a specific provider
df.writeTo("my_table").using("parquet").create()
对数据进行分区
# Partition by single column
df.writeTo("my_table") \
.partitionedBy("year") \
.create()
# Partition by multiple columns
df.writeTo("my_table") \
.partitionedBy("year", "month") \
.create()
# Partition using transform functions
from pyspark.sql.functions import years, months, days
df.writeTo("my_table") \
.partitionedBy(years("date"), months("date")) \
.create()
设置表属性
# Add table properties
df.writeTo("my_table") \
.tableProperty("key1", "value1") \
.tableProperty("key2", "value2") \
.create()
使用选项
# Add write options
df.writeTo("my_table") \
.option("compression", "snappy") \
.option("maxRecordsPerFile", "10000") \
.create()
# Add multiple options at once
df.writeTo("my_table") \
.options(compression="snappy", maxRecordsPerFile="10000") \
.create()
聚类分析数据
# Cluster by columns for query optimization
df.writeTo("my_table") \
.clusterBy("user_id", "timestamp") \
.create()
替换操作
# Replace existing table
df.writeTo("my_table") \
.using("parquet") \
.replace()
# Create or replace (safe operation)
df.writeTo("my_table") \
.using("parquet") \
.createOrReplace()
追加作
# Append to existing table
df.writeTo("my_table").append()
覆盖作
from pyspark.sql.functions import col
# Overwrite specific rows based on condition
df.writeTo("my_table") \
.overwrite(col("date") == "2025-01-01")
# Overwrite entire partitions
df.writeTo("my_table") \
.overwritePartitions()
方法链接
# Combine multiple configurations
df.writeTo("my_table") \
.using("parquet") \
.option("compression", "snappy") \
.tableProperty("description", "User data table") \
.partitionedBy("year", "month") \
.clusterBy("user_id") \
.createOrReplace()