Usare SparkR

SparkR è un pacchetto R che fornisce un front-end leggero per l'uso di Apache Spark da R. SparkR fornisce un'implementazione di frame di dati distribuita che supporta operazioni come selezione, filtro, aggregazione e così via. SparkR supporta anche l'apprendimento automatico distribuito tramite MLlib.

Usare SparkR tramite definizioni di processi batch Spark o con notebook interattivi di Microsoft Fabric.

Il supporto di R è disponibile solo in Spark3.1 o versione successiva. R in Spark 2.4 non è supportato.

Prerequisiti

  • Ottenere una sottoscrizione di Microsoft Fabric. In alternativa, iscriversi per ottenere una versione di valutazione gratuita di Microsoft Fabric.

  • Accedere a Microsoft Fabric.

  • Usare il commutatore esperienza sul lato sinistro della home page per passare all'esperienza di data science di Synapse.

    Screenshot of the experience switcher menu, showing where to select Data Science.

  • Aprire o creare un notebook. Per informazioni su come, vedere Come usare i notebook di Microsoft Fabric.

  • Impostare l'opzione del linguaggio su SparkR (R) per modificare il linguaggio primario.

  • Collegare il notebook a una lakehouse. Sul lato sinistro selezionare Aggiungi per aggiungere una lakehouse esistente o per creare una lakehouse.

Leggere e scrivere dataframe SparkR

Leggere un dataframe SparkR da un data.frame R locale

Il modo più semplice per creare un dataframe consiste nel convertire un dataframe R locale in un dataframe Spark.

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

Leggere e scrivere un dataframe SparkR da Lakehouse

I dati possono essere archiviati nel file system locale dei nodi del cluster. I metodi generali per leggere e scrivere un dataframe SparkR da Lakehouse sono read.df e write.df. Questi metodi accettano il percorso del file da caricare e il tipo di origine dati. SparkR supporta la lettura di file CSV, JSON, testo e Parquet in modo nativo.

Per leggere e scrivere in un lakehouse, aggiungerlo prima alla sessione. Sul lato sinistro del notebook selezionare Aggiungi per aggiungere un lakehouse esistente o creare una Lakehouse.

Nota

Per accedere ai file Lakehouse usando pacchetti Spark, ad esempio read.df o write.df, usare il percorso ADFS o il percorso relativo per Spark. In Lakehouse Explorer fare clic con il pulsante destro del mouse sui file o sulla cartella a cui si vuole accedere e copiarne il percorso ADFS o il percorso relativo per Spark dal menu contestuale.

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Microsoft Fabric è tidyverse preinstallato. È possibile accedere ai file Lakehouse nei pacchetti R familiari, ad esempio la lettura e la scrittura di file Lakehouse usando readr::read_csv() e readr::write_csv().

Nota

Per accedere ai file Lakehouse usando pacchetti R, è necessario usare il percorso dell'API File. In Lakehouse Explorer fare clic con il pulsante destro del mouse sul file o sulla cartella a cui si vuole accedere e copiare il percorso dell'API File dal menu contestuale.

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

È anche possibile leggere un dataframe SparkR in Lakehouse usando query SparkSQL.

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

Leggere e scrivere tabelle SQL tramite RODBC

Usare RODBC per connettersi ai database basati su SQL tramite un'interfaccia ODBC. Ad esempio, è possibile connettersi a un pool SQL dedicato di Synapse, come illustrato nel codice di esempio seguente. Sostituire i dettagli di connessione personalizzati per <database>, <uid>, <password>e <table>.

# load RODBC package
library(RODBC)


# config connection string

DriverVersion <- substr(system("apt list --installed *msodbc*", intern=TRUE, ignore.stderr=TRUE)[2],10,11)
ServerName <- "your-server-name"
DatabaseName <- "your-database-name"
Uid <- "your-user-id-list"
Password <- "your-password"

