Share via


Migrate from SparkR to sparklyr

SparkR was developed as part of Apache Spark, and its design is familiar to users of Scala and Python, but potentially less intuitive for R practitioners. In addition, SparkR is deprecated in Spark 4.0.

In contrast, sparklyr is focused on providing a more R-friendly experience. It leverages dplyr syntax, which is familiar to users of tidyverse with patterns like select(), filter(), and mutate() for DataFrame operations.

sparklyr is the recommended R package for working with Apache Spark. This page explains differences between SparkR and sparklyr across Spark APIs, and provides information about code migration.

Environment setup

Installation

If you are in the Azure Databricks workspace, no installation is required. Load sparklyr with library(sparklyr). To install sparklyr locally outside of Azure Databricks, see Get Started.

Connecting to Spark

Connect to Spark with sparklyr in the Databricks workspace or locally using Databricks Connect:

Workspace:

library(sparklyr)
sc <- spark_connect(method = "databricks")

Databricks Connect:

sc <- spark_connect(method = "databricks_connect")

For more details and an extended tutorial on Databricks Connect with sparklyr, see Getting Started.

Reading and writing data

sparklyr has a family of spark_read_*() and spark_write_*() functions to load and save data, unlike SparkR's generic read.df() and write.df() functions. There are also unique functions to create Spark DataFrames or Spark SQL temporary views from R data frames in memory.

Task SparkR sparklyr
Copy data to Spark createDataFrame() copy_to()
Create temporary view createOrReplaceTempView() Use invoke() with method directly
Write data to table saveAsTable() spark_write_table()
Write data to a specified format write.df() spark_write_<format>()
Read data from table tableToDF() tbl() or spark_read_table()
Read data from a specified format read.df() spark_read_<format>()

Loading data

To convert an R data frame to a Spark DataFrame, or to create a temporary view out of a DataFrame to apply SQL to it:

SparkR

mtcars_df <- createDataFrame(mtcars)

sparklyr

mtcars_tbl <- copy_to(
  sc,
  df = mtcars,
  name = "mtcars_tmp",
  overwrite = TRUE,
  memory = FALSE
)

copy_to() creates a temporary view using the specified name. You can use name to reference data if you are using SQL directly (for example, sdf_sql()). Also, copy_to() caches data by setting the memory parameter to TRUE.

Creating views

The following code examples show how temporary views are created:

SparkR

createOrReplaceTempView(mtcars_df, "mtcars_tmp_view")

sparklyr

spark_dataframe(mtcars_tbl) |>
  invoke("createOrReplaceTempView", "mtcars_tmp_view")

Writing data

The following code examples show how data is written:

SparkR

# Save a DataFrame to Unity Catalog
saveAsTable(
  mtcars_df,
  tableName = "<catalog>.<schema>.<table>",
  mode = "overwrite"
)

# Save a DataFrame to local filesystem using Delta format
write.df(
  mtcars_df,
  path = "file:/<path/to/save/delta/mtcars>",
  source = "delta",
  mode = "overwrite"
)

sparklyr

# Save tbl_spark to Unity Catalog
spark_write_table(
  mtcars_tbl,
  name = "<catalog>.<schema>.<table>",
  mode = "overwrite"
)

# Save tbl_spark to local filesystem using Delta format
spark_write_delta(
  mtcars_tbl,
  path = "file:/<path/to/save/delta/mtcars>",
  mode = "overwrite"
)

# Use DBI
library(DBI)
dbWriteTable(
  sc,
  value = mtcars_tbl,
  name = "<catalog>.<schema>.<table>",
  overwrite = TRUE
)

Reading data

The following code examples show how data is read:

SparkR

# Load a Unity Catalog table as a DataFrame
tableToDF("<catalog>.<schema>.<table>")

# Load csv file into a DataFrame
read.df(
  path = "file:/<path/to/read/csv/data.csv>",
  source = "csv",
  header = TRUE,
  inferSchema = TRUE
)

# Load Delta from local filesystem as a DataFrame
read.df(
  path = "file:/<path/to/read/delta/mtcars>",
  source = "delta"
)

# Load data from a table using SQL - Databricks recommendeds using `tableToDF`
sql("SELECT * FROM <catalog>.<schema>.<table>")

sparklyr

# Load table from Unity Catalog with dplyr
tbl(sc, "<catalog>.<schema>.<table>")

# or using `in_catalog`
tbl(sc, in_catalog("<catalog>", "<schema>", "<table>"))

