자습서: Apache Spark DataFrames를 사용하여 데이터 로드 및 변환

이 자습서에서는 Azure Databricks에서 Apache Spark Python(PySpark) DataFrame API, Apache Spark Scala DataFrame API 및 SparkR SparkDataFrame API를 사용하여 데이터를 로드하고 변환하는 방법을 보여 줍니다.

이 자습서를 마치면 DataFrame이 무엇인지 이해하고 다음 작업에 익숙해집니다.

Python

Apache Spark PySpark API 참조도 참조하세요.

Scala

Apache Spark Scala API 참조도 확인하세요.

R

Apache SparkR API 참조도 참조하세요.

DataFrame이란?

DataFrame은 잠재적으로 서로 다른 형식의 열이 있는 2차원 레이블이 지정된 데이터 구조입니다. DataFrame은 스프레드시트, SQL 테이블 또는 계열 개체 사전이라고 생각하면 됩니다. Apache Spark DataFrames는 일반적인 데이터 분석 문제를 효율적으로 해결할 수 있는 다양한 함수 집합(열 선택, 필터, 조인, 집계)를 제공합니다.

Apache Spark DataFrames는 RDD(Resilient Distributed Datasets) 위에 빌드된 추상화입니다. Spark DataFrames 및 Spark SQL은 통합 계획 및 최적화 엔진을 사용하므로 Azure Databricks에서 지원되는 모든 언어(Python, SQL, Scala 및 R)에서 거의 동일한 성능을 가져올 수 있습니다.

요구 사항

다음 자습서를 완료하려면 다음 요구 사항을 충족해야 합니다.

  • 이 자습서의 예제를 사용하려면 작업 영역에 Unity 카탈로그활성화되어 있어야 합니다.

  • 이 자습서의 예제에서는 Unity 카탈로그 볼륨 을 사용하여 샘플 데이터를 저장합니다. 이러한 예제를 사용하려면 볼륨을 만들고 해당 볼륨의 카탈로그, 스키마 및 볼륨 이름을 사용하여 예제에서 사용하는 볼륨 경로를 설정합니다.

  • Unity 카탈로그에는 다음 권한이 있어야 합니다.

    • READ VOLUMEWRITE VOLUME, 또는 ALL PRIVILEGES 이 자습서에 사용되는 볼륨의 경우
    • USE SCHEMA 또는 ALL PRIVILEGES 이 자습서에 사용되는 스키마에 사용됩니다.
    • USE CATALOG 또는 ALL PRIVILEGES 이 자습서에 사용되는 카탈로그의 경우

    이러한 권한을 설정하려면 Databricks 관리자 또는 Unity 카탈로그 권한 및 보안 개체를 참조하세요.

1단계: 변수 정의 및 CSV 파일 로드

이 단계에서는 이 자습서에서 사용할 변수를 정의한 다음, health.data.ny.gov 아기 이름 데이터가 포함된 CSV 파일을 Unity 카탈로그 볼륨으로 로드합니다.

  1. 아이콘을 클릭하여 새 전자 필기장을 새 아이콘 엽니다. Azure Databricks Notebook을 탐색하는 방법을 알아보려면 Databricks Notebook 인터페이스 및 컨트롤을 참조 하세요.

  2. 다음 코드를 복사하여 새 빈 Notebook 셀에 붙여넣습니다. <schema-name><volume-name> Unity 카탈로그 볼륨의 카탈로그, 스키마 및 볼륨 이름으로 바꿉<catalog-name>니다. <table_name> 선택한 테이블 이름으로 대체합니다. 이 자습서의 뒷부분에서 이 표에 아기 이름 데이터를 로드합니다.

  3. 셀을 실행하고 새 빈 셀을 만들려면 누릅니 Shift+Enter 다.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_tables = catalog + "." + schema
    print(path_tables) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val file_name = "rows.csv"
    val table_name = "<table_name>"
    val path_volume = s"/Volumes/$catalog/$schema/$volume"
    val path_tables = s"$catalog.$schema.$table_name"
    print(path_volume) // Show the complete path
    print(path_tables) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_tables <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_tables) # Show the complete path
    
  4. 다음 코드를 복사하여 새 빈 Notebook 셀에 붙여넣습니다. 이 코드는 Databricks rows.csv dbutuils 명령을 사용하여 health.data.ny.gov 파일을 Unity 카탈로그 볼륨으로 복사합니다.

  5. 키를 눌러 Shift+Enter 셀을 실행한 다음 다음 셀로 이동합니다.

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
    

    Scala

    dbutils.fs.cp(download_url, s"$path_volume/$file_name")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    

2단계: DataFrame 만들기

이 단계에서는 테스트 데이터를 사용하여 명명된 df1 DataFrame을 만든 다음 해당 내용을 표시합니다.

  1. 다음 코드를 복사하여 새 빈 Notebook 셀에 붙여넣습니다. 이 코드는 테스트 데이터를 사용하여 데이터 프레임을 만든 다음 DataFrame의 내용과 스키마를 표시합니다.

  2. 키를 눌러 Shift+Enter 셀을 실행한 다음 다음 셀로 이동합니다.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = c(2021),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = c(42)
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    

3단계: CSV 파일에서 DataFrame에 데이터 로드

이 단계에서는 이전에 Unity 카탈로그 볼륨에 로드한 CSV 파일에서 명명 df_csv 된 DataFrame을 만듭니다. spark.read.csv 참조하세요.

  1. 다음 코드를 복사하여 새 빈 Notebook 셀에 붙여넣습니다. 이 코드는 CSV 파일에서 DataFrame df_csv 에 아기 이름 데이터를 로드한 다음 DataFrame의 내용을 표시합니다.

  2. 키를 눌러 Shift+Enter 셀을 실행한 다음 다음 셀로 이동합니다.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
      header=True,
      inferSchema=True,
      sep=",")
    display(df_csv)
    

    Scala

    val df_csv = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("delimiter", ",")
      .csv(s"$path_volume/$file_name")
    
    display(df_csv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
      source="csv",
      header = TRUE,
      inferSchema = TRUE,
      delimiter = ",")
    
    display(df_csv)
    

지원되는 다양한 파일 형식에서 데이터를 로드할 수 있습니다.

4단계: DataFrame 보기 및 상호 작용

다음 방법을 사용하여 아기 이름 DataFrames를 보고 상호 작용합니다.

Apache Spark DataFrame의 스키마를 표시하는 방법을 알아봅니다. Apache Spark는 스키마라는 용어를 사용하여 DataFrame에 있는 열의 이름 및 데이터 형식을 참조합니다.

다음 코드를 복사하여 빈 Notebook 셀에 붙여넣습니다. 이 코드는 두 DataFrame의 스키마를 보는 메서드를 .printSchema() 사용하여 DataFrames의 스키마를 보여 줍니다. 두 DataFrame을 통합할 준비를 합니다.

Python

df_csv.printSchema()
df1.printSchema()

Scala

df_csv.printSchema()
df1.printSchema()

R

printSchema(df_csv)
printSchema(df1)

참고 항목

Azure Databricks는 {b>스키마

DataFrame에서 열 이름 바꾸기

DataFrame에서 열 이름을 바꾸는 방법을 알아봅니다.

다음 코드를 복사하여 빈 Notebook 셀에 붙여넣습니다. 이 코드는 DataFrame의 df1_csv 열 이름을 DataFrame의 해당 열과 df1 일치하도록 바꿉니다. 이 코드는 Apache Spark withColumnRenamed() 메서드를 사용합니다.

Python

df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema

Scala

val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()

R

df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)

DataFrame 결합

한 DataFrame의 행을 다른 데이터 프레임에 추가하는 새 DataFrame을 만드는 방법을 알아봅니다.

