Collection distribuée de données regroupées en colonnes nommées.
Un DataFrame équivaut à une table relationnelle dans Spark SQL et peut être créé à l’aide de différentes fonctions dans SparkSession.
Important
Un DataFrame ne doit pas être créé directement à l’aide du constructeur.
Prend en charge Spark Connect
Propriétés
| Propriété |
Description |
sparkSession |
Retourne SparkSession qui a créé ce DataFrame. |
rdd |
Retourne le contenu sous la forme d’un RDD de ligne (mode Classique uniquement). |
na |
Retourne un DataFrameNaFunctions pour gérer les valeurs manquantes . |
stat |
Retourne un DataFrameStatFunctions pour les fonctions statistiques. |
write |
Interface permettant d’enregistrer le contenu du DataFrame hors streaming dans un stockage externe. |
writeStream |
Interface permettant d’enregistrer le contenu du DataFrame de diffusion en continu dans un stockage externe. |
schema |
Retourne le schéma de ce DataFrame en tant que StructType. |
dtypes |
Retourne tous les noms de colonnes et leurs types de données sous forme de liste. |
columns |
Récupère les noms de toutes les colonnes du DataFrame sous forme de liste. |
storageLevel |
Obtenez le niveau de stockage actuel du DataFrame. |
isStreaming |
Retourne True si ce DataFrame contient une ou plusieurs sources qui retournent en permanence des données à mesure qu’elles arrivent. |
executionInfo |
Retourne un objet ExecutionInfo après l’exécution de la requête. |
plot |
Retourne un PySparkPlotAccessor pour les fonctions de traçage. |
Méthodes
Affichage et inspection des données
| Méthode |
Description |
toJSON(use_unicode) |
Convertit un DataFrame en RDD de chaîne ou dataFrame. |
printSchema(level) |
Imprime le schéma au format d’arborescence. |
explain(extended, mode) |
Imprime les plans (logiques et physiques) dans la console à des fins de débogage. |
show(n, truncate, vertical) |
Imprime les n premières lignes du DataFrame dans la console. |
collect() |
Retourne tous les enregistrements dans le DataFrame sous forme de liste de lignes. |
toLocalIterator(prefetchPartitions) |
Retourne un itérateur qui contient toutes les lignes de ce DataFrame. |
take(num) |
Retourne les premières lignes num sous la forme d’une liste de lignes. |
tail(num) |
Retourne les dernières lignes num sous la forme d’une liste de lignes. |
head(n) |
Retourne les n premières lignes. |
first() |
Retourne la première ligne sous forme de ligne. |
count() |
Retourne le nombre de lignes dans ce DataFrame. |
isEmpty() |
Vérifie si le DataFrame est vide et retourne une valeur booléenne. |
describe(*cols) |
Calcule les statistiques de base pour les colonnes numériques et de chaînes. |
summary(*statistics) |
Calcule les statistiques spécifiées pour les colonnes numériques et de chaînes. |
Vues temporaires
Sélection et projection
| Méthode |
Description |
select(*cols) |
Projette un ensemble d’expressions et retourne un nouveau DataFrame. |
selectExpr(*expr) |
Projette un ensemble d’expressions SQL et retourne un nouveau DataFrame. |
filter(condition) |
Filtre les lignes à l’aide de la condition donnée. |
where(condition) |
Alias pour le filtre. |
drop(*cols) |
Retourne un nouveau DataFrame sans colonnes spécifiées. |
toDF(*cols) |
Retourne un nouveau DataFrame avec de nouveaux noms de colonnes spécifiés. |
withColumn(colName, col) |
Retourne un nouveau DataFrame en ajoutant une colonne ou en remplaçant la colonne existante portant le même nom. |
withColumns(*colsMap) |
Retourne un nouveau DataFrame en ajoutant plusieurs colonnes ou en remplaçant les colonnes existantes qui ont les mêmes noms. |
withColumnRenamed(existing, new) |
Retourne un nouveau DataFrame en renommant une colonne existante. |
withColumnsRenamed(colsMap) |
Retourne un nouveau DataFrame en renommant plusieurs colonnes. |
withMetadata(columnName, metadata) |
Retourne un nouveau DataFrame en mettant à jour une colonne existante avec des métadonnées. |
metadataColumn(colName) |
Sélectionne une colonne de métadonnées en fonction de son nom de colonne logique et la retourne en tant que colonne. |
colRegex(colName) |
Sélectionne la colonne en fonction du nom de colonne spécifié en tant que regex et la retourne en tant que colonne. |
Tri et classement
Agrégation et regroupement
| Méthode |
Description |
groupBy(*cols) |
Regroupe le DataFrame par les colonnes spécifiées afin que l’agrégation puisse être effectuée sur ces colonnes. |
rollup(*cols) |
Créez un cumul multidimensionnel pour le DataFrame actuel à l’aide des colonnes spécifiées. |
cube(*cols) |
Créez un cube multidimensionnel pour le DataFrame actuel à l’aide des colonnes spécifiées. |
groupingSets(groupingSets, *cols) |
Créez une agrégation multidimensionnelle pour le DataFrame actuel à l’aide des jeux de regroupement spécifiés. |
agg(*exprs) |
Agréger sur l’ensemble du DataFrame sans groupes (raccourci pour df.groupBy().agg()). |
observe(observation, *exprs) |
Définissez les métriques (nommées) à observer sur le DataFrame. |
Joins
Définir des opérations
| Méthode |
Description |
union(other) |
Retourne un nouveau DataFrame contenant l’union de lignes dans ce dataFrame et un autre DataFrame. |
unionByName(other, allowMissingColumns) |
Retourne un nouveau DataFrame contenant l’union de lignes dans ce dataFrame et un autre DataFrame. |
intersect(other) |
Retourne un nouveau DataFrame contenant des lignes uniquement dans ce DataFrame et un autre DataFrame. |
intersectAll(other) |
Retourne un nouveau DataFrame contenant des lignes dans ce DataFrame et un autre DataFrame tout en préservant les doublons. |
subtract(other) |
Retourne un nouveau DataFrame contenant des lignes dans ce DataFrame, mais pas dans un autre DataFrame. |
exceptAll(other) |
Retourne un nouveau DataFrame contenant des lignes dans ce DataFrame, mais pas dans un autre DataFrame tout en préservant les doublons. |
Deduplication
| Méthode |
Description |
distinct() |
Retourne un nouveau DataFrame contenant les lignes distinctes de ce DataFrame. |
dropDuplicates(subset) |
Retourne un nouveau DataFrame avec des lignes dupliquées supprimées, éventuellement compte tenu de certaines colonnes. |
dropDuplicatesWithinWatermark(subset) |
Retourne un nouveau DataFrame avec des lignes en double supprimées, éventuellement uniquement compte tenu de certaines colonnes, dans le filigrane. |
Échantillonnage et fractionnement
Partitionnement
| Méthode |
Description |
coalesce(numPartitions) |
Retourne un nouveau DataFrame qui a exactement des partitions numPartitions. |
repartition(numPartitions, *cols) |
Retourne un nouveau DataFrame partitionné par les expressions de partitionnement données données. |
repartitionByRange(numPartitions, *cols) |
Retourne un nouveau DataFrame partitionné par les expressions de partitionnement données données. |
repartitionById(numPartitions, partitionIdCol) |
Retourne un nouveau DataFrame partitionné par l’expression d’ID de partition donnée. |
Remise en forme
Gestion des données manquantes
Fonctions statistiques
Opérations de schéma
| Méthode |
Description |
to(schema) |
Retourne un nouveau DataFrame où chaque ligne est rapprochée pour correspondre au schéma spécifié. |
alias(alias) |
Retourne un nouveau DataFrame avec un jeu d’alias. |
Itération
| Méthode |
Description |
foreach(f) |
Applique la fonction f à toutes les lignes de ce DataFrame. |
foreachPartition(f) |
Applique la fonction f à chaque partition de ce DataFrame. |
Mise en cache et persistance
| Méthode |
Description |
cache() |
Conserve le DataFrame avec le niveau de stockage par défaut (MEMORY_AND_DISK_DESER). |
persist(storageLevel) |
Définit le niveau de stockage pour conserver le contenu du DataFrame entre les opérations. |
unpersist(blocking) |
Marque le DataFrame comme non persistant et supprime tous les blocs pour celui-ci de la mémoire et du disque. |
Points de contrôle
Opérations de diffusion en continu
Indicateurs d’optimisation
Limites et décalages
| Méthode |
Description |
limit(num) |
Limite le nombre de résultats au nombre spécifié. |
offset(num) |
Retourne un nouveau DataFrame en ignorant les premières lignes. |
Méthodes de conversion
Écriture de données
| Méthode |
Description |
writeTo(table) |
Créez un générateur de configuration d’écriture pour les sources v2. |
mergeInto(table, condition) |
Fusionne un ensemble de mises à jour, d’insertions et de suppressions basées sur une table source dans une table cible. |
Comparaison d’images de données
| Méthode |
Description |
sameSemantics(other) |
Retourne True lorsque les plans de requête logiques à l’intérieur des deux DataFrames sont égaux. |
semanticHash() |
Retourne un code de hachage du plan de requête logique sur ce DataFrame. |
| Méthode |
Description |
inputFiles() |
Retourne une capture instantanée optimale des fichiers qui composent ce DataFrame. |
Fonctionnalités SQL avancées
| Méthode |
Description |
isLocal() |
Retourne true si les méthodes collect et take peuvent être exécutées localement. |
asTable() |
Convertit le DataFrame en objet TableArg, qui peut être utilisé comme argument de table dans un objet TVF. |
scalar() |
Retourne un objet Column pour une sous-requête SCALAR contenant exactement une ligne et une colonne. |
exists() |
Retourne un objet Column pour une sous-requête EXISTS. |
Exemples
Opérations dataFrame de base
# Create a DataFrame
people = spark.createDataFrame([
{"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
{"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
{"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
{"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
])
# Select columns
people.select("name", "age").show()
# Filter rows
people.filter(people.age > 30).show()
# Add a new column
people.withColumn("age_plus_10", people.age + 10).show()
Agrégation et regroupement
# Group by and aggregate
people.groupBy("gender").agg({"salary": "avg", "age": "max"}).show()
# Multiple aggregations
from pyspark.sql import functions as F
people.groupBy("deptId").agg(
F.avg("salary").alias("avg_salary"),
F.max("age").alias("max_age")
).show()
Joins
# Create another DataFrame
department = spark.createDataFrame([
{"id": 1, "name": "PySpark"},
{"id": 2, "name": "ML"},
{"id": 3, "name": "Spark SQL"}
])
# Join DataFrames
people.join(department, people.deptId == department.id).show()
# Chained operations
result = people.filter(people.age > 30) \\
.join(department, people.deptId == department.id) \\
.groupBy(department.name, "gender") \\
.agg({"salary": "avg", "age": "max"}) \\
.sort("max(age)")
result.show()