DataFrame を外部ストレージ システム (ファイル システム、キー値ストアなど) に書き込むのに使用されるインターフェイス。
Spark Connect のサポート
構文
このインターフェイスにアクセスするには、 DataFrame.write を使用します。
メソッド
| メソッド | 説明 |
|---|---|
mode(saveMode) |
データまたはテーブルが既に存在する場合の動作を指定します。 |
format(source) |
基になる出力データ ソースを指定します。 |
option(key, value) |
基になるデータ ソースの出力オプションを追加します。 |
options(**options) |
基になるデータ ソースの出力オプションを追加します。 |
partitionBy(*cols) |
ファイル システム上の指定された列で出力をパーティション分割します。 |
bucketBy(numBuckets, col, *cols) |
指定された列で出力をバケットします。 |
sortBy(col, *cols) |
各バケットの出力を、ファイル システム上の指定された列で並べ替えます。 |
clusterBy(*cols) |
指定された列ごとにデータをクラスター化して、クエリのパフォーマンスを最適化します。 |
save(path, format, mode, partitionBy, **options) |
DataFrame の内容をデータ ソースに保存します。 |
insertInto(tableName, overwrite) |
指定したテーブルに DataFrame の内容を挿入します。 |
saveAsTable(name, format, mode, partitionBy, **options) |
DataFrame の内容を指定したテーブルとして保存します。 |
json(path, mode, compression, ...) |
指定したパスの JSON 形式で DataFrame のコンテンツを保存します。 |
parquet(path, mode, partitionBy, compression) |
指定したパスの Parquet 形式で DataFrame の内容を保存します。 |
text(path, compression, lineSep) |
指定したパスのテキスト ファイルに DataFrame の内容を保存します。 |
csv(path, mode, compression, sep, ...) |
指定したパスの CSV 形式で DataFrame の内容を保存します。 |
xml(path, rowTag, mode, ...) |
指定したパスの XML 形式で DataFrame の内容を保存します。 |
orc(path, mode, partitionBy, compression) |
指定したパスの ORC 形式で DataFrame の内容を保存します。 |
excel(path, mode, dataAddress, headerRows) |
DataFrame の内容を Excel 形式で指定したパスに保存します。 |
jdbc(url, table, mode, properties) |
DataFrame の内容を JDBC 経由で外部データベース テーブルに保存します。 |
保存モード
mode() メソッドでは、次のオプションがサポートされています。
- append: この DataFrame の内容を既存のデータに追加します。
- overwrite: 既存のデータを上書きします。
- error または errorifexists: データが既に存在する場合は例外をスローします (既定値)。
- ignore: データが既に存在する場合は、この操作を自動的に無視します。
例示
さまざまなデータ ソースへの書き込み
# Access DataFrameWriter through DataFrame
df = spark.createDataFrame([{"name": "Alice", "age": 30}])
df.write
# Write to JSON file
df.write.json("path/to/output.json")
# Write to CSV file with options
df.write.option("header", "true").csv("path/to/output.csv")
# Write to Parquet file
df.write.parquet("path/to/output.parquet")
# Write to a table
df.write.saveAsTable("table_name")
形式と保存の使用
# Specify format explicitly
df.write.format("json").save("path/to/output.json")
# With options
df.write.format("csv") \
.option("header", "true") \
.option("compression", "gzip") \
.save("path/to/output.csv")
保存モードの指定
# Overwrite existing data
df.write.mode("overwrite").parquet("path/to/output.parquet")
# Append to existing data
df.write.mode("append").parquet("path/to/output.parquet")
# Ignore if data exists
df.write.mode("ignore").json("path/to/output.json")
# Error if data exists (default)
df.write.mode("error").csv("path/to/output.csv")
データのパーティション分割
# Partition by single column
df.write.partitionBy("year").parquet("path/to/output.parquet")
# Partition by multiple columns
df.write.partitionBy("year", "month").parquet("path/to/output.parquet")
# Partition with bucketing
df.write \
.bucketBy(10, "id") \
.sortBy("age") \
.saveAsTable("bucketed_table")
JDBC への書き込み
# Write to database table
df.write.jdbc(
url="jdbc:postgresql://localhost:5432/mydb",
table="users",
mode="overwrite",
properties={"user": "myuser", "password": "mypassword"}
)
メソッドチェーン
# Chain multiple configuration methods
df.write \
.format("parquet") \
.mode("overwrite") \
.option("compression", "snappy") \
.partitionBy("year", "month") \
.save("path/to/output")
テーブルへの書き込み
# Save as managed table
df.write.saveAsTable("my_table")
# Save as managed table with options
df.write \
.mode("overwrite") \
.format("parquet") \
.partitionBy("year") \
.saveAsTable("partitioned_table")
# Insert into existing table
df.write.insertInto("existing_table")
# Insert into existing table with overwrite
df.write.insertInto("existing_table", overwrite=True)