Partage via


Tutoriel : charger et transformer des données en utilisant des DataFrames Apache Spark

Ce tutoriel vous montre comment charger et transformer des données en utilisant l’API Apache Spark Python (PySpark) DataFrame, l’API Apache Spark Scala DataFrame et l’API SparkR SparkDataFrame dans Azure Databricks.

Remarque

Si vous utilisez Databricks Free Edition, sélectionnez l’onglet Python pour tous les exemples de code de ce tutoriel. Free Edition ne prend pas en charge R ou Scala. En outre, Free Edition restreint l’accès Internet sortant. Vous devez donc charger le fichier CSV à l’aide de l’interface utilisateur de l’espace de travail au lieu de le télécharger avec du code. Consultez l’étape 1 pour obtenir des instructions détaillées.

À la fin de ce tutoriel, vous saurez ce qu’est un DataFrame et serez en mesure d’effectuer les tâches suivantes :

Python

Consultez également la référence de l’API PySpark Apache Spark.

Langage de programmation Scala

Consultez également Référence de l’API Scala Apache Spark.

R

Consultez également les informations de référence sur l’API Apache SparkR.

Qu’est-ce qu’un DataFrame ?

Un DataFrame est une structure de données étiquetée à deux dimensions avec des colonnes de types potentiellement différents. Vous pouvez considérer un DataFrame comme une feuille de calcul, une table SQL ou un dictionnaire d’objets de série. Les DataFrames Apache Spark offre un ensemble complet de fonctions (select columns, filter, join, aggregate, etc.) qui vous permettent de résoudre efficacement les problèmes courants d’analyse des données.

Les DataFrames Apache Spark sont une abstraction basée sur des jeux de données distribués résilients (RDD). Les DataFrames Spark et Spark SQL utilisent un moteur de planification et d’optimisation unifié, ce qui vous permet d’obtenir des performances presque identiques dans tous les langages pris en charge sur Azure Databricks (Python, SQL, Scala et R).

Spécifications

Pour suivre le tutoriel ci-après, les conditions suivantes doivent être remplies :

  • Pour utiliser les exemples de ce tutoriel, Unity Catalog doit être activé dans votre espace de travail. Azure Databricks Free Edition et les espaces de travail d’essai gratuit ont le catalogue Unity activé par défaut.

  • Les exemples de ce tutoriel utilisent un volume Unity Catalog pour stocker des exemples de données. Pour utiliser ces exemples, créez un volume et utilisez le catalogue, le schéma et les noms de volumes de ce volume pour définir le chemin d’accès au volume utilisé par les exemples. Les utilisateurs de Free Edition ont accès au catalogue d’espaces de travail et au default schéma par défaut.

  • Vous devez disposer des autorisations suivantes dans Unity Catalog :

    • READ VOLUME et WRITE VOLUME pour le volume utilisé pour ce didacticiel
    • USE SCHEMA pour le schéma utilisé pour ce didacticiel
    • USE CATALOG pour le catalogue utilisé pour ce didacticiel

    Pour définir ces autorisations, consultez vos privilèges d’administrateur Azure Databricks ou de catalogue Unity et objets sécurisables. Les utilisateurs free Edition disposent de ces privilèges sur le catalogue et default le schéma de l’espace de travail par défaut.

Conseil

Pour un notebook complet de cet article, consultez les notebooks de tutoriels sur les DataFrames.

Étape 1 : définir des variables et charger un fichier CSV

Cette étape définit les variables à utiliser dans ce tutoriel, puis charge un fichier CSV de noms de bébé provenant de health.data.ny.gov dans votre volume Unity Catalog. Vous avez besoin des noms d’un catalogue Unity Catalog, d’un schéma et d’un volume.

Conseil

Si vous ne connaissez pas vos noms de catalogue et de schéma, cliquez sur l’icône Données.Catalogue dans la barre latérale. Le catalogue d’espaces de travail partage un nom avec votre espace de travail et est répertorié dans le panneau catalogue. Développez-le pour afficher les schémas disponibles. Les utilisateurs de l'Édition Gratuite et de l'essai gratuit peuvent utiliser le catalogue de l'espace de travail et le schéma default.

Si vous n’avez pas de volume, créez-en un en exécutant la commande suivante dans une cellule de notebook (remplacez <catalog_name> et <schema_name> par vos valeurs) :

