用于将数据帧写入外部存储系统(例如文件系统、键值存储等)的接口。
支持 Spark Connect
Syntax
用于 DataFrame.write 访问此接口。
方法
| 方法 | 说明 |
|---|---|
mode(saveMode) |
指定数据或表已存在时的行为。 |
format(source) |
指定基础输出数据源。 |
option(key, value) |
为基础数据源添加输出选项。 |
options(**options) |
添加基础数据源的输出选项。 |
partitionBy(*cols) |
按文件系统上的给定列对输出进行分区。 |
bucketBy(numBuckets, col, *cols) |
将给定列的输出存储桶。 |
sortBy(col, *cols) |
按文件系统上的给定列对每个存储桶中的输出进行排序。 |
clusterBy(*cols) |
按给定列对数据进行聚类分析,以优化查询性能。 |
save(path, format, mode, partitionBy, **options) |
将 DataFrame 的内容保存到数据源。 |
insertInto(tableName, overwrite) |
将 DataFrame 的内容插入指定表。 |
saveAsTable(name, format, mode, partitionBy, **options) |
将 DataFrame 的内容保存为指定的表。 |
json(path, mode, compression, ...) |
将 DataFrame 的内容以 JSON 格式保存在指定路径处。 |
parquet(path, mode, partitionBy, compression) |
将 DataFrame 的内容以 Parquet 格式保存在指定的路径上。 |
text(path, compression, lineSep) |
将 DataFrame 的内容保存在指定路径的文本文件中。 |
csv(path, mode, compression, sep, ...) |
将 DataFrame 的内容以 CSV 格式保存在指定路径处。 |
xml(path, rowTag, mode, ...) |
将 DataFrame 的内容以 XML 格式保存在指定的路径上。 |
orc(path, mode, partitionBy, compression) |
将 DataFrame 的内容以 ORC 格式保存在指定路径处。 |
excel(path, mode, dataAddress, headerRows) |
将 DataFrame 的内容以 Excel 格式保存在指定的路径处。 |
jdbc(url, table, mode, properties) |
通过 JDBC 将 DataFrame 的内容保存到外部数据库表。 |
保存模式
该方法 mode() 支持以下选项:
- append:将此 DataFrame 的内容追加到现有数据。
- overwrite:覆盖现有数据。
- error 或 errorifexists:如果数据已存在(默认值),则引发异常。
- ignore:如果数据已存在,则以无提示方式忽略此作。
示例
写入到不同的数据源
# Access DataFrameWriter through DataFrame
df = spark.createDataFrame([{"name": "Alice", "age": 30}])
df.write
# Write to JSON file
df.write.json("path/to/output.json")
# Write to CSV file with options
df.write.option("header", "true").csv("path/to/output.csv")
# Write to Parquet file
df.write.parquet("path/to/output.parquet")
# Write to a table
df.write.saveAsTable("table_name")
使用格式并保存
# Specify format explicitly
df.write.format("json").save("path/to/output.json")
# With options
df.write.format("csv") \
.option("header", "true") \
.option("compression", "gzip") \
.save("path/to/output.csv")
指定保存模式
# Overwrite existing data
df.write.mode("overwrite").parquet("path/to/output.parquet")
# Append to existing data
df.write.mode("append").parquet("path/to/output.parquet")
# Ignore if data exists
df.write.mode("ignore").json("path/to/output.json")
# Error if data exists (default)
df.write.mode("error").csv("path/to/output.csv")
对数据进行分区
# Partition by single column
df.write.partitionBy("year").parquet("path/to/output.parquet")
# Partition by multiple columns
df.write.partitionBy("year", "month").parquet("path/to/output.parquet")
# Partition with bucketing
df.write \
.bucketBy(10, "id") \
.sortBy("age") \
.saveAsTable("bucketed_table")
写入 JDBC
# Write to database table
df.write.jdbc(
url="jdbc:postgresql://localhost:5432/mydb",
table="users",
mode="overwrite",
properties={"user": "myuser", "password": "mypassword"}
)
方法链接
# Chain multiple configuration methods
df.write \
.format("parquet") \
.mode("overwrite") \
.option("compression", "snappy") \
.partitionBy("year", "month") \
.save("path/to/output")
写入表
# Save as managed table
df.write.saveAsTable("my_table")
# Save as managed table with options
df.write \
.mode("overwrite") \
.format("parquet") \
.partitionBy("year") \
.saveAsTable("partitioned_table")
# Insert into existing table
df.write.insertInto("existing_table")
# Insert into existing table with overwrite
df.write.insertInto("existing_table", overwrite=True)