Esercitazione: Caricare e trasformare i dati usando i dataframe di Apache Spark

Questa esercitazione illustra come caricare e trasformare i dati usando l'API dataframe Apache Spark Python (PySpark), l'API DataFrame di Apache Spark Scala e l'API SparkR SparkDataFrame in Azure Databricks.

Al termine di questa esercitazione si comprenderà che cos'è un dataframe e si avrà familiarità con le attività seguenti:

Python

Vedere anche Informazioni di riferimento sulle API PySpark per Apache Spark.

Scala

Vedere anche Informazioni di riferimento sulle API Scala di Apache Spark.

R

Vedere anche Informazioni di riferimento sulle API Apache SparkR.

Che cos'è un dataframe?

Un dataframe è una struttura di dati con etichetta bidimensionale con colonne di tipi potenzialmente diversi. È possibile pensare a un dataframe come un foglio di calcolo, una tabella SQL o un dizionario di oggetti serie. I dataframe Apache Spark offrono un set completo di funzioni (selezionare colonne, filtri, join, aggregazioni) che consentono di risolvere in modo efficiente i problemi comuni di analisi dei dati.

I dataframe Apache Spark sono un'astrazione basata su set di dati distribuiti resilienti (RDD). I dataframe Spark e Spark SQL usano un motore di pianificazione e ottimizzazione unificato, consentendo di ottenere prestazioni quasi identiche in tutti i linguaggi supportati in Azure Databricks (Python, SQL, Scala e R).

Requisiti

Per completare l'esercitazione seguente, è necessario soddisfare i requisiti seguenti:

  • Per usare gli esempi in questa esercitazione, l'area di lavoro deve avere Il catalogo Unity abilitato.

  • Gli esempi in questa esercitazione usano un volume di Unity Catalog per archiviare i dati di esempio. Per usare questi esempi, creare un volume e usare i nomi di catalogo, schema e volume del volume per impostare il percorso del volume usato dagli esempi.

  • È necessario disporre delle autorizzazioni seguenti in Unity Catalog:

    • READ VOLUME e WRITE VOLUMEo ALL PRIVILEGES per il volume usato per questa esercitazione.
    • USE SCHEMA oppure ALL PRIVILEGES per lo schema usato per questa esercitazione.
    • USE CATALOG oppure ALL PRIVILEGES per il catalogo usato per questa esercitazione.

    Per impostare queste autorizzazioni, vedere l'amministratore di Databricks o i privilegi di Unity Catalog e gli oggetti a protezione diretta.

Suggerimento

Per un notebook completo per questo articolo, vedere Notebook dell'esercitazione sul dataframe.

Passaggio 1: Definire le variabili e caricare il file CSV

Questo passaggio definisce le variabili da usare in questa esercitazione e quindi carica un file CSV contenente i dati dei nomi dei bambini da health.data.ny.gov nel volume di Unity Catalog.

  1. Aprire un nuovo notebook facendo clic sull'icona Nuova icona . Per informazioni su come esplorare i notebook di Azure Databricks, vedere Interfaccia e controlli dei notebook di Databricks.

  2. Copiare e incollare il codice seguente nella nuova cella vuota del notebook. Sostituire <catalog-name>, <schema-name>e <volume-name> con i nomi di catalogo, schema e volume per un volume di Unity Catalog. Sostituire <table_name> con un nome di tabella di propria scelta. I dati relativi al nome del bambino verranno caricati in questa tabella più avanti in questa esercitazione.

  3. Premere Shift+Enter per eseguire la cella e creare una nuova cella vuota.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_tables = catalog + "." + schema
    print(path_tables) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val file_name = "rows.csv"
    val table_name = "<table_name>"
    val path_volume = s"/Volumes/$catalog/$schema/$volume"
    val path_tables = s"$catalog.$schema.$table_name"
    print(path_volume) // Show the complete path
    print(path_tables) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_tables <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_tables) # Show the complete path
    
  4. Copiare e incollare il codice seguente nella nuova cella vuota del notebook. Questo codice copia il rows.csv file da health.data.ny.gov nel volume di Unity Catalog usando il comando databricks dbut shadows .

  5. Premere Shift+Enter per eseguire la cella e quindi passare alla cella successiva.

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
    

    Scala

    dbutils.fs.cp(download_url, s"$path_volume/$file_name")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    

Passaggio 2: Creare un dataframe

Questo passaggio crea un dataframe denominato df1 con dati di test e quindi ne visualizza il contenuto.

  1. Copiare e incollare il codice seguente nella nuova cella vuota del notebook. Questo codice crea il dataframe con i dati di test e quindi visualizza il contenuto e lo schema del dataframe.

  2. Premere Shift+Enter per eseguire la cella e quindi passare alla cella successiva.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = c(2021),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = c(42)
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    

