명명된 열로 그룹화된 데이터의 분산 컬렉션입니다.
DataFrame은 Spark SQL의 관계형 테이블과 동일하며 SparkSession의 다양한 함수를 사용하여 만들 수 있습니다.
중요합니다
데이터 프레임은 생성자를 사용하여 직접 만들어서는 안 됩니다.
Spark Connect 지원
속성
| 재산 | 설명 |
|---|---|
sparkSession |
이 DataFrame 을 만든 SparkSession 을 반환합니다. |
rdd |
콘텐츠를 행의 RDD로 반환합니다(클래식 모드에만 해당). |
na |
누락된 값을 처리하기 위한 DataFrameNaFunctions 를 반환합니다. |
stat |
통계 함수에 대한 DataFrameStatFunctions 를 반환합니다. |
write |
스트리밍이 아닌 DataFrame의 콘텐츠를 외부 스토리지에 저장하기 위한 인터페이스입니다. |
writeStream |
스트리밍 DataFrame의 콘텐츠를 외부 스토리지에 저장하기 위한 인터페이스입니다. |
schema |
이 DataFrame의 스키마를 StructType으로 반환합니다. |
dtypes |
모든 열 이름과 해당 데이터 형식을 목록으로 반환합니다. |
columns |
DataFrame의 모든 열 이름을 목록으로 검색합니다. |
storageLevel |
DataFrame의 현재 스토리지 수준을 가져옵니다. |
isStreaming |
이 DataFrame에 데이터가 도착할 때 지속적으로 반환하는 하나 이상의 원본이 포함되어 있으면 True를 반환합니다. |
executionInfo |
쿼리가 실행된 후 ExecutionInfo 개체를 반환합니다. |
plot |
함수를 그리기 위한 PySparkPlotAccessor 를 반환합니다. |
메서드
데이터 보기 및 검사
| 메서드 | 설명 |
|---|---|
toJSON(use_unicode) |
DataFrame을 문자열 또는 DataFrame의 RDD로 변환합니다. |
printSchema(level) |
트리 형식으로 스키마를 인쇄합니다. |
explain(extended, mode) |
디버깅을 위해 (논리적 및 물리적) 계획을 콘솔에 인쇄합니다. |
show(n, truncate, vertical) |
DataFrame의 첫 번째 n 행을 콘솔에 인쇄합니다. |
collect() |
DataFrame의 모든 레코드를 행 목록으로 반환합니다. |
toLocalIterator(prefetchPartitions) |
이 DataFrame의 모든 행이 포함된 반복기를 반환합니다. |
take(num) |
첫 번째 num 행을 행 목록으로 반환합니다. |
tail(num) |
마지막 num 행을 행 목록으로 반환합니다. |
head(n) |
첫 번째 n개 행을 반환합니다. |
first() |
첫 번째 행을 행으로 반환합니다. |
count() |
이 DataFrame의 행 수를 반환합니다. |
isEmpty() |
DataFrame이 비어 있는지 확인하고 부울 값을 반환합니다. |
describe(*cols) |
숫자 및 문자열 열에 대한 기본 통계를 계산합니다. |
summary(*statistics) |
숫자 및 문자열 열에 대해 지정된 통계를 계산합니다. |
임시 보기
| 메서드 | 설명 |
|---|---|
createTempView(name) |
이 DataFrame을 사용하여 로컬 임시 보기를 만듭니다. |
createOrReplaceTempView(name) |
로컬 임시 보기를 만들거나 이 DataFrame으로 대체합니다. |
createGlobalTempView(name) |
이 DataFrame을 사용하여 전역 임시 보기를 만듭니다. |
createOrReplaceGlobalTempView(name) |
지정된 이름을 사용하여 전역 임시 뷰를 만들거나 바꿉니다. |
선택 및 프로젝션
| 메서드 | 설명 |
|---|---|
select(*cols) |
식 집합을 투영하고 새 DataFrame을 반환합니다. |
selectExpr(*expr) |
SQL 식 집합을 투영하고 새 DataFrame을 반환합니다. |
filter(condition) |
지정된 조건을 사용하여 행을 필터링합니다. |
where(condition) |
필터의 별칭입니다. |
drop(*cols) |
지정한 열이 없는 새 DataFrame을 반환합니다. |
toDF(*cols) |
지정한 새 열 이름을 가진 새 DataFrame을 반환합니다. |
withColumn(colName, col) |
열을 추가하거나 이름이 같은 기존 열을 바꿔 새 DataFrame을 반환합니다. |
withColumns(*colsMap) |
여러 열을 추가하거나 이름이 같은 기존 열을 바꿔 새 DataFrame을 반환합니다. |
withColumnRenamed(existing, new) |
기존 열의 이름을 변경하여 새 DataFrame을 반환합니다. |
withColumnsRenamed(colsMap) |
여러 열의 이름을 변경하여 새 DataFrame을 반환합니다. |
withMetadata(columnName, metadata) |
메타데이터를 사용하여 기존 열을 업데이트하여 새 DataFrame을 반환합니다. |
metadataColumn(colName) |
논리 열 이름에 따라 메타데이터 열을 선택하고 열로 반환합니다. |
colRegex(colName) |
regex로 지정된 열 이름에 따라 열을 선택하고 열로 반환합니다. |
정렬 및 순서 지정
| 메서드 | 설명 |
|---|---|
sort(*cols, **kwargs) |
지정된 열을 기준으로 정렬된 새 DataFrame을 반환합니다. |
orderBy(*cols, **kwargs) |
정렬의 별칭입니다. |
sortWithinPartitions(*cols, **kwargs) |
각 파티션이 지정된 열을 기준으로 정렬된 새 DataFrame을 반환합니다. |
집계 및 그룹화
| 메서드 | 설명 |
|---|---|
groupBy(*cols) |
데이터 프레임을 지정된 열별로 그룹화하여 집계를 수행할 수 있도록 합니다. |
rollup(*cols) |
지정된 열을 사용하여 현재 DataFrame에 대한 다차원 롤업을 만듭니다. |
cube(*cols) |
지정된 열을 사용하여 현재 DataFrame에 대한 다차원 큐브를 만듭니다. |
groupingSets(groupingSets, *cols) |
지정된 그룹화 집합을 사용하여 현재 DataFrame에 대한 다차원 집계를 만듭니다. |
agg(*exprs) |
그룹이 없는 전체 DataFrame에 대해 집계합니다(df.groupBy().agg()의 약식). |
observe(observation, *exprs) |
DataFrame에서 관찰할 (명명된) 메트릭을 정의합니다. |
Joins
| 메서드 | 설명 |
|---|---|
join(other, on, how) |
지정된 조인 식을 사용하여 다른 DataFrame과 조인합니다. |
crossJoin(other) |
다른 DataFrame을 사용하여 카티시안 제품을 반환합니다. |
lateralJoin(other, on, how) |
지정된 조인 식을 사용하여 다른 DataFrame과 횡적 조인합니다. |
작업 설정
| 메서드 | 설명 |
|---|---|
union(other) |
이 데이터 프레임과 다른 DataFrame의 행 조합이 포함된 새 DataFrame을 반환합니다. |
unionByName(other, allowMissingColumns) |
이 데이터 프레임과 다른 DataFrame의 행 조합이 포함된 새 DataFrame을 반환합니다. |
intersect(other) |
이 DataFrame과 다른 DataFrame 모두에서 행만 포함하는 새 DataFrame을 반환합니다. |
intersectAll(other) |
중복 항목을 유지하면서 이 DataFrame과 다른 DataFrame 모두에 행이 포함된 새 DataFrame을 반환합니다. |
subtract(other) |
이 DataFrame에 행이 포함되지만 다른 DataFrame에는 없는 새 DataFrame을 반환합니다. |
exceptAll(other) |
중복 항목을 유지하면서 이 DataFrame에 행이 포함되지만 다른 DataFrame에는 없는 새 DataFrame을 반환합니다. |
Deduplication
| 메서드 | 설명 |
|---|---|
distinct() |
이 DataFrame의 고유 행을 포함하는 새 DataFrame을 반환합니다. |
dropDuplicates(subset) |
선택적으로 특정 열만 고려하여 중복 행이 제거된 새 DataFrame을 반환합니다. |
dropDuplicatesWithinWatermark(subset) |
선택적으로 워터마크 내에서 특정 열만 고려하여 중복 행이 제거된 새 DataFrame을 반환합니다. |
샘플링 및 분할
| 메서드 | 설명 |
|---|---|
sample(withReplacement, fraction, seed) |
이 DataFrame의 샘플링된 하위 집합을 반환합니다. |
sampleBy(col, fractions, seed) |
각 계층에 지정된 분수에 따라 대체하지 않고 계층화된 샘플을 반환합니다. |
randomSplit(weights, seed) |
제공된 가중치로 이 DataFrame을 임의로 분할합니다. |
분할
| 메서드 | 설명 |
|---|---|
coalesce(numPartitions) |
정확히 numPartitions 파티션이 있는 새 DataFrame을 반환합니다. |
repartition(numPartitions, *cols) |
지정된 분할 식으로 분할된 새 DataFrame을 반환합니다. |
repartitionByRange(numPartitions, *cols) |
지정된 분할 식으로 분할된 새 DataFrame을 반환합니다. |
repartitionById(numPartitions, partitionIdCol) |
지정된 파티션 ID 식으로 분할된 새 DataFrame을 반환합니다. |
재편
| 메서드 | 설명 |
|---|---|
unpivot(ids, values, variableColumnName, valueColumnName) |
DataFrame을 와이드 형식에서 긴 형식으로 피벗 해제합니다. |
melt(ids, values, variableColumnName, valueColumnName) |
피벗 해제의 별칭입니다. |
transpose(indexColumn) |
지정된 인덱스 열의 값이 새 열이 되도록 DataFrame을 바꿈합니다. |
누락된 데이터 처리
| 메서드 | 설명 |
|---|---|
dropna(how, thresh, subset) |
null 또는 NaN 값이 있는 행을 생략하는 새 DataFrame을 반환합니다. |
fillna(value, subset) |
null 값이 새 값으로 채워진 새 DataFrame을 반환합니다. |
replace(to_replace, value, subset) |
값을 다른 값으로 바꾸는 새 DataFrame을 반환합니다. |
통계 함수
| 메서드 | 설명 |
|---|---|
approxQuantile(col, probabilities, relativeError) |
DataFrame의 숫자 열의 대략적인 분위수를 계산합니다. |
corr(col1, col2, method) |
DataFrame의 두 열 상관 관계를 이중 값으로 계산합니다. |
cov(col1, col2) |
지정된 열의 샘플 공변성(해당 이름으로 지정됨)을 계산합니다. |
crosstab(col1, col2) |
지정된 열의 쌍 단위 빈도 테이블을 계산합니다. |
freqItems(cols, support) |
가양성으로 열에 대한 빈번한 항목을 찾습니다. |
스키마 작업
| 메서드 | 설명 |
|---|---|
to(schema) |
각 행이 지정된 스키마와 일치하도록 조정되는 새 DataFrame을 반환합니다. |
alias(alias) |
별칭이 설정된 새 DataFrame을 반환합니다. |
반복
| 메서드 | 설명 |
|---|---|
foreach(f) |
이 DataFrame의 모든 행에 f 함수를 적용합니다. |
foreachPartition(f) |
이 DataFrame의 각 파티션에 f 함수를 적용합니다. |
캐싱 및 지속성
| 메서드 | 설명 |
|---|---|
cache() |
기본 스토리지 수준(MEMORY_AND_DISK_DESER)을 사용하여 DataFrame을 유지합니다. |
persist(storageLevel) |
작업 간에 DataFrame의 콘텐츠를 유지하도록 스토리지 수준을 설정합니다. |
unpersist(blocking) |
DataFrame을 비영구로 표시하고 메모리 및 디스크에서 모든 블록을 제거합니다. |
검사점 설정
| 메서드 | 설명 |
|---|---|
checkpoint(eager) |
이 DataFrame의 검사점이 지정된 버전을 반환합니다. |
localCheckpoint(eager, storageLevel) |
이 DataFrame의 로컬 검사점 버전을 반환합니다. |
스트리밍 작업
| 메서드 | 설명 |
|---|---|
withWatermark(eventTime, delayThreshold) |
이 DataFrame에 대한 이벤트 시간 워터마크를 정의합니다. |
최적화 힌트
| 메서드 | 설명 |
|---|---|
hint(name, *parameters) |
현재 DataFrame에 대한 힌트를 지정합니다. |
제한 및 오프셋
| 메서드 | 설명 |
|---|---|
limit(num) |
결과 수를 지정된 수로 제한합니다. |
offset(num) |
첫 번째 n행을 건너뛰어 새 DataFrame을 반환합니다. |
고급 변환
| 메서드 | 설명 |
|---|---|
transform(func, *args, **kwargs) |
새 DataFrame을 반환합니다. 사용자 지정 변환을 연결하기 위한 간결한 구문입니다. |
변환 방법
| 메서드 | 설명 |
|---|---|
toPandas() |
이 DataFrame의 내용을 Pandas pandas로 반환합니다. DataFrame. |
toArrow() |
이 DataFrame의 내용을 PyArrow pyarrow로 반환합니다. 테이블. |
pandas_api(index_col) |
기존 DataFrame을 pandas-on-Spark DataFrame으로 변환합니다. |
mapInPandas(func, schema, barrier, profile) |
Python 네이티브 함수를 사용하여 현재 DataFrame의 일괄 처리 반복기를 매핑합니다. |
mapInArrow(func, schema, barrier, profile) |
pyarrow에서 수행되는 Python 네이티브 함수를 사용하여 현재 DataFrame의 일괄 처리 반복기를 매핑합니다. RecordBatch. |
데이터 쓰기
| 메서드 | 설명 |
|---|---|
writeTo(table) |
v2 원본에 대한 쓰기 구성 작성기를 만듭니다. |
mergeInto(table, condition) |
원본 테이블을 기반으로 하는 업데이트, 삽입 및 삭제 집합을 대상 테이블에 병합합니다. |
데이터 프레임 비교
| 메서드 | 설명 |
|---|---|
sameSemantics(other) |
두 DataFrame 내의 논리 쿼리 계획이 같으면 True를 반환합니다. |
semanticHash() |
이 DataFrame에 대한 논리 쿼리 계획의 해시 코드를 반환합니다. |
메타데이터 및 파일 정보
| 메서드 | 설명 |
|---|---|
inputFiles() |
이 DataFrame을 구성하는 파일의 최상의 스냅샷을 반환합니다. |
고급 SQL 기능
| 메서드 | 설명 |
|---|---|
isLocal() |
collect 및 take 메서드를 로컬로 실행할 수 있으면 True를 반환합니다. |
asTable() |
DataFrame을 TABLEArg 개체로 변환합니다. 이 개체는 TVF에서 테이블 인수로 사용할 수 있습니다. |
scalar() |
정확히 하나의 행과 하나의 열을 포함하는 SCALAR 하위 쿼리에 대한 Column 개체를 반환합니다. |
exists() |
EXISTS 하위 쿼리에 대한 Column 개체를 반환합니다. |
예제
기본 데이터 프레임 작업
# Create a DataFrame
people = spark.createDataFrame([
{"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
{"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
{"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
{"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
])
# Select columns
people.select("name", "age").show()
# Filter rows
people.filter(people.age > 30).show()
# Add a new column
people.withColumn("age_plus_10", people.age + 10).show()
집계 및 그룹화
# Group by and aggregate
people.groupBy("gender").agg({"salary": "avg", "age": "max"}).show()
# Multiple aggregations
from pyspark.sql import functions as F
people.groupBy("deptId").agg(
F.avg("salary").alias("avg_salary"),
F.max("age").alias("max_age")
).show()
Joins
# Create another DataFrame
department = spark.createDataFrame([
{"id": 1, "name": "PySpark"},
{"id": 2, "name": "ML"},
{"id": 3, "name": "Spark SQL"}
])
# Join DataFrames
people.join(department, people.deptId == department.id).show()
복잡한 변환
# Chained operations
result = people.filter(people.age > 30) \\
.join(department, people.deptId == department.id) \\
.groupBy(department.name, "gender") \\
.agg({"salary": "avg", "age": "max"}) \\
.sort("max(age)")
result.show()