# Load csv from local filesystem as tbl_spark
spark_read_csv(
  sc,
  name = "mtcars_csv",
  path = "file:/<path/to/csv/mtcars>",
  header = TRUE,
  infer_schema = TRUE
)

# Load delta from local filesystem as tbl_spark
spark_read_delta(
  sc,
  name = "mtcars_delta",
  path = "file:/tmp/test/sparklyr1"
)

# Load data using SQL
sdf_sql(sc, "SELECT * FROM <catalog>.<schema>.<table>")

Processing data

Select and filter

SparkR

# Select specific columns
select(mtcars_df, "mpg", "cyl", "hp")

# Filter rows where mpg > 20
filter(mtcars_df, mtcars_df$mpg > 20)

sparklyr

# Select specific columns
mtcars_tbl |>
  select(mpg, cyl, hp)

# Filter rows where mpg > 20
mtcars_tbl |>
  filter(mpg > 20)

Add columns

SparkR

# Add a new column 'power_to_weight' (hp divided by wt)
withColumn(mtcars_df, "power_to_weight", mtcars_df$hp / mtcars_df$wt)

sparklyr

# Add a new column 'power_to_weight' (hp divided by wt)
mtcars_tbl |>
  mutate(power_to_weight = hp / wt)

Grouping and aggregation

SparkR

# Calculate average mpg and hp by number of cylinders
mtcars_df |>
  groupBy("cyl") |>
  summarize(
    avg_mpg = avg(mtcars_df$mpg),
    avg_hp = avg(mtcars_df$hp)
  )

sparklyr

# Calculate average mpg and hp by number of cylinders
mtcars_tbl |>
  group_by(cyl) |>
  summarize(
    avg_mpg = mean(mpg),
    avg_hp = mean(hp)
  )

Joins

Suppose we have another dataset with cylinder labels that we want to join to mtcars.

SparkR

# Create another DataFrame with cylinder labels
cylinders <- data.frame(
  cyl = c(4, 6, 8),
  cyl_label = c("Four", "Six", "Eight")
)
cylinders_df <- createDataFrame(cylinders)

# Join mtcars_df with cylinders_df
join(
  x = mtcars_df,
  y = cylinders_df,
  mtcars_df$cyl == cylinders_df$cyl,
  joinType = "inner"
)

sparklyr

# Create another SparkDataFrame with cylinder labels
cylinders <- data.frame(
  cyl = c(4, 6, 8),
  cyl_label = c("Four", "Six", "Eight")
)
cylinders_tbl <- copy_to(sc, cylinders, "cylinders", overwrite = TRUE)

# join mtcars_df with cylinders_tbl
mtcars_tbl |>
  inner_join(cylinders_tbl, by = join_by(cyl))

User defined functions (UDFs)

To create a custom function for categorization:

# Define the custom function
categorize_hp <- function(df)
  df$hp_category <- ifelse(df$hp > 150, "High", "Low") # a real-world example would use case_when() with mutate()
  df

SparkR

SparkR requires defining the output schema explicitly before applying a function:

# Define the schema for the output DataFrame
schema <- structType(
  structField("mpg", "double"),
  structField("cyl", "double"),
  structField("disp", "double"),
  structField("hp", "double"),
  structField("drat", "double"),
  structField("wt", "double"),
  structField("qsec", "double"),
  structField("vs", "double"),
  structField("am", "double"),
  structField("gear", "double"),
  structField("carb", "double"),
  structField("hp_category", "string")
)

# Apply the function across partitions
dapply(
  mtcars_df,
  func = categorize_hp,
  schema = schema
)

# Apply the same function to each group of a DataFrame. Note that the schema is still required.
gapply(
  mtcars_df,
  cols = "hp",
  func = categorize_hp,
  schema = schema
)

sparklyr

# Load Arrow to avoid cryptic errors
library(arrow)

# Apply the function over data.
# By default this applies to each partition.
mtcars_tbl |>
  spark_apply(f = categorize_hp)

# Apply the function over data
# Use `group_by` to apply data over groups
mtcars_tbl |>
  spark_apply(
    f = summary,
    group_by = "hp" # This isn't changing the resulting output as the functions behavior is applied to rows independently.
  )

spark.lapply() vs spark_apply()

In SparkR, spark.lapply() operates on R lists rather than DataFrames. There's no direct equivalent in sparklyr, but you can achieve similar behavior with spark_apply() by working with a DataFrame that includes unique identifiers and grouping by those IDs. In some cases, row-wise operations can also provide comparable functionality. For more information about spark_apply(), see Distributing R Computations.