CREATE VOLUME IF NOT EXISTS <catalog_name>.<schema_name>.my_volume
  1. Ouvrez un nouveau notebook en cliquant sur l’icône Nouvelle icône. Pour savoir comment naviguer dans les notebooks Azure Databricks, consultez Personnaliser l’apparence des blocs-notes.

  2. Copiez et collez le code suivant dans la nouvelle cellule de notebook vide. Remplacez <catalog-name>, <schema-name> et <volume-name> par les noms de catalogue, de schéma et de volume d’un volume Unity Catalog. Remplacez <table_name> par le nom de table de votre choix. Vous chargez les données de nom de bébé dans cette table plus loin dans ce tutoriel.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    Langage de programmation Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val fileName = "rows.csv"
    val tableName = "<table_name>"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    
  3. Appuyez sur Shift+Enter pour exécuter la cellule et créer une nouvelle cellule vide.

  4. Chargez le fichier CSV dans votre volume. Choisissez une des méthodes suivantes :

    • Charger à l’aide de l’interface utilisateur de l’espace de travail : utilisez cette méthode si vous utilisez Databricks Free Edition ou si le téléchargement du code dans l’option B échoue avec une erreur réseau. Free Edition et d’autres environnements de calcul serverless limitent l’accès Internet sortant. Vous devez donc charger le fichier à partir de votre ordinateur local.
    • Télécharger à l’aide du code : utilisez cette méthode si votre environnement de calcul dispose d’un accès Internet sortant.

    Option A : Charger à l’aide de l’interface utilisateur de l’espace de travail

    1. Sur votre ordinateur local, ouvrez health.data.ny.gov/api/views/jxy9-yhdk/rows.csv dans votre navigateur. Le fichier est téléchargé sur votre ordinateur en tant que rows.csv, qui correspond à la file_name variable définie précédemment.
    2. Revenez à votre espace de travail Azure Databricks. Dans la barre latérale, cliquez sur Nouvelle icôneAjouter > ou charger des données.
    3. Cliquez sur Téléverser des fichiers dans un volume.
    4. Cliquez sur Parcourir et sélectionnez le rows.csv fichier, ou faites-le glisser dans la zone de chargement.
    5. Sous Volume de destination, sélectionnez le volume que vous avez spécifié ci-dessus.
    6. Une fois le chargement terminé, revenez à votre bloc-notes et passez à l’étape 2.

    Pour plus d’informations sur le chargement de fichiers, consultez Charger des fichiers dans un volume de catalogue Unity.

    Option B : Télécharger à l’aide du code

    Copiez et collez le code suivant dans la nouvelle cellule de notebook vide. Ce code copie le rows.csv fichier de health.data.ny.gov dans votre volume de catalogue Unity à l’aide de la commande Databricks dbutils . Appuyez sur Shift+Enter pour exécuter la cellule et passer à la cellule suivante.

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
    

    Langage de programmation Scala

    dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    

Étape 2 : créer un DataFrame

Cette étape crée un DataFrame nommé df1 avec des données de test, puis affiche son contenu.

  1. Copiez et collez le code suivant dans la nouvelle cellule de notebook vide. Ce code crée le DataFrame avec des données de test, puis affiche le contenu et le schéma du DataFrame.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    # highlight-next-line
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Langage de programmation Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    // highlight-next-line
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = as.integer(c(2021)),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = as.integer(c(42))
    )
    
    # highlight-next-line
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    
  2. Appuyez sur Shift+Enter pour exécuter la cellule et passer à la cellule suivante.

Étape 3 : charger des données dans un DataFrame à partir d’un fichier CSV

Cette étape crée un DataFrame nommé df_csv à partir du fichier CSV précédemment chargé dans votre volume Unity Catalog. Consultez spark.read.csv.

  1. Copiez et collez le code suivant dans la nouvelle cellule de notebook vide. Ce code charge les noms de bébé dans le DataFrame df_csv à partir du fichier CSV, puis affiche le contenu du DataFrame.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    

    Langage de programmation Scala

    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    
    display(dfCsv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
        source="csv",
        header = TRUE,
        inferSchema = TRUE,
        delimiter = ",")
    
    display(df_csv)
    
  2. Appuyez sur Shift+Enter pour exécuter la cellule et passer à la cellule suivante.

Vous pouvez charger des données à partir de nombreux formats de fichiers pris en charge.

Étape 4 : afficher votre DataFrame et interagir avec celui-ci

Utilisez les méthodes suivantes pour afficher vos DataFrames de noms de bébé et interagir avec eux.

Découvrez comment afficher le schéma d’un DataFrame Apache Spark. Apache Spark utilise le terme schéma pour désigner les noms et les types de données des colonnes du DataFrame.

Remarque

Azure Databricks utilise également le terme schéma pour décrire une collection de tables inscrites dans un catalogue.

  1. Copiez et collez le code suivant dans une cellule de notebook vide. Ce code montre le schéma de vos DataFrames avec la méthode .printSchema() pour afficher les schémas des deux DataFrames et préparer l’union des deux DataFrames.

    Python

    df_csv.printSchema()
    df1.printSchema()
    

    Langage de programmation Scala

    dfCsv.printSchema()
    df1.printSchema()
    

    R

    printSchema(df_csv)
    printSchema(df1)
    
  2. Appuyez sur Shift+Enter pour exécuter la cellule et passer à la cellule suivante.

