Spark를 사용하여 데이터 파일 작업

완료됨

Notebook을 설정하고 클러스터에 연결한 후 Spark를 사용하여 데이터 파일을 읽고 처리할 수 있습니다. Spark는 CSV, JSON, Parquet, ORC, Avro 및 Delta와 같은 다양한 형식을 지원하며 Databricks는 작업 영역, Azure Data Lake 또는 Blob Storage 또는 다른 외부 시스템에 저장된 파일에 액세스하는 기본 제공 커넥터를 제공합니다.

워크플로는 일반적으로 다음 세 단계를 수행합니다.

  1. 올바른 형식과 경로로 spark.read를 사용하여 Spark DataFrame으로 파일을 습니다. CSV 또는 JSON과 같은 원시 텍스트 형식을 읽을 때 Spark는 스키마(열 이름 및 데이터 형식)를 유추할 수 있지만 때로는 느리거나 불안정할 수 있습니다. 프로덕션 환경에서 더 나은 방법은 데이터가 일관되고 효율적으로 로드되도록 스키마를 명시적으로 정의하는 것입니다.

  2. SQL 또는 DataFrame 작업(예: 행 필터링, 열 선택, 값 집계)을 사용하여 DataFrame을 탐색하고 변환합니다.

  3. 선택한 형식으로 결과를 스토리지에 다시 씁니다.

Spark의 파일 작업은 크고 작은 데이터 세트 간에 일관성을 유지하도록 설계되었습니다. Spark가 클러스터 전체에 작업을 분산하므로 작은 CSV 파일을 테스트하는 데 사용되는 동일한 코드도 훨씬 더 큰 데이터 세트에 작동합니다. 이렇게 하면 빠른 탐색에서 더 복잡한 데이터 처리로 쉽게 확장할 수 있습니다.

데이터 프레임에 데이터 로드

가상의 예제를 통해 데이터 프레임을 사용하여 데이터 작업을 수행할 수 있는 방법을 알아보겠습니다. DBFS(Databricks File System) 스토리지의 데이터 폴더에 products.csv 이름이 지정된 쉼표로 구분된 텍스트 파일에 다음 데이터가 있다고 가정합니다.

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Spark Notebook에서 다음 PySpark 코드를 사용하여 데이터를 데이터 프레임에 로드하고 처음 10개 행을 표시할 수 있습니다.

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

처음 부분의 %pyspark 줄을 매직이라고 하며 이 셀에 사용되는 언어가 PySpark임을 Spark에 알립니다. 제품 데이터 예제에 해당하는 Scala 코드는 다음과 같습니다.

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

매직 %spark는 Scala를 지정하는 데 사용됩니다.

Notebook 인터페이스의 각 셀에 사용할 언어를 선택할 수도 있습니다.

이전에 표시된 두 예제 모두 다음과 같은 출력을 생성합니다.

ProductID ProductName 범주 정가표
771 마운틴 - 100 실버, 38 산악용 자전거 3399.9900
772 마운틴-100 실버, 42 산악용 자전거 3399.9900
773 마운틴 - 100 실버, 44 산악용 자전거 3399.9900
... ... ... ...

데이터 프레임 스키마 지정

이전 예제에서 CSV 파일의 첫 번째 행에는 열 이름이 포함되어 있으며 Spark는 포함된 데이터에서 각 열의 데이터 형식을 유추할 수 있었습니다. 데이터에 대한 명시적 스키마를 지정할 수도 있습니다. 이 스키마는 다음 CSV 예제와 같이 열 이름이 데이터 파일에 포함되지 않은 경우에 유용합니다.

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

다음 PySpark 예제에서는 이 형식의 product-data.csv 파일에서 로드할 데이터 프레임에 대한 스키마를 지정하는 방법을 보여 줍니다.

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

이번에도 결과는 다음과 유사합니다.

ProductID ProductName 범주 정가표
771 마운틴 - 100 실버, 38 산악용 자전거 3399.9900
772 마운틴-100 실버, 42 산악용 자전거 3399.9900
773 마운틴 - 100 실버, 44 산악용 자전거 3399.9900
... ... ... ...

데이터 프레임 필터링 및 그룹화

Dataframe 클래스의 메서드를 사용하여 포함된 데이터를 필터링, 정렬, 그룹화 또는 조작할 수 있습니다. 예를 들어 다음 코드 예제에서는 이 메서드를 사용하여 select 이전 예제의 제품 데이터가 포함된 df 데이터 프레임에서 ProductNameListPrice 열을 검색합니다.

pricelist_df = df.select("ProductID", "ListPrice")

이 코드 예제의 결과는 다음과 같습니다.

ProductID 정가표
771 3399.9900
772 3399.9900
773 3399.9900
... ...

대부분의 데이터 조작 메서드 select 와 공통적으로 새 데이터 프레임 개체를 반환합니다.

