SparkR 概觀
SparkR 是一種 R 套件,提供輕量前端以使用來自 R 的 Apache Spark。SparkR 也支援使用 MLlib 的分散式機器學習。
SparkR 函式參考
您可以在 spark.apache.org 上找到最新的 SparkR 函式參考。
您也可以在匯入 SparkR 套件之後,在 R 筆記本或 RStudio 中檢視函式說明。
筆記本中的SparkR
- 針對Spark 2.0和更新版本,您不需要明確地將對象傳遞
sqlContext
至每個函式呼叫。 - 針對 Spark 2.2 和更新版本,筆記本預設不會再匯入 SparkR,因為 SparkR 函式與其他熱門套件中類似命名的函式發生衝突。 若要使用 SparkR,您可以在筆記本中呼叫
library(SparkR)
。 SparkR 工作階段已設定,而且所有 SparkR 函式都會使用現有的作業階段與連結的叢集交談。
spark-submit 作業中的SparkR
您可以執行在 Azure Databricks 上使用 SparkR 做為 spark-submit 作業的腳本,並修改次要程式代碼。
建立 SparkR DataFrame
您可以從本機 R data.frame
、數據源或使用 Spark SQL 查詢建立 DataFrame。
從本機 R data.frame
建立 DataFrame 最簡單的方式是將本機 R data.frame
SparkDataFrame
轉換成 。 具體而言,我們可以使用 createDataFrame
並傳入本機 R data.frame
來建立 SparkDataFrame
。 和大多數其他SparkR函式一樣, createDataFrame
Spark 2.0 中的語法已變更。 您可以在下列代碼段中查看此範例。
如需更多範例,請參閱 createDataFrame。
library(SparkR)
df <- createDataFrame(faithful)
# Displays the content of the DataFrame to stdout
head(df)
使用數據源 API
從資料來源 read.df
建立 DataFrame 的一般方法是 。
這個方法會採用檔案要載入的路徑,以及數據源的類型。
SparkR 支援原生讀取 CSV、JSON、文字和 Parquet 檔案。
library(SparkR)
diamondsDF <- read.df("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv", source = "csv", header="true", inferSchema = "true")
head(diamondsDF)
SparkR 會自動從 CSV 檔案推斷架構。
使用 Spark 套件新增數據源連接器
透過 Spark 套件,您可以尋找適用於 Avro 等熱門檔案格式的數據源連接器。 例如,使用 spark-avro套件 載入 Avro 檔案。 spark-avro 套件的可用性取決於您的 叢集版本。 請參閱 Avro 檔案。
首先,請採用現有的 data.frame
、轉換為Spark DataFrame,並將它儲存為Avro檔案。
require(SparkR)
irisDF <- createDataFrame(iris)
write.df(irisDF, source = "com.databricks.spark.avro", path = "dbfs:/tmp/iris.avro", mode = "overwrite")
若要確認已儲存 Avro 檔案:
%fs ls /tmp/iris.avro
現在再次使用spark-avro套件來讀取數據。
irisDF2 <- read.df(path = "/tmp/iris.avro", source = "com.databricks.spark.avro")
head(irisDF2)
數據源 API 也可以用來將 DataFrame 儲存成多個檔案格式。 例如,您可以使用 將上一個範例中的 DataFrame 儲存至 Parquet 檔案 write.df
。
write.df(irisDF2, path="dbfs:/tmp/iris.parquet", source="parquet", mode="overwrite")
%fs ls dbfs:/tmp/iris.parquet
從 Spark SQL 查詢
您也可以使用 Spark SQL 查詢來建立 SparkR DataFrame。
# Register earlier df as temp view
createOrReplaceTempView(irisDF2, "irisTemp")
# Create a df consisting of only the 'species' column using a Spark SQL query
species <- sql("SELECT species FROM irisTemp")
species
是 SparkDataFrame。
DataFrame 作業
Spark DataFrame 支援數個函式來執行結構化數據處理。 以下是一些基本範例。 您可以在 API 檔案中找到完整的清單。
選取數據列和數據行
# Import SparkR package if this is a new notebook
require(SparkR)
# Create DataFrame
df <- createDataFrame(faithful)
# Select only the "eruptions" column
head(select(df, df$eruptions))
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))
群組和匯總
SparkDataFrame 支援一些常用的函式,以在分組之後匯總數據。 例如,您可以計算每個等候時間出現在忠實數據集中的次數。
head(count(groupBy(df, df$waiting)))
# You can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- count(groupBy(df, df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
資料行作業
SparkR 提供數個函式,可直接套用至數據行以進行數據處理和匯總。 下列範例顯示基本算術函數的使用。
# Convert waiting time from hours to seconds.
# You can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
機器學習
SparkR 會公開大部分的 MLLib 演算法。 在幕後,SparkR 會使用 MLlib 來定型模型。
下列範例示範如何使用 SparkR 建置高斯 GLM 模型。 若要執行線性回歸,請將系列設定為 "gaussian"
。 若要執行羅吉斯回歸,請將系列設定為 "binomial"
。 使用SparkML GLM SparkR時,會自動執行類別特徵的單熱編碼,因此不需要手動完成。
除了 String 和 Double 類型功能之外,也可以配合 MLlib 向量功能,以與其他 MLlib 元件相容。
# Create the DataFrame
df <- createDataFrame(iris)
# Fit a linear model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")
# Model coefficients are returned in a similar format to R's native glm().
summary(model)
如需教學課程,請參閱 教學課程:使用 glm 分析數據。
如需其他範例,請參閱 在 R 中使用 DataFrame 和數據表。