Utiliser SparkR

SparkR est un package R qui fournit un frontend léger pour utiliser Apache Spark à partir de R. SparkR fournit une implémentation de trame de données distribuée qui prend en charge des opérations comme la sélection, le filtrage, l’agrégation, etc. SparkR prend également en charge le Machine Learning distribué à l’aide de MLlib.

Utilisez SparkR via des définitions de traitement par lots Spark ou avec des notebooks Microsoft Fabric interactifs.

La prise en charge de R est disponible uniquement dans Spark 3.1 ou version ultérieure. R dans Spark 2.4 n’est pas pris en charge.

Prérequis

  • Ouvrez ou créez un notebook. Pour en savoir plus, consultez Comment utiliser les blocs-notes Microsoft Fabric.

  • Réglez l'option de langue sur SparkR (R) pour modifier la langue principale.

  • Attachez votre cahier à une cabane au bord du lac. Sur le côté gauche, sélectionnez Ajouter pour ajouter un lakehouse existant ou pour créer un lakehouse.

Lire et écrire des dataframes SparkR

Lire un dataframe SparkR à partir d’un data.frame R local

La façon la plus simple de créer un DataFrame consiste à convertir un data.frame R local en DataFrame Spark.

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

Lire et écrire un DataFrame SparkR à partir d’un lakehouse

Les données peuvent être stockées sur le système de fichiers local des nœuds de cluster. Les méthodes générales pour lire et écrire un DataFrame SparkR à partir d’un lakehouse sont read.df et write.df. Ces méthodes prennent le chemin d’accès du fichier à charger et le type de source de données. SparkR prend en charge la lecture des fichiers CSV, JSON, texte et Parquet en mode natif.

Pour lire et écrire dans un lakehouse, commencez par l’ajouter à votre session. Sur le côté gauche du notebook, sélectionnez Ajouter pour ajouter un lakehouse existant ou en créer un.

Remarque

Pour accéder aux fichiers de lakehouse à l’aide de packages Spark, tels que read.df ou write.df, utilisez leur chemin ADFS ou chemin relatif pour Spark. Dans l’explorateur de lakehouse, cliquez avec le bouton droit sur les fichiers ou dossiers auxquels vous souhaitez accéder et copiez leur chemin ADFS ou chemin relatif pour Spark à partir du menu contextuel.

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

tidyverse est préinstallé avec Microsoft Fabric. Vous pouvez accéder aux fichiers de lakehouse dans vos packages R familiers, tels que la lecture et l’écriture de fichiers de lakehouse à l’aide de readr::read_csv() et de readr::write_csv().

Remarque

Pour accéder aux fichiers de lakehouse à l’aide de packages R, vous devez utiliser le chemin d’accès de l’API Fichier. Dans l’explorateur de lakehouse, cliquez avec le bouton droit sur le fichier ou le dossier auquel vous souhaitez accéder, puis copiez son chemin d’accès à l’API Fichier à partir du menu contextuel.

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

Vous pouvez également lire un Dataframe SparkR sur votre lakehouse à l’aide de requêtes SparkSQL.

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

Lire et écrire des tables SQL via RODBC

Utilisez RODBC pour vous connecter à des bases de données SQL via une interface ODBC. Par exemple, vous pouvez vous connecter à un pool SQL dédié Synapse, comme indiqué dans l’exemple de code suivant. Remplacez vos informations de connexion par <database>, <uid>, <password> et <table>.

# load RODBC package
library(RODBC)


# config connection string

DriverVersion <- substr(system("apt list --installed *msodbc*", intern=TRUE, ignore.stderr=TRUE)[2],10,11)
ServerName <- "your-server-name"
DatabaseName <- "your-database-name"
Uid <- "your-user-id-list"
Password <- "your-password"