데이터 프레임에서 열의 하위 집합을 선택하는 것은 일반적인 작업이며 다음과 같은 짧은 구문을 사용하여 수행할 수도 있습니다.

pricelist_df = df["ProductID", "ListPrice"]

메서드를 함께 “연결”하여 변환된 데이터 프레임을 생성하는 일련의 조작을 수행할 수 있습니다. 예를 들어 이 예제 코드는 다음 코드와 select 메서드를 연결 where 하여 Mountain Bikes 또는 Road Bikes 범주가 있는 제품의 ProductNameListPrice 열을 포함하는 새 데이터 프레임을 만듭니다.

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

이 코드 예제의 결과는 다음과 같습니다.

ProductName 정가표
마운틴 - 100 실버, 38 3399.9900
Road-750 블랙, 52 539.9900
... ...

데이터를 그룹화하고 집계하려면 메서드 및 집계 함수를 groupby 사용할 수 있습니다. 예를 들어 다음 PySpark 코드는 각 카테고리의 제품 수를 계산합니다.

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

이 코드 예제의 결과는 다음과 같습니다.

범주 개수
헤드세트 3
바퀴 14
산악용 자전거 32
... ...

참고

Spark DataFrame은 선언적이고 변경할 수 없습니다. 각 변환(예: select또는filtergroupBy)은 실행 방법이 아니라 원하는 것을 나타내는 새 DataFrame을 만듭니다. 이렇게 하면 코드를 재사용 가능하고 최적화 가능하며 부작용이 없습니다. 그러나 이러한 변환은 실제로 작업을 트리거할 때까지(예: display, , collectwrite) 실행되지 않으며, 이때 Spark는 전체 최적화된 계획을 실행합니다.

Spark에서 SQL 식 사용

Dataframe API는 데이터 분석가가 SQL 식을 사용하여 데이터를 쿼리하고 조작할 수 있도록 하는 Spark SQL 라이브러리의 일부입니다.

Spark 카탈로그에서 데이터베이스 개체 만들기

Spark 카탈로그는 뷰 및 테이블과 같은 관계형 데이터 개체에 대한 메타스토어입니다. Spark 런타임은 카탈로그를 사용하여 Spark 지원 언어로 작성된 코드를 일부 데이터 분석가 또는 개발자에게 더 자연스러운 SQL 식과 원활하게 통합할 수 있습니다.

Spark 카탈로그에서 쿼리에 사용할 수 있도록 데이터 프레임의 데이터를 만드는 가장 간단한 방법 중 하나는 다음 코드 예제와 같이 임시 보기를 만드는 것입니다.

df.createOrReplaceTempView("products")

보기는 일시적이므로 현재 세션이 끝날 때 자동으로 삭제됩니다. 카탈로그에 유지되는 테이블을 만들어 Spark SQL을 사용하여 쿼리할 수 있는 데이터베이스를 정의할 수도 있습니다.

참고

이 모듈에서는 Spark 카탈로그 테이블을 자세히 살펴보지는 않지만 몇 가지 주요 사항을 살펴볼 필요가 있습니다.

  • spark.catalog.createTable 메서드를 사용하여 빈 테이블을 만들 수 있습니다. 테이블은 카탈로그와 연결된 스토리지 위치에 기본 데이터를 저장하는 메타데이터 구조입니다. 테이블을 삭제하면 기본 데이터도 삭제됩니다.
  • saveAsTable 메서드를 사용하여 데이터 프레임을 테이블로 저장할 수 있습니다.
  • 메서드를 사용하여 외부 테이블을 만들 수 있습니다. 외부 테이블은 카탈로그에서 메타데이터를 정의하지만 외부 스토리지 위치(보통 데이터 레이크의 폴더)에서 기본 데이터를 가져옵니다. 외부 테이블을 삭제해도 기본 데이터는 삭제되지 않습니다.

Spark SQL API를 사용하여 데이터 쿼리

모든 언어로 작성된 코드에서 Spark SQL API를 사용하여 카탈로그의 데이터를 쿼리할 수 있습니다. 예를 들어 다음 PySpark 코드는 SQL 쿼리를 사용하여 제품 보기에서 데이터를 데이터 프레임으로 반환 합니다 .

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

코드 예제의 결과는 다음 표와 유사합니다.

ProductName 정가표
마운틴 - 100 실버, 38 3399.9900
Road-750 블랙, 52 539.9900
... ...

SQL 코드 사용

이전 예제에서는 Spark SQL API를 사용하여 Spark 코드에 SQL 식을 포함하는 방법을 보여 줍니다. Notebook에서는 다음과 같이 %sql 매직을 사용하여 카탈로그의 개체를 쿼리하는 SQL 코드를 실행할 수도 있습니다.

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

SQL 코드 예제에서는 아래와 같이 Notebook에 테이블로 자동으로 표시되는 결과 집합을 반환합니다.

범주 제품수량
빕 쇼츠 3
자전거 랙 1
자전거 스탠드 1
... ...