Partager via


Classe DataFrame

Collection distribuée de données regroupées en colonnes nommées.

Un DataFrame équivaut à une table relationnelle dans Spark SQL et peut être créé à l’aide de différentes fonctions dans SparkSession.

Important

Un DataFrame ne doit pas être créé directement à l’aide du constructeur.

Prend en charge Spark Connect

Propriétés

Propriété Description
sparkSession Retourne SparkSession qui a créé ce DataFrame.
rdd Retourne le contenu sous la forme d’un RDD de ligne (mode Classique uniquement).
na Retourne un DataFrameNaFunctions pour gérer les valeurs manquantes .
stat Retourne un DataFrameStatFunctions pour les fonctions statistiques.
write Interface permettant d’enregistrer le contenu du DataFrame hors streaming dans un stockage externe.
writeStream Interface permettant d’enregistrer le contenu du DataFrame de diffusion en continu dans un stockage externe.
schema Retourne le schéma de ce DataFrame en tant que StructType.
dtypes Retourne tous les noms de colonnes et leurs types de données sous forme de liste.
columns Récupère les noms de toutes les colonnes du DataFrame sous forme de liste.
storageLevel Obtenez le niveau de stockage actuel du DataFrame.
isStreaming Retourne True si ce DataFrame contient une ou plusieurs sources qui retournent en permanence des données à mesure qu’elles arrivent.
executionInfo Retourne un objet ExecutionInfo après l’exécution de la requête.
plot Retourne un PySparkPlotAccessor pour les fonctions de traçage.

Méthodes

Affichage et inspection des données

Méthode Description
toJSON(use_unicode) Convertit un DataFrame en RDD de chaîne ou dataFrame.
printSchema(level) Imprime le schéma au format d’arborescence.
explain(extended, mode) Imprime les plans (logiques et physiques) dans la console à des fins de débogage.
show(n, truncate, vertical) Imprime les n premières lignes du DataFrame dans la console.
collect() Retourne tous les enregistrements dans le DataFrame sous forme de liste de lignes.
toLocalIterator(prefetchPartitions) Retourne un itérateur qui contient toutes les lignes de ce DataFrame.
take(num) Retourne les premières lignes num sous la forme d’une liste de lignes.
tail(num) Retourne les dernières lignes num sous la forme d’une liste de lignes.
head(n) Retourne les n premières lignes.
first() Retourne la première ligne sous forme de ligne.
count() Retourne le nombre de lignes dans ce DataFrame.
isEmpty() Vérifie si le DataFrame est vide et retourne une valeur booléenne.
describe(*cols) Calcule les statistiques de base pour les colonnes numériques et de chaînes.
summary(*statistics) Calcule les statistiques spécifiées pour les colonnes numériques et de chaînes.

Vues temporaires

Méthode Description
createTempView(name) Crée une vue temporaire locale avec ce DataFrame.
createOrReplaceTempView(name) Crée ou remplace une vue temporaire locale par ce DataFrame.
createGlobalTempView(name) Crée une vue temporaire globale avec ce DataFrame.
createOrReplaceGlobalTempView(name) Crée ou remplace une vue temporaire globale à l’aide du nom donné.

Sélection et projection

Méthode Description
select(*cols) Projette un ensemble d’expressions et retourne un nouveau DataFrame.
selectExpr(*expr) Projette un ensemble d’expressions SQL et retourne un nouveau DataFrame.
filter(condition) Filtre les lignes à l’aide de la condition donnée.
where(condition) Alias pour le filtre.
drop(*cols) Retourne un nouveau DataFrame sans colonnes spécifiées.
toDF(*cols) Retourne un nouveau DataFrame avec de nouveaux noms de colonnes spécifiés.
withColumn(colName, col) Retourne un nouveau DataFrame en ajoutant une colonne ou en remplaçant la colonne existante portant le même nom.
withColumns(*colsMap) Retourne un nouveau DataFrame en ajoutant plusieurs colonnes ou en remplaçant les colonnes existantes qui ont les mêmes noms.
withColumnRenamed(existing, new) Retourne un nouveau DataFrame en renommant une colonne existante.
withColumnsRenamed(colsMap) Retourne un nouveau DataFrame en renommant plusieurs colonnes.
withMetadata(columnName, metadata) Retourne un nouveau DataFrame en mettant à jour une colonne existante avec des métadonnées.
metadataColumn(colName) Sélectionne une colonne de métadonnées en fonction de son nom de colonne logique et la retourne en tant que colonne.
colRegex(colName) Sélectionne la colonne en fonction du nom de colonne spécifié en tant que regex et la retourne en tant que colonne.

Tri et classement

Méthode Description
sort(*cols, **kwargs) Retourne un nouveau DataFrame trié par les colonnes spécifiées.
orderBy(*cols, **kwargs) Alias pour le tri.
sortWithinPartitions(*cols, **kwargs) Retourne un nouveau DataFrame avec chaque partition triée par la ou les colonnes spécifiées.

Agrégation et regroupement

Méthode Description
groupBy(*cols) Regroupe le DataFrame par les colonnes spécifiées afin que l’agrégation puisse être effectuée sur ces colonnes.
rollup(*cols) Créez un cumul multidimensionnel pour le DataFrame actuel à l’aide des colonnes spécifiées.
cube(*cols) Créez un cube multidimensionnel pour le DataFrame actuel à l’aide des colonnes spécifiées.
groupingSets(groupingSets, *cols) Créez une agrégation multidimensionnelle pour le DataFrame actuel à l’aide des jeux de regroupement spécifiés.
agg(*exprs) Agréger sur l’ensemble du DataFrame sans groupes (raccourci pour df.groupBy().agg()).
observe(observation, *exprs) Définissez les métriques (nommées) à observer sur le DataFrame.

Joins

Méthode Description
join(other, on, how) Jointure à un autre DataFrame à l’aide de l’expression de jointure donnée.
crossJoin(other) Retourne le produit cartésien avec un autre DataFrame.
lateralJoin(other, on, how) Jointures latérales avec un autre DataFrame, à l’aide de l’expression de jointure donnée.

Définir des opérations

Méthode Description
union(other) Retourne un nouveau DataFrame contenant l’union de lignes dans ce dataFrame et un autre DataFrame.
unionByName(other, allowMissingColumns) Retourne un nouveau DataFrame contenant l’union de lignes dans ce dataFrame et un autre DataFrame.
intersect(other) Retourne un nouveau DataFrame contenant des lignes uniquement dans ce DataFrame et un autre DataFrame.
intersectAll(other) Retourne un nouveau DataFrame contenant des lignes dans ce DataFrame et un autre DataFrame tout en préservant les doublons.
subtract(other) Retourne un nouveau DataFrame contenant des lignes dans ce DataFrame, mais pas dans un autre DataFrame.
exceptAll(other) Retourne un nouveau DataFrame contenant des lignes dans ce DataFrame, mais pas dans un autre DataFrame tout en préservant les doublons.

Deduplication

Méthode Description
distinct() Retourne un nouveau DataFrame contenant les lignes distinctes de ce DataFrame.
dropDuplicates(subset) Retourne un nouveau DataFrame avec des lignes dupliquées supprimées, éventuellement compte tenu de certaines colonnes.
dropDuplicatesWithinWatermark(subset) Retourne un nouveau DataFrame avec des lignes en double supprimées, éventuellement uniquement compte tenu de certaines colonnes, dans le filigrane.

Échantillonnage et fractionnement

Méthode Description
sample(withReplacement, fraction, seed) Retourne un sous-ensemble échantillonné de ce DataFrame.
sampleBy(col, fractions, seed) Retourne un échantillon stratifié sans remplacement en fonction de la fraction donnée sur chaque strate.
randomSplit(weights, seed) Fractionne de façon aléatoire ce DataFrame avec les pondérations fournies.

Partitionnement

Méthode Description
coalesce(numPartitions) Retourne un nouveau DataFrame qui a exactement des partitions numPartitions.
repartition(numPartitions, *cols) Retourne un nouveau DataFrame partitionné par les expressions de partitionnement données données.
repartitionByRange(numPartitions, *cols) Retourne un nouveau DataFrame partitionné par les expressions de partitionnement données données.
repartitionById(numPartitions, partitionIdCol) Retourne un nouveau DataFrame partitionné par l’expression d’ID de partition donnée.

Remise en forme

Méthode Description
unpivot(ids, values, variableColumnName, valueColumnName) Dissociez un DataFrame d’un format large au format long.
melt(ids, values, variableColumnName, valueColumnName) Alias pour unpivot.
transpose(indexColumn) Transpose un DataFrame de sorte que les valeurs de la colonne d’index spécifiée deviennent les nouvelles colonnes.

Gestion des données manquantes

Méthode Description
dropna(how, thresh, subset) Retourne un nouveau DataFrame omettant des lignes avec des valeurs Null ou NaN.
fillna(value, subset) Retourne un nouveau DataFrame dont les valeurs null sont remplies avec une nouvelle valeur.
replace(to_replace, value, subset) Retourne un nouveau DataFrame en remplaçant une valeur par une autre valeur.

Fonctions statistiques

Méthode Description
approxQuantile(col, probabilities, relativeError) Calcule les quantiles approximatifs des colonnes numériques d’un DataFrame.
corr(col1, col2, method) Calcule la corrélation de deux colonnes d’un DataFrame sous la forme d’une valeur double.
cov(col1, col2) Calculez l’exemple de covariance pour les colonnes données, spécifiées par leurs noms.
crosstab(col1, col2) Calcule une table de fréquences jumelée des colonnes données.
freqItems(cols, support) Recherche d’éléments fréquents pour les colonnes, éventuellement avec des faux positifs.

Opérations de schéma

Méthode Description
to(schema) Retourne un nouveau DataFrame où chaque ligne est rapprochée pour correspondre au schéma spécifié.
alias(alias) Retourne un nouveau DataFrame avec un jeu d’alias.

Itération

Méthode Description
foreach(f) Applique la fonction f à toutes les lignes de ce DataFrame.
foreachPartition(f) Applique la fonction f à chaque partition de ce DataFrame.

Mise en cache et persistance

Méthode Description
cache() Conserve le DataFrame avec le niveau de stockage par défaut (MEMORY_AND_DISK_DESER).
persist(storageLevel) Définit le niveau de stockage pour conserver le contenu du DataFrame entre les opérations.
unpersist(blocking) Marque le DataFrame comme non persistant et supprime tous les blocs pour celui-ci de la mémoire et du disque.

Points de contrôle

Méthode Description
checkpoint(eager) Retourne une version point de contrôle de ce DataFrame.
localCheckpoint(eager, storageLevel) Retourne une version localement point de contrôle de ce DataFrame.

Opérations de diffusion en continu

Méthode Description
withWatermark(eventTime, delayThreshold) Définit un filigrane d’heure d’événement pour ce DataFrame.

Indicateurs d’optimisation

Méthode Description
hint(name, *parameters) Spécifie un indicateur sur le DataFrame actuel.

Limites et décalages

Méthode Description
limit(num) Limite le nombre de résultats au nombre spécifié.
offset(num) Retourne un nouveau DataFrame en ignorant les premières lignes.

Transformations avancées

Méthode Description
transform(func, *args, **kwargs) Retourne un nouveau DataFrame. Syntaxe concise pour chaîner des transformations personnalisées.

Méthodes de conversion

Méthode Description
toPandas() Retourne le contenu de ce DataFrame en tant que pandas Pandas Pandas. DataFrame.
toArrow() Retourne le contenu de ce DataFrame en tant que pyarrow PyArrow. Table.
pandas_api(index_col) Convertit le DataFrame existant en dataFrame pandas-on-Spark.
mapInPandas(func, schema, barrier, profile) Mappe un itérateur de lots dans le DataFrame actuel à l’aide d’une fonction native Python.
mapInArrow(func, schema, barrier, profile) Mappe un itérateur de lots dans le DataFrame actuel à l’aide d’une fonction native Python effectuée sur pyarrow. RecordBatch.

Écriture de données

Méthode Description
writeTo(table) Créez un générateur de configuration d’écriture pour les sources v2.
mergeInto(table, condition) Fusionne un ensemble de mises à jour, d’insertions et de suppressions basées sur une table source dans une table cible.

Comparaison d’images de données

Méthode Description
sameSemantics(other) Retourne True lorsque les plans de requête logiques à l’intérieur des deux DataFrames sont égaux.
semanticHash() Retourne un code de hachage du plan de requête logique sur ce DataFrame.

Métadonnées et informations de fichier

Méthode Description
inputFiles() Retourne une capture instantanée optimale des fichiers qui composent ce DataFrame.

Fonctionnalités SQL avancées

Méthode Description
isLocal() Retourne true si les méthodes collect et take peuvent être exécutées localement.
asTable() Convertit le DataFrame en objet TableArg, qui peut être utilisé comme argument de table dans un objet TVF.
scalar() Retourne un objet Column pour une sous-requête SCALAR contenant exactement une ligne et une colonne.
exists() Retourne un objet Column pour une sous-requête EXISTS.

Exemples

Opérations dataFrame de base

# Create a DataFrame
people = spark.createDataFrame([
    {"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
    {"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
    {"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
    {"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
])

# Select columns
people.select("name", "age").show()

# Filter rows
people.filter(people.age > 30).show()

# Add a new column
people.withColumn("age_plus_10", people.age + 10).show()

Agrégation et regroupement

# Group by and aggregate
people.groupBy("gender").agg({"salary": "avg", "age": "max"}).show()

# Multiple aggregations
from pyspark.sql import functions as F
people.groupBy("deptId").agg(
    F.avg("salary").alias("avg_salary"),
    F.max("age").alias("max_age")
).show()

Joins

# Create another DataFrame
department = spark.createDataFrame([
    {"id": 1, "name": "PySpark"},
    {"id": 2, "name": "ML"},
    {"id": 3, "name": "Spark SQL"}
])

# Join DataFrames
people.join(department, people.deptId == department.id).show()

Transformations complexes

# Chained operations
result = people.filter(people.age > 30) \\
    .join(department, people.deptId == department.id) \\
    .groupBy(department.name, "gender") \\
    .agg({"salary": "avg", "age": "max"}) \\
    .sort("max(age)")
result.show()