다음을 통해 공유


SparkR 사용

SparkR은 R에서 Apache Spark를 사용하는 경량 프런트 엔드를 제공하는 R 패키지입니다. SparkR은 선택, 필터링, 집계 등의 작업을 지원하는 분산 데이터 프레임 구현을 제공하며 MLlib를 사용하는 분산 기계 학습도 지원합니다.

Spark 일괄 처리 작업 정의를 통해 또는 대화형 Microsoft Fabric Notebook으로 SparkR을 사용할 수 있습니다.

R 지원은 Spark3.1 이상에서만 사용할 수 있습니다. Spark 2.4의 R은 지원되지 않습니다.

필수 조건

  • Microsoft Fabric 구독을 구매합니다. 또는 무료 Microsoft Fabric 평가판에 등록합니다.

  • Microsoft Fabric에 로그인합니다.

  • 홈페이지 왼쪽의 환경 전환기를 사용하여 Synapse 데이터 과학 환경으로 전환합니다.

    데이터 과학을 선택할 위치를 보여 주는 환경 전환기 메뉴의 스크린샷.

  • Notebook을 열거나 만듭니다. 방법을 알아보려면 Microsoft Fabric Notebook을 사용하는 방법을 참조하세요.

  • 언어 옵션을 SparkR(R) 로 설정하여 기본 언어를 변경합니다.

  • 레이크하우스에 Notebook을 첨부합니다. 왼쪽에서 추가를 선택하여 기존 레이크하우스를 추가하거나 레이크하우스를 만듭니다.

SparkR DataFrame 읽기 및 쓰기

로컬 R data.frame에서 SparkR DataFrame 읽기

데이터 프레임을 만드는 가장 간단한 방법은 로컬 R data.frame을 Spark DataFrame으로 변환하는 것입니다.

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

레이크하우스에서 SparkR DataFrame 읽기 및 쓰기

데이터는 클러스터 노드의 로컬 파일 시스템에 저장될 수 있습니다. 레이크하우스에서 SparkR DataFrame을 읽고 쓰는 일반적인 메서드는 read.dfwrite.df입니다. 이러한 메서드는 로드할 파일의 경로와 데이터 원본의 형식을 사용합니다. SparkR은 기본적으로 CSV, JSON, 텍스트 및 Parquet 파일 읽기를 지원합니다.

레이크하우스를 읽고 쓰려면 먼저 세션에 추가해야 합니다. Notebook 왼쪽에서 추가를 선택하여 기존 레이크하우스를 추가하거나 레이크하우스를 만듭니다.

참고 항목

Spark 패키지(예: read.df 또는write.df)를 사용하여 레이크하우스 파일에 액세스하려면 ADFS 경로 또는 Spark의 상대 경로를 사용할 수 있습니다. 레이크하우스 탐색기에서 액세스하려는 파일 또는 폴더를 마우스 오른쪽 버튼으로 클릭하고 상황별 메뉴에서 ADFS 경로 또는 Spark의 상대 경로를 복사합니다.

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Microsoft Fabric에는 tidyverse가 사전 설치되어 있습니다. readr::read_csv()readr::write_csv()를 사용하여 레이크하우스 파일을 읽고 쓰는 등 익숙한 R 패키지에서 레이크하우스 파일에 액세스할 수 있습니다.

참고 항목

R 패키지를 사용하여 레이크하우스 파일에 액세스하려면 파일 API 경로를 사용해야 합니다. 레이크하우스 탐색기에서 액세스하려는 파일 또는 폴더를 마우스 오른쪽 버튼으로 클릭하고 상황별 메뉴에서 해당 파일 API 경로를 복사합니다.

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

SparkSQL 쿼리를 사용하여 레이크하우스에서 SparkR 데이터 프레임을 읽을 수도 있습니다.

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

DataFrame 작업

Spark DataFrame은 구조화된 데이터 처리를 수행하는 다양한 함수를 지원합니다. 다음은 몇 가지 기본적인 예입니다. 전체 목록은 SparkR API 문서에서 확인할 수 있습니다.

행 및 열 선택

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

그룹화 및 집계

SparkR DataFrame은 그룹화 후에 데이터를 집계하는 데 널리 사용되는 함수를 지원합니다. 예를 들어 아래와 같이 충실한 데이터 세트의 대기 시간에 대한 히스토그램을 계산할 수 있습니다.

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

열 작업

