次の方法で共有


DataFrameWriterV2 クラス

v2 API を使用して DataFrame を外部ストレージに書き込むインターフェイス。

Databricks テーブルと Delta Lake のほとんどのユース ケースでは、DataFrameWriterV2 は元の DataFrameWriter よりも強力で柔軟なオプションを提供します。

  • テーブル プロパティのサポートの向上
  • パーティション分割をより細かく制御する
  • 条件付き上書き機能
  • クラスタリングのサポート
  • 作成操作または置換操作のより明確なセマンティクス

Spark Connect のサポート

構文

このインターフェイスにアクセスするには、 DataFrame.writeTo(table) を使用します。

メソッド

メソッド 説明
using(provider) 基になる出力データ ソースのプロバイダーを指定します。
option(key, value) 書き込みオプションを追加します。 たとえば、マネージド テーブルを作成するには、 df.writeTo("test").using("delta").option("path", "s3://test").createOrReplace()
options(**options) 書き込みオプションを追加します。
tableProperty(property, value) テーブル プロパティを追加します。 たとえば、 tableProperty("location", "s3://test") を使用して EXTERNAL (アンマネージ) テーブルを作成します。
partitionedBy(col, *cols) 指定された列または変換を使用して、create、createOrReplace、または置換によって作成された出力テーブルをパーティション分割します。
clusterBy(col, *cols) 指定された列ごとにデータをクラスター化して、クエリのパフォーマンスを最適化します。
create() データ フレームの内容から新しいテーブルを作成します。
replace() 既存のテーブルをデータ フレームの内容に置き換えます。
createOrReplace() 新しいテーブルを作成するか、既存のテーブルをデータ フレームの内容に置き換えます。
append() データ フレームの内容を出力テーブルに追加します。
overwrite(condition) 指定されたフィルター条件に一致する行を、出力テーブル内のデータ フレームの内容で上書きします。
overwritePartitions() データ フレームに少なくとも 1 つの行が含まれるすべてのパーティションを、出力テーブル内のデータ フレームの内容で上書きします。

例示

新しいテーブルの作成

# Create a new table with DataFrame contents
df = spark.createDataFrame([{"name": "Alice", "age": 30}])
df.writeTo("my_table").create()

# Create with a specific provider
df.writeTo("my_table").using("parquet").create()

データのパーティション分割

# Partition by single column
df.writeTo("my_table") \
    .partitionedBy("year") \
    .create()

# Partition by multiple columns
df.writeTo("my_table") \
    .partitionedBy("year", "month") \
    .create()

# Partition using transform functions
from pyspark.sql.functions import years, months, days

df.writeTo("my_table") \
    .partitionedBy(years("date"), months("date")) \
    .create()

テーブルのプロパティの設定

# Add table properties
df.writeTo("my_table") \
    .tableProperty("key1", "value1") \
    .tableProperty("key2", "value2") \
    .create()

オプションの使用

# Add write options
df.writeTo("my_table") \
    .option("compression", "snappy") \
    .option("maxRecordsPerFile", "10000") \
    .create()

# Add multiple options at once
df.writeTo("my_table") \
    .options(compression="snappy", maxRecordsPerFile="10000") \
    .create()

データのクラスタリング

# Cluster by columns for query optimization
df.writeTo("my_table") \
    .clusterBy("user_id", "timestamp") \
    .create()

操作を置換する

# Replace existing table
df.writeTo("my_table") \
    .using("parquet") \
    .replace()

# Create or replace (safe operation)
df.writeTo("my_table") \
    .using("parquet") \
    .createOrReplace()

追加操作

# Append to existing table
df.writeTo("my_table").append()

上書き操作

from pyspark.sql.functions import col

# Overwrite specific rows based on condition
df.writeTo("my_table") \
    .overwrite(col("date") == "2025-01-01")

# Overwrite entire partitions
df.writeTo("my_table") \
    .overwritePartitions()

メソッドチェーン

# Combine multiple configurations
df.writeTo("my_table") \
    .using("parquet") \
    .option("compression", "snappy") \
    .tableProperty("description", "User data table") \
    .partitionedBy("year", "month") \
    .clusterBy("user_id") \
    .createOrReplace()