分享方式:


SparkR 概觀

SparkR 是一種 R 套件,提供輕量前端以使用來自 R 的 Apache Spark。SparkR 也支援使用 MLlib 的分散式機器學習。

SparkR 函式參考

您可以在 spark.apache.org找到最新的 SparkR 函式參考。

您也可以在匯入 SparkR 套件之後,在 R 筆記本或 RStudio 中檢視函式說明。

內嵌 R 檔

筆記本中的SparkR

  • 針對Spark 2.0和更新版本,您不需要明確地將對象傳遞 sqlContext 至每個函式呼叫。
  • 針對 Spark 2.2 和更新版本,筆記本預設不會再匯入 SparkR,因為 SparkR 函式與其他熱門套件中類似命名的函式發生衝突。 若要使用 SparkR,您可以在筆記本中呼叫 library(SparkR) 。 SparkR 工作階段已設定,而且所有 SparkR 函式都會使用現有的作業階段與連結的叢集交談。

spark-submit 作業中的SparkR

您可以執行在 Azure Databricks 上使用 SparkR 做為 spark-submit 作業的腳本,並修改次要程式代碼。

建立 SparkR DataFrame

您可以從本機 R data.frame、數據源或使用 Spark SQL 查詢建立 DataFrame。

從本機 R data.frame

建立 DataFrame 最簡單的方式是將本機 R data.frame SparkDataFrame轉換成 。 具體而言,我們可以使用 createDataFrame 並傳入本機 R data.frame 來建立 SparkDataFrame。 和大多數其他SparkR函式一樣, createDataFrame Spark 2.0 中的語法已變更。 您可以在下列代碼段中查看此範例。 如需更多範例,請參閱 createDataFrame

library(SparkR)
df <- createDataFrame(faithful)

# Displays the content of the DataFrame to stdout
head(df)

使用數據源 API

從資料來源 read.df建立 DataFrame 的一般方法是 。 這個方法會採用檔案要載入的路徑,以及數據源的類型。 SparkR 支援原生讀取 CSV、JSON、文字和 Parquet 檔案。

library(SparkR)
diamondsDF <- read.df("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv", source = "csv", header="true", inferSchema = "true")
head(diamondsDF)

SparkR 會自動從 CSV 檔案推斷架構。

使用 Spark 套件新增數據源連接器

透過 Spark 套件,您可以尋找適用於 Avro 等熱門檔案格式的數據源連接器。 例如,使用 spark-avro套件 載入 Avro 檔案。 spark-avro 套件的可用性取決於您的 叢集版本。 請參閱 Avro 檔案

首先,請採用現有的 data.frame、轉換為Spark DataFrame,並將它儲存為Avro檔案。

require(SparkR)
irisDF <- createDataFrame(iris)
write.df(irisDF, source = "com.databricks.spark.avro", path = "dbfs:/tmp/iris.avro", mode = "overwrite")

若要確認已儲存 Avro 檔案:

%fs ls /tmp/iris.avro

現在再次使用spark-avro套件來讀取數據。

irisDF2 <- read.df(path = "/tmp/iris.avro", source = "com.databricks.spark.avro")
head(irisDF2)

數據源 API 也可以用來將 DataFrame 儲存成多個檔案格式。 例如,您可以使用 將上一個範例中的 DataFrame 儲存至 Parquet 檔案 write.df

write.df(irisDF2, path="dbfs:/tmp/iris.parquet", source="parquet", mode="overwrite")
%fs ls dbfs:/tmp/iris.parquet

從 Spark SQL 查詢

您也可以使用 Spark SQL 查詢來建立 SparkR DataFrame。

# Register earlier df as temp view
createOrReplaceTempView(irisDF2, "irisTemp")
# Create a df consisting of only the 'species' column using a Spark SQL query
species <- sql("SELECT species FROM irisTemp")

species 是 SparkDataFrame。

DataFrame 作業

Spark DataFrame 支援數個函式來執行結構化數據處理。 以下是一些基本範例。 您可以在 API 檔案中找到完整的清單。

選取數據列和數據行

# Import SparkR package if this is a new notebook
require(SparkR)

# Create DataFrame
df <- createDataFrame(faithful)
# Select only the "eruptions" column
head(select(df, df$eruptions))
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))

群組和匯總

SparkDataFrame 支援一些常用的函式,以在分組之後匯總數據。 例如,您可以計算每個等候時間出現在忠實數據集中的次數。

head(count(groupBy(df, df$waiting)))
# You can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- count(groupBy(df, df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

資料行作業

SparkR 提供數個函式,可直接套用至數據行以進行數據處理和匯總。 下列範例顯示基本算術函數的使用。

# Convert waiting time from hours to seconds.
# You can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

機器學習

SparkR 會公開大部分的 MLLib 演算法。 在幕後,SparkR 會使用 MLlib 來定型模型。

下列範例示範如何使用 SparkR 建置高斯 GLM 模型。 若要執行線性回歸,請將系列設定為 "gaussian"。 若要執行羅吉斯回歸,請將系列設定為 "binomial"。 使用SparkML GLM SparkR時,會自動執行類別特徵的單熱編碼,因此不需要手動完成。 除了 String 和 Double 類型功能之外,也可以配合 MLlib 向量功能,以與其他 MLlib 元件相容。

# Create the DataFrame
df <- createDataFrame(iris)

# Fit a linear model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

# Model coefficients are returned in a similar format to R's native glm().
summary(model)

如需教學課程,請參閱 教學課程:使用 glm 分析數據。

如需其他範例,請參閱 在 R 中使用 DataFrame 和數據表。