使用 SparkR
SparkR 是一種 R 套件,提供輕量型前端以使用 R 的 Apache Spark。SparkR 提供分散式資料架構實作,支援選取、篩選、彙總等作業。SparkR 還支援使用 MLlib 的分散式機器學習。
透過 Spark 批次工作定義或互動式 Microsoft Fabric 筆記本使用 SparkR。
R 支援僅適用於 Spark3.1 或更新版本。 不支援 Spark 2.4 中的 R。
必要條件
取得 Microsoft Fabric 訂用帳戶。 或註冊免費的 Microsoft Fabric 試用版。
登入 Microsoft Fabric。
使用首頁左下方的體驗切換器,切換至 Fabric。
開啟或建立筆記本。 若要了解操作說明,請參閱如何使用 Microsoft Fabric 筆記本。
將語言選項設定為 SparkR (R),以變更主要語言。
將筆記本連結至 Lakehouse。 在左側選取 [新增] 以新增現有的 Lakehouse 或建立 Lakehouse。
讀取和寫入 SparkR DataFrame
從本機 R data.frame 讀取 SparkR DataFrame
建立 DataFrame 的最簡單方法是將本機 R data.frame 轉換為 Spark DataFrame。
# load SparkR pacakge
library(SparkR)
# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)
# displays the content of the DataFrame
display(df)
從 Lakehouse 讀取和寫入 SparkR DataFrame
資料可以儲存在叢集節點的本機檔案系統。 從 Lakehouse 讀取和寫入 SparkR DataFrame 的一般方法是 read.df
和 write.df
。 這些方法採用要載入的檔案的路徑以及資料來源的類型。 SparkR 支援原生讀取 CSV、JSON、文字及 Parquet 檔案。
要讀取和寫入 Lakehouse,請先將其新增至您的工作階段。 在筆記本左側,選取 [新增],以新增現有的 Lakehouse 或建立 Lakehouse。
注意
若要使用 Spark 套件 (例如 read.df
或 write.df
) 存取 Lakehouse 檔案,請使用其 ADFS 路徑 或 Spark 相對路徑。 在 Lakehouse 總管中,以滑鼠右鍵按一下想要存取的檔案或資料夾,並從特色選單複製其 ADFS 路徑或 Spark 相對路徑。
# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")
# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")
# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_pq)
Microsoft Fabric 已預安裝 tidyverse
。 您可以在熟悉的 R 套件中存取 Lakehouse 檔案,例如使用 readr::read_csv()
及 readr::write_csv()
讀取和寫入 Lakehouse 檔案。
注意
若要使用 R 套件存取 Lakehouse 檔案,需要使用檔案 API 路徑。 在 Lakehouse 總管中,以滑鼠右鍵按一下您想要存取的檔案或資料夾,並從內容相關的功能表中複製其檔案 API 路徑。
# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)
# display the content of the R data.frame
head(faithfulDF_API)
也可以使用 SparkSQL 查詢在 Lakehouse 上讀取 SparkR Dataframe。
# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")
head(waiting)
DataFrame 作業
SparkR DataFrame 支援透過多個函數來執行結構化資料處理。 以下是一些基本範例。 SparkR API 檔案中有完整的清單。
選取資料列和資料行
# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))
群組和彙總
SparkR 資料框架支援許多常用函數,以在分組之後彙總資料。 例如,我們可以計算忠實資料集中等待時間的色階分佈圖,如下所示
# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
資料行作業
SparkR 提供許多函數,可直接套用至資料行以進行資料處理和彙總。 下列範例顯示如何使用基本計算函數。
# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
套用使用者定義的函數
SparkR 支援多種使用者定義的函數:
在具有 dapply
或 dapplyCollect
的大型資料集上執行函數
dapply
將函數套用至 SparkDataFrame
的每個分割區。 函數將套用至 SparkDataFrame
的每個分割區,並且應該只有一個參數,每個分割區對應的 data.frame 將傳遞給該參數。 函數的輸出應該是 data.frame
。 結構描述會指定產生的 SparkDataFrame
的資料列格式。 它必須符合傳回值的資料類型。
# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
dapplyCollect
就像 dapply 一樣,將函數套用至 SparkDataFrame
的每個分割區,並將結果收集回來。 函數的輸出應該是 data.frame
。 但此時不需要傳遞結構描述。 請注意,如果在所有分割區上執行的函數的輸出無法拉到驅動程式,且無法放入驅動程式記憶體,則 dapplyCollect
可能會失敗。
# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
在依具有 gapply
或 gapplyCollect
的輸入資料行分組的大型資料集上執行函數
gapply
將函數套用至 SparkDataFrame
的每個组。 函數會套用至 SparkDataFrame
的每個群組,而且應該只有兩個參數:分組索引鍵以及與該索引鍵對應的 R data.frame
。 群組從 SparkDataFrames
資料列中選取。 函數的輸出應該是 data.frame
。 結構描述會指定產生的 SparkDataFrame
的資料列格式。 它必須代表來自 Spark 資料類型的 R 函數輸出結構描述。 傳回 data.frame
的資料行名稱由使用者設定。
# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
gapplyCollect
就像 gapply
一樣,將函數套用至 SparkDataFrame
的每個群組,並將結果收集回 R data.frame
。 函數的輸出應該是 data.frame
。 但是,不需要傳遞結構描述。 請注意,如果在所有分割區上執行的函數的輸出無法拉到驅動程式,且無法放入驅動程式記憶體,則 gapplyCollect
可能會失敗。
# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
執行透過 spark.lapply 散發的本地 R 函數
spark.lapply
類似於原生 R 中的 lapply
,spark.lapply
在元素清單上執行函數,並使用 Spark 散發計算。 以類似 doParallel
或 lapply
的方式將函數套用到清單的元素。 所有計算的結果都應該符合單一機器。 如果情況並非如此,他們可以執行類似 df <- createDataFrame(list)
的操作,然後使用 dapply
。
# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# print the summary of each model
print(model.summaries)
從 SparkR 執行 SQL 查詢
SparkR DataFrame 也可以註冊為暫存檢視,允許您對其資料執行 SQL 查詢。 sql 函數可讓應用程式以程式設計方式執行 SQL 查詢,並將結果以 SparkR DataFrame 的形式傳回。
# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")
head(waiting)
機器學習服務
SparkR 公開了大多數 MLLib 演算法。 在底層,SparkR 使用 MLlib 訓練模型。
下列範例示範如何使用 SparkR 建置 Gaussian GLM 模型。 若要執行線性迴歸,請將系列設定為 "gaussian"
。 若要執行邏輯迴歸,請將系列設定為 "binomial"
。 使用 SparkML GLM
時,SparkR 會自動執行分類特徵的單次編碼,因此不需要手動完成。 除了 String 和 Double 種類功能之外,也可以套用 MLlib 媒介功能,以便與其他 MLlib 元件相容。
若要深入了解哪些機器學習演算法受支援,可以瀏覽 SparkR 和 MLlib 檔案。
# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)
# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")
# model coefficients are returned in a similar format to R's native glm().
summary(model)