델타 테이블 만들기

완료됨

델타 테이블을 만들면 델타 형식으로 저장된 구조화된 데이터 세트가 설정됩니다. 테이블은 관리형 또는 외부형으로 설정할 수 있습니다(Databricks가 스토리지 및 메타데이터를 관리합니다. 외부형 테이블의 경우, 예를 들어 Azure Data Lake Storage Gen2에서 스토리지 경로를 직접 제공해야 합니다). 테이블을 만들면 스키마, 스토리지 위치가 설정되고 ACID 트랜잭션과 처음부터 시간 이동이 가능합니다.

다음 Python 예제에서는 두 개의 행이 있는 DataFrame을 만들고 이를 관리되는 델타 테이블 이라고 main.default.people씁니다. SQL 예제에서는 두 개의 열이 있는 테이블을 명시적으로 만들고 두 개의 행을 삽입합니다.

from pyspark.sql import Row

data = [Row(id=1, name="a"), Row(id=2, name="b")]
df = spark.createDataFrame(data)

# Managed Delta table
df.write.format("delta").mode("overwrite").saveAsTable("main.default.people")
CREATE TABLE main.default.people_sql (id INT, name STRING) USING DELTA;
INSERT INTO main.default.people_sql VALUES (1, 'a'), (2, 'b');

외부 델타 테이블은 기본 데이터 파일이 Databricks에서 자동으로 관리되지 않고 사용자가 지정한 스토리지 위치(예: Azure Data Lake Storage Gen2)에 있는 테이블입니다. 이 설정에서 Databricks는 테이블의 메타데이터만 추적하지만 실제 스토리지 경로는 사용자의 제어 하에 유지됩니다. 즉, 기본 데이터 파일에 영향을 주지 않고 Databricks에서 테이블 정의를 삭제할 수 있으며, 여러 플랫폼에서 데이터를 공유하거나 Databricks와 독립적으로 유지해야 하는 경우 편리할 수 있습니다.

그러나 관리되는 테이블은 이러한 측면을 자동으로 처리하는 반면, 디렉터리 구성, 정리 처리 및 보존 정책 적용을 포함하여 해당 스토리지 위치를 관리할 책임이 있습니다.

다음 Python 예제에서 DataFrame은 지정된 Azure Data Lake Storage Gen2 경로에 직접 기록되어 델타 파일 폴더를 생성하지만 Databricks 내에 등록된 테이블은 생성하지 않습니다.

external_path = "abfss://raw@myaccount.dfs.core.windows.net/delta/people_ext"

data_ext = [(10, "x"), (11, "y")]
df_ext = spark.createDataFrame(data_ext, ["id", "name"])

# Write Delta files to external storage path
df_ext.write.format("delta").mode("overwrite").save(external_path)

그런 다음, 다음 SQL 문은 해당 위치를 테이블로 등록하므로 다른 델타 테이블처럼 쿼리할 수 있습니다.

CREATE TABLE main.default.people_ext
USING DELTA
LOCATION 'abfss://raw@myaccount.dfs.core.windows.net/delta/people_ext';

비고

일반적으로 Databricks의 외부 테이블 대신 관리되는 델타 테이블을 사용하는 것이 좋습니다. Databricks는 메타데이터와 기본 스토리지 위치를 모두 처리하여 스키마 적용, 권한 및 수명 주기 관리가 일관되게 적용되도록 하기 때문에 관리 테이블은 작업을 간소화합니다.

읽기 및 쓰기

읽기 및 쓰기는 델타 테이블을 사용하는 핵심 I/O 작업입니다. 스냅샷(일괄 처리 쿼리) 또는 연속(스트리밍)으로 읽을 수 있습니다. 쓰기는 다른 모드(append, overwrite등)일 수 있습니다. 델타는 모든 읽기가 일관되고 쓰기가 원자성인지 확인합니다.

다음 Python 코드 조각은 먼저 전체 사용자 테이블을 읽고 표시합니다. 그런 다음 새 행 (4, "d")을 추가합니다. SQL 예제에서는 테이블을 SELECT 쿼리하고 을 사용하여 INSERT동일한 행을 추가합니다.

# Read
df = spark.read.table("main.default.people")
df.show()

# Write append
new_rows = spark.createDataFrame([(4, "d")], ["id", "name"])
new_rows.write.format("delta").mode("append").saveAsTable("main.default.people")
-- Read
SELECT * FROM main.default.people_sql;

-- Write append
INSERT INTO main.default.people_sql VALUES (4, 'd');

행 업데이트

업데이트하면 조건에 따라 기존 행을 수정할 수 있습니다. Delta Lake는 SQL 스타일을 UPDATE 지원하므로 전체 데이터 세트를 다시 작성할 필요가 없습니다. 이는 수정, 늦게 도착하는 데이터 또는 분석의 차원 조정에 특히 중요합니다.

다음 Python 예제에서는 id=2 행을 업데이트하고 이름을 "b_updated"로 변경합니다. SQL 코드는 선언적으로 동일한 업데이트를 적용합니다.