Passaggio 3: Caricare i dati in un dataframe da un file CSV

Questo passaggio crea un dataframe denominato df_csv dal file CSV caricato in precedenza nel volume di Unity Catalog. Vedere spark.read.csv.

  1. Copiare e incollare il codice seguente nella nuova cella vuota del notebook. Questo codice carica i dati del nome figlio nel dataframe df_csv dal file CSV e quindi visualizza il contenuto del dataframe.

  2. Premere Shift+Enter per eseguire la cella e quindi passare alla cella successiva.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
      header=True,
      inferSchema=True,
      sep=",")
    display(df_csv)
    

    Scala

    val df_csv = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("delimiter", ",")
      .csv(s"$path_volume/$file_name")
    
    display(df_csv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
      source="csv",
      header = TRUE,
      inferSchema = TRUE,
      delimiter = ",")
    
    display(df_csv)
    

È possibile caricare i dati da molti formati di file supportati.

Passaggio 4: Visualizzare e interagire con il dataframe

Visualizzare e interagire con i dataframe dei bambini usando i metodi seguenti.

Informazioni su come visualizzare lo schema di un dataframe Apache Spark. Apache Spark usa il termine schema per fare riferimento ai nomi e ai tipi di dati delle colonne nel dataframe.

Copiare e incollare il codice seguente in una cella vuota del notebook. Questo codice mostra lo schema dei dataframe con il .printSchema() metodo per visualizzare gli schemi dei due dataframe, per prepararsi all'unione dei due dataframe.

Python

df_csv.printSchema()
df1.printSchema()

Scala

df_csv.printSchema()
df1.printSchema()

R

printSchema(df_csv)
printSchema(df1)

Nota

Azure Databricks usa anche il termine schema per descrivere una raccolta di tabelle registrate in un catalogo.

Rinominare la colonna nel dataframe

Informazioni su come rinominare una colonna in un dataframe.

Copiare e incollare il codice seguente in una cella vuota del notebook. Questo codice rinomina una colonna nel df1_csv dataframe in modo che corrisponda alla rispettiva colonna nel df1 dataframe. Questo codice usa il metodo Apache Spark withColumnRenamed() .

Python

df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema

Scala

val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()

R

df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)

Combinare dataframe

Informazioni su come creare un nuovo dataframe che aggiunge le righe di un dataframe a un altro.

Copiare e incollare il codice seguente in una cella vuota del notebook. Questo codice usa il metodo Apache Spark union() per combinare il contenuto del primo dataframe con il dataframe df_csvdf contenente i dati dei nomi figlio caricati dal file CSV.

Python

df = df1.union(df_csv)
display(df)

Scala

val df = df1.union(df_csv_renamed)
display(df)

R

display(df <- union(df1, df_csv))

Filtrare le righe in un dataframe

Individuare i nomi dei bambini più diffusi nel set di dati filtrando le righe usando Apache Spark .filter() o .where() i metodi. Usare il filtro per selezionare un subset di righe da restituire o modificare in un dataframe. Non esiste alcuna differenza nelle prestazioni o nella sintassi, come illustrato negli esempi seguenti.

Uso del metodo .filter()

Copiare e incollare il codice seguente in una cella vuota del notebook. Questo codice usa il metodo Apache Spark .filter() per visualizzare tali righe nel dataframe con un conteggio di più di 50.

Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))

Uso del metodo .where()

Copiare e incollare il codice seguente in una cella vuota del notebook. Questo codice usa il metodo Apache Spark .where() per visualizzare tali righe nel dataframe con un conteggio di più di 50.

Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))

Selezionare le colonne da un dataframe e ordinare in base alla frequenza

Informazioni sulla frequenza dei nomi figlio con il select() metodo per specificare le colonne del DataFrame da restituire. Usare Apache Spark orderby e desc le funzioni per ordinare i risultati.

Il modulo pyspark.sql per Apache Spark offre il supporto per le funzioni SQL. Tra queste funzioni usate in questa esercitazione sono le funzioni Apache Spark orderBy(), desc()e expr() . Per abilitare l'uso di queste funzioni, importarle nella sessione in base alle esigenze.

Copiare e incollare il codice seguente in una cella vuota del notebook. Questo codice importa la desc() funzione e quindi usa il metodo Apache Spark select() e Apache Spark orderBy() e desc() le funzioni per visualizzare i nomi più comuni e i relativi conteggi in ordine decrescente.

Python

from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

Scala

import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

R

display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))

Creare un sottoinsieme di dati

Informazioni su come creare un subset di dataframe da un dataframe esistente.

