Partager via


Principes de base de PySpark

Cet article propose des exemples simples pour illustrer l’utilisation de PySpark. Il part du principe que vous comprenez les concepts fondamentaux Apache Spark et que vous exécutez des commandes dans un notebook Azure Databricks connecté au calcul. Vous créez des DataFrames à l’aide d’exemples de données, effectuez des transformations de base, notamment des opérations de ligne et de colonne sur ces données, vous combinez plusieurs DataFrames et vous agrégez et visualisez ces données, puis vous les enregistrez dans une table ou un fichier.

Charger des données

Certains exemples de cet article utilisent des exemples de données fournis par Databricks pour illustrer l’utilisation de DataFrames pour charger, transformer et enregistrer des données. Si vous souhaitez utiliser vos propres données qui ne sont pas encore dans Databricks, vous pouvez commencer par le charger et créer un DataFrame à partir de celui-ci. Consultez Créer ou modifier une table en utilisant le chargement de fichiers et Charger des fichiers dans un volume Unity Catalog.

À propos des exemples de données Databricks

Databricks fournit des exemples de données dans le catalogue samples et dans le répertoire /databricks-datasets.

  • Pour accéder aux exemples de données du catalogue samples, utilisez le format samples.<schema-name>.<table-name>. Cet article utilise des tables dans le schéma samples.tpch, qui contient les données d’une entreprise fictive. La table customer contient des informations sur les clients et orders contient des informations sur les commandes passées par ces clients.
  • Utilisez dbutils.fs.ls pour explorer les données dans /databricks-datasets. Utilisez Spark SQL ou DataFrames pour interroger des données à cet emplacement à l’aide des chemins d’accès aux fichiers. Pour en savoir plus sur les exemples de données fournis par Databricks, consultez Exemples de jeux de données.

Importer des types de données

De nombreuses opérations PySpark nécessitent que vous utilisiez des fonctions SQL ou que vous interagissiez avec des types Spark natifs. Vous pouvez importer directement ces fonctions et types dont vous avez besoin, ou importer l’intégralité du module.

# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

Étant donné que certaines fonctions importées peuvent remplacer les fonctions intégrées Python, certains utilisateurs choisissent d’importer ces modules à l’aide d’un alias. Les exemples suivants montrent un alias courant utilisé dans les exemples de code Apache Spark :

import pyspark.sql.types as T
import pyspark.sql.functions as F

Pour obtenir la liste complète des types de données, consultez Types de données Spark.

Pour obtenir la liste complète des fonctions SQL PySpark, consultez Fonctions Spark.

Créer un DataFrame

Il existe plusieurs façons de créer un DataFrame. En règle générale, vous définissez un DataFrame sur une source de données telle qu’une table ou une collection de fichiers. Ensuite, comme décrit dans la section Concepts fondamentaux d’Apache Spark, utilisez une action, telle que display, pour déclencher les transformations à exécuter. La méthode display génère des DataFrames.

Créer un DataFrame avec des valeurs spécifiées

Pour créer un DataFrame avec des valeurs spécifiées, utilisez la méthode createDataFrame, où les lignes sont exprimées sous forme de liste de tuples :

df_children = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = ['name', 'age'])
display(df_children)

Dans la sortie, vous remarquerez que les types de données de colonnes de df_children sont déduits automatiquement. Vous pouvez également spécifier les types en ajoutant un schéma. Les schémas sont définis à l’aide du StructType constitué de StructFields qui spécifient le nom, le type de données et d’un indicateur booléen indiquant s’ils contiennent une valeur Null ou non. Vous devez importer des types de données à partir de pyspark.sql.types.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
  ])
)
display(df_children_with_schema)

Créer un DataFrame à partir d’une table dans le catalogue Unity

Pour créer un DataFrame à partir d’une table dans le catalogue Unity, utilisez la méthode table identifiant la table au format <catalog-name>.<schema-name>.<table-name>. Cliquez sur Catalogue dans la barre de navigation de gauche pour utiliser Explorateur de catalogues pour accéder à votre table. Cliquez dessus, puis sélectionnez Copier le chemin de la table pour insérer le chemin de la table dans le notebook.

L’exemple suivant charge la table samples.tpch.customer, mais vous pouvez également fournir le chemin d’accès à votre propre table.

df_customer = spark.table('samples.tpch.customer')
display(df_customer)

Créer un DataFrame à partir d’un fichier chargé

Pour créer un DataFrame à partir d’un fichier que vous avez chargé dans des volumes de catalogue Unity, utilisez la propriété read. Cette méthode retourne une DataFrameReader, que vous pouvez ensuite utiliser pour lire le format approprié. Cliquez sur l’option de catalogue dans la petite barre latérale à gauche et utilisez le navigateur du catalogue pour localiser votre fichier. Sélectionnez-le, puis cliquez sur Copier le chemin du fichier de volume.

L’exemple ci-dessous lit à partir d’un fichier *.csv, mais DataFrameReader prend en charge le chargement de fichiers dans de nombreux autres formats. Consultez Méthodes DataFrameReader.

# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(volume_file_path)
)
display(df_csv)

Pour plus d’informations sur les volumes Unity Catalog, consultez Présentation des volumes Unity Catalog.

Créer un DataFrame à partir d’une réponse JSON

Pour créer un DataFrame à partir d’une charge utile de réponse JSON retournée par une API REST, utilisez le package Python requests pour interroger et analyser la réponse. Vous devez importer le package pour l’utiliser. Cet exemple utilise des données de la base de données de l’application de médicaments de la Food and Drug Administration des États-Unis.

import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

Pour plus d’informations sur l’utilisation de JSON et d’autres données semi-structurées sur Databricks, consultez Modéliser des données semi-structurées.

Sélectionner un champ ou un objet JSON

Pour sélectionner un champ ou un objet spécifique à partir du JSON converti, utilisez la notation []. Par exemple, pour sélectionner le champ products qui est lui-même un tableau de produits :

display(df_drugs.select(df_drugs["products"]))

Vous pouvez également chaîner des appels de méthode pour parcourir plusieurs champs. Par exemple, pour générer le nom de marque du premier produit dans une application de médicaments :

display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

Créez un DataFrame à partir d’un fichier

Pour illustrer la création d’un DataFrame à partir d’un fichier, cet exemple charge les données CSV dans le répertoire /databricks-datasets.

Pour accéder aux exemples de jeux de données, vous pouvez utiliser les commandes de système de fichiers Databricks Utilties. L’exemple suivant utilise dbutils pour répertorier les jeux de données disponibles dans /databricks-datasets :

display(dbutils.fs.ls('/databricks-datasets'))

Vous pouvez également utiliser %fs pour accéder aux commandes de système de fichiers CLI Databricks, comme illustré dans l’exemple suivant :

%fs ls '/databricks-datasets'

Pour créer un DataFrame à partir d’un fichier ou d’un répertoire de fichiers, spécifiez le chemin d’accès dans la méthode load :

df_population = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

Transformer des données avec des DataFrames

Les DataFrames facilitent la transformation des données à l’aide de méthodes intégrées pour trier, filtrer et agréger des données. De nombreuses transformations ne sont pas spécifiées en tant que méthodes sur des DataFrames, mais sont fournies dans le package spark.sql.functions. Consultez Fonctions SQL Spark Databricks.

Opérations sur les colonnes

Spark fournit de nombreuses opérations de colonne de base :

Conseil

Pour générer toutes les colonnes d’un DataFrame, utilisez columns, par exemple df_customer.columns.

Sélectionner des colonnes

Vous pouvez sélectionner des colonnes spécifiques à l’aide de select et de col. La fonction col se trouve dans le sous-module pyspark.sql.functions.

from pyspark.sql.functions import col

df_customer.select(
  col("c_custkey"),
  col("c_acctbal")
)

Vous pouvez également faire référence à une colonne à l’aide de expr qui prend une expression définie comme une chaîne :

from pyspark.sql.functions import expr

df_customer.select(
  expr("c_custkey"),
  expr("c_acctbal")
)

Vous pouvez également utiliser selectExpr, qui accepte les expressions SQL :

df_customer.selectExpr(
  "c_custkey as key",
  "round(c_acctbal) as account_rounded"
)

Pour sélectionner des colonnes à l’aide d’un littéral de chaîne, procédez de la manière suivante :

df_customer.select(
  "c_custkey",
  "c_acctbal"
)

Pour sélectionner explicitement une colonne à partir d’un DataFrame spécifique, vous pouvez utiliser l’opérateur [] ou l’opérateur .. (Il est impossible d’utiliser l’opérateur . pour sélectionner des colonnes commençant par un entier, ou celles qui contiennent un espace ou un caractère spécial.) Cela peut être particulièrement utile lorsque vous joignez des DataFrames où certaines colonnes ont le même nom.

df_customer.select(
  df_customer["c_custkey"],
  df_customer["c_acctbal"]
)
df_customer.select(
  df_customer.c_custkey,
  df_customer.c_acctbal
)

Créer des colonnes

Pour créer une colonne, utilisez la méthode withColumn. L’exemple suivant crée une colonne qui contient une valeur booléenne selon si le solde du compte client c_acctbal dépasse 1000 :

df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

Renommer des colonnes

Pour renommer une colonne, utilisez la méthode withColumnRenamed, qui accepte les noms de colonnes existants et nouveaux :

df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

La méthode alias est particulièrement utile lorsque vous souhaitez renommer vos colonnes dans le cadre d’agrégations :

