Spark에서 델타 테이블 작업

완료됨

델타 테이블(또는 델타 형식 파일)을 사용하여 여러 가지 방법으로 데이터를 검색하고 수정할 수 있습니다.

Spark SQL 사용

Spark의 델타 테이블에서 데이터를 다루는 가장 일반적인 방법은 Spark SQL을 사용하는 것입니다. spark.sql 라이브러리를 사용하여 다른 언어(예: PySpark 또는 Scala)에 SQL 문을 포함할 수 있습니다. 예를 들어 다음 코드는 제품 테이블에 행을 삽입합니다.

spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")

또는 Notebook에서 %%sql 매직을 사용하여 SQL 문을 실행할 수 있습니다.

%%sql

UPDATE products
SET Price = 2.49 WHERE ProductId = 1;

Delta API 사용

카탈로그 테이블이 아닌 델타 파일로 작업하려는 경우 Delta Lake API를 사용하는 것이 더 간단할 수 있습니다. 델타 형식의 파일이 포함된 폴더 위치에서 DeltaTable의 인스턴스를 만든 다음 API를 사용하여 테이블의 데이터를 수정할 수 있습니다.

from delta.tables import *
from pyspark.sql.functions import *

# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

시간 이동을 사용하여 테이블 버전 관리 작업

델타 테이블에 대한 수정 사항은 테이블의 트랜잭션 로그에 기록됩니다. 기록된 트랜잭션을 사용하여 테이블에 대한 변경 기록을 보고 이전 버전의 데이터를 검색할 수 있습니다(시간 이동이라고 함).

테이블의 기록을 보려면 다음과 같이 DESCRIBE SQL 명령을 사용할 수 있습니다.

%%sql

DESCRIBE HISTORY products

이 문의 결과는 여기에 표시된 것처럼 테이블에 적용된 트랜잭션을 보여 줍니다(일부 열은 생략됨).

버전 timestamp operation operationParameters
2 2023-04-04T21:46:43Z UPDATE {"predicate":"(ProductId = 1)"}
1 2023-04-04T21:42:48Z WRITE {"mode":"Append","partitionBy":"[]"}
0 2023-04-04T20:04:23Z CREATE TABLE {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"}

외부 테이블의 기록을 보려면 테이블 이름 대신 폴더 위치를 지정할 수 있습니다.

%%sql

DESCRIBE HISTORY 'Files/mytable'

델타 파일 위치를 데이터 프레임으로 읽고 필요한 버전을 versionAsOf 옵션으로 지정하여 특정 버전의 데이터에서 데이터를 검색할 수 있습니다.

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)

또는 timestampAsOf 옵션을 사용하여 타임스탬프를 지정할 수 있습니다.

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)