Renommer une colonne dans le DataFrame

Découvrez comment renommer une colonne dans un DataFrame.

  1. Copiez et collez le code suivant dans une cellule de notebook vide. Ce code renomme une colonne dans le DataFrame df1_csv pour qu’elle reflète la colonne correspondante du DataFrame df1. Ce code utilise la méthode Apache Spark withColumnRenamed().

    Python

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema()
    

    Langage de programmation Scala

    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    

    R

    df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
    printSchema(df_csv)
    
  2. Appuyez sur Shift+Enter pour exécuter la cellule et passer à la cellule suivante.

Combiner des DataFrames

Découvrez comment créer un DataFrame qui ajoute les lignes d’un DataFrame à un autre.

  1. Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la méthode Apache Spark union() pour combiner le contenu de votre premier DataFrame df avec le DataFrame df_csv contenant les noms de bébé chargés à partir du fichier CSV.

    Python

    df = df1.union(df_csv)
    display(df)
    

    Langage de programmation Scala

    val df = df1.union(dfCsvRenamed)
    display(df)
    

    R

    display(df <- union(df1, df_csv))
    
  2. Appuyez sur Shift+Enter pour exécuter la cellule et passer à la cellule suivante.

Filtrer des lignes dans un DataFrame

Découvrez les noms de bébé les plus populaires dans votre jeu de données en filtrant les lignes à l’aide des méthodes Apache Spark .filter() ou .where(). Utilisez le filtrage pour sélectionner un sous-ensemble de lignes à retourner ou modifier dans un DataFrame. Il n’existe aucune différence de performances ou de syntaxe, comme indiqué dans les exemples suivants.

Utilisation de la méthode .filter()

  1. Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la méthode Apache Spark .filter() pour afficher ces lignes dans le DataFrame avec un nombre de plus de 50.

    Python
    display(df.filter(df["Count"] > 50))
    
    Langage de programmation Scala
    display(df.filter(df("Count") > 50))
    
    R
    display(filteredDF <- filter(df, df$Count > 50))
    
  2. Appuyez sur Shift+Enter pour exécuter la cellule et passer à la cellule suivante.

Utilisation de la méthode .where()

  1. Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la méthode Apache Spark .where() pour afficher ces lignes dans le DataFrame avec un nombre de plus de 50.

    Python
    display(df.where(df["Count"] > 50))
    
    Langage de programmation Scala
    display(df.where(df("Count") > 50))
    
    R
    display(filtered_df <- where(df, df$Count > 50))
    
  2. Appuyez sur Shift+Enter pour exécuter la cellule et passer à la cellule suivante.

Sélectionner les colonnes d’un DataFrame et les classer par fréquence

Découvrez comment connaître la fréquence des noms de bébé en utilisant la méthode select(), afin de spécifier les colonnes du DataFrame à retourner. Utilisez les fonctions Apache Spark orderby et desc pour trier les résultats.

Le module pyspark.sql pour Apache Spark prend en charge les fonctions SQL. Parmi les fonctions que nous utilisons dans ce tutoriel, citons les fonctions Apache Spark orderBy(), desc(), et expr(). Vous activez l’utilisation de ces fonctions en les important dans votre session si nécessaire.

  1. Copiez et collez le code suivant dans une cellule de notebook vide. Ce code importe la fonction desc(), puis utilise la méthode Apache Spark select() et les fonctions Apache Spark orderBy() et desc() pour afficher les noms les plus courants et leur nombre par ordre décroissant.

    Python

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    Langage de programmation Scala

    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    R

    display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
    
  2. Appuyez sur Shift+Enter pour exécuter la cellule et passer à la cellule suivante.

Créer un sous-ensemble DataFrame

Découvrez comment créer un DataFrame de sous-ensemble à partir d’un DataFrame existant.

  1. Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la méthode Apache Spark filter pour créer un DataFrame limitant les données par année, nombre et sexe. Il utilise la méthode Apache Spark select() pour limiter les colonnes. Il utilise également les fonctions Apache Spark orderBy() et desc() pour trier le nouveau DataFrame par nombre.

    Python

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    

    Langage de programmation Scala

    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    
    display(subsetDF)
    

    R

    subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
    display(subsetDF)
    
  2. Appuyez sur Shift+Enter pour exécuter la cellule et passer à la cellule suivante.

Étape 5 : enregistrer le DataFrame

