Tutoriel : charger et transformer des données dans PySpark DataFrames

Ce tutoriel vous montre comment charger et transformer des données relatives à une ville américaine en utilisant l’API DataFrame Apache Spark Python (PySpark) dans Azure Databricks.

À la fin de ce tutoriel, vous saurez ce qu’est un DataFrame et serez en mesure d’effectuer les tâches suivantes :

Consultez également la référence de l’API PySpark Apache Spark.

Qu’est-ce qu’un DataFrame ?

Un DataFrame est une structure de données étiquetée à deux dimensions avec des colonnes de types potentiellement différents. Vous pouvez considérer un DataFrame comme une feuille de calcul, une table SQL ou un dictionnaire d’objets de série. Les DataFrames Apache Spark offre un ensemble complet de fonctions (select columns, filter, join, aggregate, etc.) qui vous permettent de résoudre efficacement les problèmes courants d’analyse des données.

Les DataFrames Apache Spark sont une abstraction basée sur des jeux de données distribués résilients (RDD). Les DataFrames Spark et Spark SQL utilisent un moteur de planification et d’optimisation unifié, ce qui vous permet d’obtenir des performances presque identiques dans tous les langages pris en charge sur Azure Databricks (Python, SQL, Scala et R).

Spécifications

Pour terminer le tutoriel suivant, vous devez répondre aux exigences suivantes :

Remarque

Si vous ne disposez pas des privilèges de contrôle de cluster, vous pouvez quand même effectuer la plupart des étapes suivantes tant que vous avez accès à un cluster.

Dans la barre latérale de la page d’accueil, vous accédez aux entités Azure Databricks : le navigateur de l’espace de travail, le catalogue, l’explorateur, les workflows et le calcul. L’espace de travail est le dossier racine qui stocke vos ressources Azure Databricks, telles que les notebooks et les bibliothèques.

Étape 1 : créer un DataFrame avec Python

Pour savoir comment naviguer dans les notebooks Azure Databricks, consultez Interface et contrôles du notebook Databricks.

  1. Ouvrez un notebook et insérez une nouvelle cellule en cliquant sur l’icône New Icon.
  2. Copiez et collez le code suivant dans une cellule vide du notebook, puis appuyez sur Shift+Enter pour exécuter la cellule. L’exemple de code suivant crée un DataFrame nommé df1 avec des données sur la population d’une ville et affiche son contenu.
data = [[295, "South Bend", "Indiana", "IN", 101190, 112.9]]
columns = ["rank", "city", "state", "code", "population", "price"]

df1 = spark.createDataFrame(data, schema="rank LONG, city STRING, state STRING, code STRING, population LONG, price DOUBLE")
display(df1)

Étape 2 : charger des données dans un DataFrame à partir de fichiers

Ajoutez des données sur la population d’autres villes à partir du répertoire /databricks-datasets dans df2.

Pour charger des données dans le DataFrame df2 à partir du fichier data_geo.csv :

  1. Créez une cellule dans le notebook.
  2. Copiez et collez le code suivant dans la cellule vide du notebook, puis appuyez sur Shift+Enter pour exécuter la cellule.

Vous pouvez charger des données à partir de nombreux formats de fichiers pris en charge. L’exemple suivant utilise un jeu de données disponible dans le répertoire /databricks-datasets, accessible à partir de la plupart des espaces de travail. Consultez Exemples de jeux de données.

df2 = (spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)

Étape 3 : afficher et interagir avec votre DataFrame

Utilisez les méthodes suivantes pour afficher vos DataFrames sur la population des villes et interagir avec eux.

Combiner des DataFrames

Combinez le contenu de votre premier DataFrame avec le DataFrame qui contient le contenu de data_geo.csv.

Dans le notebook, utilisez l’exemple de code suivant pour créer un DataFrame qui ajouter les lignes d’un DataFrame dans un autre en tirant parti de l’opération d’union :

# Returns a DataFrame that combines the rows of df1 and df2
df = df1.union(df2)

Afficher le DataFrame

Pour afficher les données des villes américaines dans un format tabulaire, utilisez la commande Azure Databricks display() dans une cellule de notebook.

display(df)

Spark utilise le schéma de terme pour faire référence aux noms et aux types de données des colonnes dans le DataFrame.

