Analyser les données avec Spark
L’un des avantages de l’utilisation de Spark est que vous pouvez écrire et exécuter du code dans différents langages de programmation, ce qui vous permet d’utiliser les compétences de programmation que vous avez déjà et d’utiliser le langage le plus approprié pour une tâche donnée. Le langage par défaut dans un nouveau notebook Azure Synapse Analytics Spark est PySpark, version optimisée Spark de Python couramment utilisée par les scientifiques et analystes Données en raison de sa forte prise en charge de la manipulation et de la visualisation des données. De plus, vous pouvez utiliser des langages tels que Scala (langage dérivé de Java qui peut être utilisé de manière interactive) et SQL (variante du langage couramment utilisé SQL inclus dans la bibliothèque Spark SQL pour travailler avec des structures de données relationnelles). Les ingénieurs logiciels peuvent également créer des solutions compilées qui s’exécutent sur Spark en utilisant des frameworks tels que Java et Microsoft .NET.
Exploration de données avec des dataframes
En mode natif, Spark utilise une structure de données appelée jeu de données distribué résilient (RDD, resilient distributed dataset). Toutefois, même si vous pouvez écrire du code qui fonctionne directement avec des jeux RDD, la structure de données la plus couramment utilisée pour utiliser des données structurées dans Spark est le dataframe, qui est fourni dans le cadre de la bibliothèque Spark SQL. Les dataframes dans Spark sont similaires à ceux de la bibliothèque Pandas Python omniprésente, mais sont optimisés pour fonctionner dans l’environnement de traitement distribué de Spark.
Notes
En plus de l’API Dataframe, Spark SQL fournit une API Dataset fortement typée qui est prise en charge dans Java et Scala. Dans ce module, nous allons nous concentrer sur l’API Dataframe.
Chargement des données dans un dataframe
Explorons un exemple hypothétique afin de voir comment utiliser un dataframe pour travailler avec des données. Supposons que vous ayez les données suivantes dans un fichier texte délimité par des virgules appelé products.csv dans le compte de stockage principal d’un espace de travail Azure Synapse Analytics :
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Dans un notebook Spark, vous pouvez utiliser le code PySpark suivant pour charger les données dans un dataframe et afficher les 10 premières lignes :
%%pyspark
df = spark.read.load('abfss://container@store.dfs.core.windows.net/products.csv',
format='csv',
header=True
)
display(df.limit(10))
La ligne %%pyspark
au début est appelée magic et indique à Spark que le langage utilisé dans cette cellule est PySpark. Vous pouvez sélectionner le langage que vous souhaitez utiliser comme valeur par défaut dans la barre d’outils de l’interface Notebook, puis utiliser une commande magic pour remplacer ce choix pour une cellule spécifique. Par exemple, voici le code Scala équivalent pour l’exemple de données des produits :
%%spark
val df = spark.read.format("csv").option("header", "true").load("abfss://container@store.dfs.core.windows.net/products.csv")
display(df.limit(10))
La commande magic %%spark
est utilisée pour spécifier Scala.
Ces deux exemples de code produisent une sortie comme suit :
ProductID | ProductName | Catégorie | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | VTT | 3399.9900 |
772 | Mountain-100 Silver, 42 | VTT | 3399.9900 |
773 | Mountain-100 Silver, 44 | VTT | 3399.9900 |
... | ... | ... | ... |
Spécification d’un schéma de dataframe
Dans l’exemple précédent, la première ligne du fichier CSV contenait les noms de colonne, et Spark pouvait déduire le type de données de chaque colonne en se basant sur les données qu’elle contenait. Vous pouvez également spécifier un schéma explicite pour les données, ce qui est utile lorsque les noms de colonne ne sont pas inclus dans le fichier de données, comme cet exemple CSV :
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
L’exemple PySpark suivant montre comment spécifier un schéma pour que le dataframe soit chargé à partir d’un fichier appelé product-data.csv dans ce format :
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('abfss://container@store.dfs.core.windows.net/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Les résultats seraient une fois de plus similaires à :
ProductID | ProductName | Catégorie | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | VTT | 3399.9900 |
772 | Mountain-100 Silver, 42 | VTT | 3399.9900 |
773 | Mountain-100 Silver, 44 | VTT | 3399.9900 |
... | ... | ... | ... |
Filtrage et regroupement des dataframes
Vous pouvez utiliser les méthodes de la classe Dataframe pour filtrer, trier, regrouper et manipuler les données qu’elle contient. Par exemple, l’exemple de code suivant utilise la méthode select pour récupérer les colonnes ProductName et ListPrice à partir du dataframe df contenant les données de produit de l’exemple précédent :
pricelist_df = df.select("ProductID", "ListPrice")
Les résultats de cet exemple de code devraient ressembler à ceci :
ProductID | ListPrice |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
Comme la plupart des méthodes de manipulation de données, select retourne un nouvel objet de dataframe.
Conseil
La sélection d’une partie des colonnes d’un dataframe est une opération courante, qui peut également être réalisée à l’aide de la syntaxe plus courte suivante :
pricelist_df = df["ProductID", "ListPrice"]
Vous pouvez « chaîner » les méthodes ensemble pour effectuer une série de manipulations qui entraînent un dataframe transformé. Par exemple, cet exemple de code chaîne les méthodes select et where pour créer un dataframe contenant les colonnes ProductName et ListPrice des produits avec la catégorie Vélos VTT ou Vélos de route :
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Les résultats de cet exemple de code devraient ressembler à ceci :
ProductName | ListPrice |
---|---|
Mountain-100 Silver, 38 | 3399.9900 |
Road-750 Noir, 52 | 539.9900 |
... | ... |
Pour regrouper et agréger des données, vous pouvez utiliser la méthode groupBy et les fonctions d’agrégation. Par exemple, le code PySpark suivant compte le nombre de produits de chaque catégorie :
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Les résultats de cet exemple de code devraient ressembler à ceci :
Catégorie | count |
---|---|
Oreillettes | 3 |
Roues | 14 |
VTT | 32 |
... | ... |
Utilisation d’expressions SQL dans Spark
L’API Dataframe fait partie d’une bibliothèque Spark appelée Spark SQL, qui permet aux analystes Données d’utiliser des expressions SQL pour interroger et manipuler des données.
Création d’objets de base de données dans le catalogue Spark
Le catalogue Spark est un metastore pour les objets de données relationnelles tels que les vues et les tables. Le runtime Spark peut utiliser le catalogue pour intégrer de façon fluide le code écrit dans n’importe quel langage pris en charge par Spark avec des expressions SQL qui peuvent être plus naturelles pour certains analystes Données ou développeurs.
L’une des méthodes les plus simples pour rendre les données d’un dataframe disponibles pour pouvoir les interroger dans le catalogue Spark consiste à créer une vue temporaire, comme illustré dans l’exemple de code suivant :
df.createOrReplaceTempView("products")
Une vue est temporaire, ce qui signifie qu’elle est automatiquement supprimée à la fin de la session active. Vous pouvez également créer des tables persistantes dans le catalogue pour définir une base de données pouvant être interrogée à l’aide de Spark SQL.
Notes
Nous n’allons pas explorer les tables de catalogue Spark en profondeur dans ce module, mais cela vaut la peine de prendre le temps de mettre en évidence quelques points clés :
- Vous pouvez créer une table vide à l’aide de la méthode
spark.catalog.createTable
. Les tables sont des structures de métadonnées qui stockent leurs données sous-jacentes dans l’emplacement de stockage associé au catalogue. La suppression d’une table supprime également ses données sous-jacentes. - Vous pouvez enregistrer un dataframe en tant que table en utilisant sa méthode
saveAsTable
. - Vous pouvez créer une table externe en utilisant la méthode
spark.catalog.createExternalTable
. Les tables externes définissent des métadonnées dans le catalogue, mais obtiennent leurs données sous-jacentes d’un emplacement de stockage externe, généralement un dossier dans un lac de données. La suppression d’une table externe ne supprime pas les données sous-jacentes.
Utilisation de l’API Spark SQL pour interroger des données
Vous pouvez utiliser l’API Spark SQL dans le code écrit dans n’importe quel langage pour interroger les données du catalogue. Par exemple, le code PySpark suivant utilise une requête SQL pour retourner les données de la vue produits en tant que dataframe.
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
Les résultats de l’exemple de code ressembleraient au tableau suivant :
ProductID | ProductName | ListPrice |
---|---|---|
38 | Mountain-100 Silver, 38 | 3399.9900 |
52 | Road-750 Noir, 52 | 539.9900 |
... | ... | ... |
Utilisation du code SQL
L’exemple précédent a montré comment utiliser l’API Spark SQL pour incorporer des expressions SQL dans le code Spark. Dans un notebook, vous pouvez également utiliser la commande magic %%sql
pour exécuter le code SQL qui interroge les objets du catalogue, comme suit :
%%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
L’exemple de code SQL retourne un jeu de résultats qui s’affiche automatiquement dans le notebook sous forme de tableau, comme celui ci-dessous :
Catégorie | ProductCount |
---|---|
Cuissards | 3 |
Porte-vélos | 1 |
Supports à vélos | 1 |
... | ... |