자습서: Delta Lake

이 자습서에서는 다음을 포함하여 Azure Databricks에 대한 일반적인 Delta Lake 작업을 소개합니다.

Azure Databricks 클러스터에 연결된 Notebook 내에서 이 문서의 Python, R, Scala 및 SQL 코드 예제를 실행할 수 있습니다. Databricks SQL에서 SQL 웨어하우스와 연결된 쿼리 내에서 이 문서의 SQL 코드를 실행할 수도 있습니다.

참고 항목

다음 코드 예제 중 일부는 스키마(데이터베이스라고도 함) 및 테이블 또는 뷰(예: default.people10m)로 구성된 두 수준 네임스페이스 표기법을 사용합니다. Unity 카탈로그에서 이러한 예제를 사용하려면 두 수준 네임스페이스를 카탈로그, 스키마 및 테이블 또는 뷰(예: main.default.people10m)로 구성된 Unity 카탈로그 세 수준 네임스페이스 표기법으로 바꿉니다.

테이블 만들기

Azure Databricks에서 만든 모든 테이블은 기본적으로 Delta Lake를 사용합니다.

참고 항목

Delta Lake는 모든 읽기, 쓰기 및 테이블 만들기 명령 Azure Databricks의 기본값입니다.

Python

# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)

R

library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)

Scala

// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")

SQL

DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

이전 작업에서는 데이터를 통해 유추된 스키마를 사용하여 관리되는 테이블을 새로 만듭니다. 델타 테이블을 만들 때 사용할 수 있는 옵션에 대한 자세한 내용은 CREATE TABLE을 참조하세요.

관리되는 테이블의 경우 Azure Databricks는 데이터의 위치를 결정합니다. 위치를 가져오기 위해 DESCRIBE DETAIL 문을 사용할 수 있습니다. 예를 들면 다음과 같습니다.

Python

display(spark.sql('DESCRIBE DETAIL people_10m'))

R

display(sql("DESCRIBE DETAIL people_10m"))

Scala

display(spark.sql("DESCRIBE DETAIL people_10m"))

SQL

DESCRIBE DETAIL people_10m;

경우에 따라 데이터를 삽입하기 전에 스키마를 지정하여 테이블을 만들 수 있습니다. 다음 SQL 명령을 사용하여 이 작업을 완료할 수 있습니다.

CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

Databricks Runtime 13.3 LTS 이상에서는 원본 델타 테이블에 대한 스키마 및 테이블 속성을 복제하는 빈 델타 테이블을 새로 만들 수 있습니다 CREATE TABLE LIKE . 이는 다음 코드 예제와 같이 개발 환경에서 프로덕션으로 테이블을 승격할 때 특히 유용할 수 있습니다.

CREATE TABLE prod.people10m LIKE dev.people10m

Delta Lake에서 DeltaTableBuilder API를 사용하여 테이블을 만들 수도 있습니다. 이 API를 사용하면 DataFrameWriter API에 비해 열 주석, 테이블 속성 및 생성된 열과 같은 추가 정보를 더 쉽게 지정할 수 있습니다.

Important

이 기능은 공개 미리 보기 상태입니다.

Python

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()

Scala

// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

테이블에 Upsert

기존 델타 테이블에 업데이트 및 삽입 집합을 병합하려면 MERGE INTO 문을 사용합니다. 예를 들어 다음 문은 원본 테이블에서 데이터를 가져와 대상 델타 테이블에 병합합니다. 두 테이블에 일치하는 행이 있는 경우 Delta Lake는 지정된 식을 사용하여 데이터 열을 업데이트합니다. 일치하는 행이 없으면 Delta Lake에서 새 행을 추가합니다. 이 작업을 upsert라고 합니다.

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

*을 지정하면 대상 테이블의 모든 열이 업데이트되거나 삽입됩니다. 이 경우 원본 테이블에 대상 테이블의 열과 동일한 열이 있다고 가정합니다. 그렇지 않으면 쿼리가 분석 오류를 throw합니다.

INSERT 작업을 수행할 때(예: 기존 데이터 세트에 일치하는 행이 없는 경우) 테이블의 모든 열에 대한 값을 지정해야 합니다. 그러나 모든 값을 업데이트할 필요는 없습니다.

결과를 보려면 테이블을 쿼리합니다.

SELECT * FROM people_10m WHERE id >= 9999998

테이블 읽기

다음 예제와 같이 테이블 이름 또는 테이블 경로로 델타 테이블의 데이터에 액세스합니다.

Python

people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)

R

people_df = tableToDF(table_name)

display(people_df)

Scala

val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)

SQL

SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

테이블에 쓰기

