Work with DataFrames and tables in R
Important
SparkR in Databricks is deprecated in Databricks Runtime 16.0 and above. Databricks recommends using sparklyr instead.
This article describes how to use R packages such as SparkR, sparklyr, and dplyr to work with R data.frame
s, Spark DataFrames, and in-memory tables.
Note that as you work with SparkR, sparklyr, and dplyr, you may find that you can complete a particular operation with all of these packages, and you can use the package that you are most comfortable with. For example, to run a query, you can call functions such as SparkR::sql
, sparklyr::sdf_sql
, and dplyr::select
. At other times, you might be able to complete an operation with just one or two of these packages, and the operation you choose depends on your usage scenario. For example, the way you call sparklyr::sdf_quantile
differs slightly from the way you call dplyr::percentile_approx
, even though both functions calcuate quantiles.
You can use SQL as a bridge between SparkR and sparklyr. For example, you can use SparkR::sql
to query tables that you create with sparklyr. You can use sparklyr::sdf_sql
to query tables that you create with SparkR. And dplyr
code always gets translated to SQL in memory before it is run. See also API interoperability and SQL Translation.
Load SparkR, sparklyr, and dplyr
The SparkR, sparklyr, and dplyr packages are included in the Databricks Runtime that is installed on Azure Databricks clusters. Therefore, you do not need to call the usual install.package
before you can begin call these packages. However, you must still load these packages with library
first. For example, from within an R notebook in an Azure Databricks workspace, run the following code in a notebook cell to load SparkR, sparklyr, and dplyr:
library(SparkR)
library(sparklyr)
library(dplyr)
Connect sparklyr to a cluster
After you load sparklyr, you must call sparklyr::spark_connect
to connect to the cluster, specifying the databricks
connection method. For example, run the following code in a notebook cell to connect to the cluster that hosts the notebook:
sc <- spark_connect(method = "databricks")
In contrast, an Azure Databricks notebook already establishes a SparkSession
on the cluster for use with SparkR, so you do not need to call SparkR::sparkR.session
before you can begin calling SparkR.
Upload a JSON data file to your workspace
Many of the code examples in this article are based on data in a specific location in your Azure Databricks workspace, with specific column names and data types. The data for this code example originates in a JSON file named book.json
from within GitHub. To get this file and upload it to your workspace:
- Go to the books.json file on GitHub and use a text editor to copy its contents to a file named
books.json
somewhere on your local machine. - In your Azure Databricks workspace sidebar, click Catalog.
- Click Create Table.
- On the Upload File tab, drop the
books.json
file from your local machine to the Drop files to upload box. Or select click to browse, and browse to thebooks.json
file from your local machine.
By default, Azure Databricks uploads your local books.json
file to the DBFS location in your workspace with the path /FileStore/tables/books.json
.
Do not click Create Table with UI or Create Table in Notebook. The code examples in this article use the data in the uploaded books.json
file in this DBFS location.
Read the JSON data into a DataFrame
Use sparklyr::spark_read_json
to read the uploaded JSON file into a DataFrame, specifying the connection, the path to the JSON file, and a name for the internal table representation of the data. For this example, you must specify that the book.json
file contains multiple lines. Specifying the columns’ schema here is optional. Otherwise, sparklyr infers the columns’ schema by default. For example, run the following code in a notebook cell to read the uploaded JSON file’s data into a DataFrame named jsonDF
:
jsonDF <- spark_read_json(
sc = sc,
name = "jsonTable",
path = "/FileStore/tables/books.json",
options = list("multiLine" = TRUE),
columns = c(
author = "character",
country = "character",
imageLink = "character",
language = "character",
link = "character",
pages = "integer",
title = "character",
year = "integer"
)
)
Print the first few rows of a DataFrame
You can use SparkR::head
, SparkR::show
, or sparklyr::collect
to print the first rows of a DataFrame. By default, head
prints the first six rows by default. show
and collect
print the first 10 rows. For example, run the following code in a notebook cell to print the first rows of the DataFrame named jsonDF
:
head(jsonDF)
# Source: spark<?> [?? x 8]
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Akk… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid Em… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/Ir… images… Arabic "htt… 288 One … 1200
# … with abbreviated variable names ¹imageLink, ²language
show(jsonDF)
# Source: spark<jsonTable> [?? x 8]
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Ak… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid E… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/I… images… Arabic "htt… 288 One … 1200
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350
# 8 Jane Austen United Kingd… images… English "htt… 226 Prid… 1813
# 9 Honoré de Balzac France images… French "htt… 443 Le P… 1835
# 10 Samuel Beckett Republic of … images… French… "htt… 256 Moll… 1952
# … with more rows, and abbreviated variable names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows
collect(jsonDF)
# A tibble: 100 × 8
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Ak… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid E… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/I… images… Arabic "htt… 288 One … 1200
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350
# 8 Jane Austen United Kingd… images… English "htt… 226 Prid… 1813
# 9 Honoré de Balzac France images… French "htt… 443 Le P… 1835
# 10 Samuel Beckett Republic of … images… French… "htt… 256 Moll… 1952
# … with 90 more rows, and abbreviated variable names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows
Run SQL queries, and write to and read from a table
You can use dplyr functions to run SQL queries on a DataFrame. For example, run the following code in a notebook cell to use dplyr::group_by
and dployr::count
to get counts by author from the DataFrame named jsonDF
. Use dplyr::arrange
and dplyr::desc
to sort the result in descending order by counts. Then print the first 10 rows by default.
group_by(jsonDF, author) %>%
count() %>%
arrange(desc(n))
# Source: spark<?> [?? x 2]
# Ordered by: desc(n)
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Gustave Flaubert 2
# 8 Homer 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with more rows
# ℹ Use `print(n = ...)` to see more rows
You could then use sparklyr::spark_write_table
to write the result to a table in Azure Databricks. For example, run the following code in a notebook cell to rerun the query and then write the result to a table named json_books_agg
:
group_by(jsonDF, author) %>%
count() %>%
arrange(desc(n)) %>%
spark_write_table(
name = "json_books_agg",
mode = "overwrite"
)
To verify that the table was created, you could then use sparklyr::sdf_sql
along with SparkR::showDF
to display the table’s data. For example, run the following code in a notebook cell to query the table into a DataFrame and then use sparklyr::collect
to print the first 10 rows of the DataFrame by default:
collect(sdf_sql(sc, "SELECT * FROM json_books_agg"))
# A tibble: 82 × 2
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Homer 2
# 8 Gustave Flaubert 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows
You could also use sparklyr::spark_read_table
to do something similar. For example, run the following code in a notebook cell to query the preceding DataFrame named jsonDF
into a DataFrame and then use sparklyr::collect
to print the first 10 rows of the DataFrame by default:
fromTable <- spark_read_table(
sc = sc,
name = "json_books_agg"
)
collect(fromTable)
# A tibble: 82 × 2
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Homer 2
# 8 Gustave Flaubert 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows
Add columns and compute column values in a DataFrame
You can use dplyr functions to add columns to DataFrames and to compute columns’ values.
For example, run the following code in a notebook cell to get the contents of the DataFrame named jsonDF
. Use dplyr::mutate
to add a column named today
, and fill this new column with the current timestamp. Then write these contents to a new DataFrame named withDate
and use dplyr::collect
to print the new DataFrame’s first 10 rows by default.
Note
dplyr::mutate
only accepts arguments that conform to Hive’s built-in functions (also known as UDFs) and built-in aggregate functions (also known as UDAFs). For general information, see Hive Functions. For information about the date-related functions in this section, see Date Functions.
withDate <- jsonDF %>%
mutate(today = current_timestamp())
collect(withDate)
# A tibble: 100 × 9
# author country image…¹ langu…² link pages title year today
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int> <dttm>
# 1 Chinua A… Nigeria images… English "htt… 209 Thin… 1958 2022-09-27 21:32:59
# 2 Hans Chr… Denmark images… Danish "htt… 784 Fair… 1836 2022-09-27 21:32:59
# 3 Dante Al… Italy images… Italian "htt… 928 The … 1315 2022-09-27 21:32:59
# 4 Unknown Sumer … images… Akkadi… "htt… 160 The … -1700 2022-09-27 21:32:59
# 5 Unknown Achaem… images… Hebrew "htt… 176 The … -600 2022-09-27 21:32:59
# 6 Unknown India/… images… Arabic "htt… 288 One … 1200 2022-09-27 21:32:59
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350 2022-09-27 21:32:59
# 8 Jane Aus… United… images… English "htt… 226 Prid… 1813 2022-09-27 21:32:59
# 9 Honoré d… France images… French "htt… 443 Le P… 1835 2022-09-27 21:32:59
# 10 Samuel B… Republ… images… French… "htt… 256 Moll… 1952 2022-09-27 21:32:59
# … with 90 more rows, and abbreviated variable names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows
Now use dplyr::mutate
to add two more columns to the contents of the withDate
DataFrame. The new month
and year
columns contain the numeric month and year from the today
column. Then write these contents to a new DataFrame named withMMyyyy
, and use dplyr::select
along with dplyr::collect
to print the author
, title
, month
and year
columns of the new DataFrame’s first ten rows by default:
withMMyyyy <- withDate %>%
mutate(month = month(today),
year = year(today))
collect(select(withMMyyyy, c("author", "title", "month", "year")))
# A tibble: 100 × 4
# author title month year
# <chr> <chr> <int> <int>
# 1 Chinua Achebe Things Fall Apart 9 2022
# 2 Hans Christian Andersen Fairy tales 9 2022
# 3 Dante Alighieri The Divine Comedy 9 2022
# 4 Unknown The Epic Of Gilgamesh 9 2022
# 5 Unknown The Book Of Job 9 2022
# 6 Unknown One Thousand and One Nights 9 2022
# 7 Unknown Njál's Saga 9 2022
# 8 Jane Austen Pride and Prejudice 9 2022
# 9 Honoré de Balzac Le Père Goriot 9 2022
# 10 Samuel Beckett Molloy, Malone Dies, The Unnamable, the … 9 2022
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows
Now use dplyr::mutate
to add two more columns to the contents of the withMMyyyy
DataFrame. The new formatted_date
columns contains the yyyy-MM-dd
portion from the today
column, while the new day
column contains the numeric day from the new formatted_date
column. Then write these contents to a new DataFrame named withUnixTimestamp
, and use dplyr::select
along with dplyr::collect
to print the title
, formatted_date
, and day
columns of the new DataFrame’s first ten rows by default:
withUnixTimestamp <- withMMyyyy %>%
mutate(formatted_date = date_format(today, "yyyy-MM-dd"),
day = dayofmonth(formatted_date))
collect(select(withUnixTimestamp, c("title", "formatted_date", "day")))
# A tibble: 100 × 3
# title formatted_date day
# <chr> <chr> <int>
# 1 Things Fall Apart 2022-09-27 27
# 2 Fairy tales 2022-09-27 27
# 3 The Divine Comedy 2022-09-27 27
# 4 The Epic Of Gilgamesh 2022-09-27 27
# 5 The Book Of Job 2022-09-27 27
# 6 One Thousand and One Nights 2022-09-27 27
# 7 Njál's Saga 2022-09-27 27
# 8 Pride and Prejudice 2022-09-27 27
# 9 Le Père Goriot 2022-09-27 27
# 10 Molloy, Malone Dies, The Unnamable, the trilogy 2022-09-27 27
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows
Create a temporary view
You can create named temporary views in memory that are based on existing DataFrames. For example, run the following code in a notebook cell to use SparkR::createOrReplaceTempView
to get the contents of the preceding DataFrame named jsonTable
and make a temporary view out of it named timestampTable
. Then, use sparklyr::spark_read_table
to read the temporary view’s contents. Use sparklyr::collect
to print the first 10 rows of the temporary table by default:
createOrReplaceTempView(withTimestampDF, viewName = "timestampTable")
spark_read_table(
sc = sc,
name = "timestampTable"
) %>% collect()
# A tibble: 100 × 10
# author country image…¹ langu…² link pages title year today
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int> <dttm>
# 1 Chinua A… Nigeria images… English "htt… 209 Thin… 1958 2022-09-27 21:11:56
# 2 Hans Chr… Denmark images… Danish "htt… 784 Fair… 1836 2022-09-27 21:11:56
# 3 Dante Al… Italy images… Italian "htt… 928 The … 1315 2022-09-27 21:11:56
# 4 Unknown Sumer … images… Akkadi… "htt… 160 The … -1700 2022-09-27 21:11:56
# 5 Unknown Achaem… images… Hebrew "htt… 176 The … -600 2022-09-27 21:11:56
# 6 Unknown India/… images… Arabic "htt… 288 One … 1200 2022-09-27 21:11:56
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350 2022-09-27 21:11:56
# 8 Jane Aus… United… images… English "htt… 226 Prid… 1813 2022-09-27 21:11:56
# 9 Honoré d… France images… French "htt… 443 Le P… 1835 2022-09-27 21:11:56
# 10 Samuel B… Republ… images… French… "htt… 256 Moll… 1952 2022-09-27 21:11:56
# … with 90 more rows, 1 more variable: month <chr>, and abbreviated variable
# names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows, and `colnames()` to see all variable names
Perform statistical analysis on a DataFrame
You can use sparklyr along with dplyr for statistical analyses.
For example, create a DataFrame to run statistics on. To do this, run the following code in a notebook cell to use sparklyr::sdf_copy_to
to write the contents of the iris
dataset that is built into R to a DataFrame named iris
. Use sparklyr::sdf_collect
to print the first 10 rows of the temporary table by default:
irisDF <- sdf_copy_to(
sc = sc,
x = iris,
name = "iris",
overwrite = TRUE
)
sdf_collect(irisDF, "row-wise")
# A tibble: 150 × 5
# Sepal_Length Sepal_Width Petal_Length Petal_Width Species
# <dbl> <dbl> <dbl> <dbl> <chr>
# 1 5.1 3.5 1.4 0.2 setosa
# 2 4.9 3 1.4 0.2 setosa
# 3 4.7 3.2 1.3 0.2 setosa
# 4 4.6 3.1 1.5 0.2 setosa
# 5 5 3.6 1.4 0.2 setosa
# 6 5.4 3.9 1.7 0.4 setosa
# 7 4.6 3.4 1.4 0.3 setosa
# 8 5 3.4 1.5 0.2 setosa
# 9 4.4 2.9 1.4 0.2 setosa
# 10 4.9 3.1 1.5 0.1 setosa
# … with 140 more rows
# ℹ Use `print(n = ...)` to see more rows
Now use dplyr::group_by
to group rows by the Species
column. Use dplyr::summarize
along with dplyr::percentile_approx
to calculate summary statistics by the 25th, 50th, 75th, and 100th quantiles of the Sepal_Length
column by Species
. Use sparklyr::collect
a print the results:
Note
dplyr::summarize
only accepts arguments that conform to Hive’s built-in functions (also known as UDFs) and built-in aggregate functions (also known as UDAFs). For general information, see Hive Functions. For information about percentile_approx
, see Built-in Aggregate Functions(UDAF).
quantileDF <- irisDF %>%
group_by(Species) %>%
summarize(
quantile_25th = percentile_approx(
Sepal_Length,
0.25
),
quantile_50th = percentile_approx(
Sepal_Length,
0.50
),
quantile_75th = percentile_approx(
Sepal_Length,
0.75
),
quantile_100th = percentile_approx(
Sepal_Length,
1.0
)
)
collect(quantileDF)
# A tibble: 3 × 5
# Species quantile_25th quantile_50th quantile_75th quantile_100th
# <chr> <dbl> <dbl> <dbl> <dbl>
# 1 virginica 6.2 6.5 6.9 7.9
# 2 versicolor 5.6 5.9 6.3 7
# 3 setosa 4.8 5 5.2 5.8
Similar results can be calculated, for example, by using sparklyr::sdf_quantile
:
print(sdf_quantile(
x = irisDF %>%
filter(Species == "virginica"),
column = "Sepal_Length",
probabilities = c(0.25, 0.5, 0.75, 1.0)
))
# 25% 50% 75% 100%
# 6.2 6.5 6.9 7.9