SparkR은 데이터 처리 및 집계를 위해 열에 직접 적용할 수 있는 다양한 함수를 제공합니다. 다음 예시에서는 기본 산술 함수의 사용을 보여 줍니다.

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

사용자 정의 함수 적용

SparkR은 여러 종류의 사용자 정의 함수를 지원합니다.

dapply 또는 dapplyCollect를 통해 대규모 데이터 세트에서 함수 실행

dapply

SparkDataFrame의 각 파티션에 함수를 적용합니다. SparkDataFrame의 각 파티션에 적용할 함수는 각 파티션에 해당하는 data.frame이 전달되는 매개 변수가 하나만 있어야 합니다. 함수의 출력은 data.frame이어야 합니다. 스키마는 결과 SparkDataFrame의 행 형식을 지정합니다. 반환된 값의 데이터 형식과 일치해야 합니다.

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

dapply와 마찬가지로 SparkDataFrame의 각 파티션에 함수를 적용하고 결과를 다시 수집합니다. 함수의 출력은 data.frame이어야 합니다. 그러나 이번에는 스키마를 전달할 필요가 없습니다. 모든 파티션에서 실행되는 함수의 출력을 드라이버로 끌어올 수 없고 드라이버 메모리에 맞지 않는 경우 dapplyCollect가 실패할 수 있습니다.

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

gapply 또는 gapplyCollect의 입력 열을 기준으로 그룹화된 대규모 데이터 세트에 대해 함수 실행

gapply

SparkDataFrame의 각 그룹에 함수를 적용합니다. 함수는 SparkDataFrame의 각 그룹에 적용되며, 해당 키에 해당하는 그룹화 키와 해당 키에 해당하는 R data.frame이라는 두 개의 매개변수만 가져야 합니다. 그룹은 SparkDataFrames 열에서 선택됩니다. 함수의 출력은 data.frame이어야 합니다. 스키마는 결과 SparkDataFrame의 행 형식을 지정합니다. Spark 데이터 형식에서 R 함수의 출력 스키마를 나타내야 합니다. 반환된 data.frame의 열 이름은 사용자가 설정합니다.

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

gapply와 마찬가지로 SparkDataFrame의 각 파티션에 함수를 적용하고 결과를 다시 R data.frame으로 수집합니다. 함수의 출력은 data.frame이어야 합니다. 그러나 스키마를 전달할 필요는 없습니다. 모든 파티션에서 실행되는 함수의 출력을 드라이버로 끌어올 수 없고 드라이버 메모리에 맞지 않는 경우 gapplyCollect가 실패할 수 있습니다.

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

spark.lapply를 통해 배포된 로컬 R 함수 실행

spark.lapply

네이티브 R의 lapply과 마찬가지로, spark.lapply는 요소 목록에 대해 함수를 실행하고 Spark를 사용하여 계산을 분산합니다. 목록의 요소에 doParallel 또는 lapply와 유사한 방식으로 함수를 적용합니다. 모든 계산의 결과는 단일 컴퓨터에 적합해야 합니다. 그렇지 않은 경우 df <- createDataFrame(list)과 같은 작업을 수행한 다음 dapply를 사용할 수 있습니다.

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

SparkR에서 SQL 쿼리 실행

SparkR DataFrame은 해당 데이터에 대해 SQL 쿼리를 실행할 수 있는 임시 보기로 등록할 수도 있습니다. sql 함수를 사용하면 애플리케이션에서 프로그래밍 방식으로 SQL 쿼리를 실행하고 결과를 SparkR DataFrame으로 반환할 수 있습니다.

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

기계 학습

SparkR은 대부분의 MLLib 알고리즘을 노출합니다. 내부적으로 SparkR은 MLlib를 사용하여 모델을 학습합니다.

다음 예제에서는 SparkR을 사용하여 가우스 GLM 모델을 빌드하는 방법을 보여 줍니다. 선형 회귀를 실행하려면 패밀리를 "gaussian"로 설정합니다. 로지스틱 회귀를 실행하려면 패밀리를 "binomial"로 설정합니다. SparkML GLM SparkR을 사용하는 경우 수동으로 수행할 필요가 없도록 범주 기능의 원 핫 인코딩을 자동으로 수행합니다. 문자열 및 이중 형식 기능 외에도 다른 MLlib 구성 요소와의 호환성을 위해 MLlib 벡터 기능에 맞출 수도 있습니다.

지원되는 기계 학습 알고리즘에 대한 자세한 내용은 SparkR 및 MLlib 설명서를 참조하세요.

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)