다음을 통해 공유


DataFrame 클래스

명명된 열로 그룹화된 데이터의 분산 컬렉션입니다.

DataFrame은 Spark SQL의 관계형 테이블과 동일하며 SparkSession의 다양한 함수를 사용하여 만들 수 있습니다.

중요합니다

데이터 프레임은 생성자를 사용하여 직접 만들어서는 안 됩니다.

Spark Connect 지원

속성

재산 설명
sparkSession 이 DataFrame 을 만든 SparkSession 을 반환합니다.
rdd 콘텐츠를 행의 RDD로 반환합니다(클래식 모드에만 해당).
na 누락된 값을 처리하기 위한 DataFrameNaFunctions 를 반환합니다.
stat 통계 함수에 대한 DataFrameStatFunctions 를 반환합니다.
write 스트리밍이 아닌 DataFrame의 콘텐츠를 외부 스토리지에 저장하기 위한 인터페이스입니다.
writeStream 스트리밍 DataFrame의 콘텐츠를 외부 스토리지에 저장하기 위한 인터페이스입니다.
schema 이 DataFrame의 스키마를 StructType으로 반환합니다.
dtypes 모든 열 이름과 해당 데이터 형식을 목록으로 반환합니다.
columns DataFrame의 모든 열 이름을 목록으로 검색합니다.
storageLevel DataFrame의 현재 스토리지 수준을 가져옵니다.
isStreaming 이 DataFrame에 데이터가 도착할 때 지속적으로 반환하는 하나 이상의 원본이 포함되어 있으면 True를 반환합니다.
executionInfo 쿼리가 실행된 후 ExecutionInfo 개체를 반환합니다.
plot 함수를 그리기 위한 PySparkPlotAccessor 를 반환합니다.

메서드

데이터 보기 및 검사

메서드 설명
toJSON(use_unicode) DataFrame을 문자열 또는 DataFrame의 RDD로 변환합니다.
printSchema(level) 트리 형식으로 스키마를 인쇄합니다.
explain(extended, mode) 디버깅을 위해 (논리적 및 물리적) 계획을 콘솔에 인쇄합니다.
show(n, truncate, vertical) DataFrame의 첫 번째 n 행을 콘솔에 인쇄합니다.
collect() DataFrame의 모든 레코드를 행 목록으로 반환합니다.
toLocalIterator(prefetchPartitions) 이 DataFrame의 모든 행이 포함된 반복기를 반환합니다.
take(num) 첫 번째 num 행을 행 목록으로 반환합니다.
tail(num) 마지막 num 행을 행 목록으로 반환합니다.
head(n) 첫 번째 n개 행을 반환합니다.
first() 첫 번째 행을 행으로 반환합니다.
count() 이 DataFrame의 행 수를 반환합니다.
isEmpty() DataFrame이 비어 있는지 확인하고 부울 값을 반환합니다.
describe(*cols) 숫자 및 문자열 열에 대한 기본 통계를 계산합니다.
summary(*statistics) 숫자 및 문자열 열에 대해 지정된 통계를 계산합니다.

임시 보기

메서드 설명
createTempView(name) 이 DataFrame을 사용하여 로컬 임시 보기를 만듭니다.
createOrReplaceTempView(name) 로컬 임시 보기를 만들거나 이 DataFrame으로 대체합니다.
createGlobalTempView(name) 이 DataFrame을 사용하여 전역 임시 보기를 만듭니다.
createOrReplaceGlobalTempView(name) 지정된 이름을 사용하여 전역 임시 뷰를 만들거나 바꿉니다.

선택 및 프로젝션

메서드 설명
select(*cols) 식 집합을 투영하고 새 DataFrame을 반환합니다.
selectExpr(*expr) SQL 식 집합을 투영하고 새 DataFrame을 반환합니다.
filter(condition) 지정된 조건을 사용하여 행을 필터링합니다.
where(condition) 필터의 별칭입니다.
drop(*cols) 지정한 열이 없는 새 DataFrame을 반환합니다.
toDF(*cols) 지정한 새 열 이름을 가진 새 DataFrame을 반환합니다.
withColumn(colName, col) 열을 추가하거나 이름이 같은 기존 열을 바꿔 새 DataFrame을 반환합니다.
withColumns(*colsMap) 여러 열을 추가하거나 이름이 같은 기존 열을 바꿔 새 DataFrame을 반환합니다.
withColumnRenamed(existing, new) 기존 열의 이름을 변경하여 새 DataFrame을 반환합니다.
withColumnsRenamed(colsMap) 여러 열의 이름을 변경하여 새 DataFrame을 반환합니다.
withMetadata(columnName, metadata) 메타데이터를 사용하여 기존 열을 업데이트하여 새 DataFrame을 반환합니다.
metadataColumn(colName) 논리 열 이름에 따라 메타데이터 열을 선택하고 열로 반환합니다.
colRegex(colName) regex로 지정된 열 이름에 따라 열을 선택하고 열로 반환합니다.

정렬 및 순서 지정