Découvrez comment enregistrer un DataFrame. Vous pouvez enregistrer votre DataFrame dans une table ou écrire le DataFrame dans un ou plusieurs fichiers.

Enregistrer le DataFrame dans une table

Azure Databricks utilise par défaut le format Delta Lake pour toutes les tables. Pour enregistrer votre DataFrame, vous devez avoir des privilèges de table CREATE sur le catalogue et le schéma.

  1. Copiez et collez le code suivant dans une cellule de notebook vide. Ce code enregistre le contenu du DataFrame dans une table à l’aide de la variable définie au début de ce tutoriel.

    Python

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    

    Langage de programmation Scala

    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    

    R

    saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
    
  2. Appuyez sur Shift+Enter pour exécuter la cellule et passer à la cellule suivante.

La plupart des applications Apache Spark fonctionnent sur des jeux de données volumineux de manière distribuée. Apache Spark écrit un répertoire de fichiers plutôt qu’un fichier unique. Delta Lake divise les dossiers et fichiers Parquet. De nombreux systèmes de données peuvent lire ces répertoires de fichiers. Azure Databricks recommande l’utilisation de tables plutôt que des chemins d’accès aux fichiers pour la plupart des applications.

Enregistrer le DataFrame dans des fichiers JSON

  1. Copiez et collez le code suivant dans une cellule de notebook vide. Ce code enregistre le DataFrame dans un répertoire de fichiers JSON.

    Python

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    Langage de programmation Scala

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    R

    write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
    
  2. Appuyez sur Shift+Enter pour exécuter la cellule et passer à la cellule suivante.

Lire le DataFrame à partir d’un fichier JSON

Découvrez comment utiliser la méthode Apache Spark spark.read.format() pour lire les données JSON à partir d’un répertoire dans un DataFrame.

  1. Copiez et collez le code suivant dans une cellule de notebook vide. Ce code affiche les fichiers JSON enregistrés dans l’exemple précédent.

    Python

    display(spark.read.format("json").json("/tmp/json_data"))
    

    Langage de programmation Scala

    display(spark.read.format("json").json("/tmp/json_data"))
    

    R

    display(read.json("/tmp/json_data"))
    
  2. Appuyez sur Shift+Enter pour exécuter la cellule et passer à la cellule suivante.

Tâches supplémentaires : exécuter des requêtes SQL dans PySpark, Scala et R

Les DataFrames Apache Spark fournissent les options suivantes pour combiner SQL avec PySpark, Scala et R. Vous pouvez exécuter le code suivant dans le même notebook que celui que vous avez créé pour ce tutoriel.

Spécifier une colonne comme requête SQL

Découvrez comment utiliser la méthode Apache Spark selectExpr(). Il s’agit d’une variante de la méthode select() qui accepte les expressions SQL et retourne un DataFrame mis à jour. Cette méthode vous permet d’utiliser une expression SQL, telle que upper.

  1. Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la méthode Apache Spark selectExpr() et l’expression SQL upper pour convertir une colonne de chaîne en majuscules (et renommer la colonne).

    Python

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    Langage de programmation Scala

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    R

    display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
    
  2. Appuyez sur Shift+Enter pour exécuter la cellule et passer à la cellule suivante.

Utiliser expr() pour utiliser la syntaxe SQL d’une colonne

Découvrez comment importer et utiliser la fonction Apache Spark expr() pour utiliser la syntaxe SQL partout où une colonne peut être spécifiée.

  1. Copiez et collez le code suivant dans une cellule de notebook vide. Ce code importe la fonction expr(), puis utilise la fonction Apache Spark expr() et l’expression SQL lower pour convertir une colonne de chaîne en minuscules (et renommer la colonne).

    Python

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    

    Langage de programmation Scala

    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    

    R

    display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
    # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
    
  2. Appuyez sur Shift+Enter pour exécuter la cellule et passer à la cellule suivante.

Exécuter une requête SQL arbitraire à l’aide de la fonction spark.sql()

Découvrez comment utiliser la fonction Apache Spark spark.sql() pour exécuter des requêtes SQL arbitraires.

  1. Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la fonction Apache Spark spark.sql() pour interroger une table SQL en utilisant la syntaxe SQL.

    Python

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    

    Langage de programmation Scala

    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    

    R

    display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
    
  2. Appuyez sur Shift+Enter pour exécuter la cellule et passer à la cellule suivante.

Notebooks de tutoriels sur les DataFrames

Les notebooks suivants incluent les exemples de requêtes de ce tutoriel.

Python

Tutoriel DataFrames avec Python

Obtenir un ordinateur portable

Langage de programmation Scala

Tutoriel DataFrames avec Scala

Obtenir un ordinateur portable

R

Tutoriel DataFrames avec R

Obtenir un ordinateur portable

Ressources supplémentaires