SparkR

# Define a list of integers
numbers <- list(1, 2, 3, 4, 5)

# Define a function to apply
square <- function(x)
  x * x

# Apply the function over list using Spark
spark.lapply(numbers, square)

sparklyr

# Create a DataFrame of given length
sdf <- sdf_len(sc, 5, repartition = 1)

# Apply function to each partition of the DataFrame
# spark_apply() defaults to processing data based on number of partitions.
# In this case it will return a single row due to repartition = 1.
spark_apply(sdf, f = nrow)

# Apply function to each row (option 1)
# To force behaviour like spark.lapply() you can create a DataFrame with N rows and force grouping with group_by set to a unique row identifier. In this case it's the id column automatically generated by sdf_len()). This will return N rows.
spark_apply(sdf, f = nrow, group_by = "id")

# Apply function to each row (option 2)
# This requires writing a function that operates across rows of a data.frame, in some occasions this may be faster relative to option 1. Specifying group_by in optional for this example. This example does not require rowwise(), but is just to illustrate one method to force computations to be for every row.
row_func <- function(df)
  df |>
    dplyr::rowwise() |>
    dplyr::mutate(x = id * 2)

spark_apply(sdf, f = row_func)

Machine learning

Full SparkR and sparklyr examples for machine learning are in the Spark ML Guide and sparklyr reference.

Note

If you are not using Spark MLlib, Databricks recommends using UDFs to train with the library of your choice (for example xgboost).

Linear regression

SparkR

# Select features
training_df <- select(mtcars_df, "mpg", "hp", "wt")

# Fit the model using Generalized Linear Model (GLM)
linear_model <- spark.glm(training_df, mpg ~ hp + wt, family = "gaussian")

# View model summary
summary(linear_model)

sparklyr

# Select features
training_tbl <- mtcars_tbl |>
  select(mpg, hp, wt)

# Fit the model using Generalized Linear Model (GLM)
linear_model <- training_tbl |>
  ml_linear_regression(response = "mpg", features = c("hp", "wt"))

# View model summary
summary(linear_model)

K-means clustering

SparkR

# Apply KMeans clustering with 3 clusters using mpg and hp as features
kmeans_model <- spark.kmeans(mtcars_df, mpg ~ hp, k = 3)

# Get cluster predictions
predict(kmeans_model, mtcars_df)

sparklyr

# Use mpg and hp as features
features_tbl <- mtcars_tbl |>
  select(mpg, hp)

# Assemble features into a vector column
features_vector_tbl <- features_tbl |>
  ft_vector_assembler(
    input_cols = c("mpg", "hp"),
    output_col = "features"
  )

# Apply K-Means clustering
kmeans_model <- features_vector_tbl |>
  ml_kmeans(features_col = "features", k = 3)

# Get cluster predictions
ml_predict(kmeans_model, features_vector_tbl)

Performance and optimization

Collecting

Both SparkR and sparklyr use collect() to convert Spark DataFrames to R data frames. Only collect small amounts of data back to R data frames, or the Spark driver will run out of memory.

To prevent out of memory errors, SparkR has built-in optimizations in Databricks Runtime that help collect data or execute user-defined functions.

To ensure optimal performance with sparklyr for collecting data and UDFs on Databricks Runtime versions below 14.3 LTS, load the arrow package:

library(arrow)

In-memory partitioning

SparkR

# Repartition the SparkDataFrame based on 'cyl' column
repartition(mtcars_df, col = mtcars_df$cyl)

# Repartition the SparkDataFrame to number of partitions
repartition(mtcars_df, numPartitions = 10)

# Coalesce the DataFrame to number of partitions
coalesce(mtcars_df, numPartitions = 1)

# Get number of partitions
getNumPartitions(mtcars_df)

sparklyr

# Repartition the tbl_spark based on 'cyl' column
sdf_repartition(mtcars_tbl, partition_by = "cyl")

# Repartition the tbl_spark to number of partitions
sdf_repartition(mtcars_tbl, partitions = 10)

# Coalesce the tbl_spark to number of partitions
sdf_coalesce(mtcars_tbl, partitions = 1)

# Get number of partitions
sdf_num_partitions(mtcars_tbl)

Caching

SparkR

# Cache the DataFrame in memory
cache(mtcars_df)

sparklyr

# Cache the tbl_spark in memory
tbl_cache(sc, name = "mtcars_tmp")