次の方法で共有


DataFrameWriter クラス

DataFrame を外部ストレージ システム (ファイル システム、キー値ストアなど) に書き込むのに使用されるインターフェイス。

Spark Connect のサポート

構文

このインターフェイスにアクセスするには、 DataFrame.write を使用します。

メソッド

メソッド 説明
mode(saveMode) データまたはテーブルが既に存在する場合の動作を指定します。
format(source) 基になる出力データ ソースを指定します。
option(key, value) 基になるデータ ソースの出力オプションを追加します。
options(**options) 基になるデータ ソースの出力オプションを追加します。
partitionBy(*cols) ファイル システム上の指定された列で出力をパーティション分割します。
bucketBy(numBuckets, col, *cols) 指定された列で出力をバケットします。
sortBy(col, *cols) 各バケットの出力を、ファイル システム上の指定された列で並べ替えます。
clusterBy(*cols) 指定された列ごとにデータをクラスター化して、クエリのパフォーマンスを最適化します。
save(path, format, mode, partitionBy, **options) DataFrame の内容をデータ ソースに保存します。
insertInto(tableName, overwrite) 指定したテーブルに DataFrame の内容を挿入します。
saveAsTable(name, format, mode, partitionBy, **options) DataFrame の内容を指定したテーブルとして保存します。
json(path, mode, compression, ...) 指定したパスの JSON 形式で DataFrame のコンテンツを保存します。
parquet(path, mode, partitionBy, compression) 指定したパスの Parquet 形式で DataFrame の内容を保存します。
text(path, compression, lineSep) 指定したパスのテキスト ファイルに DataFrame の内容を保存します。
csv(path, mode, compression, sep, ...) 指定したパスの CSV 形式で DataFrame の内容を保存します。
xml(path, rowTag, mode, ...) 指定したパスの XML 形式で DataFrame の内容を保存します。
orc(path, mode, partitionBy, compression) 指定したパスの ORC 形式で DataFrame の内容を保存します。
excel(path, mode, dataAddress, headerRows) DataFrame の内容を Excel 形式で指定したパスに保存します。
jdbc(url, table, mode, properties) DataFrame の内容を JDBC 経由で外部データベース テーブルに保存します。

保存モード

mode() メソッドでは、次のオプションがサポートされています。

  • append: この DataFrame の内容を既存のデータに追加します。
  • overwrite: 既存のデータを上書きします。
  • error または errorifexists: データが既に存在する場合は例外をスローします (既定値)。
  • ignore: データが既に存在する場合は、この操作を自動的に無視します。

例示

さまざまなデータ ソースへの書き込み

# Access DataFrameWriter through DataFrame
df = spark.createDataFrame([{"name": "Alice", "age": 30}])
df.write

# Write to JSON file
df.write.json("path/to/output.json")

# Write to CSV file with options
df.write.option("header", "true").csv("path/to/output.csv")

# Write to Parquet file
df.write.parquet("path/to/output.parquet")

# Write to a table
df.write.saveAsTable("table_name")

形式と保存の使用

# Specify format explicitly
df.write.format("json").save("path/to/output.json")

# With options
df.write.format("csv") \
    .option("header", "true") \
    .option("compression", "gzip") \
    .save("path/to/output.csv")

保存モードの指定

# Overwrite existing data
df.write.mode("overwrite").parquet("path/to/output.parquet")

# Append to existing data
df.write.mode("append").parquet("path/to/output.parquet")

# Ignore if data exists
df.write.mode("ignore").json("path/to/output.json")

# Error if data exists (default)
df.write.mode("error").csv("path/to/output.csv")

データのパーティション分割

# Partition by single column
df.write.partitionBy("year").parquet("path/to/output.parquet")

# Partition by multiple columns
df.write.partitionBy("year", "month").parquet("path/to/output.parquet")

# Partition with bucketing
df.write \
    .bucketBy(10, "id") \
    .sortBy("age") \
    .saveAsTable("bucketed_table")

JDBC への書き込み

# Write to database table
df.write.jdbc(
    url="jdbc:postgresql://localhost:5432/mydb",
    table="users",
    mode="overwrite",
    properties={"user": "myuser", "password": "mypassword"}
)

メソッドチェーン

# Chain multiple configuration methods
df.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .partitionBy("year", "month") \
    .save("path/to/output")

テーブルへの書き込み

# Save as managed table
df.write.saveAsTable("my_table")

# Save as managed table with options
df.write \
    .mode("overwrite") \
    .format("parquet") \
    .partitionBy("year") \
    .saveAsTable("partitioned_table")

# Insert into existing table
df.write.insertInto("existing_table")

# Insert into existing table with overwrite
df.write.insertInto("existing_table", overwrite=True)