ConnectionString = sprintf("Driver={ODBC Driver %s for SQL Server};
Server=%s;
Database=%s;
Uid=%s;
Pwd=%s;
Encrypt=yes;
TrustServerCertificate=yes;
Connection Timeout=30;",DriverVersion,ServerName,DatabaseName,Uid,Password)
print(ConnectionString)


# connect to driver
channel <-odbcDriverConnect(ConnectionString)

# query from existing tables
Rdf <- sqlQuery(channel, "select * from <table>")
class(Rdf)

# use SparkR::as.DataFrame to convert R data.frame to SparkR DataFrame.
spark_df <- as.DataFrame(Rdf)
class(spark_df)
head(spark_df)

Operazioni del dataframe

I dataframe SparkR supportano molte funzioni per eseguire l'elaborazione dei dati strutturata. Ecco alcuni esempi di base. Un elenco completo è disponibile nella documentazione dell'API SparkR.

Selezionare righe e colonne

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

Raggruppamento e aggregazioni

I frame di dati SparkR supportano molte funzioni di uso comune per aggregare i dati dopo il raggruppamento. Ad esempio, è possibile calcolare un istogramma del tempo di attesa nel set di dati fedele, come illustrato di seguito

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Operazioni su colonne

SparkR offre molte funzioni che possono essere applicate direttamente alle colonne per l'elaborazione e l'aggregazione dei dati. Nell'esempio seguente viene illustrato l'uso di funzioni aritmetiche di base.

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Applicare una funzione definita dall'utente

SparkR supporta diversi tipi di funzioni definite dall'utente:

Eseguire una funzione in un set di dati di grandi dimensioni con dapply o dapplyCollect

dapply

Applicare una funzione a ogni partizione di un oggetto SparkDataFrame. La funzione da applicare a ogni partizione di SparkDataFrame e deve avere un solo parametro, al quale verrà passato un data.frame corrispondente a ogni partizione. L'output della funzione deve essere .data.frame Schema specifica il formato di riga dell'oggetto risultante.SparkDataFrame Deve corrispondere ai tipi di dati del valore restituito.

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

Analogamente a dapply, applicare una funzione a ogni partizione di e SparkDataFrame raccogliere nuovamente il risultato. L'output della funzione deve essere .data.frame Tuttavia, questa volta, lo schema non deve essere passato. Si noti che dapplyCollect può avere esito negativo se gli output della funzione vengono eseguiti su tutte le partizioni non possono essere estratti nel driver e si adattano alla memoria del driver.

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

Eseguire una funzione in un set di dati di grandi dimensioni raggruppando in base alle colonne di input con gapply o gapplyCollect

gapply

Applicare una funzione a ogni gruppo di un oggetto SparkDataFrame. La funzione deve essere applicata a ogni gruppo di SparkDataFrame e deve avere solo due parametri: chiave di raggruppamento e R data.frame corrispondente a tale chiave. I gruppi vengono scelti tra SparkDataFrames le colonne. L'output della funzione deve essere .data.frame Schema specifica il formato di riga dell'oggetto risultante SparkDataFrame. Deve rappresentare lo schema di output della funzione R dai tipi di dati Spark. I nomi di colonna dell'oggetto restituito data.frame vengono impostati dall'utente.

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

Analogamente gapplya , applica una funzione a ogni gruppo di un SparkDataFrame oggetto e raccoglie il risultato a R data.frame. L'output della funzione deve essere .data.frame Ma non è necessario passare lo schema. Si noti che gapplyCollect può avere esito negativo se gli output della funzione vengono eseguiti su tutte le partizioni non possono essere estratti nel driver e si adattano alla memoria del driver.

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

Eseguire funzioni R locali distribuite con spark.lapply

spark.lapply

Analogamente a lapply in R nativo, spark.lapply esegue una funzione su un elenco di elementi e distribuisce i calcoli con Spark. Applica una funzione in modo simile a doParallel o lapply a elementi di un elenco. I risultati di tutti i calcoli devono rientrare in un singolo computer. In caso contrario, è possibile eseguire operazioni come df <- createDataFrame(list) e quindi usare dapply.

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

Eseguire query SQL da SparkR

Un dataframe SparkR può anche essere registrato come visualizzazione temporanea che consente di eseguire query SQL sui relativi dati. La funzione sql consente alle applicazioni di eseguire query SQL a livello di codice e restituisce il risultato come dataframe SparkR.

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

Apprendimento automatico

SparkR espone la maggior parte degli algoritmi MLLib. SparkR usa MLlib per eseguire il training del modello.

L'esempio seguente illustra come creare un modello GLM gaussian usando SparkR. Per eseguire la regressione lineare, impostare la famiglia su "gaussian". Per eseguire la regressione logistica, impostare la famiglia su "binomial". Quando si usa SparkML GLM SparkR esegue automaticamente la codifica one-hot delle funzionalità categoriche in modo che non sia necessario eseguire manualmente. Oltre alle funzionalità di tipo String e Double, è anche possibile adattarsi alle funzionalità MLlib Vector, per compatibilità con altri componenti MLlib.

Per altre informazioni sugli algoritmi di Machine Learning supportati, vedere la documentazione per SparkR e MLlib.

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)