다음 코드를 복사하여 빈 Notebook 셀에 붙여넣습니다. 이 코드는 Apache Spark union() 메서드를 사용하여 첫 번째 DataFrame df 의 내용을 CSV 파일에서 로드된 아기 이름 데이터가 포함된 DataFrame df_csv 과 결합합니다.

Python

df = df1.union(df_csv)
display(df)

Scala

val df = df1.union(df_csv_renamed)
display(df)

R

display(df <- union(df1, df_csv))

DataFrame의 행 필터링

Apache Spark .filter() 또는 .where() 메서드를 사용하여 행을 필터링하여 데이터 집합에서 가장 인기 있는 아기 이름을 검색합니다. DataFrame에서 반환하거나 수정할 행의 하위 집합을 선택하려면 필터링을 사용합니다. 다음 예제와 같이 성능이나 구문에는 차이가 없습니다.

.filter() 메서드 사용

다음 코드를 복사하여 빈 Notebook 셀에 붙여넣습니다. 이 코드는 Apache Spark .filter() 메서드를 사용하여 데이터 프레임에 해당 행을 50개 이상 표시합니다.

Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))

.where() 메서드 사용

다음 코드를 복사하여 빈 Notebook 셀에 붙여넣습니다. 이 코드는 Apache Spark .where() 메서드를 사용하여 데이터 프레임에 해당 행을 50개 이상 표시합니다.

Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))

DataFrame에서 열 선택 및 빈도별 순서 지정

반환할 DataFrame의 열을 지정하는 메서드의 아기 이름 빈도 select() 에 대해 알아봅니다. Apache Spark orderbydesc 함수를 사용하여 결과를 정렬합니다.

Apache Spark용 pyspark.sql 모듈은 SQL 함수를 지원합니다. 이 자습서에서 사용하는 이러한 함수 중에는 Apache Spark orderBy()desc()expr() 함수가 있습니다. 필요에 따라 세션으로 가져와 이러한 함수를 사용하도록 설정합니다.

다음 코드를 복사하여 빈 Notebook 셀에 붙여넣습니다. 이 코드는 함수를 desc() 가져온 다음 Apache Spark select() 메서드와 Apache Spark orderBy()desc() 함수를 사용하여 가장 일반적인 이름과 개수를 내림차순으로 표시합니다.

Python

from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

Scala

import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

R

display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))

하위 집합 DataFrame 만들기

기존 DataFrame에서 하위 집합 DataFrame을 만드는 방법을 알아봅니다.

다음 코드를 복사하여 빈 Notebook 셀에 붙여넣습니다. 이 코드는 Apache Spark filter 메서드를 사용하여 연도, 개수 및 성별별로 데이터를 제한하는 새 DataFrame을 만듭니다. Apache Spark select() 메서드를 사용하여 열을 제한합니다. 또한 Apache Spark orderBy()desc() 함수를 사용하여 새 DataFrame을 개수별로 정렬합니다.

Python

subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)

Scala

val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))

display(subsetDF)

R

subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)

5단계: DataFrame 저장

DataFrame을 저장하는 방법을 알아봅니다. DataFrame을 테이블에 저장하거나 데이터 프레임을 파일 또는 여러 파일에 쓸 수 있습니다.

테이블에 DataFrame 저장

Azure Databricks는 기본적으로 모든 테이블에 Delta Lake 형식을 사용합니다. DataFrame을 저장하려면 카탈로그 및 스키마에 대한 테이블 권한이 있어야 합니다 CREATE .

다음 코드를 복사하여 빈 Notebook 셀에 붙여넣습니다. 이 코드는 이 자습서의 시작 부분에 정의한 변수를 사용하여 DataFrame의 내용을 테이블에 저장합니다.

Python

df.write.saveAsTable(f"{path_tables}" + "." + f"{table_name}")

# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(f"{path_tables}" + "." + f"{table_name}")

Scala

df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")

// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$path_volume" + "." + s"$table_name")

R

saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")

대부분의 Apache Spark 애플리케이션은 대규모 데이터 집합 및 분산 방식으로 작동합니다. Apache Spark는 단일 파일이 아닌 파일 디렉터리를 작성합니다. Delta Lake는 Parquet 폴더와 파일을 분할합니다. 많은 데이터 시스템에서 이러한 파일 디렉터리를 읽을 수 있습니다. Azure Databricks는 대부분의 애플리케이션에 대한 파일 경로에 테이블을 사용하는 것이 좋습니다.

JSON 파일에 DataFrame 저장

다음 코드를 복사하여 빈 Notebook 셀에 붙여넣습니다. 이 코드는 DataFrame을 JSON 파일의 디렉터리에 저장합니다.

Python

df.write.format("json").save("/tmp/json_data")

# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")

Scala

df.write.format("json").save("/tmp/json_data")

// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")

R

write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")

JSON 파일에서 DataFrame 읽기

Apache Spark spark.read.format() 메서드를 사용하여 디렉터리에서 DataFrame으로 JSON 데이터를 읽는 방법을 알아봅니다.

다음 코드를 복사하여 빈 Notebook 셀에 붙여넣습니다. 이 코드는 이전 예제에서 저장한 JSON 파일을 표시합니다.

Python

display(spark.read.format("json").json("/tmp/json_data"))

Scala

display(spark.read.format("json").json("/tmp/json_data"))

R

display(read.json("/tmp/json_data"))

추가 작업: PySpark, Scala 및 R에서 SQL 쿼리 실행

Apache Spark DataFrames는 SQL을 PySpark, Scala 및 R과 결합하는 다음 옵션을 제공합니다. 이 자습서에서 만든 것과 동일한 Notebook에서 다음 코드를 실행할 수 있습니다.

열을 SQL 쿼리로 지정

Apache Spark selectExpr() 메서드를 사용하는 방법을 알아봅니다. SQL 식을 수락하고 업데이트된 select() DataFrame을 반환하는 메서드의 변형입니다. 이 메서드를 사용하면 다음과 같은 upperSQL 식을 사용할 수 있습니다.

다음 코드를 복사하여 빈 Notebook 셀에 붙여넣습니다. 이 코드는 Apache Spark selectExpr() 메서드와 SQL upper 식을 사용하여 문자열 열을 대문자로 변환하고 열 이름을 바꿉니다.

Python

display(df.selectExpr("Count", "upper(County) as big_name"))

Scala

display(df.selectExpr("Count", "upper(County) as big_name"))

R

display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))

열에 SQL 구문을 사용하는 데 사용 expr()

Apache Spark expr() 함수를 가져오고 사용하여 열을 지정하는 모든 위치에서 SQL 구문을 사용하는 방법을 알아봅니다.

다음 코드를 복사하여 빈 Notebook 셀에 붙여넣습니다. 이 코드는 함수를 expr() 가져온 다음 Apache Spark expr() 함수와 SQL lower 식을 사용하여 문자열 열을 소문자(및 열 이름 바꾸기)로 변환합니다.

Python

from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))

Scala

import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function

display(df.select(col("Count"), expr("lower(County) as little_name")))

R

display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality

spark.sql() 함수를 사용하여 임의 SQL 쿼리 실행

Apache Spark spark.sql() 함수를 사용하여 임의의 SQL 쿼리를 실행하는 방법을 알아봅니다.

다음 코드를 복사하여 빈 Notebook 셀에 붙여넣습니다. 이 코드는 Apache Spark spark.sql() 함수를 사용하여 SQL 구문을 사용하여 SQL 테이블을 쿼리합니다.

Python

display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))

Scala

display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))

R

display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))

DataFrame 자습서 Notebook

다음 Notebook에는 이 자습서의 예제 쿼리가 포함되어 있습니다.

Python

DataFrames 자습서 Notebook

전자 필기장 가져오기

Scala

DataFrames 자습서 Notebook

전자 필기장 가져오기

R

DataFrames 자습서 Notebook

전자 필기장 가져오기

추가 리소스