Principes de base de PySpark
Cet article propose des exemples simples pour illustrer l’utilisation de PySpark. Il part du principe que vous comprenez les concepts fondamentaux Apache Spark et que vous exécutez des commandes dans un notebook Azure Databricks connecté au calcul. Vous créez des DataFrames à l’aide d’exemples de données, effectuez des transformations de base, notamment des opérations de ligne et de colonne sur ces données, vous combinez plusieurs DataFrames et vous agrégez et visualisez ces données, puis vous les enregistrez dans une table ou un fichier.
Charger des données
Certains exemples de cet article utilisent des exemples de données fournis par Databricks pour illustrer l’utilisation de DataFrames pour charger, transformer et enregistrer des données. Si vous souhaitez utiliser vos propres données qui ne sont pas encore dans Databricks, vous pouvez commencer par le charger et créer un DataFrame à partir de celui-ci. Consultez Créer ou modifier une table en utilisant le chargement de fichiers et Charger des fichiers dans un volume Unity Catalog.
À propos des exemples de données Databricks
Databricks fournit des exemples de données dans le catalogue samples
et dans le répertoire /databricks-datasets
.
- Pour accéder aux exemples de données du catalogue
samples
, utilisez le formatsamples.<schema-name>.<table-name>
. Cet article utilise des tables dans le schémasamples.tpch
, qui contient les données d’une entreprise fictive. La tablecustomer
contient des informations sur les clients etorders
contient des informations sur les commandes passées par ces clients. - Utilisez
dbutils.fs.ls
pour explorer les données dans/databricks-datasets
. Utilisez Spark SQL ou DataFrames pour interroger des données à cet emplacement à l’aide des chemins d’accès aux fichiers. Pour en savoir plus sur les exemples de données fournis par Databricks, consultez Exemples de jeux de données.
Importer des types de données
De nombreuses opérations PySpark nécessitent que vous utilisiez des fonctions SQL ou que vous interagissiez avec des types Spark natifs. Vous pouvez importer directement ces fonctions et types dont vous avez besoin, ou importer l’intégralité du module.
# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
Étant donné que certaines fonctions importées peuvent remplacer les fonctions intégrées Python, certains utilisateurs choisissent d’importer ces modules à l’aide d’un alias. Les exemples suivants montrent un alias courant utilisé dans les exemples de code Apache Spark :
import pyspark.sql.types as T
import pyspark.sql.functions as F
Pour obtenir la liste complète des types de données, consultez Types de données Spark.
Pour obtenir la liste complète des fonctions SQL PySpark, consultez Fonctions Spark.
Créer un DataFrame
Il existe plusieurs façons de créer un DataFrame. En règle générale, vous définissez un DataFrame sur une source de données telle qu’une table ou une collection de fichiers. Ensuite, comme décrit dans la section Concepts fondamentaux d’Apache Spark, utilisez une action, telle que display
, pour déclencher les transformations à exécuter. La méthode display
génère des DataFrames.
Créer un DataFrame avec des valeurs spécifiées
Pour créer un DataFrame avec des valeurs spécifiées, utilisez la méthode createDataFrame
, où les lignes sont exprimées sous forme de liste de tuples :
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)
Dans la sortie, vous remarquerez que les types de données de colonnes de df_children
sont déduits automatiquement. Vous pouvez également spécifier les types en ajoutant un schéma. Les schémas sont définis à l’aide du StructType
constitué de StructFields
qui spécifient le nom, le type de données et d’un indicateur booléen indiquant s’ils contiennent une valeur Null ou non. Vous devez importer des types de données à partir de pyspark.sql.types
.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)
Créer un DataFrame à partir d’une table dans le catalogue Unity
Pour créer un DataFrame à partir d’une table dans le catalogue Unity, utilisez la méthode table
identifiant la table au format <catalog-name>.<schema-name>.<table-name>
. Cliquez sur Catalogue dans la barre de navigation de gauche pour utiliser Explorateur de catalogues pour accéder à votre table. Cliquez dessus, puis sélectionnez Copier le chemin de la table pour insérer le chemin de la table dans le notebook.
L’exemple suivant charge la table samples.tpch.customer
, mais vous pouvez également fournir le chemin d’accès à votre propre table.
df_customer = spark.table('samples.tpch.customer')
display(df_customer)
Créer un DataFrame à partir d’un fichier chargé
Pour créer un DataFrame à partir d’un fichier que vous avez chargé dans des volumes de catalogue Unity, utilisez la propriété read
. Cette méthode retourne une DataFrameReader
, que vous pouvez ensuite utiliser pour lire le format approprié. Cliquez sur l’option de catalogue dans la petite barre latérale à gauche et utilisez le navigateur du catalogue pour localiser votre fichier. Sélectionnez-le, puis cliquez sur Copier le chemin du fichier de volume.
L’exemple ci-dessous lit à partir d’un fichier *.csv
, mais DataFrameReader
prend en charge le chargement de fichiers dans de nombreux autres formats. Consultez Méthodes DataFrameReader.
# Assign this variable your full volume file path
volume_file_path = ""
df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)
Pour plus d’informations sur les volumes Unity Catalog, consultez Présentation des volumes Unity Catalog.
Créer un DataFrame à partir d’une réponse JSON
Pour créer un DataFrame à partir d’une charge utile de réponse JSON retournée par une API REST, utilisez le package Python requests
pour interroger et analyser la réponse. Vous devez importer le package pour l’utiliser. Cet exemple utilise des données de la base de données de l’application de médicaments de la Food and Drug Administration des États-Unis.
import requests
# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)
# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)
Pour plus d’informations sur l’utilisation de JSON et d’autres données semi-structurées sur Databricks, consultez Modéliser des données semi-structurées.
Sélectionner un champ ou un objet JSON
Pour sélectionner un champ ou un objet spécifique à partir du JSON converti, utilisez la notation []
. Par exemple, pour sélectionner le champ products
qui est lui-même un tableau de produits :
display(df_drugs.select(df_drugs["products"]))
Vous pouvez également chaîner des appels de méthode pour parcourir plusieurs champs. Par exemple, pour générer le nom de marque du premier produit dans une application de médicaments :
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
Créez un DataFrame à partir d’un fichier
Pour illustrer la création d’un DataFrame à partir d’un fichier, cet exemple charge les données CSV dans le répertoire /databricks-datasets
.
Pour accéder aux exemples de jeux de données, vous pouvez utiliser les commandes de système de fichiers Databricks Utilties. L’exemple suivant utilise dbutils
pour répertorier les jeux de données disponibles dans /databricks-datasets
:
display(dbutils.fs.ls('/databricks-datasets'))
Vous pouvez également utiliser %fs
pour accéder aux commandes de système de fichiers CLI Databricks, comme illustré dans l’exemple suivant :
%fs ls '/databricks-datasets'
Pour créer un DataFrame à partir d’un fichier ou d’un répertoire de fichiers, spécifiez le chemin d’accès dans la méthode load
:
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)
Transformer des données avec des DataFrames
Les DataFrames facilitent la transformation des données à l’aide de méthodes intégrées pour trier, filtrer et agréger des données. De nombreuses transformations ne sont pas spécifiées en tant que méthodes sur des DataFrames, mais sont fournies dans le package spark.sql.functions
. Consultez Fonctions SQL Spark Databricks.
- Opérations de colonne
- Opérations de ligne
- Joindre des DataFrames
- Données agrégées
- Appels de chaînage
Opérations sur les colonnes
Spark fournit de nombreuses opérations de colonne de base :
- Sélectionner des colonnes
- Créer des colonnes
- Renommer des colonnes
- Caster des types de colonnes
- Supprimer des colonnes
Conseil
Pour générer toutes les colonnes d’un DataFrame, utilisez columns
, par exemple df_customer.columns
.
Sélectionner des colonnes
Vous pouvez sélectionner des colonnes spécifiques à l’aide de select
et de col
. La fonction col
se trouve dans le sous-module pyspark.sql.functions
.
from pyspark.sql.functions import col
df_customer.select(
col("c_custkey"),
col("c_acctbal")
)
Vous pouvez également faire référence à une colonne à l’aide de expr
qui prend une expression définie comme une chaîne :
from pyspark.sql.functions import expr
df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)
Vous pouvez également utiliser selectExpr
, qui accepte les expressions SQL :
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)
Pour sélectionner des colonnes à l’aide d’un littéral de chaîne, procédez de la manière suivante :
df_customer.select(
"c_custkey",
"c_acctbal"
)
Pour sélectionner explicitement une colonne à partir d’un DataFrame spécifique, vous pouvez utiliser l’opérateur []
ou l’opérateur .
. (Il est impossible d’utiliser l’opérateur .
pour sélectionner des colonnes commençant par un entier, ou celles qui contiennent un espace ou un caractère spécial.) Cela peut être particulièrement utile lorsque vous joignez des DataFrames où certaines colonnes ont le même nom.
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)
Créer des colonnes
Pour créer une colonne, utilisez la méthode withColumn
. L’exemple suivant crée une colonne qui contient une valeur booléenne selon si le solde du compte client c_acctbal
dépasse 1000
:
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
Renommer des colonnes
Pour renommer une colonne, utilisez la méthode withColumnRenamed
, qui accepte les noms de colonnes existants et nouveaux :
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
La méthode alias
est particulièrement utile lorsque vous souhaitez renommer vos colonnes dans le cadre d’agrégations :
from pyspark.sql.functions import avg
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)
display(df_segment_balance)
Caster des types de colonnes
Dans certains cas, vous pouvez modifier le type de données d’une ou plusieurs colonnes de votre DataFrame. Pour ce faire, utilisez la méthode cast
pour convertir entre les types de données de colonne. L’exemple suivant montre comment convertir une colonne d’un entier en type de chaîne à l’aide de la méthode col
pour référencer une colonne :
from pyspark.sql.functions import col
df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))
Supprimer des colonnes
Pour supprimer des colonnes, vous pouvez omettre des colonnes pendant une sélection ou select(*) except
, ou utiliser la méthode drop
:
df_customer_flag_renamed.drop("balance_flag_renamed")
Vous pouvez également supprimer plusieurs colonnes à la fois :
df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")
Opérations sur les lignes
Spark fournit de nombreuses opérations de ligne de base :
- Lignes de filtre
- Supprimer les lignes en double
- Gérer les valeurs Null
- Ajouter des lignes
- Trier les lignes
- Lignes de filtre
Filtrer les lignes
Pour filtrer les lignes, utilisez la méthode filter
ou where
sur un DataFrame pour renvoyer uniquement certaines lignes. Pour identifier une colonne à filtrer, utilisez la méthode col
ou une expression qui prend la valeur d’une colonne.
from pyspark.sql.functions import col
df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
Pour filtrer sur plusieurs conditions, utilisez des opérateurs logiques. Par exemple, &
et |
vous permettent les conditions AND
et OR
, respectivement. L’exemple suivant filtre les lignes où c_nationkey
est égal à 20
et c_acctbal
est supérieur à 1000
.
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
Supprimer les lignes en double
Pour dédupliquer des lignes, utilisez distinct
, qui retourne uniquement les lignes uniques.
df_unique = df_customer.distinct()
Gérer les valeurs Null
Pour gérer les valeurs Null, supprimez les lignes qui contiennent des valeurs Null à l’aide de la méthode na.drop
. Cette méthode vous permet de spécifier si vous souhaitez supprimer des lignes contenant des valeurs Null any
ou des valeurs Null all
.
Pour supprimer les valeurs Null, utilisez l’un des exemples suivants.
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")
Si, au lieu de cela, vous souhaitez uniquement filtrer les lignes qui contiennent toutes les valeurs Null, utilisez ceci :
df_customer_no_nulls = df_customer.na.drop("all")
Vous pouvez appliquer cela à un sous-ensemble de colonnes en spécifiant ceci, comme indiqué ci-dessous :
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
Pour remplir les valeurs manquantes, utilisez la méthode fill
. Vous pouvez choisir de l’appliquer à toutes les colonnes ou à un sous-ensemble de colonnes. Dans l’exemple ci-dessous, les soldes de compte ayant une valeur Null pour leur solde de compte c_acctbal
sont remplis avec 0
.
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
Pour remplacer des chaînes par d’autres valeurs, utilisez la méthode replace
. Dans l’exemple ci-dessous, toutes les chaînes d’adresses vides sont remplacées par le mot UNKNOWN
:
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
Ajouter des lignes
Pour ajouter des lignes, vous devez utiliser la méthode union
pour créer un DataFrame. Dans l’exemple suivant, le DataFrame df_that_one_customer
créé précédemment et df_filtered_customer
sont combinés, ce qui retourne un DataFrame avec trois clients :
df_appended_rows = df_that_one_customer.union(df_filtered_customer)
display(df_appended_rows)
Remarque
Vous pouvez également combiner des DataFrames en les écrivant dans une table, puis en ajoutant de nouvelles lignes. Pour les charges de travail de production, le traitement incrémentiel des sources de données vers une table cible peut réduire considérablement la latence et les coûts de calcul à mesure que les données augmentent en taille. Consultez Réception de données dans un lac de données Databricks.
Trier les lignes
Important
Le tri peut être coûteux à grande échelle et, si vous stockez des données triées et rechargez les données avec Spark, l’ordre n’est pas garanti. Assurez-vous de vos intentions dans votre utilisation du tri.
Pour trier les lignes d’une ou plusieurs colonnes, utilisez la méthode sort
ou orderBy
. Par défaut, ces méthodes trient dans l’ordre croissant :
df_customer.orderBy(col("c_acctbal"))
Pour filtrer dans l’ordre décroissant, utilisez desc
:
df_customer.sort(col("c_custkey").desc())
L’exemple suivant montre comment trier sur deux colonnes :
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
Pour limiter le nombre de lignes à retourner une fois le DataFrame trié, utilisez la méthode limit
. L’exemple suivant affiche uniquement les résultats 10
principaux :
display(df_sorted.limit(10))
Joindre des DataFrames
Pour joindre au moins deux DataFrames, utilisez la méthode join
. Vous pouvez spécifier la façon dont vous souhaitez que les DataFrames soient joints aux paramètres how
(type de jointure) et on
(sur quelles colonnes baser la jointure). Les types de jointures courants sont les suivants :
inner
: il s’agit du type de jointure par défaut, qui retourne un DataFrame qui conserve uniquement les lignes où il existe une correspondance pour le paramètreon
entre les DataFrames.left
: cela conserve toutes les lignes du premier DataFrame spécifié et uniquement les lignes du second DataFrame spécifié qui ont une correspondance avec le premier.outer
: une jointure externe conserve toutes les lignes des deux DataFrames, quelle que soit la correspondance.
Pour plus d’informations sur les jointures, consultez Travailler avec des jointures sur Azure Databricks. Pour obtenir la liste des jointures prises en charge dans PySpark, consultez Jointures DataFrame.
L’exemple suivant retourne un dataFrame unique dans lequel chaque ligne du DataFrame orders
est jointe à la ligne correspondante du DataFrame customers
. Une jointure interne est utilisée, car l’attente est que chaque commande correspond exactement à un client.
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)
display(df_joined)
Pour joindre sur plusieurs conditions, utilisez des opérateurs booléens tels que &
et |
pour spécifier AND
et OR
, respectivement. L’exemple suivant ajoute une condition supplémentaire, en filtrant uniquement les lignes avec o_totalprice
supérieur à 500,000
:
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)
display(df_complex_joined)
Agréger les données
Pour agréger des données dans un DataFrame, comme GROUP BY
dans SQL, utilisez la méthode groupBy
pour spécifier des colonnes à regrouper et la méthode agg
pour spécifier des agrégations. Importez des agrégations courantes, notamment avg
, sum
, max
et min
à partir de pyspark.sql.functions
. L’exemple suivant montre le solde moyen du client par segment de marché :
from pyspark.sql.functions import avg
# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_balance)
from pyspark.sql.functions import avg
# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_nation_balance)
Certaines agrégations sont des actions, ce qui signifie qu’elles déclenchent des calculs. Dans ce cas, vous n’avez pas besoin d’utiliser d’autres actions pour générer des résultats.
Pour compter des lignes dans un DataFrame, utilisez la méthode count
:
df_customer.count()
Appels de chaînage
Les méthodes qui transforment les DataFrames retournent des DataFrames, et Spark n’agit pas sur les transformations tant que les actions ne sont pas appelées. Cette évaluation différée signifie que vous pouvez chaîner plusieurs méthodes pour des raisons pratiques et de lisibilité. L’exemple suivant montre comment chaîner le filtrage, l’agrégation et l’ordre :
from pyspark.sql.functions import count
df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)
display(df_chained)
Visualisez votre DataFrame
Pour visualiser un DataFrame dans un notebook, cliquez sur le signe + en regard de la table située en haut à gauche du DataFrame, puis sélectionnez Visualisation pour ajouter un ou plusieurs graphiques en fonction de votre DataFrame. Pour plus de détails sur les visualisations, consultez Visualisations dans les notebooks Databricks.
display(df_order)
Pour effectuer des visualisations supplémentaires, Databricks recommande d’utiliser l’API Pandas pour Spark. Le .pandas_api()
vous permet de caster vers l’API Pandas correspondante pour un DataFrame Spark. Pour plus d’informations, consultez API Pandas sur Spark.
Enregistrer vos données
Une fois que vous avez transformé vos données, vous pouvez l’enregistrer à l’aide des méthodes DataFrameWriter
. Vous trouverez la liste complète de ces méthodes dans DataFrameWriter. Les sections suivantes montrent comment enregistrer votre DataFrame en tant que table et sous forme de collection de fichiers de données.
Enregistrer votre DataFrame en tant que table
Pour enregistrer votre DataFrame en tant que table dans le catalogue Unity, utilisez la méthode write.saveAsTable
et spécifiez le chemin d’accès au format <catalog-name>.<schema-name>.<table-name>
.
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
Écrire votre DataFrame en tant que CSV
Pour écrire votre DataFrame au format *.csv
, utilisez la méthode write.csv
, en spécifiant le format et les options. Par défaut, si les données existent au chemin spécifié, l’opération d’écriture échoue. Vous pouvez spécifier à l’un des modes suivants qu’il doit effectuer une autre action :
overwrite
remplace toutes les données existantes dans le chemin cible avec le contenu du DataFrame.append
ajoute le contenu du DataFrame aux données dans le chemin cible.ignore
échoue silencieusement l’écriture si les données existent dans le chemin cible.
L’exemple suivant montre comment remplacer les données avec le contenu du DataFrame en tant que fichiers CSV :
# Assign this variable your file path
file_path = ""
(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)
Étapes suivantes
Pour profiter d’autres fonctionnalités Spark sur Databricks, consultez :