Raccolta distribuita di dati raggruppati in colonne denominate.
Un dataframe equivale a una tabella relazionale in Spark SQL e può essere creato usando varie funzioni in SparkSession.
Importante
Un dataframe non deve essere creato direttamente usando il costruttore .
Supporta Spark Connect
Proprietà
| Proprietà |
Descrizione |
sparkSession |
Restituisce SparkSession che ha creato questo dataframe. |
rdd |
Restituisce il contenuto come RDD della riga (solo modalità classica). |
na |
Restituisce un oggetto DataFrameNaFunctions per la gestione dei valori mancanti. |
stat |
Restituisce un oggetto DataFrameStatFunctions per le funzioni statistiche. |
write |
Interfaccia per salvare il contenuto del dataframe non in streaming in un archivio esterno. |
writeStream |
Interfaccia per salvare il contenuto del dataframe di streaming in un archivio esterno. |
schema |
Restituisce lo schema di questo dataframe come StructType. |
dtypes |
Restituisce tutti i nomi di colonna e i relativi tipi di dati come elenco. |
columns |
Recupera i nomi di tutte le colonne nel dataframe come elenco. |
storageLevel |
Ottenere il livello di archiviazione corrente del dataframe. |
isStreaming |
Restituisce True se questo dataframe contiene una o più origini che restituiscono continuamente i dati non appena arrivano. |
executionInfo |
Restituisce un oggetto ExecutionInfo dopo l'esecuzione della query. |
plot |
Restituisce un PySparkPlotAccessor per le funzioni di tracciato. |
Methods
Visualizzazione e ispezione dei dati
Visualizzazioni temporanee
Selezione e proiezione
Ordinamento e ordinamento
Aggregazione e raggruppamento
| metodo |
Descrizione |
groupBy(*cols) |
Raggruppa il dataframe in base alle colonne specificate in modo che l'aggregazione possa essere eseguita su di esse. |
rollup(*cols) |
Creare un rollup multidimensionale per il dataframe corrente usando le colonne specificate. |
cube(*cols) |
Creare un cubo multidimensionale per il dataframe corrente usando le colonne specificate. |
groupingSets(groupingSets, *cols) |
Creare un'aggregazione multidimensionale per il dataframe corrente usando i set di raggruppamento specificati. |
agg(*exprs) |
Aggregazione sull'intero dataframe senza gruppi (abbreviato per df.groupBy().agg()). |
observe(observation, *exprs) |
Definire le metriche (denominate) da osservare nel dataframe. |
Joins
Impostare le operazioni
| metodo |
Descrizione |
union(other) |
Restituisce un nuovo dataframe contenente l'unione di righe in questo e un altro dataframe. |
unionByName(other, allowMissingColumns) |
Restituisce un nuovo dataframe contenente l'unione di righe in questo oggetto e un altro dataframe. |
intersect(other) |
Restituisce un nuovo dataframe contenente righe solo in questo dataframe e in un altro dataframe. |
intersectAll(other) |
Restituisce un nuovo dataframe contenente righe sia in questo dataframe che in un altro dataframe mantenendo i duplicati. |
subtract(other) |
Restituisce un nuovo dataframe contenente righe in questo dataframe, ma non in un altro dataframe. |
exceptAll(other) |
Restituisce un nuovo dataframe contenente righe in questo dataframe ma non in un altro dataframe mantenendo i duplicati. |
Deduplicazione
| metodo |
Descrizione |
distinct() |
Restituisce un nuovo dataframe contenente le righe distinte in questo dataframe. |
dropDuplicates(subset) |
Restituisce un nuovo dataframe con righe duplicate rimosse, facoltativamente considerando solo determinate colonne. |
dropDuplicatesWithinWatermark(subset) |
Restituisce un nuovo dataframe con righe duplicate rimosse, facoltativamente considerando solo determinate colonne, all'interno della filigrana. |
Campionamento e suddivisione
Partitioning
| metodo |
Descrizione |
coalesce(numPartitions) |
Restituisce un nuovo dataframe con esattamente partizioni numPartitions. |
repartition(numPartitions, *cols) |
Restituisce un nuovo dataframe partizionato dalle espressioni di partizionamento indicate. |
repartitionByRange(numPartitions, *cols) |
Restituisce un nuovo dataframe partizionato dalle espressioni di partizionamento indicate. |
repartitionById(numPartitions, partitionIdCol) |
Restituisce un nuovo dataframe partizionato dall'espressione ID di partizione specificata. |
Rimodellamento
Gestione dei dati mancanti
Funzioni statistiche
Operazioni dello schema
| metodo |
Descrizione |
to(schema) |
Restituisce un nuovo dataframe in cui ogni riga viene riconciliata in modo che corrisponda allo schema specificato. |
alias(alias) |
Restituisce un nuovo dataframe con un set di alias. |
Iterazione
| metodo |
Descrizione |
foreach(f) |
Applica la funzione f a tutte le righe di questo dataframe. |
foreachPartition(f) |
Applica la funzione f a ogni partizione di questo dataframe. |
Memorizzazione nella cache e persistenza
| metodo |
Descrizione |
cache() |
Rende persistente il dataframe con il livello di archiviazione predefinito (MEMORY_AND_DISK_DESER). |
persist(storageLevel) |
Imposta il livello di archiviazione per rendere persistente il contenuto del dataframe tra le operazioni. |
unpersist(blocking) |
Contrassegna il dataframe come non persistente e rimuove tutti i blocchi per esso dalla memoria e dal disco. |
Checkpoint
Operazioni di streaming
Hint di ottimizzazione
Limiti e offset
| metodo |
Descrizione |
limit(num) |
Limita il conteggio dei risultati al numero specificato. |
offset(num) |
Restituisce un nuovo dataframe ignorando le prime n righe. |
Metodi di conversione
Scrittura dei dati
| metodo |
Descrizione |
writeTo(table) |
Creare un generatore di configurazione di scrittura per le origini v2. |
mergeInto(table, condition) |
Unisce un set di aggiornamenti, inserimenti ed eliminazioni in base a una tabella di origine in una tabella di destinazione. |
Confronto tra dataframe
| metodo |
Descrizione |
sameSemantics(other) |
Restituisce True quando i piani di query logici all'interno di entrambi i dataframe sono uguali. |
semanticHash() |
Restituisce un codice hash del piano di query logico su questo dataframe. |
| metodo |
Descrizione |
inputFiles() |
Restituisce uno snapshot ottimale dei file che compongono questo dataframe. |
Funzionalità avanzate di SQL
| metodo |
Descrizione |
isLocal() |
Restituisce True se i metodi collect e take possono essere eseguiti localmente. |
asTable() |
Converte il dataframe in un oggetto TableArg, che può essere utilizzato come argomento di tabella in un oggetto TVF. |
scalar() |
Restituisce un oggetto Column per una sottoquery SCALAR contenente esattamente una riga e una colonna. |
exists() |
Restituisce un oggetto Column per una sottoquery EXISTS. |
Examples
Operazioni di base sul dataframe
# Create a DataFrame
people = spark.createDataFrame([
{"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
{"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
{"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
{"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
])
# Select columns
people.select("name", "age").show()
# Filter rows
people.filter(people.age > 30).show()
# Add a new column
people.withColumn("age_plus_10", people.age + 10).show()
Aggregazione e raggruppamento
# Group by and aggregate
people.groupBy("gender").agg({"salary": "avg", "age": "max"}).show()
# Multiple aggregations
from pyspark.sql import functions as F
people.groupBy("deptId").agg(
F.avg("salary").alias("avg_salary"),
F.max("age").alias("max_age")
).show()
Joins
# Create another DataFrame
department = spark.createDataFrame([
{"id": 1, "name": "PySpark"},
{"id": 2, "name": "ML"},
{"id": 3, "name": "Spark SQL"}
])
# Join DataFrames
people.join(department, people.deptId == department.id).show()
# Chained operations
result = people.filter(people.age > 30) \\
.join(department, people.deptId == department.id) \\
.groupBy(department.name, "gender") \\
.agg({"salary": "avg", "age": "max"}) \\
.sort("max(age)")
result.show()