使用sparklyr
sparklyr 是 Apache Spark 的 R 介面。 它提供使用熟悉 R 介面與 Spark 互動的機制。 您可以透過Spark批次作業定義或互動式 Microsoft Fabric 筆記本來使用sparklyr。
sparklyr
會與其他 tidyverse 套件一起使用,例如 dplyr。 Microsoft Fabric 會在每個運行時間版本中散發最新穩定版本的 sparklyr 和 tidyverse。 您可以匯入它們,並開始使用 API。
必要條件
取得 Microsoft Fabric 訂用 帳戶。 或者,註冊免費的 Microsoft Fabric 試用版。
登入 Microsoft Fabric。
使用首頁左側的體驗切換器,切換至 Synapse 資料科學 體驗。
開啟或建立筆記本。 若要瞭解如何,請參閱 如何使用 Microsoft Fabric 筆記本。
將語言選項設定為 SparkR (R) 以變更主要語言。
將筆記本附加至 Lakehouse。 在左側,選取 [新增 ] 以新增現有的 Lakehouse 或建立 Lakehouse。
將sparklyr 連線至 Synapse Spark 叢集
在中使用 spark_connect()
下列連接方法來建立 sparklyr
連接。 我們支持稱為 synapse
的新連接方法,可讓您連線到現有的Spark會話。 這可大幅減少 sparklyr
會話開始時間。 此外,我們已將此連線方法貢獻給 開放原始碼 sparklyr 專案。 透過 method = "synapse"
,您可以在相同的會話中使用 sparklyr
和 SparkR
,並輕鬆地 在它們之間共享數據。
# connect sparklyr to your spark cluster
spark_version <- sparkR.version()
config <- spark_config()
sc <- spark_connect(master = "yarn", version = spark_version, spark_home = "/opt/spark", method = "synapse", config = config)
使用sparklyr讀取數據
新的Spark會話不包含任何數據。 第一個步驟是將數據載入 Spark 工作階段的記憶體,或將 Spark 指向資料的位置,以便依需求存取數據。
# load the sparklyr package
library(sparklyr)
# copy data from R environment to the Spark session's memory
mtcars_tbl <- copy_to(sc, mtcars, "spark_mtcars", overwrite = TRUE)
head(mtcars_tbl)
使用 sparklyr
,您也可以 write
使用ABFS路徑,從Lakehouse檔案取得 read
數據。 若要讀取和寫入 Lakehouse,請先將其新增至您的工作階段。 在筆記本左側,選取 [新增 ] 以新增現有的 Lakehouse 或建立 Lakehouse。
若要尋找 ABFS 路徑,請以滑鼠右鍵按兩下 Lakehouse 中的 [檔案 ] 資料夾,然後選取 [ 複製ABFS 路徑]。 貼上您的路徑以在此程式代碼中取代 abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files
:
temp_csv = "abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files/data/mtcars.csv"
# write the table to your lakehouse using the ABFS path
spark_write_csv(mtcars_tbl, temp_csv, header = TRUE, mode = 'overwrite')
# read the data as CSV from lakehouse using the ABFS path
mtcarsDF <- spark_read_csv(sc, temp_csv)
head(mtcarsDF)
使用sparklyr操作數據
sparklyr
使用下列方法提供多個方法來處理 Spark 內的數據:
dplyr
命令- SparkSQL
- Spark 的功能轉換器
使用 dplyr
您可以使用熟悉 dplyr
的命令在 Spark 內準備數據。 命令會在 Spark 內執行,因此 R 與 Spark 之間沒有不必要的資料傳輸。
按兩下 操作數據 搭配dplyr
,以查看搭配Spark使用 dplyr的額外檔。
# count cars by the number of cylinders the engine contains (cyl), order the results descendingly
library(dplyr)
cargroup <- group_by(mtcars_tbl, cyl) %>%
count() %>%
arrange(desc(n))
cargroup
sparklyr
並將 dplyr
R 命令轉譯為適用於我們的 Spark SQL。 若要檢視產生的查詢,請使用 show_query()
:
# show the dplyr commands that are to run against the Spark connection
dplyr::show_query(cargroup)
使用 SQL
您也可以直接對 Spark 叢集中的數據表執行 SQL 查詢。 物件 spark_connection()
會實作 Spark 的 DBI 介面,因此您可以使用 dbGetQuery()
來執行 SQL,並以 R 數據框架傳回結果:
library(DBI)
dbGetQuery(sc, "select cyl, count(*) as n from spark_mtcars
GROUP BY cyl
ORDER BY n DESC")
使用功能轉換器
上述兩種方法都依賴 SQL 語句。 Spark 提供命令,讓某些資料轉換更方便,而且不需要使用 SQL。
例如, ft_binarizer()
命令可簡化新數據行的建立,指出另一個數據行的值是否高於特定臨界值。
您可以從參考 -FT 找到可用的 sparklyr
Spark 功能轉換器完整清單。
mtcars_tbl %>%
ft_binarizer("mpg", "over_20", threshold = 20) %>%
select(mpg, over_20) %>%
head(5)
在和 之間 sparklyr
共享數據 SparkR
當您使用method = "synapse"
連線sparklyr
到 synapse spark 叢集時,可以在相同的工作階段中使用 sparklyr
和 SparkR
,並輕鬆地在它們之間共用數據。 您可以在 中 sparklyr
建立 Spark 數據表,並從 讀取它 SparkR
。
# load the sparklyr package
library(sparklyr)
# Create table in `sparklyr`
mtcars_sparklyr <- copy_to(sc, df = mtcars, name = "mtcars_tbl", overwrite = TRUE, repartition = 3L)
# Read table from `SparkR`
mtcars_sparklr <- SparkR::sql("select cyl, count(*) as n
from mtcars_tbl
GROUP BY cyl
ORDER BY n DESC")
head(mtcars_sparklr)
機器學習
以下是我們用來 ml_linear_regression()
配合線性回歸模型的範例。 我們使用內mtcars
建數據集,並查看我們是否可以根據汽車重量()預測汽車的油耗(mpg
wt
),以及發動機包含的汽缸數目(cyl
)。 我們假設在每個案例中,和每個特徵之間的 mpg
關聯性都是線性的。
產生測試和定型數據集
使用分割,70% 用於定型,30% 用於測試模型。 玩這個比例會導致不同的模型。
# split the dataframe into test and training dataframes
partitions <- mtcars_tbl %>%
select(mpg, wt, cyl) %>%
sdf_random_split(training = 0.7, test = 0.3, seed = 2023)
定型模型
將羅吉斯回歸模型定型。
fit <- partitions$training %>%
ml_linear_regression(mpg ~ .)
fit
現在,使用 summary()
來深入瞭解模型的品質,以及每個預測值的統計意義。
summary(fit)
使用模型
您可以呼叫 ml_predict()
,在測試數據集上套用模型。
pred <- ml_predict(fit, partitions$test)
head(pred)
如需可透過sparklyr取得的SparkML模型清單,請造訪 參考 - ML
中斷與Spark叢集的連線
您可以呼叫 spark_disconnect()
或選取 筆記本功能區頂端的 [停止會話 ] 按鈕,結束 Spark 會話。
spark_disconnect(sc)
相關內容
深入瞭解 R 功能: