Usare SparkR

Importante

Microsoft Fabric è attualmente disponibile in ANTEPRIMA. Queste informazioni si riferiscono a un prodotto in versione preliminare che può essere modificato in modo sostanziale prima del rilascio. Microsoft non fornisce alcuna garanzia, espressa o implicita, rispetto alle informazioni fornite qui.

SparkR è un pacchetto R che fornisce un front-end leggero per l'uso di Apache Spark da R. SparkR offre un'implementazione distribuita del frame di dati che supporta operazioni come selezione, filtro, aggregazione e così via. SparkR supporta anche l'apprendimento automatico distribuito tramite MLlib.

Importante

Microsoft Fabric è attualmente disponibile in ANTEPRIMA. Queste informazioni si riferiscono a un prodotto in versione preliminare che può essere modificato in modo sostanziale prima del rilascio. Microsoft non fornisce alcuna garanzia, espressa o implicita, rispetto alle informazioni fornite qui.

Usare SparkR tramite definizioni di processi batch Spark o con notebook interattivi di Microsoft Fabric.

Il supporto di R è disponibile solo in Spark3.1 o versione successiva. R in Spark 2.4 non è supportato.

Prerequisiti

  • Aprire o creare un notebook. Per informazioni su come, vedere Come usare i notebook di Microsoft Fabric.

  • Modificare la lingua primaria impostando l'opzione linguasu SparkR (R).Change the primary language by setting the language option to SparkR (R).

  • Collegare il notebook a una lakehouse. Sul lato sinistro selezionare Aggiungi per aggiungere una lakehouse esistente o creare una lakehouse.

Leggere e scrivere dataframe SparkR

Leggere un dataframe SparkR da un data.frame R locale

Il modo più semplice per creare un dataframe consiste nel convertire un data.frame R locale in un dataframe Spark.

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

Leggere e scrivere un dataframe SparkR da Lakehouse

I dati possono essere archiviati nel file system locale dei nodi del cluster. I metodi generali per leggere e scrivere un dataframe SparkR da Lakehouse sono read.df e write.df. Questi metodi accettano il percorso del file da caricare e il tipo di origine dati. SparkR supporta la lettura nativa di file CSV, JSON, testo e Parquet.

Per leggere e scrivere in un Lakehouse, aggiungerlo prima alla sessione. Sul lato sinistro del notebook selezionare Aggiungi per aggiungere una lakehouse esistente o creare una lakehouse.

Nota

Per accedere ai file Lakehouse usando pacchetti Spark, ad esempio read.df o write.df, usare il percorso ADFS o il percorso relativo per Spark. In Lakehouse Explorer fare clic con il pulsante destro del mouse sui file o sulla cartella a cui si vuole accedere e copiare il percorso ADFS o il percorso relativo per Spark dal menu contestuale.

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Microsoft Fabric è tidyverse preinstallato. È possibile accedere ai file Lakehouse nei pacchetti R familiari, ad esempio la lettura e la scrittura di file Lakehouse usando readr::read_csv() e readr::write_csv().

Nota

Per accedere ai file Lakehouse usando i pacchetti R, è necessario usare il percorso dell'API File. In Lakehouse Explorer fare clic con il pulsante destro del mouse sul file o sulla cartella a cui si vuole accedere e copiare il relativo percorso API file dal menu contestuale.

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

È anche possibile leggere un dataframe SparkR in Lakehouse usando query SparkSQL.

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

Leggere e scrivere tabelle SQL tramite RODBC

Usare RODBC per connettersi ai database basati su SQL tramite un'interfaccia ODBC. Ad esempio, è possibile connettersi a un pool SQL dedicato di Synapse, come illustrato nel codice di esempio seguente. Sostituire i propri dettagli di connessione per <database>, <uid>, <password>e <table>.

# load RODBC package
library(RODBC)


# config connection string

DriverVersion <- substr(system("apt list --installed *msodbc*", intern=TRUE, ignore.stderr=TRUE)[2],10,11)
ServerName <- "your-server-name"
DatabaseName <- "your-database-name"
Uid <- "your-user-id-list"
Password <- "your-password"

