v2 API를 사용하여 외부 스토리지에 DataFrame을 작성하는 데 사용되는 인터페이스입니다.
Databricks 테이블 및 Delta Lake의 대부분의 사용 사례에서 DataFrameWriterV2는 원래 DataFrameWriter보다 더 강력하고 유연한 옵션을 제공합니다.
- 더 나은 테이블 속성 지원
- 분할에 대한 보다 세분화된 제어
- 조건부 덮어쓰기 기능
- 클러스터링 지원
- 만들기 또는 바꾸기 작업에 대한 보다 명확한 의미 체계
Spark Connect 지원
문법
이 인터페이스에 액세스하는 데 사용합니다 DataFrame.writeTo(table) .
메서드
| 메서드 | 설명 |
|---|---|
using(provider) |
기본 출력 데이터 원본에 대한 공급자를 지정합니다. |
option(key, value) |
쓰기 옵션을 추가합니다. 예를 들어 관리되는 테이블을 df.writeTo("test").using("delta").option("path", "s3://test").createOrReplace()만들려면 . |
options(**options) |
쓰기 옵션을 추가합니다. |
tableProperty(property, value) |
테이블 속성을 추가합니다. 예를 들어 EXTERNAL(관리되지 않는) 테이블을 만드는 데 사용합니다 tableProperty("location", "s3://test") . |
partitionedBy(col, *cols) |
지정된 열 또는 변환을 사용하여 create, createOrReplace 또는 replace로 만든 출력 테이블을 분할합니다. |
clusterBy(col, *cols) |
지정된 열별로 데이터를 클러스터하여 쿼리 성능을 최적화합니다. |
create() |
데이터 프레임의 내용에서 새 테이블을 만듭니다. |
replace() |
기존 테이블을 데이터 프레임의 내용으로 바꿉니다. |
createOrReplace() |
새 테이블을 만들거나 기존 테이블을 데이터 프레임의 내용으로 바꿉니다. |
append() |
데이터 프레임의 내용을 출력 테이블에 추가합니다. |
overwrite(condition) |
지정된 필터 조건과 일치하는 행을 출력 테이블의 데이터 프레임 내용과 덮어씁니다. |
overwritePartitions() |
데이터 프레임에 출력 테이블의 데이터 프레임 내용이 포함된 행을 하나 이상 포함하는 모든 파티션을 덮어씁니다. |
예제
새 테이블 만들기
# Create a new table with DataFrame contents
df = spark.createDataFrame([{"name": "Alice", "age": 30}])
df.writeTo("my_table").create()
# Create with a specific provider
df.writeTo("my_table").using("parquet").create()
데이터 분할
# Partition by single column
df.writeTo("my_table") \
.partitionedBy("year") \
.create()
# Partition by multiple columns
df.writeTo("my_table") \
.partitionedBy("year", "month") \
.create()
# Partition using transform functions
from pyspark.sql.functions import years, months, days
df.writeTo("my_table") \
.partitionedBy(years("date"), months("date")) \
.create()
테이블 속성 설정
# Add table properties
df.writeTo("my_table") \
.tableProperty("key1", "value1") \
.tableProperty("key2", "value2") \
.create()
옵션 사용
# Add write options
df.writeTo("my_table") \
.option("compression", "snappy") \
.option("maxRecordsPerFile", "10000") \
.create()
# Add multiple options at once
df.writeTo("my_table") \
.options(compression="snappy", maxRecordsPerFile="10000") \
.create()
데이터 클러스터링
# Cluster by columns for query optimization
df.writeTo("my_table") \
.clusterBy("user_id", "timestamp") \
.create()
작업 바꾸기
# Replace existing table
df.writeTo("my_table") \
.using("parquet") \
.replace()
# Create or replace (safe operation)
df.writeTo("my_table") \
.using("parquet") \
.createOrReplace()
추가 작업
# Append to existing table
df.writeTo("my_table").append()
덮어쓰기 작업
from pyspark.sql.functions import col
# Overwrite specific rows based on condition
df.writeTo("my_table") \
.overwrite(col("date") == "2025-01-01")
# Overwrite entire partitions
df.writeTo("my_table") \
.overwritePartitions()
메서드 연결
# Combine multiple configurations
df.writeTo("my_table") \
.using("parquet") \
.option("compression", "snappy") \
.tableProperty("description", "User data table") \
.partitionedBy("year", "month") \
.clusterBy("user_id") \
.createOrReplace()