使用 SparkR

SparkR 是一个 R 包,提供轻量级前端以通过 R 使用 Apache Spark。SparkR 提供分布式数据帧实现,该实现支持选择、筛选、聚合等操作。SparkR 还支持使用 MLlib 的分布式机器学习。

通过 Spark 批处理作业定义或交互式 Microsoft Fabric 笔记本使用 SparkR。

R 支持仅在 Spark3.1 或更高版本中可用。 不支持 Spark 2.4 中的 R。

先决条件

  • 打开或创建笔记本。 请参阅如何使用 Microsoft Fabric 笔记本,了解如何操作。

  • 通过将语言选项设置为 Spark (R) 来更改主要语言。

  • 将笔记本附加到湖屋。 选择左侧的“添加”以添加现有湖屋或创建湖屋。

读取和写入 SparkR 数据帧

从本地 R data.frame 读取 SparkR 数据帧

创建数据帧的最简单方法是将本地 R 数据帧转换为 Spark 数据帧。

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

从湖屋读取和写入 SparkR 数据帧

数据可以存储在群集节点的本地文件系统上。 从湖屋读取和写入 SparkR 数据帧的一般方法是 read.dfwrite.df。 这些方法需要获取要加载的文件的路径和数据源的类型。 SparkR 支持原生读取 CSV、JSON、文本和 Parquet 文件。

若要读取和写入湖屋,请先将其添加到会话。 在笔记本左侧,选择“添加”以添加现有的湖屋或创建湖屋。

注意

要使用 Spark 软件包(例如 read.dfwrite.df)访问湖屋文件,请使用其 ADFS 路径Spark 相对路径。 在湖屋资源管理器中,右键单击要访问的文件或文件夹,然后从上下文菜单中复制其 ADFS 路径Spark 的相对路径

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Microsoft Fabric 已预安装 tidyverse。 可以在熟悉的 R 包中访问湖屋文件,例如使用 readr::read_csv()readr::write_csv() 读取和写入湖屋文件。

注意

若要使用 R 包访问湖屋文件,需要使用“文件 API 路径”。 在湖屋资源管理器中,右键单击想要访问的文件或文件夹,然后从上下文菜单中复制其“文件 API 路径”。

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

还可以使用 SparkSQL 查询读取湖屋上的 SparkR 数据帧。

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

数据帧操作

SparkR 数据帧支持许多函数,可执行结构化数据处理。 下面是一些基本示例。 你可以在 SparkR API 文档中找到完整列表。

选择行和列

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

分组和聚合

SparkR 数据帧支持许多常用函数,可在分组后聚合数据。 例如,我们可以计算忠实数据集中等待时间的直方图,如下所示

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

列操作

SparkR 提供了许多函数,这些函数可直接应用于列以进行数据处理和聚合。 以下示例演示基本算术函数的用法。

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

应用用户定义的函数

SparkR 支持多种用户定义的函数:

使用 dapplydapplyCollect 在大型数据集上运行函数

dapply

将函数应用至 SparkDataFrame 的每个分区。 该函数将应用至 SparkDataFrame 的每个分区,并且应该只有一个参数,将向该参数传递与每个分区相对应的 data.frame。 函数的输出应为 data.frame。 架构指定生成的 SparkDataFrame 的行格式。 它必须与返回值的数据类型相匹配。

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

与 dapply 类似,将函数应用至 SparkDataFrame 的每个分区并收集结果。 函数的输出应为 data.frame。 但是,这次不需要传递架构。 请注意,如果在所有分区上运行的函数的输出无法拉取到驱动程序并适合驱动程序内存,则 dapplyCollect 可能会失败。

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

使用 gapplygapplyCollect 对按输入列分组的大型数据集运行函数

gapply

将一个函数应用于 SparkDataFrame 的每个组。 该函数将应用于 SparkDataFrame 的每个组,并且应该只有两个参数:分组键和与该键对应的 R data.frame。 组是从 SparkDataFrames 列中选择的。 函数的输出应为 data.frame。 架构指定生成的 SparkDataFrame 的行格式。 它必须表示 Spark 数据类型中的 R 函数输出架构。 返回的 data.frame 的列名由用户设置。

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

gapply 类似,将函数应用于 SparkDataFrame 的每个组并将结果收集回 R data.frame。 函数的输出应为 data.frame。 但是,不需要传递架构。 请注意,如果在所有分区上运行的函数的输出无法拉取到驱动程序并适合驱动程序内存,则 gapplyCollect 可能会失败。

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

运行通过 spark.lapply 分发的本地 R 函数

spark.lapply

与本机 R 中的 lapply 类似,spark.lapply 对元素列表运行函数并使用 Spark 分配计算。 以类似于 doParallellapply 的方式将函数应用于列表的元素。 所有计算的结果应该适合一台机器。 否则,它们可以执行 df <- createDataFrame(list) 等操作,然后使用 dapply

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

从 SparkR 运行 SQL 查询

SparkR DataFrame 还可以注册为临时视图,允许对其数据运行 SQL 查询。 sql 函数使应用程序能够以编程方式运行 SQL 查询并将结果作为 SparkR DataFrame 返回。

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

机器学习

SparkR 公开大多数 MLLib 算法。 实际上,SparkR 使用 MLlib 来训练模型。

以下示例演示如何使用 SparkR 来生成 Gaussian GLM 模型。 若要运行线性回归,请将系列设置为 "gaussian"。 若要运行逻辑回归,请将系列设置为 "binomial"。 当使用 SparkML GLM 时,SparkR 会自动对分类特征进行独热编码,因此不需要手动执行此操作。 除了 String 和 Double 类型的特征以外,还可以在 MLlib 矢量特征上进行拟合,以便与其他 MLlib 组件兼容。

若要详细了解支持哪些机器学习算法,请访问 SparkR 和 MLlib 文档

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)