from delta.tables import DeltaTable

dt = DeltaTable.forName(spark, "main.default.people")
dt.update(condition="id = 2", set={"name": "b_updated"})
UPDATE main.default.people_sql SET name = 'b_updated' WHERE id = 2;

여러 행 삭제하기

삭제는 조건과 일치하는 레코드를 제거합니다. 삭제 시 파일을 수동으로 다시 작성해야 하는 원시 Parquet 또는 CSV 파일과 달리 Delta Lake는 선언적 삭제를 트랜잭션 작업으로 지원합니다. 이는 규정 준수 사용 사례 (예: 개인 데이터) 또는 잘못된 데이터 정리에 매우 중요합니다.

다음 Python 예제에서는 people 테이블에서 행 id=1 을 삭제합니다. SQL 코드는 동일한 삭제를 수행합니다.

from delta.tables import DeltaTable

dt = DeltaTable.forName(spark, "main.default.people")
dt.delete("id = 1")
DELETE FROM main.default.people_sql WHERE id = 1;

Delta 테이블에 upsert(MERGE)

Upsert(업데이트 + 삽입)는 테이블이 한 작업에서 새 레코드기존 레코드의 변경 내용을 모두 반영하도록 합니다. Delta Lake에서는 MERGE를 사용하여 이 작업을 수행합니다. 일치 항목(조인 조건 기준)을 확인하고 일치하는 경우 업데이트를 적용하거나 일치하지 않는 경우 삽입합니다. 이는 느리게 변경되는 데이터 또는 증분 수집을 처리하는 데 필수적입니다.

다음 Python 코드는 업데이트된 DataFrame을 대상 테이블 people에 병합합니다. 있는 경우 id=2 행이 업데이트되고, 행이 없으면 id=3 삽입됩니다. SQL 코드는 .를 사용하여 동일한 작업을 수행합니다 MERGE INTO.

from delta.tables import DeltaTable
from pyspark.sql import Row

target = DeltaTable.forName(spark, "main.default.people")

updates_df = spark.createDataFrame([Row(id=2, name="b2"), Row(id=3, name="c")])

(target.alias("t")
 .merge(updates_df.alias("s"), "t.id = s.id")
 .whenMatchedUpdateAll()
 .whenNotMatchedInsertAll()
 .execute())
MERGE INTO main.default.people_sql AS t
USING (SELECT 2 AS id, 'b2' AS name UNION ALL SELECT 3, 'c') AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

트랜잭션 로그

트랜잭션 로그(DeltaLog라고도 함)는 테이블에 대한 모든 변경 내용(삽입, 업데이트, 삭제, 메타데이터 변경, 스키마 업데이트 등)을 기록하는 Delta Lake의 핵심 구성 요소입니다. 테이블의 데이터 파일과 함께 디렉터리에 있으며_delta_log, 각 원자성 커밋은 "파일 추가", "파일 제거", "메타데이터 업데이트", "커밋 정보"와 같은 작업을 캡처하는 JSON 파일(예000000.json000001.json: ...)로 저장됩니다. Delta Lake는 테이블의 현재 상태를 계산하기 위해 처음부터 모든 JSON 커밋을 재생할 필요가 없도록 검사점(Parquet 형식 파일)을 정기적으로 만듭니다. 최신 검사점에서 시작한 다음 최신 커밋만 적용할 수 있습니다.

델타 로그 조직을 보여 주는 다이어그램

트랜잭션 로그는 단일 원본이므로 Delta Lake의 ACID 보증을 지원합니다. 원자성의 경우 전체 커밋이 로그에 나타나지 않는 한 변경 내용이 유효한 것으로 간주되지 않습니다. 격리일관성을 위해 여러 사용자 또는 작업이 동시에 읽기/쓰기가 가능한 경우 로그는 낙관적 동시성 제어를 통해 조정하는 데 도움이 됩니다. Spark는 트랜잭션 로그 버전을 확인하고 커밋은 충돌하는 변경을 방지하기 위해 정렬됩니다. 로그의 기록은 또한 시간 이동 (이전 버전의 테이블을 쿼리할 수 있습니다)과 감사 가능성과 같은 기능을 사용할 수 있습니다. 이는 변경된 작업과 시기를 정확히 추적할 수 있기 때문입니다.

Partitioning

웹 애플리케이션 로그를 저장하기 위해 델타 테이블을 빌드하고 있다고 상상해 보십시오. 시스템은 매일 수십억 개의 로그 레코드를 생성합니다. 분석가는 일반적으로 "어제의 모든 오류 표시" 또는 "지난 주의 일별 사용량 집계"와 같은 쿼리를 실행합니다.

