Use sparklyr

sparklyr is an R interface to Apache Spark. It provides a mechanism to interact with Spark using familiar R interfaces. You can use sparklyr through Spark batch job definitions or with interactive Microsoft Fabric notebooks.

sparklyr is used along with other tidyverse packages such as dplyr. Microsoft Fabric distributes the latest stable version of sparklyr and tidyverse with every runtime release. You can import them and start using the API.

Prerequisites

  • Open or create a notebook. To learn how, see How to use Microsoft Fabric notebooks.

  • Set the language option to SparkR (R) to change the primary language.

  • Attach your notebook to a lakehouse. On the left side, select Add to add an existing lakehouse or to create a lakehouse.

Connect sparklyr to Synapse Spark cluster

Use the following connection method in spark_connect() to establish a sparklyr connection. We support a new connection method called synapse, which allows you to connect to an existing Spark session. It dramatically reduces the sparklyr session start time. Additionally, we contributed this connection method to the open sourced sparklyr project. With method = "synapse", you can use both sparklyr and SparkR in the same session and easily share data between them.

# connect sparklyr to your spark cluster
spark_version <- sparkR.version()
config <- spark_config()
sc <- spark_connect(master = "yarn", version = spark_version, spark_home = "/opt/spark", method = "synapse", config = config)

Use sparklyr to read data

A new Spark session contains no data. The first step is to either load data into your Spark session's memory, or point Spark to the location of the data so it can access the data on-demand.

# load the sparklyr package
library(sparklyr)

# copy data from R environment to the Spark session's memory
mtcars_tbl <- copy_to(sc, mtcars, "spark_mtcars", overwrite = TRUE)

head(mtcars_tbl)

Using sparklyr, you can also write and read data from a Lakehouse file using ABFS path. To read and write to a Lakehouse, first add it to your session. On the left side of the notebook, select Add to add an existing Lakehouse or create a Lakehouse.

To find your ABFS path, right click on the Files folder in your Lakehouse, then select Copy ABFS path. Paste your path to replace abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files in this code:

temp_csv = "abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files/data/mtcars.csv"

# write the table to your lakehouse using the ABFS path
spark_write_csv(mtcars_tbl, temp_csv, header = TRUE, mode = 'overwrite')

# read the data as CSV from lakehouse using the ABFS path
mtcarsDF <- spark_read_csv(sc, temp_csv) 
head(mtcarsDF)

Use sparklyr to manipulate data

sparklyr provides multiple methods to process data inside Spark using:

  • dplyr commands
  • SparkSQL
  • Spark's feature transformers

Use dplyr

You can use familiar dplyr commands to prepare data inside Spark. The commands run inside Spark, so there are no unnecessary data transfers between R and Spark.

Click the Manipulating Data with dplyr to see extra documentation on using dplyr with Spark.

# count cars by the number of cylinders the engine contains (cyl), order the results descendingly
library(dplyr)

cargroup <- group_by(mtcars_tbl, cyl) %>%
  count() %>%
  arrange(desc(n))

cargroup

sparklyr and dplyr translate the R commands into Spark SQL for us. To see the resulting query use show_query():

# show the dplyr commands that are to run against the Spark connection
dplyr::show_query(cargroup)

Use SQL

It's also possible to execute SQL queries directly against tables within a Spark cluster. The spark_connection() object implements a DBI interface for Spark, so you can use dbGetQuery() to execute SQL and return the result as an R data frame:

library(DBI)
dbGetQuery(sc, "select cyl, count(*) as n from spark_mtcars
GROUP BY cyl
ORDER BY n DESC")

Use Feature Transformers

Both of the previous methods rely on SQL statements. Spark provides commands that make some data transformation more convenient, and without the use of SQL.

For example, the ft_binarizer() command simplifies the creation of a new column that indicates if the value of another column is above a certain threshold.

You can find the full list of the Spark Feature Transformers available through sparklyr from Reference -FT.

mtcars_tbl %>% 
  ft_binarizer("mpg", "over_20", threshold = 20) %>% 
  select(mpg, over_20) %>% 
  head(5)

Share data between sparklyr and SparkR

When you connect sparklyr to synapse spark cluster with method = "synapse", you can use both sparklyr and SparkR in the same session and easily share data between them. You can create a spark table in sparklyr and read it from SparkR.

# load the sparklyr package
library(sparklyr)

# Create table in `sparklyr`
mtcars_sparklyr <- copy_to(sc, df = mtcars, name = "mtcars_tbl", overwrite = TRUE, repartition = 3L)

# Read table from `SparkR`
mtcars_sparklr <- SparkR::sql("select cyl, count(*) as n
from mtcars_tbl
GROUP BY cyl
ORDER BY n DESC")

head(mtcars_sparklr)

Machine learning

Here's an example where we use ml_linear_regression() to fit a linear regression model. We use the built-in mtcars dataset, and see if we can predict a car's fuel consumption (mpg) based on its weight (wt), and the number of cylinders the engine contains (cyl). We assume in each case that the relationship between mpg and each of our features is linear.

Generate testing and training data sets

Use a split, 70% for training and 30% for testing the model. Playing with this ratio results in different models.

# split the dataframe into test and training dataframes

partitions <- mtcars_tbl %>%
  select(mpg, wt, cyl) %>% 
  sdf_random_split(training = 0.7, test = 0.3, seed = 2023)

Train the model

Train the Logistic Regression model.

fit <- partitions$training %>%
  ml_linear_regression(mpg ~ .)

fit

Now use summary() to learn a bit more about the quality of our model, and the statistical significance of each of our predictors.

summary(fit)

Use the model

You can apply the model on the test dataset by calling ml_predict().

pred <- ml_predict(fit, partitions$test)

head(pred)

For a list of Spark ML models available through sparklyr visit Reference - ML

Disconnect from Spark cluster

You can call spark_disconnect() to or select the Stop session button on top of the notebook ribbon end your Spark session.

spark_disconnect(sc)

Learn more about the R functionalities: