Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questa esercitazione mostra come caricare e trasformare i dati utilizzando l'API DataFrame di Apache Spark Python (PySpark), l'API DataFrame di Apache Spark Scala e l'API SparkDataFrame di SparkR in Azure Databricks.
Nota
Se si usa Databricks Free Edition, selezionare la scheda Python per tutti gli esempi di codice in questa esercitazione. Free Edition non supporta R o Scala. L'edizione gratuita limita inoltre l'accesso a Internet in uscita, quindi è necessario caricare il file CSV usando l'interfaccia utente dell'area di lavoro invece di scaricarlo con il codice. Per istruzioni dettagliate, vedere Passaggio 1 .
Al termine di questa esercitazione si comprenderà che cos'è un DataFrame e si avrà familiarità con i task seguenti:
Pitone
- Definire le variabili e copiare i dati pubblici in un volume di Unity Catalog
- Creare un DataFrame con Python
- Caricare dati da un file CSV in un DataFrame
- Visualizzare e interagire con i DataFrame
- Salvare il DataFrame
- Eseguire query SQL in PySpark
Vedere anche le Informazioni di riferimento sulle API PySpark per Apache Spark.
Linguaggio di programmazione Scala
- Definire le variabili e copiare i dati pubblici in un volume di Unity Catalog
- Creare un DataFrame con Scala
- Caricare dati da un file CSV in un DataFrame
- Visualizzare e interagire con i DataFrame
- Salvare il DataFrame
- Eseguire query SQL in Apache Spark
Vedere anche le Informazioni di riferimento sulle API Scala per Apache Spark.
R
- Definire le variabili e copiare i dati pubblici in un volume di Unity Catalog
- Creare degli SparkDataFrames SparkR
- Caricare dati da un file CSV in un DataFrame
- Visualizzare e interagire con i DataFrame
- Salvare il DataFrame
- Eseguire query SQL in SparkR
Vedere anche le Informazioni di riferimento sulle API SparkR per Apache Spark.
Che cos'è un DataFrame?
Un dataframe è una struttura di dati con etichetta bidimensionale con colonne di tipi potenzialmente diversi. È possibile pensare a un dataframe come un foglio di calcolo, una tabella SQL o un dizionario di oggetti serie. I dataframe Apache Spark offrono un set completo di funzioni (selezionare colonne, filtri, join, aggregazioni) che consentono di risolvere in modo efficiente i problemi comuni di analisi dei dati.
I DataFrames di Apache Spark sono un'astrazione basata su Set di dati distribuiti resilienti (RDD). I dataframe Spark e Spark SQL usano un motore di pianificazione e ottimizzazione unificato, consentendo di ottenere prestazioni quasi identiche in tutti i linguaggi supportati in Azure Databricks (Python, SQL, Scala e R).
Requisiti
Per completare l’esercitazione seguente, è necessario soddisfare questi requisiti:
Per usare gli esempi di questa esercitazione, l'area di lavoro deve avere Unity Catalog abilitato. Per impostazione predefinita, Azure Databricks Free Edition e le aree di lavoro di prova gratuita hanno Unity Catalog abilitato.
Gli esempi in questa esercitazione usano un volume Unity per archiviare i dati di esempio. Per utilizzare questi esempi, crea un volume e utilizza il catalogo, lo schema e il nome del volume per impostare il percorso del volume utilizzato dagli esempi. Gli utenti di Free Edition hanno accesso al catalogo delle aree di lavoro e allo
defaultschema per impostazione predefinita.È necessario disporre delle autorizzazioni seguenti in Unity Catalog:
-
READ VOLUMEeWRITE VOLUMEper il volume usato per questa esercitazione -
USE SCHEMAper lo schema usato per questa esercitazione -
USE CATALOGper il catalogo usato per questa esercitazione
Per impostare queste autorizzazioni, consultare l'amministratore di Azure Databricks o i privilegi di Unity Catalog e gli oggetti proteggibili. Gli utenti di Free Edition dispongono di questi privilegi per il catalogo e
defaultlo schema dell'area di lavoro per impostazione predefinita.-
Suggerimento
Per un notebook completo di questo articolo, vedere il notebook dell'esercitazione sui DataFrame.
Passaggio 1: Definire le variabili e caricare il file CSV
Questo passaggio definisce le variabili da usare in questo tutorial e quindi carica un file CSV contenente i dati sui nomi dei neonati da health.data.ny.gov nel volume di Unity Catalog. Sono necessari i nomi di un catalogo, uno schema e un volume di Unity Catalog.
Suggerimento
Se non si conoscono i nomi del catalogo e dello schema, fare clic Catalogo nella barra laterale. Il catalogo dell'area di lavoro condivide un nome con l'area di lavoro ed è elencato nel pannello del catalogo. Espanderlo per visualizzare gli schemi disponibili. Gli utenti della versione di valutazione gratuita e dell'edizione gratuita possono usare il catalogo dell'area di lavoro e lo
default schema.
Se non si dispone di un volume, crearne uno eseguendo il comando seguente in una cella del notebook (sostituire <catalog_name> e <schema_name> con i valori):
CREATE VOLUME IF NOT EXISTS <catalog_name>.<schema_name>.my_volume
Aprire un nuovo Notebook facendo clic sull'icona
. Per informazioni su come esplorare i notebook di Azure Databricks, vedere Personalizzare l'aspetto del notebook.Copiare e incollare il codice seguente nella nuova cella vuota del Notebook. Sostituire
<catalog-name>,<schema-name>e<volume-name>con i nomi di catalogo, schema e volume per un volume di Unity Catalog. Sostituire<table_name>con un nome di tabella a tua scelta. I dati dei nomi dei bambini vengono caricati in questa tabella più avanti in questa esercitazione.Pitone
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete pathLinguaggio di programmazione Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete pathR
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete pathPremere
Shift+Enterper eseguire la cella e creare una nuova cella vuota.Carica il file CSV nel volume. Scegliere una delle seguenti modalità:
- Caricare usando l'interfaccia utente dell'area di lavoro : usare questo metodo se si usa Databricks Free Edition o se il download del codice nell'opzione B ha esito negativo con un errore di rete. Free Edition e altri ambienti di calcolo serverless limitano l'accesso a Internet in uscita, quindi è necessario caricare il file dal computer locale.
- Scaricare usando un codice — Utilizzare questo metodo se l'ambiente di calcolo ha accesso a Internet in uscita.
Opzione A: Caricare usando l'interfaccia utente dell'area di lavoro
- Nel computer locale aprire health.data.ny.gov/api/views/jxy9-yhdk/rows.csv nel browser. Il file viene scaricato nel computer come
rows.csv, che corrisponde allafile_namevariabile definita in precedenza. - Tornare all'area di lavoro di Azure Databricks. Nella barra laterale fare clic su
Nuovo > Aggiungi o carica dati. - Clicca su Carica file in un volume.
- Fare clic su Sfoglia e selezionare il
rows.csvfile oppure trascinarlo nell'area di caricamento. - In Volume di destinazione selezionare il volume specificato in precedenza.
- Al termine del caricamento, tornare al notebook e continuare con il passaggio 2.
Per altre informazioni sul caricamento dei file, vedere Caricare file in un volume del catalogo Unity.
Opzione B: Scaricare usando il codice
Copiare e incollare il codice seguente nella nuova cella vuota del Notebook. Questo codice copia il
rows.csvfile da health.data.ny.gov nel volume di Unity Catalog usando il comando dbutils di Databricks . PremereShift+Enterper eseguire la cella e quindi passare alla cella successiva.Pitone
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")Linguaggio di programmazione Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Passaggio 2: creare un DataFrame
Questo passaggio crea un DataFrame denominato df1 con dati di test e quindi ne visualizza il contenuto.
Copiare e incollare il codice seguente nella nuova cella vuota del Notebook. Questo codice crea il dataframe con i dati di test e quindi visualizza il contenuto e lo schema del dataframe.
Pitone
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] # highlight-next-line df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.Linguaggio di programmazione Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") // highlight-next-line val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) # highlight-next-line df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.Premere
Shift+Enterper eseguire la cella e quindi passare alla cella successiva.
Passsaggio 3:caricare dati da un file CSV in un DataFrame
Questo passaggio crea un dataframe denominato df_csv dal file CSV caricato in precedenza nel volume di Unity Catalog. Vedere spark.read.csv.
Copiare e incollare il codice seguente nella nuova cella vuota del Notebook. Questo codice carica i dati del nome del bambino nel DataFrame
df_csvdal file CSV e quindi visualizza il contenuto del DataFrame.Pitone
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)Linguaggio di programmazione Scala
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)Premere
Shift+Enterper eseguire la cella e quindi passare alla cella successiva.
È possibile caricare i dati da molti formati di file supportati.
Passaggio 4: visualizzare e interagire con il DataFrame
Visualizza e interagisci con i DataFrame dei nomi dei bambini con i seguenti metodi.
Stampare lo schema del dataframe
Informazioni su come visualizzare lo schema di un dataframe Apache Spark. Apache Spark usa il termine schema per fare riferimento ai nomi e ai tipi di dati delle colonne nel dataframe.
Nota
Azure Databricks usa anche il termine schema per descrivere una raccolta di tabelle registrate in un catalogo.
Copiare e incollare il codice seguente in una cella vuota di codice del notebook. Questo codice mostra lo schema dei dataframe con il metodo
.printSchema()per visualizzare gli schemi dei due dataframe, per prepararsi all'unione dei due dataframe.Pitone
df_csv.printSchema() df1.printSchema()Linguaggio di programmazione Scala
dfCsv.printSchema() df1.printSchema()R
printSchema(df_csv) printSchema(df1)Premere
Shift+Enterper eseguire la cella e quindi passare alla cella successiva.
Rinominare la colonna nel dataframe
Informazioni su come rinominare una colonna in un dataframe.
Copiare e incollare il codice seguente in una nuova cella vuota del Notebook. Questo codice rinomina una colonna nel dataframe
df1_csvin modo che corrisponda alla rispettiva colonna nel dataframedf1. Questo codice usa il metodo Apache SparkwithColumnRenamed().Pitone
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema()Linguaggio di programmazione Scala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)Premere
Shift+Enterper eseguire la cella e quindi passare alla cella successiva.
Combinare i "DataFrame"
Informazioni su come creare un nuovo DataFrame che aggiunge le righe di un DataFrame a un altro.
Copiare e incollare il codice seguente in una cella vuota di codice nel notebook. Questo codice usa il metodo Apache Spark
union()per combinare il contenuto del primo DataFramedfcon il DataFramedf_csvcontenente i dati dei nomi dei bambini caricati dal file CSV.Pitone
df = df1.union(df_csv) display(df)Linguaggio di programmazione Scala
val df = df1.union(dfCsvRenamed) display(df)R
display(df <- union(df1, df_csv))Premere
Shift+Enterper eseguire la cella e quindi passare alla cella successiva.
Filtrare le righe in un DataFrame
Individuare i nomi dei bambini più diffusi nel set di dati filtrando le righe, usando i metodi di .filter() Apache Spark o .where(). Usare il filtro per selezionare un subset di righe da restituire o modificare in un dataframe. Non esiste alcuna differenza nelle prestazioni o nella sintassi, come illustrato negli esempi seguenti.
Uso del metodo .filter()
Copiare e incollare il codice seguente in una cella vuota in un notebook. Questo codice usa il metodo Apache Spark
.filter()per visualizzare tali righe nel dataframe con un conteggio di più di 50.Pitone
display(df.filter(df["Count"] > 50))Linguaggio di programmazione Scala
display(df.filter(df("Count") > 50))R
display(filteredDF <- filter(df, df$Count > 50))Premere
Shift+Enterper eseguire la cella e quindi passare alla cella successiva.
Uso del metodo .where()
Copia e incolla il codice seguente in una cella vuota del Notebook. Questo codice usa il metodo Apache Spark
.where()per visualizzare tali righe nel dataframe con un conteggio di più di 50.Pitone
display(df.where(df["Count"] > 50))Linguaggio di programmazione Scala
display(df.where(df("Count") > 50))R
display(filtered_df <- where(df, df$Count > 50))Premere
Shift+Enterper eseguire la cella e quindi passare alla cella successiva.
Selezionare le colonne da un dataframe e ordinare in base alla frequenza
Scopri la frequenza dei nomi dei bambini con il metodo select() per selezionare le colonne del DataFrame da restituire. Usare le funzioni Apache Spark orderby e desc per ordinare i risultati.
Il modulo pyspark.sql per Apache Spark fornisce il supporto per le funzioni SQL. Tra queste funzioni usate in questa esercitazione sono le funzioni Apache Spark orderBy(), desc()e expr() . Per abilitare l'uso di queste funzioni, importarle nella sessione in base alle esigenze.
Copia e incolla il codice seguente in una cella vuota del Notebook. Questo codice importa la funzione
desc()e quindi usa il metodo Apache Sparkselect()e Apache SparkorderBy()e le funzionidesc()per visualizzare i nomi più comuni e i relativi conteggi in ordine decrescente.Pitone
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))Linguaggio di programmazione Scala
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))Premere
Shift+Enterper eseguire la cella e quindi passare alla cella successiva.
Creare un sottoinsieme di DataFrame
Informazioni su come creare un sottoinsieme di DataFrame da un DataFrame esistente.
Copia e incolla il codice seguente in una cella vuota del Notebook. Questo codice usa il metodo Apache Spark
filterper creare un nuovo SataFrame limitando i dati per anno, conteggio e sesso. Usa il metodo diselect()Apache Spark per limitare le colonne. Usa anche Apache SparkorderBy()e le funzionidesc()per ordinare il nuovo DataFrame in base al conteggio.Pitone
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)Linguaggio di programmazione Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)Premere
Shift+Enterper eseguire la cella e quindi passare alla cella successiva.
Passaggio 5: salvare il DataFrame
Informazioni su come salvare un DataFrame. È possibile salvare il dataframe in una tabella o scrivere il dataframe in un file o in più file.
Salvare il dataframe in una tabella
Azure Databricks usa il formato Delta Lake per tutte le tabelle per impostazione predefinita. Per salvare il dataframe, è necessario disporre dei privilegi di tabella CREATE nel catalogo e nello schema.
Copia e incolla il codice seguente in una cella vuota del Notebook. Questo codice salva il contenuto del dataframe in una tabella usando la variabile definita all'inizio di questa esercitazione.
Pitone
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")Linguaggio di programmazione Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")Premere
Shift+Enterper eseguire la cella e quindi passare alla cella successiva.
La maggior parte delle applicazioni Apache Spark funziona su set di dati di grandi dimensioni e in modo distribuito. Apache Spark scrive una directory di file anziché un singolo file. Delta Lake suddivide le cartelle e i file Parquet. Molti sistemi dati possono leggere queste directory di file. Azure Databricks consiglia di usare tabelle su percorsi di file per la maggior parte delle applicazioni.
Salvare il DataFrame in file JSON
Copia e incolla il codice seguente in una cella vuota del Notebook. Questo codice salva il DataFrame in una directory di file JSON.
Pitone
df.write.format("json").mode("overwrite").save("/tmp/json_data")Linguaggio di programmazione Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")Premere
Shift+Enterper eseguire la cella e quindi passare alla cella successiva.
Leggere il DataFrame da un file JSON
Informazioni su come usare il metodo Apache Spark spark.read.format() per leggere i dati JSON da una directory in un DataFrame.
Copia e incolla il codice seguente in una cella vuota del Notebook. Questo codice visualizza i file JSON salvati nell'esempio precedente.
Pitone
display(spark.read.format("json").json("/tmp/json_data"))Linguaggio di programmazione Scala
display(spark.read.format("json").json("/tmp/json_data"))R
display(read.json("/tmp/json_data"))Premere
Shift+Enterper eseguire la cella e quindi passare alla cella successiva.
Attività aggiuntive: eseguire query SQL in PySpark, Scala e R
I DataFrame Apache Spark offrono le opzioni seguenti per combinare SQL con PySpark, Scala e R. È possibile eseguire il codice seguente nello stesso Notebook creato per questa esercitazione.
Specificare una colonna nella query SQL
Informazioni su come usare il metodo Apache Spark selectExpr(). Si tratta di una variante del metodo select() che accetta espressioni SQL e restituisce un DataFrame aggiornato. Questo metodo consente di usare un'espressione SQL, ad esempio upper.
Copia e incolla il codice seguente in una cella vuota del Notebook. Questo codice utilizza il metodo
selectExpr()di Apache Spark e l'espressione SQLupperper convertire una colonna di tipo stringa in lettere maiuscole (e rinominare la colonna).Pitone
display(df.selectExpr("Count", "upper(County) as big_name"))Linguaggio di programmazione Scala
display(df.selectExpr("Count", "upper(County) as big_name"))R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))Premere
Shift+Enterper eseguire la cella e quindi passare alla cella successiva.
Usare expr() per usare la sintassi SQL per una colonna
Informazioni su come importare e usare la funzione expr() Apache Spark per usare la sintassi SQL in qualsiasi punto in cui viene specificata una colonna.
Copia e incolla il codice seguente in una cella vuota del Notebook. Questo codice importa la funzione
expr()e quindi usa la funzioneexpr()Apache Spark e l'espressione SQLlowerper convertire una colonna stringa in lettere minuscole (e rinominare la colonna).Pitone
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))Linguaggio di programmazione Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionalityPremere
Shift+Enterper eseguire la cella e quindi passare alla cella successiva.
Eseguire una query SQL arbitraria usando la funzione spark.sql()
Informazioni su come usare la funzione Apache Spark spark.sql() per eseguire query SQL arbitrarie.
Copia e incolla il codice seguente in una cella vuota del Notebook. Questo codice usa la funzione di
spark.sql()Apache Spark per eseguire query su una tabella SQL usando la sintassi SQL.Pitone
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))Linguaggio di programmazione Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))Premere
Shift+Enterper eseguire la cella e quindi passare alla cella successiva.
Notebook di tutorial su DataFrame
I taccuini seguenti includono esempi di query di questa esercitazione.
Pitone
Esercitazione su DataFrame con Python
Ottieni taccuino
Linguaggio di programmazione Scala
Esercitazione su DataFrame con Scala
Ottieni taccuino
R
Esercitazione su DataFrame con R
Ottieni taccuino