SparkR 사용

SparkR은 R에서 Apache Spark를 사용하는 경량 프런트 엔드를 제공하는 R 패키지입니다. SparkR은 선택, 필터링, 집계 등의 작업을 지원하는 분산 데이터 프레임 구현을 제공하며 MLlib를 사용하는 분산 기계 학습도 지원합니다.

Spark 일괄 처리 작업 정의 또는 대화형 Microsoft Fabric Notebook을 통해 SparkR을 사용합니다.

R 지원은 Spark3.1 이상에서만 사용할 수 있습니다. Spark 2.4의 R은 지원되지 않습니다.

필수 조건

  • Microsoft Fabric 구독가져옵니다. 또는 무료 Microsoft Fabric 평가판에 등록합니다.

  • Microsoft Fabric에 로그인합니다.

  • 홈페이지 왼쪽의 환경 전환기를 사용하여 Synapse 데이터 과학 환경으로 전환합니다.

    Screenshot of the experience switcher menu, showing where to select Data Science.

  • 전자 필기장을 열거나 만듭니다. 방법을 알아보려면 Microsoft Fabric Notebook을 사용하는 방법을 참조 하세요.

  • 언어 옵션을 SparkR(R) 로 설정하여 기본 언어를 변경합니다.

  • 레이크하우스에 전자 필기장을 첨부합니다. 왼쪽에서 추가를 선택하여 기존 레이크하우스를 추가하거나 레이크하우스를 만듭니다.

SparkR DataFrame 읽기 및 쓰기

로컬 R data.frame에서 SparkR DataFrame 읽기

DataFrame을 만드는 가장 간단한 방법은 로컬 R data.frame을 Spark DataFrame으로 변환하는 것입니다.

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

Lakehouse에서 SparkR DataFrame 읽기 및 쓰기

데이터는 클러스터 노드의 로컬 파일 시스템에 저장할 수 있습니다. Lakehouse에서 SparkR DataFrame을 읽고 쓰는 일반적인 메서드는 다음과 write.df입니다read.df. 이러한 메서드는 파일을 로드할 경로와 데이터 원본의 형식을 사용합니다. SparkR은 기본적으로 CSV, JSON, 텍스트 및 Parquet 파일 읽기를 지원합니다.

Lakehouse를 읽고 쓰려면 먼저 세션에 추가합니다. 전자 필기장 왼쪽에서 추가를 선택하여 기존 Lakehouse를 추가하거나 Lakehouse를 만듭니다.

참고 항목

Spark 패키지(예: read.df 또는write.df)를 사용하여 Lakehouse 파일에 액세스하려면 Spark에 대한 ADFS 경로 또는 상대 경로를 사용합니다. Lakehouse 탐색기에서 액세스하려는 파일 또는 폴더를 마우스 오른쪽 단추로 클릭하고 상황에 맞는 메뉴에서 Spark의 ADFS 경로 또는 상대 경로를 복사합니다.

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Microsoft Fabric이 tidyverse 사전 설치되었습니다. Lakehouse 파일을 읽고 readr::read_csv() 쓰는 등 익숙한 R 패키지에서 Lakehouse 파일에 액세스할 수 있습니다 readr::write_csv().

참고 항목

R 패키지를 사용하여 Lakehouse 파일에 액세스하려면 파일 API 경로를 사용해야 합니다. Lakehouse 탐색기에서 액세스하려는 파일 또는 폴더를 마우스 오른쪽 단추로 클릭하고 상황에 맞는 메뉴에서 해당 파일 API 경로를 복사합니다.

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

SparkSQL 쿼리를 사용하여 Lakehouse에서 SparkR 데이터 프레임을 읽을 수도 있습니다.

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

RODBC를 통해 SQL 테이블 읽기 및 쓰기

RODBC를 사용하여 ODBC 인터페이스를 통해 SQL 기반 데이터베이스에 연결합니다. 예를 들어 다음 예제 코드와 같이 Synapse 전용 SQL 풀에 연결할 수 있습니다. , 및 <table>.에 대한 <password><database><uid>고유한 연결 세부 정보를 대체합니다.

# load RODBC package
library(RODBC)


# config connection string

DriverVersion <- substr(system("apt list --installed *msodbc*", intern=TRUE, ignore.stderr=TRUE)[2],10,11)
ServerName <- "your-server-name"
DatabaseName <- "your-database-name"
Uid <- "your-user-id-list"
Password <- "your-password"

