使用 sparklyr

sparklyr 是 Apache Spark 的 R 接口。 它提供能使用熟悉的 R 接口与 Spark 交互的机制。 可以通过 Spark 批处理作业定义或交互式 Microsoft Fabric 笔记本使用 sparklyr。

sparklyr 通常与其他 tidyverse 包(例如 dplyr)一起使用。 Microsoft Fabric 每一次运行时发布时都会分发 sparklyr 和 tidyverse 的最新稳定版。 可以导入它们并开始使用 API。

先决条件

  • 打开或创建笔记本。 请参阅如何使用 Microsoft Fabric 笔记本,了解如何操作。

  • 通过将语言选项设置为 Spark (R) 来更改主要语言。

  • 将笔记本附加到湖屋。 选择左侧的“添加”以添加现有湖屋或创建湖屋。

将 sparklyr 连接到 Synapse Spark 群集

spark_connect() 中使用以下连接方法建立 sparklyr 连接。 我们支持名为 synapse 的新连接方法,该方法可用于连接到现有的 Spark 会话。 这样大大减少了 sparklyr 会话开启时间。 此外,我们还将此连接方法加入了开源 sparklyr 项目。 使用 method = "synapse",可以在同一会话中使用 sparklyrSparkR,还能轻松地在两者间共享数据

# connect sparklyr to your spark cluster
spark_version <- sparkR.version()
config <- spark_config()
sc <- spark_connect(master = "yarn", version = spark_version, spark_home = "/opt/spark", method = "synapse", config = config)

使用 sparklyr 读取数据

新的 Spark 会话不包含任何数据。 第一步是将数据加载到 Spark 会话的内存中,或使 Spark 指向数据的位置,以便它可以按需访问数据。

# load the sparklyr package
library(sparklyr)

# copy data from R environment to the Spark session's memory
mtcars_tbl <- copy_to(sc, mtcars, "spark_mtcars", overwrite = TRUE)

head(mtcars_tbl)

使用 sparklyr,还可以使用 ABFS 路径从 Lakehouse 文件 writeread 数据。 若要读取和写入湖屋,请先将其添加到会话。 在笔记本左侧,选择“添加”以添加现有的湖屋或创建湖屋。

若要查找 ABFS 路径,请右键单击湖屋中的“文件”文件夹,然后选择“复制 ABFS 路径”。 粘贴路径以替换此代码中的 abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files

temp_csv = "abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files/data/mtcars.csv"

# write the table to your lakehouse using the ABFS path
spark_write_csv(mtcars_tbl, temp_csv, header = TRUE, mode = 'overwrite')

# read the data as CSV from lakehouse using the ABFS path
mtcarsDF <- spark_read_csv(sc, temp_csv) 
head(mtcarsDF)

使用 sparklyr 操作数据

sparklyr 提供了多种方法来处理 Spark 中的数据,使用:

  • dplyr 命令
  • SparkSQL
  • Spark 的功能转换器

使用 dplyr

可以使用熟悉的 dplyr 命令在 Spark 中准备数据。 命令在 Spark 内部运行,因此 R 和 Spark 之间没有不必要的数据传输。

单击“使用 dplyr 操作数据”,查看有关将 dplyr 与 Spark 配合使用的额外文档。

# count cars by the number of cylinders the engine contains (cyl), order the results descendingly
library(dplyr)

cargroup <- group_by(mtcars_tbl, cyl) %>%
  count() %>%
  arrange(desc(n))

cargroup

sparklyrdplyr 为我们将 R 命令转换为 Spark SQL。 若要查看生成的查询,请使用 show_query()

# show the dplyr commands that are to run against the Spark connection
dplyr::show_query(cargroup)

使用 SQL

还可以直接对 Spark 群集中的表执行 SQL 查询。 spark_connection() 对象实现 Spark 的 DBI 接口,因此可以使用 dbGetQuery() 执行 SQL 并将结果作为 R 数据帧返回:

library(DBI)
dbGetQuery(sc, "select cyl, count(*) as n from spark_mtcars
GROUP BY cyl
ORDER BY n DESC")

使用功能转换器

上述两种方法都依赖于 SQL 语句。 Spark 提供的命令使某些数据转换更加方便,而无需使用 SQL。

例如,ft_binarizer() 命令简化了新列的创建过程,该列指示另一列的值是否高于特定阈值。

可以通过 sparklyr引用 -FT 中找到可用的 Spark 功能转换器的完整列表。

mtcars_tbl %>% 
  ft_binarizer("mpg", "over_20", threshold = 20) %>% 
  select(mpg, over_20) %>% 
  head(5)

sparklyrSparkR 之间共享数据

使用 method = "synapse"sparklyr 连接到 synapse spark 群集时,可以在同一会话中使用 sparklyrSparkR,还能轻松地在两者间共享数据。 可以在 sparklyr 中创建 spark 表,并从 SparkR 读取它。

# load the sparklyr package
library(sparklyr)

# Create table in `sparklyr`
mtcars_sparklyr <- copy_to(sc, df = mtcars, name = "mtcars_tbl", overwrite = TRUE, repartition = 3L)

# Read table from `SparkR`
mtcars_sparklr <- SparkR::sql("select cyl, count(*) as n
from mtcars_tbl
GROUP BY cyl
ORDER BY n DESC")

head(mtcars_sparklr)

机器学习

以下示例使用 ml_linear_regression() 来拟合线性回归模型。 我们使用内置的 mtcars 数据集,看看是否可以根据汽车的重量 (wt) 以及发动机包含的汽缸数量 (cyl) 来预测汽车的油耗 (mpg)。 我们假设在每种情况下,mpg 和每个特征之间的关系都是线性的。

生成测试和训练数据集

使用拆分:70% 用于训练,30% 用于测试模型。 采用此比率会产生不同的模型。

# split the dataframe into test and training dataframes

partitions <- mtcars_tbl %>%
  select(mpg, wt, cyl) %>% 
  sdf_random_split(training = 0.7, test = 0.3, seed = 2023)

定型模型

训练逻辑回归模型。

fit <- partitions$training %>%
  ml_linear_regression(mpg ~ .)

fit

现在,使用 summary() 详细了解模型的质量,以及每个预测器的统计意义。

summary(fit)

使用模型

可以通过调用 ml_predict() 在测试数据集上应用模型。

pred <- ml_predict(fit, partitions$test)

head(pred)

有关通过 sparklyr 提供的 Spark ML 模型的列表,请访问引用 - ML

断开与 Spark 群集的连接

可以调用 spark_disconnect() 或选择笔记本功能区顶部的“停止会话”按钮来结束 Spark 会话。

spark_disconnect(sc)

详细了解 R 功能: