共用方式為


使用 SparkR

SparkR 是一種 R 套件,提供輕量型前端以使用 R 的 Apache Spark。SparkR 提供分散式資料架構實作,支援選取、篩選、彙總等作業。SparkR 還支援使用 MLlib 的分散式機器學習。

透過 Spark 批次工作定義或互動式 Microsoft Fabric 筆記本使用 SparkR。

R 支援僅適用於 Spark3.1 或更新版本。 不支援 Spark 2.4 中的 R。

必要條件

  • 開啟或建立筆記本。 若要了解操作說明,請參閱如何使用 Microsoft Fabric 筆記本

  • 將語言選項設定為 SparkR (R),以變更主要語言。

  • 將筆記本連結至 Lakehouse。 在左側選取 [新增] 以新增現有的 Lakehouse 或建立 Lakehouse。

讀取和寫入 SparkR DataFrame

從本機 R data.frame 讀取 SparkR DataFrame

建立 DataFrame 的最簡單方法是將本機 R data.frame 轉換為 Spark DataFrame。

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

從 Lakehouse 讀取和寫入 SparkR DataFrame

資料可以儲存在叢集節點的本機檔案系統。 從 Lakehouse 讀取和寫入 SparkR DataFrame 的一般方法是 read.dfwrite.df。 這些方法採用要載入的檔案的路徑以及資料來源的類型。 SparkR 支援原生讀取 CSV、JSON、文字及 Parquet 檔案。

要讀取和寫入 Lakehouse,請先將其新增至您的工作階段。 在筆記本左側,選取 [新增],以新增現有的 Lakehouse 或建立 Lakehouse。

注意

若要使用 Spark 套件 (例如 read.dfwrite.df) 存取 Lakehouse 檔案,請使用其 ADFS 路徑Spark 相對路徑。 在 Lakehouse 總管中,以滑鼠右鍵按一下想要存取的檔案或資料夾,並從特色選單複製其 ADFS 路徑Spark 相對路徑

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Microsoft Fabric 已預安裝 tidyverse。 您可以在熟悉的 R 套件中存取 Lakehouse 檔案,例如使用 readr::read_csv()readr::write_csv() 讀取和寫入 Lakehouse 檔案。

注意

若要使用 R 套件存取 Lakehouse 檔案,需要使用檔案 API 路徑。 在 Lakehouse 總管中,以滑鼠右鍵按一下您想要存取的檔案或資料夾,並從內容相關的功能表中複製其檔案 API 路徑

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

也可以使用 SparkSQL 查詢在 Lakehouse 上讀取 SparkR Dataframe。

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

DataFrame 作業

SparkR DataFrame 支援透過多個函數來執行結構化資料處理。 以下是一些基本範例。 SparkR API 檔案中有完整的清單。

選取資料列和資料行

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

群組和彙總

SparkR 資料框架支援許多常用函數,以在分組之後彙總資料。 例如,我們可以計算忠實資料集中等待時間的色階分佈圖,如下所示

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

資料行作業

SparkR 提供許多函數,可直接套用至資料行以進行資料處理和彙總。 下列範例顯示如何使用基本計算函數。

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

套用使用者定義的函數

SparkR 支援多種使用者定義的函數:

在具有 dapplydapplyCollect 的大型資料集上執行函數

dapply

將函數套用至 SparkDataFrame 的每個分割區。 函數將套用至 SparkDataFrame 的每個分割區,並且應該只有一個參數,每個分割區對應的 data.frame 將傳遞給該參數。 函數的輸出應該是 data.frame。 結構描述會指定產生的 SparkDataFrame 的資料列格式。 它必須符合傳回值的資料類型

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

就像 dapply 一樣,將函數套用至 SparkDataFrame 的每個分割區,並將結果收集回來。 函數的輸出應該是 data.frame。 但此時不需要傳遞結構描述。 請注意,如果在所有分割區上執行的函數的輸出無法拉到驅動程式,且無法放入驅動程式記憶體,則 dapplyCollect 可能會失敗。

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

在依具有 gapplygapplyCollect 的輸入資料行分組的大型資料集上執行函數

gapply

將函數套用至 SparkDataFrame 的每個组。 函數會套用至 SparkDataFrame 的每個群組,而且應該只有兩個參數:分組索引鍵以及與該索引鍵對應的 R data.frame。 群組從 SparkDataFrames 資料列中選取。 函數的輸出應該是 data.frame。 結構描述會指定產生的 SparkDataFrame 的資料列格式。 它必須代表來自 Spark 資料類型的 R 函數輸出結構描述。 傳回 data.frame 的資料行名稱由使用者設定。

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

就像 gapply 一樣,將函數套用至 SparkDataFrame 的每個群組,並將結果收集回 R data.frame。 函數的輸出應該是 data.frame。 但是,不需要傳遞結構描述。 請注意,如果在所有分割區上執行的函數的輸出無法拉到驅動程式,且無法放入驅動程式記憶體,則 gapplyCollect 可能會失敗。

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

執行透過 spark.lapply 散發的本地 R 函數

spark.lapply

類似於原生 R 中的 lapplyspark.lapply 在元素清單上執行函數,並使用 Spark 散發計算。 以類似 doParallellapply 的方式將函數套用到清單的元素。 所有計算的結果都應該符合單一機器。 如果情況並非如此,他們可以執行類似 df <- createDataFrame(list) 的操作,然後使用 dapply

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

從 SparkR 執行 SQL 查詢

SparkR DataFrame 也可以註冊為暫存檢視,允許您對其資料執行 SQL 查詢。 sql 函數可讓應用程式以程式設計方式執行 SQL 查詢,並將結果以 SparkR DataFrame 的形式傳回。

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

機器學習服務

SparkR 公開了大多數 MLLib 演算法。 在底層,SparkR 使用 MLlib 訓練模型。

下列範例示範如何使用 SparkR 建置 Gaussian GLM 模型。 若要執行線性迴歸,請將系列設定為 "gaussian"。 若要執行邏輯迴歸,請將系列設定為 "binomial"。 使用 SparkML GLM 時,SparkR 會自動執行分類特徵的單次編碼,因此不需要手動完成。 除了 String 和 Double 種類功能之外,也可以套用 MLlib 媒介功能,以便與其他 MLlib 元件相容。

若要深入了解哪些機器學習演算法受支援,可以瀏覽 SparkR 和 MLlib 檔案

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)