Usare SparkR
Importante
Microsoft Fabric è attualmente disponibile in ANTEPRIMA. Queste informazioni si riferiscono a un prodotto in versione preliminare che può essere modificato in modo sostanziale prima del rilascio. Microsoft non fornisce alcuna garanzia, espressa o implicita, rispetto alle informazioni fornite qui.
SparkR è un pacchetto R che fornisce un front-end leggero per l'uso di Apache Spark da R. SparkR offre un'implementazione distribuita del frame di dati che supporta operazioni come selezione, filtro, aggregazione e così via. SparkR supporta anche l'apprendimento automatico distribuito tramite MLlib.
Importante
Microsoft Fabric è attualmente disponibile in ANTEPRIMA. Queste informazioni si riferiscono a un prodotto in versione preliminare che può essere modificato in modo sostanziale prima del rilascio. Microsoft non fornisce alcuna garanzia, espressa o implicita, rispetto alle informazioni fornite qui.
Usare SparkR tramite definizioni di processi batch Spark o con notebook interattivi di Microsoft Fabric.
Il supporto di R è disponibile solo in Spark3.1 o versione successiva. R in Spark 2.4 non è supportato.
Prerequisiti
Sottoscrizione Power BI Premium. Se non è disponibile, vedere Come acquistare Power BI Premium.
Un'area di lavoro di Power BI con capacità Premium assegnata. Se non si ha un'area di lavoro, seguire la procedura descritta in Creare un'area di lavoro per crearne una e assegnarla a una capacità Premium.
Accedere a Microsoft Fabric.
Aprire o creare un notebook. Per informazioni su come, vedere Come usare i notebook di Microsoft Fabric.
Modificare la lingua primaria impostando l'opzione linguasu SparkR (R).Change the primary language by setting the language option to SparkR (R).
Collegare il notebook a una lakehouse. Sul lato sinistro selezionare Aggiungi per aggiungere una lakehouse esistente o creare una lakehouse.
Leggere e scrivere dataframe SparkR
Leggere un dataframe SparkR da un data.frame R locale
Il modo più semplice per creare un dataframe consiste nel convertire un data.frame R locale in un dataframe Spark.
# load SparkR pacakge
library(SparkR)
# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)
# displays the content of the DataFrame
display(df)
Leggere e scrivere un dataframe SparkR da Lakehouse
I dati possono essere archiviati nel file system locale dei nodi del cluster. I metodi generali per leggere e scrivere un dataframe SparkR da Lakehouse sono read.df
e write.df
. Questi metodi accettano il percorso del file da caricare e il tipo di origine dati. SparkR supporta la lettura nativa di file CSV, JSON, testo e Parquet.
Per leggere e scrivere in un Lakehouse, aggiungerlo prima alla sessione. Sul lato sinistro del notebook selezionare Aggiungi per aggiungere una lakehouse esistente o creare una lakehouse.
Nota
Per accedere ai file Lakehouse usando pacchetti Spark, ad esempio read.df
o write.df
, usare il percorso ADFS o il percorso relativo per Spark. In Lakehouse Explorer fare clic con il pulsante destro del mouse sui file o sulla cartella a cui si vuole accedere e copiare il percorso ADFS o il percorso relativo per Spark dal menu contestuale.
# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")
# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")
# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_pq)
Microsoft Fabric è tidyverse
preinstallato. È possibile accedere ai file Lakehouse nei pacchetti R familiari, ad esempio la lettura e la scrittura di file Lakehouse usando readr::read_csv()
e readr::write_csv()
.
Nota
Per accedere ai file Lakehouse usando i pacchetti R, è necessario usare il percorso dell'API File. In Lakehouse Explorer fare clic con il pulsante destro del mouse sul file o sulla cartella a cui si vuole accedere e copiare il relativo percorso API file dal menu contestuale.
# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)
# display the content of the R data.frame
head(faithfulDF_API)
È anche possibile leggere un dataframe SparkR in Lakehouse usando query SparkSQL.
# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")
head(waiting)
Leggere e scrivere tabelle SQL tramite RODBC
Usare RODBC per connettersi ai database basati su SQL tramite un'interfaccia ODBC. Ad esempio, è possibile connettersi a un pool SQL dedicato di Synapse, come illustrato nel codice di esempio seguente. Sostituire i propri dettagli di connessione per <database>
, <uid>
, <password>
e <table>
.
# load RODBC package
library(RODBC)
# config connection string
DriverVersion <- substr(system("apt list --installed *msodbc*", intern=TRUE, ignore.stderr=TRUE)[2],10,11)
ServerName <- "your-server-name"
DatabaseName <- "your-database-name"
Uid <- "your-user-id-list"
Password <- "your-password"
ConnectionString = sprintf("Driver={ODBC Driver %s for SQL Server};
Server=%s;
Database=%s;
Uid=%s;
Pwd=%s;
Encrypt=yes;
TrustServerCertificate=yes;
Connection Timeout=30;",DriverVersion,ServerName,DatabaseName,Uid,Password)
print(ConnectionString)
# connect to driver
channel <-odbcDriverConnect(ConnectionString)
# query from existing tables
Rdf <- sqlQuery(channel, "select * from <table>")
class(Rdf)
# use SparkR::as.DataFrame to convert R date.frame to SparkR DataFrame.
spark_df <- as.DataFrame(Rdf)
class(spark_df)
head(spark_df)
Operazioni del dataframe
I dataframe SparkR supportano molte funzioni per eseguire l'elaborazione dei dati strutturata. Ecco alcuni esempi di base. Un elenco completo è disponibile nella documentazione dell'API SparkR.
Selezionare righe e colonne
# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))
Raggruppamento e aggregazioni
I frame di dati SparkR supportano molte funzioni di uso comune per aggregare i dati dopo il raggruppamento. Ad esempio, è possibile calcolare un istogramma del tempo di attesa nel set di dati fedele, come illustrato di seguito
# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
Operazioni su colonne
SparkR offre molte funzioni che possono essere applicate direttamente alle colonne per l'elaborazione e l'aggregazione dei dati. Nell'esempio seguente viene illustrato l'uso di funzioni aritmetiche di base.
# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
Applicare la funzione definita dall'utente
SparkR supporta diversi tipi di funzioni definite dall'utente:
Eseguire una funzione in un set di dati di grandi dimensioni con dapply
o dapplyCollect
dapply
Applicare una funzione a ogni partizione di un oggetto SparkDataFrame
. La funzione da applicare a ogni partizione di SparkDataFrame
e deve avere un solo parametro, a cui verrà passato un data.frame corrispondente a ogni partizione. L'output della funzione deve essere .data.frame
Schema specifica il formato di riga dell'oggetto risultante.SparkDataFrame
Deve corrispondere ai tipi di dati del valore restituito.
# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
dapplyCollect
Analogamente a dapply, applicare una funzione a ogni partizione di e SparkDataFrame
raccogliere il risultato. L'output della funzione deve essere .data.frame
Questa volta, tuttavia, non è necessario passare lo schema. Si noti che dapplyCollect
può avere esito negativo se gli output della funzione vengono eseguiti in tutte le partizioni non possono essere estratti nel driver e si adattano alla memoria del driver.
# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
Eseguire una funzione in un set di dati di grandi dimensioni raggruppando in base alle colonne di input con gapply
o gapplyCollect
gapply
Applicare una funzione a ogni gruppo di un oggetto SparkDataFrame
. La funzione deve essere applicata a ogni gruppo di SparkDataFrame
e deve avere solo due parametri: chiave di raggruppamento e R data.frame
corrispondente a tale chiave. I gruppi vengono scelti tra SparkDataFrames
le colonne. L'output della funzione deve essere .data.frame
Schema specifica il formato di riga dell'oggetto risultante SparkDataFrame
. Deve rappresentare lo schema di output della funzione R dai tipi di dati Spark. I nomi di colonna dell'oggetto restituito data.frame
vengono impostati dall'utente.
# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
gapplyCollect
Analogamente gapply
a , applica una funzione a ogni gruppo di un SparkDataFrame
oggetto e raccoglie il risultato in R data.frame
. L'output della funzione deve essere .data.frame
Non è tuttavia necessario passare lo schema. Si noti che gapplyCollect
può avere esito negativo se gli output della funzione vengono eseguiti in tutte le partizioni non possono essere estratti nel driver e si adattano alla memoria del driver.
# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
Eseguire le funzioni R locali distribuite con spark.lapply
spark.lapply
Analogamente a lapply
in R nativo, spark.lapply
esegue una funzione su un elenco di elementi e distribuisce i calcoli con Spark. Applica una funzione in modo simile a o lapply
agli doParallel
elementi di un elenco. I risultati di tutti i calcoli devono essere inseriti in un singolo computer. In caso contrario, possono eseguire operazioni come df <- createDataFrame(list)
e quindi usare dapply
.
# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# print the summary of each model
print(model.summaries)
Eseguire query SQL da SparkR
Un dataframe SparkR può anche essere registrato come visualizzazione temporanea che consente di eseguire query SQL sui dati. La funzione sql consente alle applicazioni di eseguire query SQL a livello di codice e restituisce il risultato come dataframe SparkR.
# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")
head(waiting)
Machine Learning
SparkR espone la maggior parte degli algoritmi MLLib. SparkR usa MLlib per eseguire il training del modello.
L'esempio seguente illustra come creare un modello GLM gaussiano usando SparkR. Per eseguire la regressione lineare, impostare la famiglia su "gaussian"
. Per eseguire la regressione logistica, impostare la famiglia su "binomial"
. Quando si usa SparkML GLM
SparkR esegue automaticamente la codifica one-hot delle funzionalità categoriche in modo che non sia necessario eseguire manualmente. Oltre alle funzionalità di tipo String e Double, è anche possibile adattarsi alle funzionalità MLlib Vector per la compatibilità con altri componenti MLlib.
Per altre informazioni sugli algoritmi di Machine Learning supportati, vedere la documentazione relativa a SparkR e MLlib.
# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)
# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")
# model coefficients are returned in a similar format to R's native glm().
summary(model)