ConnectionString = sprintf("Driver={ODBC Driver %s for SQL Server};
Server=%s;
Database=%s;
Uid=%s;
Pwd=%s;
Encrypt=yes;
TrustServerCertificate=yes;
Connection Timeout=30;",DriverVersion,ServerName,DatabaseName,Uid,Password)
print(ConnectionString)


# connect to driver
channel <-odbcDriverConnect(ConnectionString)

# query from existing tables
Rdf <- sqlQuery(channel, "select * from <table>")
class(Rdf)

# use SparkR::as.DataFrame to convert R data.frame to SparkR DataFrame.
spark_df <- as.DataFrame(Rdf)
class(spark_df)
head(spark_df)

Opérations de Tramedonnées

Les DataFrames Spark prennent en charge un certain nombre de fonctions pour le traitement de données structurées. En voici quelques exemples de base. La liste complète est disponible dans la documentation de l’API SparkR.

Sélectionner des lignes et des colonnes

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

Regroupement et agrégation

Les DataFrames Spark prennent en charge de nombreuses fonctions couramment utilisées pour agréger des données après regroupement. Par exemple, nous pouvons calculer un histogramme du temps d’attente dans le jeu de données fidèle, comme indiqué ci-dessous.

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Opérations sur les colonnes

SparkR fournit de nombreuses fonctions qui peuvent être appliquées directement aux colonnes pour le traitement et l’agrégation de données. L’exemple suivant illustre l’utilisation de fonctions arithmétiques de base.

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Appliquer une fonction définie par l’utilisateur

SparkR prend en charge plusieurs types de fonctions définies par l’utilisateur :

Exécuter une fonction sur un jeu de données volumineux avec dapply ou dapplyCollect

dapply

Appliquez une fonction à chaque partition d’un SparkDataFrame. La fonction à appliquer à chaque partition du SparkDataFrame et ne doit avoir qu’un seul paramètre, auquel un data.frame correspondant à chaque partition sera transmis. La sortie de la fonction doit être un data.frame. Schéma spécifie le format de ligne du SparkDataFrame résultant. Il doit correspondre aux types de données de la valeur renvoyée.

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

Comme dapply, appliquez une fonction à chaque partition d’un SparkDataFrame et collectez le résultat. La sortie de la fonction doit être un data.frame. Mais, cette fois, le schéma n’est pas nécessaire. Notez que dapplyCollect peut échouer si les sorties de la fonction exécutée sur toutes les partitions ne peuvent pas être extraites vers le pilote et tenir dans la mémoire du pilote.

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

Exécuter une fonction sur un regroupement de jeux de données volumineux par colonne d’entrée avec gapply ou gapplyCollect

gapply

Appliquez une fonction à chaque groupe d’un SparkDataFrame. La fonction doit être appliquée à chaque groupe du SparkDataFrame et ne doit avoir que deux paramètres : clé de regroupement et data.frame R correspondant à cette clé. Les groupes sont choisis parmi les colonnes SparkDataFrames. La sortie de la fonction doit être un data.frame. Schéma spécifie le format de ligne du SparkDataFrame résultant. Il doit représenter le schéma de sortie de la fonction R à partir de types de données Spark. Les noms de colonnes du data.frame renvoyé sont définis par l’utilisateur.

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

Comme gapply, applique une fonction à chaque groupe d’un SparkDataFrame et récupère le résultat sur data.frame R. La sortie de la fonction doit être un data.frame. Toutefois, il n’est pas nécessaire de transmettre le schéma. Notez que gapplyCollect peut échouer si les sorties de la fonction exécutées sur toutes les partitions ne peuvent pas être extraites vers le pilote et tenir dans la mémoire du pilote.

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

Exécuter des fonctions R locales distribuées avec spark.lapply

spark.lapply

Comme lapply dans R natif, spark.lapply exécute une fonction sur une liste d’éléments et distribue les calculs avec Spark. Applique une fonction d’une manière similaire à doParallel ou lapply aux éléments d’une liste. Les résultats de tous les calculs doivent tenir dans une seule machine. Si ce n’est pas le cas, ils peuvent effectuer une opération du type df <- createDataFrame(list), puis utiliser dapply.

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

Exécuter des requêtes SQL à partir de SparkR

Un DataFrame SparkR peut également être inscrit en tant qu’affichage temporaire qui vous permet d’exécuter des requêtes SQL sur ses données. La fonction sql permet aux applications d’exécuter des requêtes SQL par programmation et retourne le résultat sous la forme d’un DataFrame SparkR.

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

Machine Learning

SparkR expose la plupart des algorithmes MLLib. En coulisse, SparkR utilise MLlib pour former le modèle.

L’exemple suivant montre comment générer un modèle GLM gaussien à l’aide de SparkR. Pour exécuter une régression linéaire, définissez la famille sur "gaussian". Pour exécuter une régression logistique, définissez la famille sur "binomial". Lorsque vous utilisez SparkML GLM, SparkR effectue automatiquement un codage à chaud des fonctionnalités catégoriques afin qu’il ne soit pas nécessaire de le faire manuellement. En plus des fonctionnalités de type chaîne et double, il est possible d’ajuster les fonctionnalités de vecteur MLlib à des fins de compatibilité avec d’autres composants MLlib.

Pour en savoir plus sur les algorithmes de Machine Learning pris en charge, consultez la documentation pour SparkR et MLlib.

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)