Delta Lake는 테이블에 데이터를 쓰는 데 표준 구문을 사용합니다.

기존 Delta 테이블에 새 데이터를 원자성으로 추가하려면 다음 예제와 같이 append 모드를 사용합니다.

SQL

INSERT INTO people10m SELECT * FROM more_people

Python

df.write.mode("append").saveAsTable("people10m")

Scala

df.write.mode("append").saveAsTable("people10m")

테이블의 모든 데이터를 원자성으로 바꾸려면 다음 예제와 같이 overwrite 모드를 사용합니다.

SQL

INSERT OVERWRITE TABLE people10m SELECT * FROM more_people

Python

df.write.mode("overwrite").saveAsTable("people10m")

Scala

df.write.mode("overwrite").saveAsTable("people10m")

테이블 업데이트

델타 테이블에서 조건자와 일치하는 데이터를 업데이트할 수 있습니다. 예를 들어, people10m 테이블 또는 /tmp/delta/people-10m 경로에서 gender 열의 약어를 M 또는 F에서 Male 또는 Female로 변경하려면 다음을 실행하면 됩니다.

SQL

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

테이블에서 삭제

델타 테이블에서 조건자와 일치하는 데이터를 제거할 수 있습니다. 예를 들어, people10m 테이블 또는 /tmp/delta/people-10m 경로에서 birthDate 열의 값이 1955 이전인 사람들에 해당하는 모든 행을 삭제하려면 다음을 실행하면 됩니다.

SQL

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

Important

delete는 최신 버전의 델타 테이블에서 데이터를 제거하지만, 실제 스토리지에서는 오래된 버전이 명시적으로 vacuum되기 전까지 제거하지 않습니다. 자세한 내용은 vacuum을 참조하세요.

테이블 기록 표시

테이블의 기록을 보려면 테이블에 대한 각 쓰기에 대해 테이블 버전, 작업, 사용자 등을 비롯한 출처 정보를 제공하는 DESCRIBE HISTORY 문을 사용합니다.

DESCRIBE HISTORY people_10m

이전 버전의 테이블 쿼리(시간 이동)

Delta Lake 시간 이동을 사용하면 델타 테이블의 이전 스냅샷을 쿼리할 수 있습니다.

이전 버전의 테이블을 쿼리하려면 SELECT 문에 버전 또는 타임스탬프를 지정합니다. 예를 들어 위의 기록에서 버전 0을 쿼리하려면 다음을 사용합니다.

SELECT * FROM people_10m VERSION AS OF 0

또는

SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

타임스탬프의 경우 날짜 또는 타임스탬프 문자열만 허용됩니다(예: "2019-01-01""2019-01-01'T'00:00:00.000Z").

DataFrameReader 옵션을 사용하면 특정 버전의 테이블(예: Python)에 고정된 델타 테이블에서 데이터 프레임을 만들 수 있습니다.

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

또는 다음을 수행합니다.

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")

display(df2)

자세한 내용은 Delta Lake 테이블 기록 작업을 참조하세요.

테이블 최적화

테이블을 여러 번 변경한 후에는 작은 파일이 많이 있을 수 있습니다. 읽기 쿼리의 속도를 높이기 위해 OPTIMIZE를 사용하여 작은 파일을 더 큰 파일로 축소할 수 있습니다.

OPTIMIZE people_10m

열별 Z 순서

읽기 성능을 더 향상시키려면 동일한 파일 집합에서 Z 순서로 관련 정보를 공동 배치할 수 있습니다. Delta Lake의 데이터 건너뛰기 알고리즘에서는 이 공동 위치를 자동으로 사용하여 읽어야 하는 데이터의 양을 크게 줄입니다. Z 순서 데이터에 대해 ZORDER BY 절에서 열의 순서를 지정합니다. 예를 들어 gender로 공동 배치하려면 다음을 실행합니다.

OPTIMIZE people_10m
ZORDER BY (gender)

OPTIMIZE를 실행할 때 사용할 수 있는 전체 옵션 세트는 Delta Lake에서 최적화하여 데이터 파일 압축을 참조하세요.

VACUUM으로 스냅샷 정리

Delta Lake는 읽기에 대한 스냅샷 격리를 제공합니다. 즉, 다른 사용자 또는 작업이 테이블을 쿼리하는 동안에도 안전하게 OPTIMIZE를 실행할 수 있습니다. 그러나 결국에는 이전 스냅샷을 정리해야 합니다. 이 작업은 VACUUM 명령을 실행하여 수행할 수 있습니다.

VACUUM people_10m

VACUUM을 효과적으로 사용하는 방법에 대한 자세한 내용은 진공 상태에서 사용되지 않는 데이터 파일 제거를 참조하세요.