ConnectionString = sprintf("Driver={ODBC Driver %s for SQL Server};
Server=%s;
Database=%s;
Uid=%s;
Pwd=%s;
Encrypt=yes;
TrustServerCertificate=yes;
Connection Timeout=30;",DriverVersion,ServerName,DatabaseName,Uid,Password)
print(ConnectionString)


# connect to driver
channel <-odbcDriverConnect(ConnectionString)

# query from existing tables
Rdf <- sqlQuery(channel, "select * from <table>")
class(Rdf)

# use SparkR::as.DataFrame to convert R date.frame to SparkR DataFrame.
spark_df <- as.DataFrame(Rdf)
class(spark_df)
head(spark_df)

Operazioni del dataframe

I dataframe SparkR supportano molte funzioni per eseguire l'elaborazione dei dati strutturata. Ecco alcuni esempi di base. Un elenco completo è disponibile nella documentazione dell'API SparkR.

Selezionare righe e colonne

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

Raggruppamento e aggregazioni

I frame di dati SparkR supportano molte funzioni di uso comune per aggregare i dati dopo il raggruppamento. Ad esempio, è possibile calcolare un istogramma del tempo di attesa nel set di dati fedele, come illustrato di seguito

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Operazioni su colonne

SparkR offre molte funzioni che possono essere applicate direttamente alle colonne per l'elaborazione e l'aggregazione dei dati. Nell'esempio seguente viene illustrato l'uso di funzioni aritmetiche di base.

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Applicare la funzione definita dall'utente

SparkR supporta diversi tipi di funzioni definite dall'utente:

Eseguire una funzione in un set di dati di grandi dimensioni con dapply o dapplyCollect

dapply

Applicare una funzione a ogni partizione di un oggetto SparkDataFrame. La funzione da applicare a ogni partizione di SparkDataFrame e deve avere un solo parametro, a cui verrà passato un data.frame corrispondente a ogni partizione. L'output della funzione deve essere .data.frame Schema specifica il formato di riga dell'oggetto risultante.SparkDataFrame Deve corrispondere ai tipi di dati del valore restituito.

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

Analogamente a dapply, applicare una funzione a ogni partizione di e SparkDataFrame raccogliere il risultato. L'output della funzione deve essere .data.frame Questa volta, tuttavia, non è necessario passare lo schema. Si noti che dapplyCollect può avere esito negativo se gli output della funzione vengono eseguiti in tutte le partizioni non possono essere estratti nel driver e si adattano alla memoria del driver.

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

Eseguire una funzione in un set di dati di grandi dimensioni raggruppando in base alle colonne di input con gapply o gapplyCollect

gapply

Applicare una funzione a ogni gruppo di un oggetto SparkDataFrame. La funzione deve essere applicata a ogni gruppo di SparkDataFrame e deve avere solo due parametri: chiave di raggruppamento e R data.frame corrispondente a tale chiave. I gruppi vengono scelti tra SparkDataFrames le colonne. L'output della funzione deve essere .data.frame Schema specifica il formato di riga dell'oggetto risultante SparkDataFrame. Deve rappresentare lo schema di output della funzione R dai tipi di dati Spark. I nomi di colonna dell'oggetto restituito data.frame vengono impostati dall'utente.

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

Analogamente gapplya , applica una funzione a ogni gruppo di un SparkDataFrame oggetto e raccoglie il risultato in R data.frame. L'output della funzione deve essere .data.frame Non è tuttavia necessario passare lo schema. Si noti che gapplyCollect può avere esito negativo se gli output della funzione vengono eseguiti in tutte le partizioni non possono essere estratti nel driver e si adattano alla memoria del driver.

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

Eseguire le funzioni R locali distribuite con spark.lapply

spark.lapply

Analogamente a lapply in R nativo, spark.lapply esegue una funzione su un elenco di elementi e distribuisce i calcoli con Spark. Applica una funzione in modo simile a o lapply agli doParallel elementi di un elenco. I risultati di tutti i calcoli devono essere inseriti in un singolo computer. In caso contrario, possono eseguire operazioni come df <- createDataFrame(list) e quindi usare dapply.

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

Eseguire query SQL da SparkR

Un dataframe SparkR può anche essere registrato come visualizzazione temporanea che consente di eseguire query SQL sui dati. La funzione sql consente alle applicazioni di eseguire query SQL a livello di codice e restituisce il risultato come dataframe SparkR.

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

Machine Learning

SparkR espone la maggior parte degli algoritmi MLLib. SparkR usa MLlib per eseguire il training del modello.

L'esempio seguente illustra come creare un modello GLM gaussiano usando SparkR. Per eseguire la regressione lineare, impostare la famiglia su "gaussian". Per eseguire la regressione logistica, impostare la famiglia su "binomial". Quando si usa SparkML GLM SparkR esegue automaticamente la codifica one-hot delle funzionalità categoriche in modo che non sia necessario eseguire manualmente. Oltre alle funzionalità di tipo String e Double, è anche possibile adattarsi alle funzionalità MLlib Vector per la compatibilità con altri componenti MLlib.

Per altre informazioni sugli algoritmi di Machine Learning supportati, vedere la documentazione relativa a SparkR e MLlib.

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)

Passaggi successivi