메서드 설명
sort(*cols, **kwargs) 지정된 열을 기준으로 정렬된 새 DataFrame을 반환합니다.
orderBy(*cols, **kwargs) 정렬의 별칭입니다.
sortWithinPartitions(*cols, **kwargs) 각 파티션이 지정된 열을 기준으로 정렬된 새 DataFrame을 반환합니다.

집계 및 그룹화

메서드 설명
groupBy(*cols) 데이터 프레임을 지정된 열별로 그룹화하여 집계를 수행할 수 있도록 합니다.
rollup(*cols) 지정된 열을 사용하여 현재 DataFrame에 대한 다차원 롤업을 만듭니다.
cube(*cols) 지정된 열을 사용하여 현재 DataFrame에 대한 다차원 큐브를 만듭니다.
groupingSets(groupingSets, *cols) 지정된 그룹화 집합을 사용하여 현재 DataFrame에 대한 다차원 집계를 만듭니다.
agg(*exprs) 그룹이 없는 전체 DataFrame에 대해 집계합니다(df.groupBy().agg()의 약식).
observe(observation, *exprs) DataFrame에서 관찰할 (명명된) 메트릭을 정의합니다.

Joins

메서드 설명
join(other, on, how) 지정된 조인 식을 사용하여 다른 DataFrame과 조인합니다.
crossJoin(other) 다른 DataFrame을 사용하여 카티시안 제품을 반환합니다.
lateralJoin(other, on, how) 지정된 조인 식을 사용하여 다른 DataFrame과 횡적 조인합니다.

작업 설정

메서드 설명
union(other) 이 데이터 프레임과 다른 DataFrame의 행 조합이 포함된 새 DataFrame을 반환합니다.
unionByName(other, allowMissingColumns) 이 데이터 프레임과 다른 DataFrame의 행 조합이 포함된 새 DataFrame을 반환합니다.
intersect(other) 이 DataFrame과 다른 DataFrame 모두에서 행만 포함하는 새 DataFrame을 반환합니다.
intersectAll(other) 중복 항목을 유지하면서 이 DataFrame과 다른 DataFrame 모두에 행이 포함된 새 DataFrame을 반환합니다.
subtract(other) 이 DataFrame에 행이 포함되지만 다른 DataFrame에는 없는 새 DataFrame을 반환합니다.
exceptAll(other) 중복 항목을 유지하면서 이 DataFrame에 행이 포함되지만 다른 DataFrame에는 없는 새 DataFrame을 반환합니다.

Deduplication

메서드 설명
distinct() 이 DataFrame의 고유 행을 포함하는 새 DataFrame을 반환합니다.
dropDuplicates(subset) 선택적으로 특정 열만 고려하여 중복 행이 제거된 새 DataFrame을 반환합니다.
dropDuplicatesWithinWatermark(subset) 선택적으로 워터마크 내에서 특정 열만 고려하여 중복 행이 제거된 새 DataFrame을 반환합니다.

샘플링 및 분할

메서드 설명
sample(withReplacement, fraction, seed) 이 DataFrame의 샘플링된 하위 집합을 반환합니다.
sampleBy(col, fractions, seed) 각 계층에 지정된 분수에 따라 대체하지 않고 계층화된 샘플을 반환합니다.
randomSplit(weights, seed) 제공된 가중치로 이 DataFrame을 임의로 분할합니다.

분할

메서드 설명
coalesce(numPartitions) 정확히 numPartitions 파티션이 있는 새 DataFrame을 반환합니다.
repartition(numPartitions, *cols) 지정된 분할 식으로 분할된 새 DataFrame을 반환합니다.
repartitionByRange(numPartitions, *cols) 지정된 분할 식으로 분할된 새 DataFrame을 반환합니다.
repartitionById(numPartitions, partitionIdCol) 지정된 파티션 ID 식으로 분할된 새 DataFrame을 반환합니다.

재편

메서드 설명
unpivot(ids, values, variableColumnName, valueColumnName) DataFrame을 와이드 형식에서 긴 형식으로 피벗 해제합니다.
melt(ids, values, variableColumnName, valueColumnName) 피벗 해제의 별칭입니다.
transpose(indexColumn) 지정된 인덱스 열의 값이 새 열이 되도록 DataFrame을 바꿈합니다.

누락된 데이터 처리

메서드 설명
dropna(how, thresh, subset) null 또는 NaN 값이 있는 행을 생략하는 새 DataFrame을 반환합니다.
fillna(value, subset) null 값이 새 값으로 채워진 새 DataFrame을 반환합니다.
replace(to_replace, value, subset) 값을 다른 값으로 바꾸는 새 DataFrame을 반환합니다.

통계 함수

메서드 설명
approxQuantile(col, probabilities, relativeError) DataFrame의 숫자 열의 대략적인 분위수를 계산합니다.
corr(col1, col2, method) DataFrame의 두 열 상관 관계를 이중 값으로 계산합니다.
cov(col1, col2) 지정된 열의 샘플 공변성(해당 이름으로 지정됨)을 계산합니다.
crosstab(col1, col2) 지정된 열의 쌍 단위 빈도 테이블을 계산합니다.
freqItems(cols, support) 가양성으로 열에 대한 빈번한 항목을 찾습니다.

