자습서: Delta Lake
이 자습서에서는 다음을 포함하여 Azure Databricks에 대한 일반적인 Delta Lake 작업을 소개합니다.
- 테이블을 만듭니다.
- 테이블에 Upsert합니다.
- 테이블에서 읽습니다.
- 테이블 기록을 표시합니다.
- 이전 버전의 테이블을 쿼리합니다.
- 테이블을 최적화합니다.
- Z 순서 인덱스를 추가합니다.
- 참조되지 않은 파일을 진공합니다.
Azure Databricks 클러스터에 연결된 Notebook 내에서 이 문서의 Python, R, Scala 및 SQL 코드 예제를 실행할 수 있습니다. Databricks SQL에서 SQL 웨어하우스와 연결된 쿼리 내에서 이 문서의 SQL 코드를 실행할 수도 있습니다.
참고 항목
다음 코드 예제 중 일부는 스키마(데이터베이스라고도 함) 및 테이블 또는 뷰(예: default.people10m
)로 구성된 두 수준 네임스페이스 표기법을 사용합니다. Unity 카탈로그에서 이러한 예제를 사용하려면 두 수준 네임스페이스를 카탈로그, 스키마 및 테이블 또는 뷰(예: main.default.people10m
)로 구성된 Unity 카탈로그 세 수준 네임스페이스 표기법으로 바꿉니다.
테이블 만들기
Azure Databricks에서 만든 모든 테이블은 기본적으로 Delta Lake를 사용합니다.
참고 항목
Delta Lake는 모든 읽기, 쓰기 및 테이블 만들기 명령 Azure Databricks의 기본값입니다.
Python
# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)
R
library(SparkR)
sparkR.session()
# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
saveAsTable(
df = df,
tableName = table_name
)
Scala
// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
// Write the data to a table.
val table_name = "people_10m"
people.write.saveAsTable("people_10m")
SQL
DROP TABLE IF EXISTS people_10m;
CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;
이전 작업에서는 데이터를 통해 유추된 스키마를 사용하여 관리되는 테이블을 새로 만듭니다. 델타 테이블을 만들 때 사용할 수 있는 옵션에 대한 자세한 내용은 CREATE TABLE을 참조하세요.
관리되는 테이블의 경우 Azure Databricks는 데이터의 위치를 결정합니다. 위치를 가져오기 위해 DESCRIBE DETAIL 문을 사용할 수 있습니다. 예를 들면 다음과 같습니다.
Python
display(spark.sql('DESCRIBE DETAIL people_10m'))
R
display(sql("DESCRIBE DETAIL people_10m"))
Scala
display(spark.sql("DESCRIBE DETAIL people_10m"))
SQL
DESCRIBE DETAIL people_10m;
경우에 따라 데이터를 삽입하기 전에 스키마를 지정하여 테이블을 만들 수 있습니다. 다음 SQL 명령을 사용하여 이 작업을 완료할 수 있습니다.
CREATE TABLE IF NOT EXISTS people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
CREATE OR REPLACE TABLE people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
Databricks Runtime 13.3 LTS 이상에서는 원본 델타 테이블에 대한 스키마 및 테이블 속성을 복제하는 빈 델타 테이블을 새로 만들 수 있습니다 CREATE TABLE LIKE
. 이는 다음 코드 예제와 같이 개발 환경에서 프로덕션으로 테이블을 승격할 때 특히 유용할 수 있습니다.
CREATE TABLE prod.people10m LIKE dev.people10m
Delta Lake에서 DeltaTableBuilder
API를 사용하여 테이블을 만들 수도 있습니다. 이 API를 사용하면 DataFrameWriter API에 비해 열 주석, 테이블 속성 및 생성된 열과 같은 추가 정보를 더 쉽게 지정할 수 있습니다.
Important
이 기능은 공개 미리 보기 상태입니다.
Python
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.execute()
# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.property("description", "table with people data") \
.location("/tmp/delta/people10m") \
.execute()
Scala
// Create table in the metastore
DeltaTable.createOrReplace(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.property("description", "table with people data")
.location("/tmp/delta/people10m")
.execute()
테이블에 Upsert
기존 델타 테이블에 업데이트 및 삽입 집합을 병합하려면 MERGE INTO 문을 사용합니다. 예를 들어 다음 문은 원본 테이블에서 데이터를 가져와 대상 델타 테이블에 병합합니다. 두 테이블에 일치하는 행이 있는 경우 Delta Lake는 지정된 식을 사용하여 데이터 열을 업데이트합니다. 일치하는 행이 없으면 Delta Lake에서 새 행을 추가합니다. 이 작업을 upsert라고 합니다.
CREATE OR REPLACE TEMP VIEW people_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
*
을 지정하면 대상 테이블의 모든 열이 업데이트되거나 삽입됩니다. 이 경우 원본 테이블에 대상 테이블의 열과 동일한 열이 있다고 가정합니다. 그렇지 않으면 쿼리가 분석 오류를 throw합니다.
INSERT
작업을 수행할 때(예: 기존 데이터 세트에 일치하는 행이 없는 경우) 테이블의 모든 열에 대한 값을 지정해야 합니다. 그러나 모든 값을 업데이트할 필요는 없습니다.
결과를 보려면 테이블을 쿼리합니다.
SELECT * FROM people_10m WHERE id >= 9999998
테이블 읽기
다음 예제와 같이 테이블 이름 또는 테이블 경로로 델타 테이블의 데이터에 액세스합니다.
Python
people_df = spark.read.table(table_name)
display(people_df)
## or
people_df = spark.read.load(table_path)
display(people_df)
R
people_df = tableToDF(table_name)
display(people_df)
Scala
val people_df = spark.read.table(table_name)
display(people_df)
\\ or
val people_df = spark.read.load(table_path)
display(people_df)
SQL
SELECT * FROM people_10m;
SELECT * FROM delta.`<path-to-table`;
테이블에 쓰기
Delta Lake는 테이블에 데이터를 쓰는 데 표준 구문을 사용합니다.
기존 Delta 테이블에 새 데이터를 원자성으로 추가하려면 다음 예제와 같이 append
모드를 사용합니다.
SQL
INSERT INTO people10m SELECT * FROM more_people
Python
df.write.mode("append").saveAsTable("people10m")
Scala
df.write.mode("append").saveAsTable("people10m")
테이블의 모든 데이터를 원자성으로 바꾸려면 다음 예제와 같이 overwrite
모드를 사용합니다.
SQL
INSERT OVERWRITE TABLE people10m SELECT * FROM more_people
Python
df.write.mode("overwrite").saveAsTable("people10m")
Scala
df.write.mode("overwrite").saveAsTable("people10m")
테이블 업데이트
델타 테이블에서 조건자와 일치하는 데이터를 업데이트할 수 있습니다. 예를 들어, people10m
테이블 또는 /tmp/delta/people-10m
경로에서 gender
열의 약어를 M
또는 F
에서 Male
또는 Female
로 변경하려면 다음을 실행하면 됩니다.
SQL
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
테이블에서 삭제
델타 테이블에서 조건자와 일치하는 데이터를 제거할 수 있습니다. 예를 들어, people10m
테이블 또는 /tmp/delta/people-10m
경로에서 birthDate
열의 값이 1955
이전인 사람들에 해당하는 모든 행을 삭제하려면 다음을 실행하면 됩니다.
SQL
DELETE FROM people10m WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
Important
delete
는 최신 버전의 델타 테이블에서 데이터를 제거하지만, 실제 스토리지에서는 오래된 버전이 명시적으로 vacuum되기 전까지 제거하지 않습니다. 자세한 내용은 vacuum을 참조하세요.
테이블 기록 표시
테이블의 기록을 보려면 테이블에 대한 각 쓰기에 대해 테이블 버전, 작업, 사용자 등을 비롯한 출처 정보를 제공하는 DESCRIBE HISTORY 문을 사용합니다.
DESCRIBE HISTORY people_10m
이전 버전의 테이블 쿼리(시간 이동)
Delta Lake 시간 이동을 사용하면 델타 테이블의 이전 스냅샷을 쿼리할 수 있습니다.
이전 버전의 테이블을 쿼리하려면 SELECT
문에 버전 또는 타임스탬프를 지정합니다. 예를 들어 위의 기록에서 버전 0을 쿼리하려면 다음을 사용합니다.
SELECT * FROM people_10m VERSION AS OF 0
또는
SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
타임스탬프의 경우 날짜 또는 타임스탬프 문자열만 허용됩니다(예: "2019-01-01"
및 "2019-01-01'T'00:00:00.000Z"
).
DataFrameReader 옵션을 사용하면 특정 버전의 테이블(예: Python)에 고정된 델타 테이블에서 데이터 프레임을 만들 수 있습니다.
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")
display(df1)
또는 다음을 수행합니다.
df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")
display(df2)
자세한 내용은 Delta Lake 테이블 기록 작업을 참조하세요.
테이블 최적화
테이블을 여러 번 변경한 후에는 작은 파일이 많이 있을 수 있습니다. 읽기 쿼리의 속도를 높이기 위해 OPTIMIZE
를 사용하여 작은 파일을 더 큰 파일로 축소할 수 있습니다.
OPTIMIZE people_10m
열별 Z 순서
읽기 성능을 더 향상시키려면 동일한 파일 집합에서 Z 순서로 관련 정보를 공동 배치할 수 있습니다. Delta Lake의 데이터 건너뛰기 알고리즘에서는 이 공동 위치를 자동으로 사용하여 읽어야 하는 데이터의 양을 크게 줄입니다. Z 순서 데이터에 대해 ZORDER BY
절에서 열의 순서를 지정합니다. 예를 들어 gender
로 공동 배치하려면 다음을 실행합니다.
OPTIMIZE people_10m
ZORDER BY (gender)
OPTIMIZE
를 실행할 때 사용할 수 있는 전체 옵션 세트는 Delta Lake에서 최적화하여 데이터 파일 압축을 참조하세요.
VACUUM
으로 스냅샷 정리
Delta Lake는 읽기에 대한 스냅샷 격리를 제공합니다. 즉, 다른 사용자 또는 작업이 테이블을 쿼리하는 동안에도 안전하게 OPTIMIZE
를 실행할 수 있습니다. 그러나 결국에는 이전 스냅샷을 정리해야 합니다. 이 작업은 VACUUM
명령을 실행하여 수행할 수 있습니다.
VACUUM people_10m
VACUUM
을 효과적으로 사용하는 방법에 대한 자세한 내용은 진공 상태에서 사용되지 않는 데이터 파일 제거를 참조하세요.