共用方式為


使用 sparklyr

R 語言 sparklyr 資源可作為 Apache Spark 的介面。 sparklr 資源提供一個機制,可以透過熟悉的 R 介面來與 Spark 互動。 使用 sparklyr,可以透過 Spark 批次作業定義或與互動式 Microsoft Fabric 筆記本一起使用。

sparklyr 會與其他 tidyverse 套件搭配使用 , 例如 dplyr。 Microsoft Fabric 會在每個執行環境版本中發佈 sparklyr 和 tidyverse 的最新穩定版本。 您可以匯入這些資源,並開始使用 API。

必要條件

  • 開啟或建立筆記本。 若要了解操作說明,請參閱如何使用 Microsoft Fabric 筆記本

  • 將語言選項設定為 SparkR (R),以變更主要語言。

  • 將筆記本連結至 Lakehouse。 在左側選擇新增以新增現有的湖屋或建立湖屋。

將 sparklyr 連線到 Synapse Spark 叢集

透過函式連接方法 spark_connect(),建立 sparklyr 連接。 函式會建置名為 synapse的新連接方法,其會連線到現有的Spark會話。 這可大幅縮短 sparklyr 會話啟動時間。 此連接方法可在 開放原始碼sparklyr專案中取得。 透過 method = "synapse",您可以在相同的會話中使用 sparklyrSparkR ,並輕鬆地 在它們之間共享數據。 下列筆記本資料格程式代碼範例會使用spark_connect()函式:

# connect sparklyr to your spark cluster
spark_version <- sparkR.version()
config <- spark_config()
sc <- spark_connect(master = "yarn", version = spark_version, spark_home = "/opt/spark", method = "synapse", config = config)

使用 sparklyr 讀取資料

新的 Spark 工作階段不包含任何資料。 然後,您必須將數據載入 Spark 工作階段的記憶體,或將 Spark 指向資料的位置,讓會話可以視需要存取資料:

# load the sparklyr package
library(sparklyr)

# copy data from R environment to the Spark session's memory
mtcars_tbl <- copy_to(sc, mtcars, "spark_mtcars", overwrite = TRUE)

head(mtcars_tbl)

使用 sparklyr 時,您也可以使用 ABFS 路徑值,從 Lakehouse 檔案中 writeread 數據。 若要讀取和寫入 Lakehouse,請先將 Lakehouse 新增至您的會話。 在筆記本左側,選取 [ 新增 ] 以新增現有的 Lakehouse。 此外,您可以建立 Lakehouse。

若要尋找 ABFS 路徑,請以滑鼠右鍵按兩下 Lakehouse 中的 [ 檔案 ] 資料夾,然後選取 [ 複製ABFS 路徑]。 在下列程式代碼範例中貼上要取代 abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files 的路徑:

temp_csv = "abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files/data/mtcars.csv"

# write the table to your lakehouse using the ABFS path
spark_write_csv(mtcars_tbl, temp_csv, header = TRUE, mode = 'overwrite')

# read the data as CSV from lakehouse using the ABFS path
mtcarsDF <- spark_read_csv(sc, temp_csv) 
head(mtcarsDF)

使用 sparklyr 來操作資料

sparklyr 提供在Spark內處理數據的不同方式,其中包含:

  • dplyr 命令
  • SparkSQL
  • Spark 的功能轉換器

使用 dplyr

您可以使用熟悉的 dplyr 命令在 Spark 內準備資料。 命令會在Spark內執行,防止 R 與 Spark 之間不必要的資料傳輸。

# count cars by the number of cylinders the engine contains (cyl), order the results descendingly
library(dplyr)

cargroup <- group_by(mtcars_tbl, cyl) %>%
  count() %>%
  arrange(desc(n))

cargroup

資源 數據操作 dplyr 提供搭配 Spark 使用 dplyr 的更多資訊。 sparklyr 並將 dplyr R 命令轉譯為 Spark SQL。 使用 show_query() 來顯示產生的查詢:

# show the dplyr commands that are to run against the Spark connection
dplyr::show_query(cargroup)

使用 SQL

您也可以直接對 Spark 叢集中的數據表執行 SQL 查詢。 spark_connection() 物件會實作 Spark 的 DBI 介面,因此您可以使用 dbGetQuery() 來執行 SQL,並以 R 資料框架傳回結果:

library(DBI)
dbGetQuery(sc, "select cyl, count(*) as n from spark_mtcars
GROUP BY cyl
ORDER BY n DESC")

使用功能轉換器

上述的兩種方法都依賴 SQL 陳述式。 Spark 提供命令,讓某些資料轉換更方便,而不需要使用 SQL。 例如, ft_binarizer() 命令可簡化新數據行的建立,指出另一個數據行中的值是否超過特定臨界值:

mtcars_tbl %>% 
  ft_binarizer("mpg", "over_20", threshold = 20) %>% 
  select(mpg, over_20) %>% 
  head(5)

Reference -FT 資源提供可透過 sparklyr取得的Spark功能轉換器完整清單。

sparklyrSparkR 之間共用資料

當您連線sparklyr到 Synapse Spark 叢集使用method = "synapse"時,sparklyrSparkR都可以在同一個會話中使用,並且能夠輕鬆共享彼此之間的數據。 您可以在 中 sparklyr建立 Spark 資料表,並從 中讀取它 SparkR

# load the sparklyr package
library(sparklyr)

# Create table in `sparklyr`
mtcars_sparklyr <- copy_to(sc, df = mtcars, name = "mtcars_tbl", overwrite = TRUE, repartition = 3L)

# Read table from `SparkR`
mtcars_sparklr <- SparkR::sql("select cyl, count(*) as n
from mtcars_tbl
GROUP BY cyl
ORDER BY n DESC")

head(mtcars_sparklr)

機器學習服務

下列範例使用 ml_linear_regression() 來配合線性回歸模型。 此模型會使用內mtcars建數據集,根據汽車的重量和汽車發動機的汽缸數(mpg)來嘗試預測汽車的油耗(wtcyl)。 此處所有案例都假設 mpg 與我們每個特徵之間存在線性關係。

產生測試和訓練資料集

使用 70%% 來訓練,30%% 來測試模型。 此比率的變更會導致不同的模型:

# split the dataframe into test and training dataframes

partitions <- mtcars_tbl %>%
  select(mpg, wt, cyl) %>% 
  sdf_random_split(training = 0.7, test = 0.3, seed = 2023)

訓練模型

訓練羅吉斯迴歸模型。

fit <- partitions$training %>%
  ml_linear_regression(mpg ~ .)

fit

使用 summary() 來深入瞭解模型的品質,以及每個預測值的統計意義:

summary(fit)

使用模型

通話 ml_predict() 以將模型套用至測試資料集:

pred <- ml_predict(fit, partitions$test)

head(pred)

如需可透過sparklyr取得的SparkML模型清單,請瀏覽 參考 - ML

中斷與 Spark 叢集的連線

呼叫 spark_disconnect(),或選取筆記本功能區頂端的 [停止會話 ] 按鈕,以結束 Spark 會話:

spark_disconnect(sc)

深入了解 R 功能: