Spark를 사용하여 데이터 파일 작업

완료됨

Spark를 사용하는 이점 중 하나는 다양한 프로그래밍 언어로 코드를 작성하고 실행하여 이미 가지고 있는 프로그래밍 기술을 사용하고 지정된 작업에 가장 적합한 언어를 사용할 수 있다는 것입니다. 새 Azure Databricks Spark Notebook의 기본 언어는 데이터 조작 및 시각화에 대한 강력한 지원으로 인해 데이터 과학자 및 분석가가 일반적으로 사용하는 Spark 최적화 버전의 Python인 PySpark입니다. 또한 Scala(대화형으로 사용할 수 있는 Java 파생 언어) 및 SQL(관계형 데이터 구조를 사용하기 위해 Spark SQL 라이브러리에 포함된 일반적으로 사용되는 SQL 언어의 변형)과 같은 언어를 사용할 수 있습니다. 소프트웨어 엔지니어는 Java와 같은 프레임워크를 사용하여 Spark에서 실행되는 컴파일된 솔루션을 만들 수도 있습니다.

데이터 프레임을 사용하여 데이터 검색

기본적으로 Spark는 RDD(복원력 있는 분산 데이터 세트)라는 데이터 구조를 사용합니다. 하지만 RDD에서 직접 작동하는 코드를 작성할 수 있지만 Spark에서 정형 데이터를 사용하기 위해 가장 일반적으로 사용되는 데이터 구조는 Spark SQL 라이브러리의 일부로 제공되는 데이터 프레임입니다. Spark의 데이터 프레임은 유비쿼터스 Pandas Python 라이브러리의 데이터 프레임과 비슷하지만 Spark의 분산 처리 환경에서 작동하도록 최적화되었습니다.

참고

Spark SQL은 데이터 프레임 API 외에도 Java 및 Scala에서 지원되는 강력한 형식의 데이터 세트 API를 제공합니다. 본 모듈에서는 Dataframe API에 중점을 둡니다.

데이터 프레임에 데이터 로드

가상의 예제를 통해 데이터 프레임을 사용하여 데이터 작업을 수행할 수 있는 방법을 알아보겠습니다. DBFS(Databricks File System) 스토리지의 data 폴더에 products.csv로 명명된 쉼표로 구분된 텍스트 파일에 다음 데이터가 있다고 가정합니다.

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Spark Notebook에서 다음 PySpark 코드를 사용하여 데이터를 데이터 프레임에 로드하고 처음 10개 행을 표시할 수 있습니다.

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

처음 부분의 %pyspark 줄을 매직이라고 하며 이 셀에 사용되는 언어가 PySpark임을 Spark에 알립니다. 제품 데이터 예제에 해당하는 Scala 코드는 다음과 같습니다.

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

매직 %spark는 Scala를 지정하는 데 사용됩니다.

Notebook 인터페이스의 각 셀에 사용할 언어를 선택할 수도 있습니다.

이전에 표시된 두 예제 모두 다음과 같은 출력을 생성합니다.

ProductID ProductName 범주 ListPrice
771 Mountain-100 Silver, 38 산악용 자전거 3399.9900
772 Mountain-100 Silver, 42 산악용 자전거 3399.9900
773 Mountain-100 Silver, 44 산악용 자전거 3399.9900
... ... ... ...

데이터 프레임 스키마 지정

이전 예제에서 CSV 파일의 첫 번째 행에는 열 이름이 포함되어 있으며 Spark는 포함된 데이터에서 각 열의 데이터 형식을 유추할 수 있었습니다. 데이터에 대한 명시적 스키마를 지정할 수도 있습니다. 이 스키마는 다음 CSV 예제와 같이 열 이름이 데이터 파일에 포함되지 않은 경우에 유용합니다.

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

다음 PySpark 예제에서는 이 형식의 product-data.csv 파일에서 로드할 데이터 프레임에 대한 스키마를 지정하는 방법을 보여 줍니다.

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

이번에도 결과는 다음과 유사합니다.

ProductID ProductName 범주 ListPrice
771 Mountain-100 Silver, 38 산악용 자전거 3399.9900
772 Mountain-100 Silver, 42 산악용 자전거 3399.9900
773 Mountain-100 Silver, 44 산악용 자전거 3399.9900
... ... ... ...

데이터 프레임 필터링 및 그룹화

Dataframe 클래스의 메서드를 사용하여 포함된 데이터를 필터링, 정렬, 그룹화 또는 조작할 수 있습니다. 예를 들어 다음 코드 예제에서는 select 메서드를 사용하여 이전 예제의 제품 데이터를 포함하는 df 데이터 프레임에서 ProductNameListPrice 열을 검색합니다.

pricelist_df = df.select("ProductID", "ListPrice")

이 코드 예제의 결과는 다음과 같습니다.

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

대부분의 데이터 조작 메서드와 공통적으로 select는 새 데이터 프레임 개체를 반환합니다.

데이터 프레임에서 열의 하위 집합을 선택하는 것은 일반적인 작업이며 다음과 같은 짧은 구문을 사용하여 수행할 수도 있습니다.

pricelist_df = df["ProductID", "ListPrice"]

메서드를 함께 “연결”하여 변환된 데이터 프레임을 생성하는 일련의 조작을 수행할 수 있습니다. 예를 들어 이 예제 코드는 selectwhere 메서드를 연결하여 Mountain Bikes 또는 Road Bikes 카테고리의 ProductNameListPrice 열을 포함하는 새 데이터 프레임을 만듭니다.

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

이 코드 예제의 결과는 다음과 같습니다.

ProductName ListPrice
Mountain-100 Silver, 38 3399.9900
Road-750 Black, 52 539.9900
... ...

데이터를 그룹화하고 집계하려면 groupBy 메서드 및 집계 함수를 사용할 수 있습니다. 예를 들어 다음 PySpark 코드는 각 카테고리의 제품 수를 계산합니다.

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

이 코드 예제의 결과는 다음과 같습니다.

범주 개수
헤드세트 3
바퀴 14
산악용 자전거 32
... ...

Spark에서 SQL 식 사용

Dataframe API는 데이터 분석가가 SQL 식을 사용하여 데이터를 쿼리하고 조작할 수 있도록 하는 Spark SQL 라이브러리의 일부입니다.

Spark 카탈로그에서 데이터베이스 개체 만들기

Spark 카탈로그는 뷰 및 테이블과 같은 관계형 데이터 개체에 대한 메타스토어입니다. Spark 런타임은 카탈로그를 사용하여 Spark 지원 언어로 작성된 코드를 일부 데이터 분석가 또는 개발자에게 더 자연스러운 SQL 식과 원활하게 통합할 수 있습니다.

Spark 카탈로그에서 쿼리에 사용할 수 있도록 데이터 프레임의 데이터를 만드는 가장 간단한 방법 중 하나는 다음 코드 예제와 같이 임시 보기를 만드는 것입니다.

df.createOrReplaceTempView("products")

보기는 일시적이므로 현재 세션이 끝날 때 자동으로 삭제됩니다. 카탈로그에 유지되는 테이블을 만들어 Spark SQL을 사용하여 쿼리할 수 있는 데이터베이스를 정의할 수도 있습니다.

참고

이 모듈에서는 Spark 카탈로그 테이블을 자세히 살펴보지는 않지만 몇 가지 주요 사항을 살펴볼 필요가 있습니다.

  • spark.catalog.createTable 메서드를 사용하여 빈 테이블을 만들 수 있습니다. 테이블은 카탈로그와 연결된 스토리지 위치에 기본 데이터를 저장하는 메타데이터 구조입니다. 테이블을 삭제하면 기본 데이터도 삭제됩니다.
  • saveAsTable 메서드를 사용하여 데이터 프레임을 테이블로 저장할 수 있습니다.
  • spark.catalog.createExternalTable메서드를 사용하여 외부 테이블을 만들 수 있습니다. 외부 테이블은 카탈로그에서 메타데이터를 정의하지만 외부 스토리지 위치(보통 데이터 레이크의 폴더)에서 기본 데이터를 가져옵니다. 외부 테이블을 삭제해도 기본 데이터는 삭제되지 않습니다.

Spark SQL API를 사용하여 데이터 쿼리

모든 언어로 작성된 코드에서 Spark SQL API를 사용하여 카탈로그의 데이터를 쿼리할 수 있습니다. 예를 들어 다음 PySpark 코드는 SQL 쿼리를 사용하여 제품 보기의 데이터를 데이터 프레임으로 반환합니다.

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

코드 예제의 결과는 다음 표와 유사합니다.

ProductName ListPrice
Mountain-100 Silver, 38 3399.9900
Road-750 Black, 52 539.9900
... ...

SQL 코드 사용

이전 예제에서는 Spark SQL API를 사용하여 Spark 코드에 SQL 식을 포함하는 방법을 보여 줍니다. Notebook에서는 다음과 같이 %sql 매직을 사용하여 카탈로그의 개체를 쿼리하는 SQL 코드를 실행할 수도 있습니다.

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

SQL 코드 예제에서는 아래와 같이 Notebook에 테이블로 자동으로 표시되는 결과 집합을 반환합니다.

범주 ProductCount
빕 쇼츠 3
자전거 랙 1
자전거 스탠드 1
... ...