분할되지 않은 하나의 거대한 테이블에 모든 로그를 저장하는 경우 하루만 신경 쓰더라도 모든 쿼리는 잠재적으로 테라바이트 단위의 데이터를 검색해야 합니다. 대신 테이블을 날짜별로 분할하기로 결정합니다. 이제 매일의 데이터는 자체 파티션 디렉터리(예 .../logs/event_date=2025-09-10/: )에 저장되며, 어제의 데이터만 쿼리할 때 Databricks는 다른 모든 날짜를 건너뛸 수 있습니다. 이렇게 하면 검색된 데이터의 양이 줄어들고 쿼리 속도가 크게 향상됩니다.

다음은 Python 및 SQL에서 분할하는 방법의 예입니다.

df.write \
  .format("delta") \
  .mode("overwrite") \
  .partitionBy("event_date") \
  .saveAsTable("main.default.weblogs")
-- Create a weblogs table partitioned by event_date
CREATE TABLE main.default.weblogs (
  user_id INT,
  url STRING,
  status INT,
  event_time DATE,
  event_date DATE
)
USING DELTA PARTITIONED BY (event_date);

분할은 하나 이상의 열을 기반으로 테이블의 데이터를 별도의 하위 집합("파티션")으로 물리적으로 분할하는 것을 의미합니다. 지역, 제품 범주 또는 다른 하위 카디널리티 필드별로 큰 델타 테이블을 분할할 수 있습니다. 쿼리가 이러한 분할 열을 필터링하는 경우 Databricks는 모든 데이터를 검사하는 대신 관련 없는 파티션을 건너뛸 수 있습니다. 이렇게 하면 읽기 속도가 크게 향상되고 리소스 사용량이 줄어들며 쿼리 실행이 더 효율적일 수 있습니다. 파티션을 더 쉽게 삭제하거나 다시 구성하고(예: 파티션으로 이전 데이터 제거) 쓰기 및 기타 작업 중에 정리/필터링의 성능을 향상시킬 수 있으므로 분할은 데이터 관리에도 도움이 됩니다.

그러나 분할에는 비용과 고려 사항도 있습니다. 각 파티션은 델타 로그 및 카탈로그에 메타데이터 오버헤드를 추가하여 작은 파티션이 많을 때 작업을 느리게 할 수 있습니다. 이 때문에 Azure Databricks는 매우 큰 테이블(일반적으로 ~1TB 이상)에 대해서만 분할하고 각 파티션에 파티션에 가치가 있는 약 1GB 이상의 데이터가 있는지 확인하는 것이 좋습니다. 파티션이 너무 세분화된 경우(높은 카디널리티, 많은 작은 파티션), 쿼리 계획, 메타데이터 관리 및 쓰기 성능이 저하될 수 있습니다. 또한 분할 결정은 데이터를 일반적으로 쿼리하는 방법(즉, 해당 파티션 열로 필터링하는 경우가 많음)과 일치해야 합니다. 그렇지 않으면 거의 이점이 없을 수 있습니다. 또한 Databricks는 액체 클러스터링 및 수집 시간 클러스터링과 같은 최신 기능을 제공하므로 대부분의 경우 수동 분할의 필요성을 줄입니다.

최적화 및 유지 관리

Delta Lake는 파일 압축 및 사용되지 않는 파일 제거와 OPTIMIZE 같은 VACUUM 델타 테이블의 성능을 최적화하는 몇 가지 유틸리티를 제공합니다.

OPTIMIZE 는 많은 작은 Parquet 파일을 더 적은 수의 큰 파일로 압축하여 쿼리 성능을 향상시킵니다. 스트리밍 또는 작은 일괄 처리에서 데이터를 수집하는 경우 델타 테이블은 수천 개의 작은 파일을 누적할 수 있습니다. 따라서 Spark는 각 파일을 별도로 열고 검색해야 하므로 쿼리 속도가 느려집니다. 실행 OPTIMIZE 하면 이러한 작은 파일을 더 크고 효율적인 파일로 다시 작성하여 오버헤드를 줄이고 읽기 및 조인 속도를 높입니다.

VACUUM 은 업데이트, 삭제 또는 압축으로 인해 더 이상 필요하지 않은 이전 또는 참조되지 않은 데이터 파일을 안전하게 정리하는 데 사용됩니다. 기본적으로 Delta Lake는 시간 이동 및 동시 판독기와 같은 기능을 지원하기 위해 일정 기간(종종 7일)동안 파일을 유지합니다. 해당 보존 기간 후에 실행 VACUUM 하여 이러한 오래된 파일을 삭제하고 스토리지를 확보할 수 있습니다. 진공을 사용하지 않으면 스토리지 비용이 불필요하게 증가할 수 있으며 사용되지 않는 파일이 있으면 작업이 느려질 수 있습니다.

# Optimize the Delta table
spark.sql("OPTIMIZE '/FileStore/tables/my_delta_table'")

# Clean up old files
spark.sql("VACUUM '/FileStore/tables/my_delta_table' RETAIN 168 HOURS")

또한 Databricks는 자동 최적화자동 압축 기능(작업 영역/클러스터 설정)을 제공하므로 쓰기 중에 파일을 자동으로 압축하여 이러한 명령을 수동으로 실행할 필요가 줄어듭니다.