from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

Caster des types de colonnes

Dans certains cas, vous pouvez modifier le type de données d’une ou plusieurs colonnes de votre DataFrame. Pour ce faire, utilisez la méthode cast pour convertir entre les types de données de colonne. L’exemple suivant montre comment convertir une colonne d’un entier en type de chaîne à l’aide de la méthode col pour référencer une colonne :

from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

Supprimer des colonnes

Pour supprimer des colonnes, vous pouvez omettre des colonnes pendant une sélection ou select(*) except, ou utiliser la méthode drop :

df_customer_flag_renamed.drop("balance_flag_renamed")

Vous pouvez également supprimer plusieurs colonnes à la fois :

df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

Opérations sur les lignes

Spark fournit de nombreuses opérations de ligne de base :

Filtrer les lignes

Pour filtrer les lignes, utilisez la méthode filter ou where sur un DataFrame pour renvoyer uniquement certaines lignes. Pour identifier une colonne à filtrer, utilisez la méthode col ou une expression qui prend la valeur d’une colonne.

from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

Pour filtrer sur plusieurs conditions, utilisez des opérateurs logiques. Par exemple, & et | vous permettent les conditions AND et OR, respectivement. L’exemple suivant filtre les lignes où c_nationkey est égal à 20 et c_acctbal est supérieur à 1000.

df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

Supprimer les lignes en double

Pour dédupliquer des lignes, utilisez distinct, qui retourne uniquement les lignes uniques.

df_unique = df_customer.distinct()

Gérer les valeurs Null

Pour gérer les valeurs Null, supprimez les lignes qui contiennent des valeurs Null à l’aide de la méthode na.drop. Cette méthode vous permet de spécifier si vous souhaitez supprimer des lignes contenant des valeurs Null any ou des valeurs Null all.

Pour supprimer les valeurs Null, utilisez l’un des exemples suivants.

df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

Si, au lieu de cela, vous souhaitez uniquement filtrer les lignes qui contiennent toutes les valeurs Null, utilisez ceci :

df_customer_no_nulls = df_customer.na.drop("all")

Vous pouvez appliquer cela à un sous-ensemble de colonnes en spécifiant ceci, comme indiqué ci-dessous :

df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

Pour remplir les valeurs manquantes, utilisez la méthode fill. Vous pouvez choisir de l’appliquer à toutes les colonnes ou à un sous-ensemble de colonnes. Dans l’exemple ci-dessous, les soldes de compte ayant une valeur Null pour leur solde de compte c_acctbal sont remplis avec 0.

df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

Pour remplacer des chaînes par d’autres valeurs, utilisez la méthode replace. Dans l’exemple ci-dessous, toutes les chaînes d’adresses vides sont remplacées par le mot UNKNOWN :

df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

Ajouter des lignes

Pour ajouter des lignes, vous devez utiliser la méthode union pour créer un DataFrame. Dans l’exemple suivant, le DataFrame df_that_one_customer créé précédemment et df_filtered_customer sont combinés, ce qui retourne un DataFrame avec trois clients :

df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)

Remarque

Vous pouvez également combiner des DataFrames en les écrivant dans une table, puis en ajoutant de nouvelles lignes. Pour les charges de travail de production, le traitement incrémentiel des sources de données vers une table cible peut réduire considérablement la latence et les coûts de calcul à mesure que les données augmentent en taille. Consultez Réception de données dans un lac de données Databricks.

Trier les lignes

Important

Le tri peut être coûteux à grande échelle et, si vous stockez des données triées et rechargez les données avec Spark, l’ordre n’est pas garanti. Assurez-vous de vos intentions dans votre utilisation du tri.

Pour trier les lignes d’une ou plusieurs colonnes, utilisez la méthode sort ou orderBy. Par défaut, ces méthodes trient dans l’ordre croissant :

df_customer.orderBy(col("c_acctbal"))

Pour filtrer dans l’ordre décroissant, utilisez desc :

df_customer.sort(col("c_custkey").desc())

L’exemple suivant montre comment trier sur deux colonnes :

df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

Pour limiter le nombre de lignes à retourner une fois le DataFrame trié, utilisez la méthode limit. L’exemple suivant affiche uniquement les résultats 10 principaux :

display(df_sorted.limit(10))

Joindre des DataFrames

Pour joindre au moins deux DataFrames, utilisez la méthode join. Vous pouvez spécifier la façon dont vous souhaitez que les DataFrames soient joints aux paramètres how (type de jointure) et on (sur quelles colonnes baser la jointure). Les types de jointures courants sont les suivants :

  • inner : il s’agit du type de jointure par défaut, qui retourne un DataFrame qui conserve uniquement les lignes où il existe une correspondance pour le paramètre on entre les DataFrames.
  • left : cela conserve toutes les lignes du premier DataFrame spécifié et uniquement les lignes du second DataFrame spécifié qui ont une correspondance avec le premier.
  • outer : une jointure externe conserve toutes les lignes des deux DataFrames, quelle que soit la correspondance.

