Tutoriel : charger et transformer des données en utilisant des DataFrames Apache Spark
Ce tutoriel vous montre comment charger et transformer des données en utilisant l’API Apache Spark Python (PySpark) DataFrame, l’API Apache Spark Scala DataFrame et l’API SparkR SparkDataFrame dans Azure Databricks.
À la fin de ce tutoriel, vous saurez ce qu’est un DataFrame et serez en mesure d’effectuer les tâches suivantes :
Python
- Définir des variables et copier des données publiques dans un volume Unity Catalog
- Créer un DataFrame avec Python
- Charger des données dans un DataFrame à partir d’un fichier CSV
- Afficher un DataFrame et interagir avec celui-ci
- Enregistrer le DataFrame
- Exécuter des requêtes SQL dans PySpark
Consultez également la référence de l’API PySpark Apache Spark.
Scala
- Définir des variables et copier des données publiques dans un volume Unity Catalog
- Créer un DataFrame avec Scala
- Charger des données dans un DataFrame à partir d’un fichier CSV
- Afficher un DataFrame et interagir avec celui-ci
- Enregistrer le DataFrame
- Exécuter des requêtes SQL dans Apache Spark
Consultez également Référence de l’API Scala Apache Spark.
R
- Définir des variables et copier des données publiques dans un volume Unity Catalog
- Créer un SparkDataFrame SparkR
- Charger des données dans un DataFrame à partir d’un fichier CSV
- Afficher un DataFrame et interagir avec celui-ci
- Enregistrer le DataFrame
- Exécuter des requêtes SQL dans SparkR
Consultez également les informations de référence sur l’API Apache SparkR.
Qu’est-ce qu’un DataFrame ?
Un DataFrame est une structure de données étiquetée à deux dimensions avec des colonnes de types potentiellement différents. Vous pouvez considérer un DataFrame comme une feuille de calcul, une table SQL ou un dictionnaire d’objets de série. Les DataFrames Apache Spark offre un ensemble complet de fonctions (select columns, filter, join, aggregate, etc.) qui vous permettent de résoudre efficacement les problèmes courants d’analyse des données.
Les DataFrames Apache Spark sont une abstraction basée sur des jeux de données distribués résilients (RDD). Les DataFrames Spark et Spark SQL utilisent un moteur de planification et d’optimisation unifié, ce qui vous permet d’obtenir des performances presque identiques dans tous les langages pris en charge sur Azure Databricks (Python, SQL, Scala et R).
Spécifications
Pour suivre le tutoriel ci-après, les conditions suivantes doivent être remplies :
Pour utiliser les exemples de ce tutoriel, Unity Catalog doit être activé dans votre espace de travail.
Les exemples de ce tutoriel utilisent un volume Unity Catalog pour stocker des exemples de données. Pour utiliser ces exemples, créez un volume et utilisez les noms de catalogue, de schéma et de volume associés à ce volume afin de définir le chemin d’accès au volume utilisé par les exemples.
Vous devez disposer des autorisations suivantes dans Unity Catalog :
READ VOLUME
etWRITE VOLUME
, ouALL PRIVILEGES
pour le volume utilisé dans ce tutoriel.USE SCHEMA
ouALL PRIVILEGES
pour le schéma utilisé dans ce tutoriel.USE CATALOG
ouALL PRIVILEGES
pour le catalogue utilisé dans ce tutoriel.
Pour définir ces autorisations, contactez votre administrateur Databricks ou consultez Privilèges et objets sécurisables dans Unity Catalog.
Conseil
Pour découvrir un notebook terminé pour cet article, consultez Notebooks des tutoriels sur les DataFrames.
Étape 1 : définir des variables et charger un fichier CSV
Cette étape définit les variables à utiliser dans ce tutoriel, puis charge un fichier CSV de noms de bébé provenant de health.data.ny.gov dans votre volume Unity Catalog.
Ouvrez un nouveau notebook en cliquant sur l’icône . Pour savoir comment naviguer dans les notebooks Azure Databricks, consultez Interface et contrôles du notebook Databricks.
Copiez et collez le code suivant dans la nouvelle cellule de notebook vide. Remplacez
<catalog-name>
,<schema-name>
et<volume-name>
par les noms de catalogue, de schéma et de volume d’un volume Unity Catalog. Remplacez<table_name>
par le nom de table de votre choix. Plus loin dans ce tutoriel, vous chargerez des noms de bébé dans cette table.Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete path
Appuyez sur
Shift+Enter
pour exécuter la cellule et créer une cellule vide.Copiez et collez le code suivant dans la nouvelle cellule de notebook vide. Ce code copie le fichier
rows.csv
de health.data.ny.gov dans votre volume Unity Catalog à l’aide de la commande Databricks dbutuils.Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Appuyez sur
Shift+Enter
pour exécuter la cellule et passer à la cellule suivante.
Étape 2 : créer un DataFrame
Cette étape crée un DataFrame nommé df1
avec des données de test, puis affiche son contenu.
Copiez et collez le code suivant dans la nouvelle cellule de notebook vide. Ce code crée le DataFrame avec des données de test, puis affiche le contenu et le schéma du DataFrame.
Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
Appuyez sur
Shift+Enter
pour exécuter la cellule et passer à la cellule suivante.
Étape 3 : charger des données dans un DataFrame à partir d’un fichier CSV
Cette étape crée un DataFrame nommé df_csv
à partir du fichier CSV précédemment chargé dans votre volume Unity Catalog. Consultez spark.read.csv.
Copiez et collez le code suivant dans la nouvelle cellule de notebook vide. Ce code charge les noms de bébé dans le DataFrame
df_csv
à partir du fichier CSV, puis affiche le contenu du DataFrame.Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
Appuyez sur
Shift+Enter
pour exécuter la cellule et passer à la cellule suivante.
Vous pouvez charger des données à partir de nombreux formats de fichiers pris en charge.
Étape 4 : afficher votre DataFrame et interagir avec celui-ci
Utilisez les méthodes suivantes pour afficher vos DataFrames de noms de bébé et interagir avec eux.
Imprimer le schéma DataFrame
Découvrez comment afficher le schéma d’un DataFrame Apache Spark. Apache Spark utilise le terme schéma pour désigner les noms et les types de données des colonnes du DataFrame.
Remarque
Azure Databricks utilise également le terme schéma pour décrire une collection de tables inscrites dans un catalogue.
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code montre le schéma de vos DataFrames avec la méthode
.printSchema()
pour afficher les schémas des deux DataFrames et préparer l’union des deux DataFrames.Python
df_csv.printSchema() df1.printSchema()
Scala
dfCsv.printSchema() df1.printSchema()
R
printSchema(df_csv) printSchema(df1)
Appuyez sur
Shift+Enter
pour exécuter la cellule et passer à la cellule suivante.
Renommer une colonne dans le DataFrame
Découvrez comment renommer une colonne dans un DataFrame.
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code renomme une colonne dans le DataFrame
df1_csv
pour qu’elle reflète la colonne correspondante du DataFramedf1
. Ce code utilise la méthode Apache SparkwithColumnRenamed()
.Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema
Scala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)
Appuyez sur
Shift+Enter
pour exécuter la cellule et passer à la cellule suivante.
Combiner des DataFrames
Découvrez comment créer un DataFrame qui ajoute les lignes d’un DataFrame à un autre.
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la méthode Apache Spark
union()
pour combiner le contenu de votre premier DataFramedf
avec le DataFramedf_csv
contenant les noms de bébé chargés à partir du fichier CSV.Python
df = df1.union(df_csv) display(df)
Scala
val df = df1.union(dfCsvRenamed) display(df)
R
display(df <- union(df1, df_csv))
Appuyez sur
Shift+Enter
pour exécuter la cellule et passer à la cellule suivante.
Filtrer des lignes dans un DataFrame
Découvrez les noms de bébé les plus populaires dans votre jeu de données en filtrant les lignes à l’aide des méthodes Apache Spark .filter()
ou .where()
. Utilisez le filtrage pour sélectionner un sous-ensemble de lignes à retourner ou modifier dans un DataFrame. Il n’existe aucune différence de performances ou de syntaxe, comme indiqué dans les exemples suivants.
Utilisation de la méthode .filter()
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la méthode Apache Spark
.filter()
pour afficher les lignes du DataFrame avec un nombre supérieur à 50.Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
Appuyez sur
Shift+Enter
pour exécuter la cellule et passer à la cellule suivante.
Utilisation de la méthode .where()
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la méthode Apache Spark
.where()
pour afficher les lignes du DataFrame avec un nombre supérieur à 50.Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
Appuyez sur
Shift+Enter
pour exécuter la cellule et passer à la cellule suivante.
Sélectionner les colonnes d’un DataFrame et les classer par fréquence
Découvrez la fréquence des noms de bébé avec la méthode select()
pour spécifier les colonnes du DataFrame à retourner. Utilisez les fonctions Apache Spark orderby
et desc
pour trier les résultats.
Le module pyspark.sql pour Apache Spark prend en charge les fonctions SQL. Parmi les fonctions que nous utilisons dans ce tutoriel, citons les fonctions Apache Spark orderBy()
, desc()
et expr()
. Vous activez l’utilisation de ces fonctions en les important dans votre session si nécessaire.
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code importe la fonction
desc()
, puis utilise la méthode Apache Sparkselect()
et les fonctions Apache SparkorderBy()
etdesc()
pour afficher les noms les plus courants et leur nombre par ordre décroissant.Python
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
Appuyez sur
Shift+Enter
pour exécuter la cellule et passer à la cellule suivante.
Créer un sous-ensemble DataFrame
Découvrez comment créer un DataFrame de sous-ensemble à partir d’un DataFrame existant.
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la méthode Apache Spark
filter
pour créer un DataFrame limitant les données par année, nombre et sexe. Il utilise la méthode Apache Sparkselect()
pour limiter les colonnes. Il utilise également les fonctions Apache SparkorderBy()
etdesc()
pour trier le nouveau DataFrame par nombre.Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)
Appuyez sur
Shift+Enter
pour exécuter la cellule et passer à la cellule suivante.
Étape 5 : enregistrer le DataFrame
Découvrez comment enregistrer un DataFrame. Vous pouvez enregistrer votre DataFrame dans une table ou écrire le DataFrame dans un ou plusieurs fichiers.
Enregistrer le DataFrame dans une table
Azure Databricks utilise par défaut le format Delta Lake pour toutes les tables. Pour enregistrer votre DataFrame, vous devez avoir des privilèges de table CREATE
sur le catalogue et le schéma.
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code enregistre le contenu du DataFrame dans une table à l’aide de la variable définie au début de ce tutoriel.
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
Appuyez sur
Shift+Enter
pour exécuter la cellule et passer à la cellule suivante.
La plupart des applications Apache Spark fonctionnent sur des jeux de données volumineux de manière distribuée. Apache Spark écrit un répertoire de fichiers plutôt qu’un fichier unique. Delta Lake fractionne les dossiers et fichiers Parquet. De nombreux systèmes de données peuvent lire ces répertoires de fichiers. Azure Databricks recommande l’utilisation de tables plutôt que des chemins d’accès aux fichiers pour la plupart des applications.
Enregistrer le DataFrame dans des fichiers JSON
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code enregistre le DataFrame dans un répertoire de fichiers JSON.
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
Appuyez sur
Shift+Enter
pour exécuter la cellule et passer à la cellule suivante.
Lire le DataFrame à partir d’un fichier JSON
Découvrez comment utiliser la méthode Apache Spark spark.read.format()
pour lire les données JSON à partir d’un répertoire dans un DataFrame.
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code affiche les fichiers JSON enregistrés dans l’exemple précédent.
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
Appuyez sur
Shift+Enter
pour exécuter la cellule et passer à la cellule suivante.
Tâches supplémentaires : exécuter des requêtes SQL dans PySpark, Scala et R
Les DataFrames Apache Spark fournissent les options suivantes pour combiner SQL avec PySpark, Scala et R. Vous pouvez exécuter le code suivant dans le même notebook que celui que vous avez créé pour ce tutoriel.
Spécifier une colonne comme requête SQL
Découvrez comment utiliser la méthode Apache Spark selectExpr()
. Il s’agit d’une variante de la méthode select()
qui accepte les expressions SQL et retourne un DataFrame mis à jour. Cette méthode vous permet d’utiliser une expression SQL, telle que upper
.
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la méthode Apache Spark
selectExpr()
et l’expression SQLupper
pour convertir une colonne de chaîne en majuscules (et renommer la colonne).Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
Appuyez sur
Shift+Enter
pour exécuter la cellule et passer à la cellule suivante.
Utiliser expr()
pour utiliser la syntaxe SQL d’une colonne
Découvrez comment importer et utiliser la fonction Apache Spark expr()
pour utiliser la syntaxe SQL partout où une colonne peut être spécifiée.
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code importe la fonction
expr()
, puis utilise la fonction Apache Sparkexpr()
et l’expression SQLlower
pour convertir une colonne de chaîne en minuscules (et renommer la colonne).Python
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
Appuyez sur
Shift+Enter
pour exécuter la cellule et passer à la cellule suivante.
Exécuter une requête SQL arbitraire à l’aide de la fonction spark.sql()
Découvrez comment utiliser la fonction Apache Spark spark.sql()
pour exécuter des requêtes SQL arbitraires.
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la fonction Apache Spark
spark.sql()
pour interroger une table SQL en utilisant la syntaxe SQL.Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
Appuyez sur
Shift+Enter
pour exécuter la cellule et passer à la cellule suivante.
Notebooks des tutoriels sur les DataFrames
Les notebooks suivants incluent les exemples de requêtes de ce tutoriel.