Événements
31 mars, 23 h - 2 avr., 23 h
L’événement de la communauté Microsoft Fabric, Power BI, SQL et AI ultime. 31 mars au 2 avril 2025.
Inscrivez-vous aujourd’huiCe navigateur n’est plus pris en charge.
Effectuez une mise à niveau vers Microsoft Edge pour tirer parti des dernières fonctionnalités, des mises à jour de sécurité et du support technique.
SparkR est un package R qui fournit un frontend léger pour utiliser Apache Spark à partir de R. SparkR fournit une implémentation de trame de données distribuée qui prend en charge des opérations comme la sélection, le filtrage, l’agrégation, etc. SparkR prend également en charge le Machine Learning distribué à l’aide de MLlib.
Utilisez SparkR via des définitions de traitement par lots Spark ou avec des notebooks Microsoft Fabric interactifs.
La prise en charge de R est disponible uniquement dans Spark 3.1 ou version ultérieure. R dans Spark 2.4 n’est pas pris en charge.
Obtenir un abonnement Microsoft Fabric. Ou, inscrivez-vous pour un essai gratuit de Microsoft Fabric.
Connectez-vous à Microsoft Fabric.
Utilisez le sélecteur d’expérience sur le côté gauche de votre page d’accueil pour passer à l’expérience science des données Synapse.
Ouvrez ou créez un notebook. Pour en savoir plus, consultez Comment utiliser les blocs-notes Microsoft Fabric.
Réglez l’option de langue sur SparkR (R) pour modifier la langue principale.
Attachez votre notebook à lakehouse. Sur le côté gauche, sélectionnez Ajouter pour ajouter un lakehouse existant ou pour créer un lakehouse.
La façon la plus simple de créer un DataFrame consiste à convertir un data.frame R local en DataFrame Spark.
# load SparkR pacakge
library(SparkR)
# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)
# displays the content of the DataFrame
display(df)
Les données peuvent être stockées sur le système de fichiers local des nœuds de cluster. Les méthodes générales pour lire et écrire un DataFrame SparkR à partir d’un lakehouse sont read.df
et write.df
. Ces méthodes prennent le chemin d’accès du fichier à charger et le type de source de données. SparkR prend en charge la lecture des fichiers CSV, JSON, texte et Parquet en mode natif.
Pour lire et écrire dans un lakehouse, commencez par l’ajouter à votre session. Sur le côté gauche du notebook, sélectionnez Ajouter pour ajouter un lakehouse existant ou en créer un.
Notes
Pour accéder aux fichiers de lakehouse à l’aide de packages Spark, tels que read.df
ou write.df
, utilisez leur chemin ADFS ou chemin relatif pour Spark. Dans l’explorateur de lakehouse, cliquez avec le bouton droit sur les fichiers ou dossiers auxquels vous souhaitez accéder et copiez leur chemin ADFS ou chemin relatif pour Spark à partir du menu contextuel.
# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")
# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")
# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_pq)
tidyverse
est préinstallé avec Microsoft Fabric. Vous pouvez accéder aux fichiers de lakehouse dans vos packages R familiers, tels que la lecture et l’écriture de fichiers de lakehouse à l’aide de readr::read_csv()
et de readr::write_csv()
.
Notes
Pour accéder aux fichiers de lakehouse à l’aide de packages R, vous devez utiliser le chemin d’accès de l’API Fichier. Dans l’explorateur de lakehouse, cliquez avec le bouton droit sur le fichier ou le dossier auquel vous souhaitez accéder, puis copiez son chemin d’accès à l’API Fichier à partir du menu contextuel.
# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)
# display the content of the R data.frame
head(faithfulDF_API)
Vous pouvez également lire un Dataframe SparkR sur votre lakehouse à l’aide de requêtes SparkSQL.
# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")
head(waiting)
Les DataFrames Spark prennent en charge un certain nombre de fonctions pour le traitement de données structurées. En voici quelques exemples de base. La liste complète est disponible dans la documentation de l’API SparkR.
# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))
Les DataFrames Spark prennent en charge de nombreuses fonctions couramment utilisées pour agréger des données après regroupement. Par exemple, nous pouvons calculer un histogramme du temps d’attente dans le jeu de données fidèle, comme indiqué ci-dessous.
# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
SparkR fournit de nombreuses fonctions qui peuvent être appliquées directement aux colonnes pour le traitement et l’agrégation de données. L’exemple suivant illustre l’utilisation de fonctions arithmétiques de base.
# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
SparkR prend en charge plusieurs types de fonctions définies par l’utilisateur :
Appliquez une fonction à chaque partition d’un SparkDataFrame
. La fonction à appliquer à chaque partition du SparkDataFrame
et ne doit avoir qu’un seul paramètre, auquel un data.frame correspondant à chaque partition sera transmis. La sortie de la fonction doit être un data.frame
. Schéma spécifie le format de ligne du SparkDataFrame
résultant. Il doit correspondre aux types de données de la valeur renvoyée.
# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
Comme dapply, appliquez une fonction à chaque partition d’un SparkDataFrame
et collectez le résultat. La sortie de la fonction doit être un data.frame
. Mais, cette fois, le schéma n’est pas nécessaire. Notez que dapplyCollect
peut échouer si les sorties de la fonction exécutée sur toutes les partitions ne peuvent pas être extraites vers le pilote et tenir dans la mémoire du pilote.
# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
gapply
ou gapplyCollect
Appliquez une fonction à chaque groupe d’un SparkDataFrame
. La fonction doit être appliquée à chaque groupe du SparkDataFrame
et ne doit avoir que deux paramètres : clé de regroupement et data.frame
R correspondant à cette clé. Les groupes sont choisis parmi les colonnes SparkDataFrames
. La sortie de la fonction doit être un data.frame
. Schéma spécifie le format de ligne du SparkDataFrame
résultant. Il doit représenter le schéma de sortie de la fonction R à partir de types de données Spark. Les noms de colonnes du data.frame
renvoyé sont définis par l’utilisateur.
# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
Comme gapply
, applique une fonction à chaque groupe d’un SparkDataFrame
et récupère le résultat sur data.frame
R. La sortie de la fonction doit être un data.frame
. Toutefois, il n’est pas nécessaire de transmettre le schéma. Notez que gapplyCollect
peut échouer si les sorties de la fonction exécutées sur toutes les partitions ne peuvent pas être extraites vers le pilote et tenir dans la mémoire du pilote.
# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
Comme lapply
dans R natif, spark.lapply
exécute une fonction sur une liste d’éléments et distribue les calculs avec Spark. Applique une fonction d’une manière similaire à doParallel
ou lapply
aux éléments d’une liste. Les résultats de tous les calculs doivent tenir dans une seule machine. Si ce n’est pas le cas, ils peuvent effectuer une opération du type df <- createDataFrame(list)
, puis utiliser dapply
.
# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# print the summary of each model
print(model.summaries)
Un DataFrame SparkR peut également être inscrit en tant qu’affichage temporaire qui vous permet d’exécuter des requêtes SQL sur ses données. La fonction sql permet aux applications d’exécuter des requêtes SQL par programmation et retourne le résultat sous la forme d’un DataFrame SparkR.
# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")
head(waiting)
SparkR expose la plupart des algorithmes MLLib. En coulisse, SparkR utilise MLlib pour former le modèle.
L’exemple suivant montre comment générer un modèle GLM gaussien à l’aide de SparkR. Pour exécuter une régression linéaire, définissez la famille sur "gaussian"
. Pour exécuter une régression logistique, définissez la famille sur "binomial"
. Lorsque vous utilisez SparkML GLM
, SparkR effectue automatiquement un codage à chaud des fonctionnalités catégoriques afin qu’il ne soit pas nécessaire de le faire manuellement. En plus des fonctionnalités de type chaîne et double, il est possible d’ajuster les fonctionnalités de vecteur MLlib à des fins de compatibilité avec d’autres composants MLlib.
Pour en savoir plus sur les algorithmes de Machine Learning pris en charge, consultez la documentation pour SparkR et MLlib.
# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)
# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")
# model coefficients are returned in a similar format to R's native glm().
summary(model)
Événements
31 mars, 23 h - 2 avr., 23 h
L’événement de la communauté Microsoft Fabric, Power BI, SQL et AI ultime. 31 mars au 2 avril 2025.
Inscrivez-vous aujourd’huiEntrainement
Parcours d’apprentissage
Créer des modèles Machine Learning avec R et tidymodels - Training
Découvrez comment explorer et analyser des données en utilisant R. Découvrez les modèles de régression, de classification et de clustering avec tidymodels et R.
Certification
Microsoft Certified : Azure Data Scientist Associate - Certifications
Gérer l’ingestion et la préparation des données, l’entraînement et le déploiement des modèles, ainsi que la surveillance des solutions d’apprentissage automatique avec Python, Azure Machine Learning et MLflow.