Pour plus d’informations sur les jointures, consultez Travailler avec des jointures sur Azure Databricks. Pour obtenir la liste des jointures prises en charge dans PySpark, consultez Jointures DataFrame.

L’exemple suivant retourne un dataFrame unique dans lequel chaque ligne du DataFrame orders est jointe à la ligne correspondante du DataFrame customers. Une jointure interne est utilisée, car l’attente est que chaque commande correspond exactement à un client.

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
  df_customer,
  on = df_order["o_custkey"] == df_customer["c_custkey"],
  how = "inner"
)

display(df_joined)

Pour joindre sur plusieurs conditions, utilisez des opérateurs booléens tels que & et | pour spécifier AND et OR, respectivement. L’exemple suivant ajoute une condition supplémentaire, en filtrant uniquement les lignes avec o_totalprice supérieur à 500,000 :

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
  df_customer,
  on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
  how = "inner"
)

display(df_complex_joined)

Agréger les données

Pour agréger des données dans un DataFrame, comme GROUP BY dans SQL, utilisez la méthode groupBy pour spécifier des colonnes à regrouper et la méthode agg pour spécifier des agrégations. Importez des agrégations courantes, notamment avg, sum, max et min à partir de pyspark.sql.functions. L’exemple suivant montre le solde moyen du client par segment de marché :

from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

Certaines agrégations sont des actions, ce qui signifie qu’elles déclenchent des calculs. Dans ce cas, vous n’avez pas besoin d’utiliser d’autres actions pour générer des résultats.

Pour compter des lignes dans un DataFrame, utilisez la méthode count :

df_customer.count()

Appels de chaînage

Les méthodes qui transforment les DataFrames retournent des DataFrames, et Spark n’agit pas sur les transformations tant que les actions ne sont pas appelées. Cette évaluation différée signifie que vous pouvez chaîner plusieurs méthodes pour des raisons pratiques et de lisibilité. L’exemple suivant montre comment chaîner le filtrage, l’agrégation et l’ordre :

from pyspark.sql.functions import count

df_chained = (
    df_order.filter(col("o_orderstatus") == "F")
    .groupBy(col("o_orderpriority"))
    .agg(count(col("o_orderkey")).alias("n_orders"))
    .sort(col("n_orders").desc())
)

display(df_chained)

Visualisez votre DataFrame

Pour visualiser un DataFrame dans un notebook, cliquez sur le signe + en regard de la table située en haut à gauche du DataFrame, puis sélectionnez Visualisation pour ajouter un ou plusieurs graphiques en fonction de votre DataFrame. Pour plus de détails sur les visualisations, consultez Visualisations dans les notebooks Databricks.

display(df_order)

Pour effectuer des visualisations supplémentaires, Databricks recommande d’utiliser l’API Pandas pour Spark. Le .pandas_api() vous permet de caster vers l’API Pandas correspondante pour un DataFrame Spark. Pour plus d’informations, consultez API Pandas sur Spark.

Enregistrer vos données

Une fois que vous avez transformé vos données, vous pouvez l’enregistrer à l’aide des méthodes DataFrameWriter. Vous trouverez la liste complète de ces méthodes dans DataFrameWriter. Les sections suivantes montrent comment enregistrer votre DataFrame en tant que table et sous forme de collection de fichiers de données.

Enregistrer votre DataFrame en tant que table

Pour enregistrer votre DataFrame en tant que table dans le catalogue Unity, utilisez la méthode write.saveAsTable et spécifiez le chemin d’accès au format <catalog-name>.<schema-name>.<table-name>.

df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

Écrire votre DataFrame en tant que CSV

Pour écrire votre DataFrame au format *.csv, utilisez la méthode write.csv, en spécifiant le format et les options. Par défaut, si les données existent au chemin spécifié, l’opération d’écriture échoue. Vous pouvez spécifier à l’un des modes suivants qu’il doit effectuer une autre action :

  • overwrite remplace toutes les données existantes dans le chemin cible avec le contenu du DataFrame.
  • append ajoute le contenu du DataFrame aux données dans le chemin cible.
  • ignore échoue silencieusement l’écriture si les données existent dans le chemin cible.

L’exemple suivant montre comment remplacer les données avec le contenu du DataFrame en tant que fichiers CSV :

# Assign this variable your file path
file_path = ""

(df_joined.write
  .format("csv")
  .mode("overwrite")
  .write(file_path)
)

Étapes suivantes

Pour profiter d’autres fonctionnalités Spark sur Databricks, consultez :