Copiare e incollare il codice seguente in una cella vuota del notebook. Questo codice usa il metodo Apache Spark filter per creare un nuovo dataframe limitando i dati per anno, conteggio e sesso. Usa il metodo Apache Spark select() per limitare le colonne. Usa anche Apache Spark orderBy() e desc() le funzioni per ordinare il nuovo dataframe in base al conteggio.

Python

subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)

Scala

val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))

display(subsetDF)

R

subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)

Passaggio 5: Salvare il dataframe

Informazioni su come salvare un dataframe. È possibile salvare il dataframe in una tabella o scrivere il dataframe in un file o in più file.

Salvare il dataframe in una tabella

Azure Databricks usa il formato Delta Lake per tutte le tabelle per impostazione predefinita. Per salvare il dataframe, è necessario disporre CREATE dei privilegi di tabella per il catalogo e lo schema.

Copiare e incollare il codice seguente in una cella vuota del notebook. Questo codice salva il contenuto del dataframe in una tabella usando la variabile definita all'inizio di questa esercitazione.

Python

df.write.saveAsTable(f"{path_tables}" + "." + f"{table_name}")

# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(f"{path_tables}" + "." + f"{table_name}")

Scala

df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")

// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$tables" + "." + s"$table_name")

R

saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")

La maggior parte delle applicazioni Apache Spark funziona su set di dati di grandi dimensioni e in modo distribuito. Apache Spark scrive una directory di file anziché un singolo file. Delta Lake suddivide le cartelle e i file Parquet. Molti sistemi dati possono leggere queste directory di file. Azure Databricks consiglia di usare tabelle su percorsi di file per la maggior parte delle applicazioni.

Salvare il dataframe in file JSON

Copiare e incollare il codice seguente in una cella vuota del notebook. Questo codice salva il dataframe in una directory di file JSON.

Python

df.write.format("json").save("/tmp/json_data")

# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")

Scala

df.write.format("json").save("/tmp/json_data")

// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")

R

write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")

Leggere il dataframe da un file JSON

Informazioni su come usare il metodo Apache Spark spark.read.format() per leggere i dati JSON da una directory in un dataframe.

Copiare e incollare il codice seguente in una cella vuota del notebook. Questo codice visualizza i file JSON salvati nell'esempio precedente.

Python

display(spark.read.format("json").json("/tmp/json_data"))

Scala

display(spark.read.format("json").json("/tmp/json_data"))

R

display(read.json("/tmp/json_data"))

Attività aggiuntive: eseguire query SQL in PySpark, Scala e R

I dataframe Apache Spark offrono le opzioni seguenti per combinare SQL con PySpark, Scala e R. È possibile eseguire il codice seguente nello stesso notebook creato per questa esercitazione.

Specificare una colonna come query SQL

Informazioni su come usare il metodo Apache Spark selectExpr() . Si tratta di una variante del select() metodo che accetta espressioni SQL e restituisce un dataframe aggiornato. Questo metodo consente di usare un'espressione SQL, ad esempio upper.

Copiare e incollare il codice seguente in una cella vuota del notebook. Questo codice usa il metodo Apache Spark selectExpr() e l'espressione SQL upper per convertire una colonna stringa in lettere maiuscole (e rinominare la colonna).

Python

display(df.selectExpr("Count", "upper(County) as big_name"))

Scala

display(df.selectExpr("Count", "upper(County) as big_name"))

R

display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))

Usare expr() per usare la sintassi SQL per una colonna

Informazioni su come importare e usare la funzione Apache Spark expr() per usare la sintassi SQL in qualsiasi punto in cui viene specificata una colonna.

Copiare e incollare il codice seguente in una cella vuota del notebook. Questo codice importa la expr() funzione e quindi usa la funzione Apache Spark expr() e l'espressione SQL lower per convertire una colonna stringa in lettere minuscole (e rinominare la colonna).

Python

from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))

Scala

import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function

display(df.select(col("Count"), expr("lower(County) as little_name")))

R

display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality

Eseguire una query SQL arbitraria usando spark.sql() funzione

Informazioni su come usare la funzione Apache Spark spark.sql() per eseguire query SQL arbitrarie.

Copiare e incollare il codice seguente in una cella vuota del notebook. Questo codice usa la funzione Apache Spark spark.sql() per eseguire query su una tabella SQL usando la sintassi SQL.

Python

display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))

Scala

display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))

R

display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))

Notebook dell'esercitazione sul dataframe

Il notebook seguente include le query di esempio di questa esercitazione.

Python

Esercitazione su dataframe con notebook Python

Ottenere il notebook

Scala

Esercitazione su dataframe con notebook Scala

Ottenere il notebook

R

Esercitazione su dataframe con R Notebook

Ottenere il notebook

Risorse aggiuntive