Imprimez le schéma de votre DataFrame avec la méthode .printSchema() suivante dans votre notebook. Utilisez la métadonnée obtenue pour interagir avec le contenu de votre DataFrame.

df.printSchema()

Remarque

Azure Databricks utilise également le terme schéma pour décrire une collection de tables inscrites dans un catalogue.

Filtrer des lignes dans un DataFrame

Découvrez les cinq villes les plus peuplées dans votre jeu de données en filtrant des lignes, à l’aide de .filter() ou .where(). Utilisez le filtrage pour sélectionner un sous-ensemble de lignes à retourner ou modifier dans un DataFrame. Il n’existe aucune différence de performances ou de syntaxe, comme indiqué dans les exemples suivants.

# Filter rows using .filter()
filtered_df = df.filter(df["rank"] < 6)
display(filtered_df)

# Filter rows using .where()
filtered_df = df.where(df["rank"] < 6)
display(filtered_df)

Sélectionner des colonnes à partir d’un DataFrame

Découvrez l’état dans lequel se trouve une ville avec la méthode select(). Sélectionnez des colonnes en passant un ou plusieurs noms de colonnes dans .select(), comme dans l’exemple suivant :

select_df = df.select("City", "State")
display(select_df)

Créer un sous-ensemble DataFrame

Créez un sous-ensemble DataFrame avec les dix villes ayant la population la plus élevée et affichez les données obtenues. Combinez la sélection et le filtrage de requêtes pour limiter les lignes et les colonnes retournées en utilisant le code suivant dans votre notebook :

subset_df = df.filter(df["rank"] < 11).select("City")
display(subset_df)

Étape 4 : enregistrer le DataFrame

Vous pouvez enregistrer votre DataFrame dans une table ou écrire le DataFrame dans un ou plusieurs fichiers.

Enregistrer le DataFrame dans une table

Azure Databricks utilise par défaut le format Delta Lake pour toutes les tables. Pour enregistrer votre DataFrame, vous devez avoir des privilèges de table CREATE sur le catalogue et le schéma. L’exemple suivant enregistre le contenu du DataFrame dans une table nommée us_cities :

df.write.saveAsTable("us_cities")

La plupart des applications Spark utilisent des jeux de données volumineux et de manière distribuée. Spark écrit un répertoire de fichiers plutôt qu’un fichier unique. Delta Lake fractionne les dossiers et fichiers Parquet. De nombreux systèmes de données peuvent lire ces répertoires de fichiers. Azure Databricks recommande l’utilisation de tables plutôt que des chemins d’accès aux fichiers pour la plupart des applications.

Enregistrer le DataFrame dans des fichiers JSON

L’exemple suivant enregistre un répertoire de fichiers JSON :

# Write a DataFrame to a collection of files
df.write.format("json").save("/tmp/json_data")

Lire le DataFrame à partir d’un fichier JSON

# Read a DataFrame from a JSON file
df3 = spark.read.format("json").json("/tmp/json_data")
display(df3)

Tâches supplémentaires : exécuter des requêtes SQL dans PySpark

Les DataFrames Spark fournissent les options suivantes pour combiner SQL avec Python. Vous pouvez exécuter le code suivant dans le même notebook que celui que vous avez créé pour ce tutoriel.

Spécifier une colonne comme requête SQL

La méthode selectExpr() vous permet de spécifier chaque colonne en tant que requête SQL, comme dans l’exemple suivant :

display(df.selectExpr("`rank`", "upper(city) as big_name"))

Importez expr()

Vous pouvez importer la fonction expr() à partir de pyspark.sql.functions pour utiliser la syntaxe SQL dès qu’une colonne serait spécifiée, comme dans l’exemple suivant :

from pyspark.sql.functions import expr

display(df.select("rank", expr("lower(city) as little_name")))

Exécuter une requête SQL arbitraire

Vous pouvez utiliser spark.sql() pour exécuter des requêtes SQL arbitraires, comme dans l’exemple suivant :

query_df = spark.sql("SELECT * FROM us_cities")

Paramétrer des requêtes SQL

Vous pouvez utiliser la mise en forme Python pour paramétriser des requêtes SQL, comme dans l’exemple suivant :

table_name = "us_cities"

query_df = spark.sql(f"SELECT * FROM {table_name}")

Ressources supplémentaires