Delta Lake 테이블 만들기

완료됨

Delta Lake는 데이터 레이크의 파일에 대한 관계형 스토리지 추상화를 제공하는 테이블을 기반으로 합니다.

데이터 프레임에서 Delta Lake 테이블 만들기

Delta Lake 테이블을 만드는 가장 쉬운 방법 중 하나는 데이터 프레임을 델타 형식으로 저장하고 테이블에 대한 데이터 파일 및 관련 메타데이터 정보를 저장할 경로를 지정하는 것입니다.

예를 들어 다음 PySpark 코드는 기존 파일의 데이터가 포함된 데이터 프레임을 로드한 다음 해당 데이터 프레임을 델타 형식으로 새 폴더 위치에 저장합니다.

# Load a file into a dataframe
df = spark.read.load('/data/mydata.csv', format='csv', header=True)

# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)

델타 테이블을 저장한 후, 지정한 경로 위치에는 데이터에 대한 parquet 파일(데이터 프레임에 로드한 원본 파일의 형식에 관계없음)과 테이블의 트랜잭션 로그가 포함된 _delta_log 폴더가 포함됩니다.

참고

트랜잭션 로그는 테이블에 대한 모든 데이터 수정 내용을 기록합니다. 각 수정 내용을 기록하면 트랜잭션 일관성을 적용할 수 있고 테이블의 버전 관리 정보를 유지할 수 있습니다.

다음과 같이 덮어쓰기 모드를 사용하여 기존 Delta Lake 테이블을 데이터 프레임의 내용으로 바꿀 수 있습니다.

new_df.write.format("delta").mode("overwrite").save(delta_table_path)

추가 모드를 사용하여 데이터 프레임의 행을 기존 테이블에 추가할 수도 있습니다.

new_rows_df.write.format("delta").mode("append").save(delta_table_path)

조건부 업데이트 만들기

데이터 프레임에서 데이터를 수정한 다음 덮어쓰기를 통해 Delta Lake 테이블을 바꿀 수 있지만 데이터베이스에서 보다 일반적인 패턴은 기존 테이블의 행을 개별 트랜잭션 작업으로 삽입, 업데이트 또는 삭제하는 것입니다. 이와 같이 Delta Lake 테이블을 수정하려면 업데이트, 삭제 및 병합 작업을 지원하는 Delta Lake API에서 DeltaTable 개체를 사용할 수 있습니다. 예를 들어 다음 코드를 사용하여 category 열 값이 "Accessories"인 모든 행의 price 열을 업데이트할 수 있습니다.

from delta.tables import *
from pyspark.sql.functions import *

# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

데이터 수정 내용은 트랜잭션 로그에 기록되고 필요에 따라 테이블 폴더에 새 parquet 파일이 만들어집니다.

Data Lake API 사용에 대한 자세한 내용은 Delta Lake API 설명서를 참조하세요.

테이블의 이전 버전 쿼리

Delta Lake 테이블은 트랜잭션 로그를 통한 버전 관리가 지원됩니다. 트랜잭션 로그는 각 트랜잭션에 대한 타임스탬프 및 버전 번호를 기록하여 테이블 수정 내용을 기록합니다. 이 로그된 버전 데이터를 사용하여 이전 버전의 테이블을 볼 수 있습니다(이 기능을 시간 이동이라고 함).

델타 테이블 위치의 데이터를 데이터 프레임으로 읽고 필요한 버전을 versionAsOf 옵션으로 지정하여 Delta Lake 테이블의 특정 버전에서 데이터를 검색할 수 있습니다.

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)

또는 timestampAsOf 옵션을 사용하여 타임스탬프를 지정할 수 있습니다.

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path)