訓練
在 R 中使用 DataFrame 和數據表
重要
Databricks 中的 SparkR Databricks Runtime 16.0 和更新版本中已被取代。 Databricks 建議改用 sparklyr。
本文說明如何使用 R 套件,例如 SparkR、sparklyr,以及 dplyr 來搭配 R data.frame
、Spark DataFrames和記憶體內部數據表。
請注意,當您使用 SparkR、sparklyr 和 dplyr 時,您可能會發現您可以使用所有這些套件完成特定作業,而且您可以使用最熟悉的套件。 例如,若要執行查詢,您可以呼叫 SparkR::sql
、sparklyr::sdf_sql
和 dplyr::select
等函式。 在其他情況下,您可能只需利用其中一到兩個套件來完成操作,而您選擇的操作將取決於您的使用情境。 例如,您呼叫 sparklyr::sdf_quantile
的方式與呼叫 dplyr::percentile_approx
的方式略有不同,雖然這兩個函數都計算分位數。
您可以使用 SQL 作為 SparkR 與 sparklyr 之間的網橋。 例如,您可以使用 SparkR::sql
來查詢使用 sparklyr 建立的數據表。 您可以使用 sparklyr::sdf_sql
來查詢使用 SparkR 建立的數據表。 而且 dplyr
程式代碼在執行之前,一律會轉譯為記憶體中的 SQL。 請參閱 API 互作性 和 SQL 翻譯。
SparkR、sparklyr 和 dplyr 套件包含在 Azure Databricks 上安裝的 Databricks 運行時間 叢集上。 因此,您不需要呼叫一般 install.package
,才能開始呼叫這些套件。 不過,您仍然必須使用 library
載入這些套件。 例如,從 Azure Databricks 工作區中的 R Notebook,在 Notebook 數據格中執行下列程式代碼以載入 SparkR、sparklyr 和 dplyr:
library(SparkR)
library(sparklyr)
library(dplyr)
載入 sparklyr 之後,您必須呼叫 sparklyr::spark_connect
以連線到叢集,並指定 databricks
連接方法。 例如,在筆記本數據格中執行下列程式代碼,以連線到裝載筆記本的叢集:
sc <- spark_connect(method = "databricks")
相反地,Azure Databricks Notebook 已在叢集上建立 SparkSession
以搭配 SparkR 使用,因此您不需要呼叫 SparkR::sparkR.session
,才能開始呼叫 SparkR。
本文中的許多程式代碼範例都是以 Azure Databricks 工作區中特定位置的數據為基礎,其中包含特定數據行名稱和數據類型。 此程式代碼範例的數據源自於 GitHub 內名為 book.json
的 JSON 檔案。 若要取得此檔案,並將它上傳至您的工作區:
- 移至 GitHub 上的 books.json 檔案,並使用文字編輯器將其內容複製到本機電腦上某處名為
books.json
的檔案。 - 在 Azure Databricks 工作區的側邊欄中,按一下 [目錄]。
- 點選 「建立資料表」。
- 在 [上傳檔案] 索引標籤上,將
books.json
檔案從本機電腦拖放至 [拖放檔案以上傳] 方塊。 或選取 點擊以瀏覽,然後瀏覽至本機電腦的books.json
檔案。
根據預設,Azure Databricks 會將本機 books.json
檔案上傳到工作區中的 DBFS 位置,其路徑為 /FileStore/tables/books.json
。
請勿按下 [使用 UI 建立資料表] 或 [在 Notebook 中建立資料表]。 本文中的程式代碼範例會使用此 DBFS 位置中上傳 books.json
檔案中的數據。
使用 sparklyr::spark_read_json
將上傳的 JSON 檔案讀入 DataFrame、指定連線、JSON 檔案的路徑,以及資料內部數據表表示的名稱。 在此範例中,您必須指定 book.json
檔案包含多行。 在這裡指定數據行的架構是選擇性的。 否則,sparklyr 預設會推斷數據行的架構。 例如,在筆記本數據格中執行下列程式代碼,將上傳的 JSON 檔案資料讀入名為 jsonDF
的數據框架:
jsonDF <- spark_read_json(
sc = sc,
name = "jsonTable",
path = "/FileStore/tables/books.json",
options = list("multiLine" = TRUE),
columns = c(
author = "character",
country = "character",
imageLink = "character",
language = "character",
link = "character",
pages = "integer",
title = "character",
year = "integer"
)
)
您可以使用 SparkR::head
、SparkR::show
或 sparklyr::collect
來列印 DataFrame 的第一個數據列。 根據預設,head
預設會列印前六個數據列。
show
和 collect
顯示前 10 行。 例如,在筆記本數據格中執行下列程序代碼,以列印名為 jsonDF
之 DataFrame 的第一個數據列:
head(jsonDF)
# Source: spark<?> [?? x 8]
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Akk… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid Em… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/Ir… images… Arabic "htt… 288 One … 1200
# … with abbreviated variable names ¹imageLink, ²language
show(jsonDF)
# Source: spark<jsonTable> [?? x 8]
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Ak… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid E… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/I… images… Arabic "htt… 288 One … 1200
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350
# 8 Jane Austen United Kingd… images… English "htt… 226 Prid… 1813
# 9 Honoré de Balzac France images… French "htt… 443 Le P… 1835
# 10 Samuel Beckett Republic of … images… French… "htt… 256 Moll… 1952
# … with more rows, and abbreviated variable names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows
collect(jsonDF)
# A tibble: 100 × 8
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Ak… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid E… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/I… images… Arabic "htt… 288 One … 1200
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350
# 8 Jane Austen United Kingd… images… English "htt… 226 Prid… 1813
# 9 Honoré de Balzac France images… French "htt… 443 Le P… 1835
# 10 Samuel Beckett Republic of … images… French… "htt… 256 Moll… 1952
# … with 90 more rows, and abbreviated variable names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows
您可以使用 dplyr 函式在 DataFrame 上執行 SQL 查詢。 例如,在筆記本的單元格中執行下列程式碼,以使用 dplyr::group_by
和 dployr::count
,從名為 jsonDF
的 DataFrame 獲取作者計數。 使用 dplyr::arrange
和 dplyr::desc
,依計數以遞減順序排序結果。 然後預設會列印前10個數據列。
group_by(jsonDF, author) %>%
count() %>%
arrange(desc(n))
# Source: spark<?> [?? x 2]
# Ordered by: desc(n)
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Gustave Flaubert 2
# 8 Homer 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with more rows
# ℹ Use `print(n = ...)` to see more rows
然後,您可以使用 sparklyr::spark_write_table
將結果寫入 Azure Databricks 中的數據表。 例如,在筆記本數據格中執行下列程式代碼以重新執行查詢,然後將結果寫入名為 json_books_agg
的數據表:
group_by(jsonDF, author) %>%
count() %>%
arrange(desc(n)) %>%
spark_write_table(
name = "json_books_agg",
mode = "overwrite"
)
若要確認數據表已建立,您可以使用 sparklyr::sdf_sql
以及 SparkR::showDF
來顯示資料表的數據。 例如,在筆記本的單元格中執行下列程式碼,將表格查詢到 DataFrame,然後使用 sparklyr::collect
預設列印 DataFrame 的前 10 行:
collect(sdf_sql(sc, "SELECT * FROM json_books_agg"))
# A tibble: 82 × 2
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Homer 2
# 8 Gustave Flaubert 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows
您也可以使用 sparklyr::spark_read_table
來執行類似動作。 例如,在筆記本的儲存格中執行下列程式碼,查詢先前名為 jsonDF
的 DataFrame,然後使用 sparklyr::collect
預設地列印出該 DataFrame 的前 10 個數據列:
fromTable <- spark_read_table(
sc = sc,
name = "json_books_agg"
)
collect(fromTable)
# A tibble: 82 × 2
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Homer 2
# 8 Gustave Flaubert 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows
您可以使用 dplyr 函式,將數據行新增至 DataFrame,以及計算數據行的值。
例如,在筆記本數據格中執行下列程序代碼,以取得名為 jsonDF
之 DataFrame 的內容。 使用 dplyr::mutate
新增名為 today
的數據行,並以目前的時間戳填入這個新數據行。 然後將這些內容寫入名為 withDate
的新 DataFrame,並使用 dplyr::collect
預設列印新 DataFrame 的前 10 個數據列。
備註
dplyr::mutate
只接受符合Hive內建函式(也稱為UDF)和內建聚合函數(也稱為UDAF)的自變數。 如需一般資訊,請參閱 Hive 功能。 如需本節中日期相關函式的詳細資訊,請參閱 Date Functions。
withDate <- jsonDF %>%
mutate(today = current_timestamp())
collect(withDate)
# A tibble: 100 × 9
# author country image…¹ langu…² link pages title year today
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int> <dttm>
# 1 Chinua A… Nigeria images… English "htt… 209 Thin… 1958 2022-09-27 21:32:59
# 2 Hans Chr… Denmark images… Danish "htt… 784 Fair… 1836 2022-09-27 21:32:59
# 3 Dante Al… Italy images… Italian "htt… 928 The … 1315 2022-09-27 21:32:59
# 4 Unknown Sumer … images… Akkadi… "htt… 160 The … -1700 2022-09-27 21:32:59
# 5 Unknown Achaem… images… Hebrew "htt… 176 The … -600 2022-09-27 21:32:59
# 6 Unknown India/… images… Arabic "htt… 288 One … 1200 2022-09-27 21:32:59
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350 2022-09-27 21:32:59
# 8 Jane Aus… United… images… English "htt… 226 Prid… 1813 2022-09-27 21:32:59
# 9 Honoré d… France images… French "htt… 443 Le P… 1835 2022-09-27 21:32:59
# 10 Samuel B… Republ… images… French… "htt… 256 Moll… 1952 2022-09-27 21:32:59
# … with 90 more rows, and abbreviated variable names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows
現在,使用 dplyr::mutate
將兩個數據行新增至 withDate
DataFrame 的內容。 新的 month
和 year
數據行包含來自 today
數據行的數值月份和年份。 然後將這些內容寫入名為 withMMyyyy
的新 DataFrame,並使用 dplyr::select
以及 dplyr::collect
,依預設列印新 DataFrame 前十個數據列的 author
、title
、month
和 year
數據行:
withMMyyyy <- withDate %>%
mutate(month = month(today),
year = year(today))
collect(select(withMMyyyy, c("author", "title", "month", "year")))
# A tibble: 100 × 4
# author title month year
# <chr> <chr> <int> <int>
# 1 Chinua Achebe Things Fall Apart 9 2022
# 2 Hans Christian Andersen Fairy tales 9 2022
# 3 Dante Alighieri The Divine Comedy 9 2022
# 4 Unknown The Epic Of Gilgamesh 9 2022
# 5 Unknown The Book Of Job 9 2022
# 6 Unknown One Thousand and One Nights 9 2022
# 7 Unknown Njál's Saga 9 2022
# 8 Jane Austen Pride and Prejudice 9 2022
# 9 Honoré de Balzac Le Père Goriot 9 2022
# 10 Samuel Beckett Molloy, Malone Dies, The Unnamable, the … 9 2022
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows
現在,使用 dplyr::mutate
將兩個數據行新增至 withMMyyyy
DataFrame 的內容。 新的 formatted_date
數據行包含 today
數據行的 yyyy-MM-dd
部分,而新的 day
資料行則包含新 formatted_date
資料行中的數值日。 然後將這些內容寫入名為 withUnixTimestamp
的新 DataFrame,並使用 dplyr::select
以及 dplyr::collect
,依預設列印新 DataFrame 前十個數據列的 title
、formatted_date
和 day
數據行:
withUnixTimestamp <- withMMyyyy %>%
mutate(formatted_date = date_format(today, "yyyy-MM-dd"),
day = dayofmonth(formatted_date))
collect(select(withUnixTimestamp, c("title", "formatted_date", "day")))
# A tibble: 100 × 3
# title formatted_date day
# <chr> <chr> <int>
# 1 Things Fall Apart 2022-09-27 27
# 2 Fairy tales 2022-09-27 27
# 3 The Divine Comedy 2022-09-27 27
# 4 The Epic Of Gilgamesh 2022-09-27 27
# 5 The Book Of Job 2022-09-27 27
# 6 One Thousand and One Nights 2022-09-27 27
# 7 Njál's Saga 2022-09-27 27
# 8 Pride and Prejudice 2022-09-27 27
# 9 Le Père Goriot 2022-09-27 27
# 10 Molloy, Malone Dies, The Unnamable, the trilogy 2022-09-27 27
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows
您可以在記憶體中建立以現有 DataFrame 為基礎的具名暫存檢視。 例如,在筆記本數據格中執行下列程式代碼,以使用 SparkR::createOrReplaceTempView
來取得名為 jsonTable
的先前 DataFrame 內容,並將暫存檢視從中命名為 timestampTable
。 然後,使用 sparklyr::spark_read_table
來讀取暫存檢視的內容。 根據預設,請使用 sparklyr::collect
列印臨時表的前10個資料列:
createOrReplaceTempView(withTimestampDF, viewName = "timestampTable")
spark_read_table(
sc = sc,
name = "timestampTable"
) %>% collect()
# A tibble: 100 × 10
# author country image…¹ langu…² link pages title year today
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int> <dttm>
# 1 Chinua A… Nigeria images… English "htt… 209 Thin… 1958 2022-09-27 21:11:56
# 2 Hans Chr… Denmark images… Danish "htt… 784 Fair… 1836 2022-09-27 21:11:56
# 3 Dante Al… Italy images… Italian "htt… 928 The … 1315 2022-09-27 21:11:56
# 4 Unknown Sumer … images… Akkadi… "htt… 160 The … -1700 2022-09-27 21:11:56
# 5 Unknown Achaem… images… Hebrew "htt… 176 The … -600 2022-09-27 21:11:56
# 6 Unknown India/… images… Arabic "htt… 288 One … 1200 2022-09-27 21:11:56
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350 2022-09-27 21:11:56
# 8 Jane Aus… United… images… English "htt… 226 Prid… 1813 2022-09-27 21:11:56
# 9 Honoré d… France images… French "htt… 443 Le P… 1835 2022-09-27 21:11:56
# 10 Samuel B… Republ… images… French… "htt… 256 Moll… 1952 2022-09-27 21:11:56
# … with 90 more rows, 1 more variable: month <chr>, and abbreviated variable
# names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows, and `colnames()` to see all variable names
您可以使用sparklyr和 dplyr進行統計分析。
例如,建立用於進行統計分析的DataFrame。 若要這樣做,請在筆記本數據格中執行下列程序代碼,以使用 sparklyr::sdf_copy_to
,將 R 內建 iris
數據集的內容寫入名為 iris
的數據框架。 根據預設,請使用 sparklyr::sdf_collect
列印臨時表的前10個資料列:
irisDF <- sdf_copy_to(
sc = sc,
x = iris,
name = "iris",
overwrite = TRUE
)
sdf_collect(irisDF, "row-wise")
# A tibble: 150 × 5
# Sepal_Length Sepal_Width Petal_Length Petal_Width Species
# <dbl> <dbl> <dbl> <dbl> <chr>
# 1 5.1 3.5 1.4 0.2 setosa
# 2 4.9 3 1.4 0.2 setosa
# 3 4.7 3.2 1.3 0.2 setosa
# 4 4.6 3.1 1.5 0.2 setosa
# 5 5 3.6 1.4 0.2 setosa
# 6 5.4 3.9 1.7 0.4 setosa
# 7 4.6 3.4 1.4 0.3 setosa
# 8 5 3.4 1.5 0.2 setosa
# 9 4.4 2.9 1.4 0.2 setosa
# 10 4.9 3.1 1.5 0.1 setosa
# … with 140 more rows
# ℹ Use `print(n = ...)` to see more rows
現在,使用 dplyr::group_by
,按照 Species
欄位分組行。 使用 dplyr::summarize
以及 dplyr::percentile_approx
,依 Species
計算 Sepal_Length
列的第 25、50、75 和第 100 百分位數的統計摘要。 使用 sparklyr::collect
列印結果:
備註
dplyr::summarize
只接受符合Hive內建函式(也稱為UDF)和內建聚合函數(也稱為UDAF)的自變數。 如需一般資訊,請參閱 Hive 函式。 如需 percentile_approx
的相關信息,請參閱 內建聚合函數(UDAF)。
quantileDF <- irisDF %>%
group_by(Species) %>%
summarize(
quantile_25th = percentile_approx(
Sepal_Length,
0.25
),
quantile_50th = percentile_approx(
Sepal_Length,
0.50
),
quantile_75th = percentile_approx(
Sepal_Length,
0.75
),
quantile_100th = percentile_approx(
Sepal_Length,
1.0
)
)
collect(quantileDF)
# A tibble: 3 × 5
# Species quantile_25th quantile_50th quantile_75th quantile_100th
# <chr> <dbl> <dbl> <dbl> <dbl>
# 1 virginica 6.2 6.5 6.9 7.9
# 2 versicolor 5.6 5.9 6.3 7
# 3 setosa 4.8 5 5.2 5.8
例如,您可以使用 sparklyr::sdf_quantile
來計算類似的結果:
print(sdf_quantile(
x = irisDF %>%
filter(Species == "virginica"),
column = "Sepal_Length",
probabilities = c(0.25, 0.5, 0.75, 1.0)
))
# 25% 50% 75% 100%
# 6.2 6.5 6.9 7.9
更多資源
文件
-
比較 SparkR 和 sparklyr - Azure Databricks
-
瞭解如何在 Azure Databricks 中透過 SparkR、sparklyr 和 RStudio 從 R 中使用 Apache Spark。
-
了解如何在 Azure Databricks 中透過 sparklyr 從 R 使用 Apache Spark。