스키마 작업

메서드 설명
to(schema) 각 행이 지정된 스키마와 일치하도록 조정되는 새 DataFrame을 반환합니다.
alias(alias) 별칭이 설정된 새 DataFrame을 반환합니다.

반복

메서드 설명
foreach(f) 이 DataFrame의 모든 행에 f 함수를 적용합니다.
foreachPartition(f) 이 DataFrame의 각 파티션에 f 함수를 적용합니다.

캐싱 및 지속성

메서드 설명
cache() 기본 스토리지 수준(MEMORY_AND_DISK_DESER)을 사용하여 DataFrame을 유지합니다.
persist(storageLevel) 작업 간에 DataFrame의 콘텐츠를 유지하도록 스토리지 수준을 설정합니다.
unpersist(blocking) DataFrame을 비영구로 표시하고 메모리 및 디스크에서 모든 블록을 제거합니다.

검사점 설정

메서드 설명
checkpoint(eager) 이 DataFrame의 검사점이 지정된 버전을 반환합니다.
localCheckpoint(eager, storageLevel) 이 DataFrame의 로컬 검사점 버전을 반환합니다.

스트리밍 작업

메서드 설명
withWatermark(eventTime, delayThreshold) 이 DataFrame에 대한 이벤트 시간 워터마크를 정의합니다.

최적화 힌트

메서드 설명
hint(name, *parameters) 현재 DataFrame에 대한 힌트를 지정합니다.

제한 및 오프셋

메서드 설명
limit(num) 결과 수를 지정된 수로 제한합니다.
offset(num) 첫 번째 n행을 건너뛰어 새 DataFrame을 반환합니다.

고급 변환

메서드 설명
transform(func, *args, **kwargs) 새 DataFrame을 반환합니다. 사용자 지정 변환을 연결하기 위한 간결한 구문입니다.

변환 방법

메서드 설명
toPandas() 이 DataFrame의 내용을 Pandas pandas로 반환합니다. DataFrame.
toArrow() 이 DataFrame의 내용을 PyArrow pyarrow로 반환합니다. 테이블.
pandas_api(index_col) 기존 DataFrame을 pandas-on-Spark DataFrame으로 변환합니다.
mapInPandas(func, schema, barrier, profile) Python 네이티브 함수를 사용하여 현재 DataFrame의 일괄 처리 반복기를 매핑합니다.
mapInArrow(func, schema, barrier, profile) pyarrow에서 수행되는 Python 네이티브 함수를 사용하여 현재 DataFrame의 일괄 처리 반복기를 매핑합니다. RecordBatch.

데이터 쓰기

메서드 설명
writeTo(table) v2 원본에 대한 쓰기 구성 작성기를 만듭니다.
mergeInto(table, condition) 원본 테이블을 기반으로 하는 업데이트, 삽입 및 삭제 집합을 대상 테이블에 병합합니다.

데이터 프레임 비교

메서드 설명
sameSemantics(other) 두 DataFrame 내의 논리 쿼리 계획이 같으면 True를 반환합니다.
semanticHash() 이 DataFrame에 대한 논리 쿼리 계획의 해시 코드를 반환합니다.

메타데이터 및 파일 정보

메서드 설명
inputFiles() 이 DataFrame을 구성하는 파일의 최상의 스냅샷을 반환합니다.

고급 SQL 기능

메서드 설명
isLocal() collect 및 take 메서드를 로컬로 실행할 수 있으면 True를 반환합니다.
asTable() DataFrame을 TABLEArg 개체로 변환합니다. 이 개체는 TVF에서 테이블 인수로 사용할 수 있습니다.
scalar() 정확히 하나의 행과 하나의 열을 포함하는 SCALAR 하위 쿼리에 대한 Column 개체를 반환합니다.
exists() EXISTS 하위 쿼리에 대한 Column 개체를 반환합니다.

예제

기본 데이터 프레임 작업

# Create a DataFrame
people = spark.createDataFrame([
    {"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
    {"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
    {"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
    {"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
])

# Select columns
people.select("name", "age").show()

# Filter rows
people.filter(people.age > 30).show()

# Add a new column
people.withColumn("age_plus_10", people.age + 10).show()

집계 및 그룹화

# Group by and aggregate
people.groupBy("gender").agg({"salary": "avg", "age": "max"}).show()

# Multiple aggregations
from pyspark.sql import functions as F
people.groupBy("deptId").agg(
    F.avg("salary").alias("avg_salary"),
    F.max("age").alias("max_age")
).show()

Joins

# Create another DataFrame
department = spark.createDataFrame([
    {"id": 1, "name": "PySpark"},
    {"id": 2, "name": "ML"},
    {"id": 3, "name": "Spark SQL"}
])

# Join DataFrames
people.join(department, people.deptId == department.id).show()

복잡한 변환

# Chained operations
result = people.filter(people.age > 30) \\
    .join(department, people.deptId == department.id) \\
    .groupBy(department.name, "gender") \\
    .agg({"salary": "avg", "age": "max"}) \\
    .sort("max(age)")
result.show()