ConnectionString = sprintf("Driver={ODBC Driver %s for SQL Server};
Server=%s;
Database=%s;
Uid=%s;
Pwd=%s;
Encrypt=yes;
TrustServerCertificate=yes;
Connection Timeout=30;",DriverVersion,ServerName,DatabaseName,Uid,Password)
print(ConnectionString)


# connect to driver
channel <-odbcDriverConnect(ConnectionString)

# query from existing tables
Rdf <- sqlQuery(channel, "select * from <table>")
class(Rdf)

# use SparkR::as.DataFrame to convert R data.frame to SparkR DataFrame.
spark_df <- as.DataFrame(Rdf)
class(spark_df)
head(spark_df)

DataFrame 작업

SparkR DataFrames는 구조화된 데이터 처리를 수행하는 많은 함수를 지원합니다. 다음은 몇 가지 기본적인 예입니다. 전체 목록은 SparkR API 문서에서 찾을 수 있습니다.

행 및 열 선택

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

그룹화 및 집계

SparkR 데이터 프레임은 그룹화 후 데이터를 집계하는 데 일반적으로 사용되는 많은 함수를 지원합니다. 예를 들어 아래와 같이 충실한 데이터 세트에서 대기 시간의 히스토그램을 계산할 수 있습니다.

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

열 작업

SparkR은 데이터 처리 및 집계를 위해 열에 직접 적용할 수 있는 많은 함수를 제공합니다. 다음 예제에서는 기본적인 산술 함수의 사용법을 보여 줍니다.

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

사용자 정의 함수 적용

SparkR은 다음과 같은 여러 종류의 사용자 정의 함수를 지원합니다.

대규모 데이터 세트에서 dapply 함수를 실행하거나 dapplyCollect

dapply

의 각 파티션에 함수를 적용합니다 SparkDataFrame. 각 파티션에 적용할 함수에는 data.frame이 각 파티션 SparkDataFrame 에 해당하는 매개 변수가 하나만 있어야 합니다. 함수의 출력은 .이어야 data.frame합니다. 스키마는 결과의 행 형식을 지정합니다 SparkDataFrame. 반환된 값의 데이터 형식일치해야 합니다.

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

dapply와 마찬가지로 함수를 각 파티션에 SparkDataFrame 적용하고 결과를 다시 수집합니다. 함수의 출력은 .이어야 data.frame합니다. 하지만 이번에는 스키마를 전달할 필요가 없습니다. dapplyCollect 모든 파티션에서 실행되는 함수의 출력을 드라이버로 끌어와 드라이버 메모리에 맞출 수 없는 경우 실패할 수 있습니다.

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

입력 열 gapply 에 따라 큰 데이터 세트 그룹화에서 함수 실행 gapplyCollect

gapply

의 각 그룹에 함수를 적용합니다 SparkDataFrame. 함수는 각 그룹에 SparkDataFrame 적용되어야 하며, 해당 키에 해당하는 그룹화 키와 R data.frame 이라는 두 개의 매개 변수만 있어야 합니다. 그룹은 열에서 SparkDataFrames 선택됩니다. 함수의 출력은 .이어야 data.frame합니다. 스키마는 결과 행 형식을 지정합니다 SparkDataFrame. Spark 데이터 형식에서 R 함수의 출력 스키마를 나타내야 합니다. 반환 data.frame 된 열 이름은 사용자가 설정합니다.

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

다음과 같이 gapply함수를 각 a 그룹에 SparkDataFrame 적용하고 결과를 R data.frame에 다시 수집합니다. 함수의 출력은 .이어야 data.frame합니다. 그러나 스키마를 전달할 필요는 없습니다. gapplyCollect 모든 파티션에서 실행되는 함수의 출력을 드라이버로 끌어와 드라이버 메모리에 맞출 수 없는 경우 실패할 수 있습니다.

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

spark.lapply를 사용하여 배포된 로컬 R 함수 실행

spark.lapply

lapply 네이티브 R과 마찬가지로 요소 spark.lapply 목록에서 함수를 실행하고 Spark를 사용하여 계산을 분산합니다. 목록의 요소와 비슷하거나 lapply 유사한 doParallel 방식으로 함수를 적용합니다. 모든 계산의 결과는 단일 컴퓨터에 적합해야 합니다. 그렇지 않은 경우 다음과 같은 df <- createDataFrame(list) 작업을 수행하고 다음을 사용할 dapply수 있습니다.

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

SparkR에서 SQL 쿼리 실행

SparkR DataFrame은 데이터를 통해 SQL 쿼리를 실행할 수 있는 임시 보기로 등록할 수도 있습니다. sql 함수를 사용하면 애플리케이션에서 프로그래밍 방식으로 SQL 쿼리를 실행하고 결과를 SparkR DataFrame으로 반환할 수 있습니다.

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

기계 학습

SparkR은 대부분의 MLLib 알고리즘을 노출합니다. 내부적으로 SparkR은 MLlib를 사용하여 모델을 학습합니다.

다음 예제에서는 SparkR을 사용하여 Gaussian GLM 모델을 빌드하는 방법을 보여 줍니다. 선형 회귀를 실행하려면 패밀리를 "gaussian"로 설정합니다. 로지스틱 회귀를 실행하려면 패밀리를 "binomial"로 설정합니다. SparkML GLM SparkR을 사용하는 경우 수동으로 수행할 필요가 없도록 범주 기능의 원 핫 인코딩을 자동으로 수행합니다. 문자열 및 이중 형식 기능 외에도 다른 MLlib 구성 요소와의 호환성을 위해 MLlib 벡터 기능에 적합할 수도 있습니다.

지원되는 기계 학습 알고리즘에 대한 자세한 내용은 SparkR 및 MLlib 설명서를 참조하세요.

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)