병합을 사용하여 Delta Lake 테이블에 upsert
MERGE
SQL 작업을 사용하여 원본 테이블, 뷰 또는 DataFrame에서 대상 델타 테이블에 데이터를 upsert할 수 있습니다. Delta Lake는 삽입, 업데이트 및 삭제를 MERGE
지원하며 고급 사용 사례를 용이하게 하기 위해 SQL 표준을 초과하는 확장 구문을 지원합니다.
people10mupdates
테이블 또는 /tmp/delta/people-10m-updates
원본 경로에 대상 테이블 people10m
또는 대상 경로 /tmp/delta/people-10m
에 대한 새 데이터가 포함되어 있다고 가정하겠습니다. 새 레코드 중 일부는 대상 데이터에 이미 있을 수 있습니다. 새 데이터를 병합하려면 이 사람의 id
가 이미 있는 행은 업데이트하고, 일치하는 id
가 없는 경우에는 새 행을 삽입해야 합니다. 다음 쿼리를 실행할 수 있습니다.
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Important
원본 테이블의 한 행만 대상 테이블의 지정된 행과 일치할 수 있습니다. Databricks Runtime 16.0 이상 MERGE
에서 and ON
절에 WHEN MATCHED
지정된 조건을 평가하여 중복 일치 항목을 확인합니다. Databricks Runtime 15.4 LTS 이하 MERGE
에서 작업은 절에 ON
지정된 조건만 고려합니다.
Scala 및 Python의 구문 세부 사항은 Delta Lake API 문서를 참조하세요. Spark SQL 구문 세부 정보는 MERGE INTO를 참조하세요.
병합을 사용하여 일치하지 않는 모든 행 수정
Databricks SQL 및 Databricks Runtime 12.2 LTS 이상에서는 원본 테이블에 해당 레코드가 없는 대상 테이블의 레코드나 DELETE
절 UPDATE
을 사용할 WHEN NOT MATCHED BY SOURCE
수 있습니다. Databricks는 대상 테이블을 완전히 다시 작성하지 않도록 선택적 조건부 절을 추가하는 것이 좋습니다.
다음 코드 예제에서는 삭제에 이것을 사용하고, 대상 테이블을 원본 테이블의 내용으로 덮어쓰고, 대상 테이블에서 일치하지 않는 레코드를 삭제하는 기본 구문을 보여 줍니다. 원본 업데이트 및 삭제가 시간 제한인 테이블에 대한 확장성 있는 패턴은 델타 테이블을 원본과 증분 동기화를 참조하세요.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
다음 예제에서는 절에 WHEN NOT MATCHED BY SOURCE
조건을 추가하고 일치하지 않는 대상 행에서 업데이트할 값을 지정합니다.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
작업 의미 체계 병합
다음은 프로그래밍 방식 작업 의미 체계에 merge
대한 자세한 설명입니다.
whenMatched
및whenNotMatched
절은 원하는 개수만큼 가질 수 있습니다.whenMatched
절은 일치 조건에 따라 원본 행이 대상 테이블 행과 일치할 때 실행됩니다. 해당 절에는 다음과 같은 의미 체계가 있습니다.whenMatched
절에는 최대 1개의update
작업과 1개의delete
작업이 포함될 수 있습니다. 이update
작업은merge
일치하는 대상 행의 지정된 열(작업과 유사update
)만 업데이트합니다.delete
동작은 일치된 행을 삭제합니다.각
whenMatched
절에 선택적 조건이 포함될 수 있습니다. 이 절 조건이 존재하는 경우, 절 조건이 true인 경우에만 일치하는 원본-대상 행 쌍에 대해update
또는delete
동작이 실행됩니다.여러
whenMatched
절이 있는 경우 지정된 순서대로 평가됩니다. 마지막 절을 제외한 모든whenMatched
절에 조건이 있어야 합니다.whenMatched
조건 중 병합 조건과 일치하는 원본-대상 행 쌍에 대해 true인 조건이 없는 경우, 대상 행이 변경되지 않습니다.대상 델타 테이블의 모든 열을 원본 데이터 세트의 해당 열로 업데이트하려면
whenMatched(...).updateAll()
를 사용합니다. 다음 코드와 동일합니다.whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
대상 델타 데이블의 모든 열에 대해 위 코드와 동일합니다. 따라서 이 동작은 원본 테이블이 대상 테이블과 동일한 열을 갖는다고 가정하며, 동일한 열을 갖지 않는 경우 쿼리가 분석 오류를 throw합니다.
참고 항목
이 동작은 자동 스키마 진화를 사용하도록 설정하면 변경됩니다. 자세한 내용은 자동 스키마 개선을 참조하세요.
whenNotMatched
절은 일치 조건에 따라 원본 행이 대상 행과 일치하지 않을 때 실행됩니다. 해당 절에는 다음과 같은 의미 체계가 있습니다.whenNotMatched
절은 하나의insert
동작만 가질 수 있습니다. 지정한 열과 해당 식에 따라 새 행이 생성됩니다. 대상 테이블의 모든 열을 지정할 필요는 없습니다. 지정하지 않은 대상 열의 경우NULL
이 삽입됩니다.각
whenNotMatched
절에 선택적 조건이 포함될 수 있습니다. 절 조건이 있는 경우 해당 행에 대한 조건이 true일 때만 원본 행이 삽입됩니다. 그렇지 않으면 원본 열이 무시됩니다.여러
whenNotMatched
절이 있는 경우 지정된 순서대로 평가됩니다. 마지막 절을 제외한 모든whenNotMatched
절에 조건이 있어야 합니다.대상 델타 테이블의 모든 열을 원본 데이터 세트의 해당 열과 함께 삽입하려면
whenNotMatched(...).insertAll()
를 사용합니다. 다음 코드와 동일합니다.whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
대상 델타 데이블의 모든 열에 대해 위 코드와 동일합니다. 따라서 이 동작은 원본 테이블이 대상 테이블과 동일한 열을 갖는다고 가정하며, 동일한 열을 갖지 않는 경우 쿼리가 분석 오류를 throw합니다.
참고 항목
이 동작은 자동 스키마 진화를 사용하도록 설정하면 변경됩니다. 자세한 내용은 자동 스키마 개선을 참조하세요.
whenNotMatchedBySource
절은 대상 행이 병합 조건에 따라 원본 행과 일치하지 않을 때 실행됩니다. 해당 절에는 다음과 같은 의미 체계가 있습니다.whenNotMatchedBySource
절은 지정delete
하고update
작업을 수행할 수 있습니다.- 각
whenNotMatchedBySource
절에 선택적 조건이 포함될 수 있습니다. 절 조건이 있는 경우 해당 행에 대해 해당 조건이 true인 경우에만 대상 행이 수정됩니다. 그렇지 않으면 대상 행이 변경되지 않은 상태로 유지됩니다. - 여러
whenNotMatchedBySource
절이 있는 경우 지정된 순서대로 평가됩니다. 마지막 절을 제외한 모든whenNotMatchedBySource
절에 조건이 있어야 합니다. - 정의
whenNotMatchedBySource
에 따라 절에는 열 값을 끌어올 원본 행이 없으므로 원본 열을 참조할 수 없습니다. 수정할 각 열에 대해 리터럴을 지정하거나 대상 열에 대해 작업을 수행할 수 있습니다(예:SET target.deleted_count = target.deleted_count + 1
.).
Important
- 원본 데이터 세트의 여러 행이 일치하고 병합이 대상 델타 테이블의 동일한 행을 업데이트하려고 시도하면
merge
작업이 실패할 수 있습니다. 병합의 SQL 의미 체계에 따르면 일치하는 대상 행을 업데이트하는 데 사용해야 하는 원본 행이 명확하지 않으므로 이러한 업데이트 작업이 모호합니다. 원본 테이블을 전처리하여 여러 일치 항목이 발생할 가능성을 제거할 수 있습니다. - 뷰가
CREATE VIEW viewName AS SELECT * FROM deltaTable
로 정의된 경우에만 SQL VIEW에 SQLMERGE
작업을 적용할 수 있습니다.
델타 테이블에 쓸 때 데이터 중복 제거
일반적인 ETL 사용 사례는 로그를 테이블에 추가하여 델타 테이블로 수집하는 것입니다. 그러나 원본이 중복된 로그 레코드를 생성하기 때문에 이를 처리하기 위해 다운스트림 중복 제거 단계가 필요하게 되는 경우가 많습니다. merge
를 사용할 때는 중복된 레코드가 삽입되는 것을 방지할 수 있습니다.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
참고 항목
새 로그를 포함하는 데이터 세트는 자체적으로 중복 제거되어야 합니다. 새 로그를 포함하는 데이터 세트는 병합의 SQL 의미 체계에 따라 새 데이터를 테이블의 기존 데이터와 매칭하고 중복 제거하지만, 새 데이터 세트 내에 중복 데이터가 있는 경우에는 삽입됩니다. 따라서 테이블에 병합하기 전에 새 데이터를 중복 제거해야 합니다.
며칠 동안만 중복 레코드를 가져올 수 있다는 것을 알고 있는 경우 테이블을 날짜별로 분할한 다음 일치시킬 대상 테이블의 날짜 범위를 지정하여 쿼리를 추가로 최적화할 수 있습니다.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
이 명령은 테이블 전체가 아니라 마지막 7일 분량의 로그에서 중복 항목을 찾기 때문에 앞의 명령보다 효율적입니다. 또한 구조적 스트리밍으로 이 삽입 전용 병합을 사용하면 로그의 연속 중복 제거를 수행할 수 있습니다.
- 스트리밍 쿼리에서는
foreachBatch
에서 병합 작업을 사용하여 스트리밍 데이터를 중복 제거를 사용한 상태로 델타 테이블에 연속 쓰기할 수 있습니다.foreachBatch
에 대한 자세한 내용은 아래의 스트리밍 예제를 참조하세요. - 다른 스트리밍 쿼리에서는 이 델타 테이블에서 중복 제거된 데이터를 연속으로 읽을 수 있습니다. 이것은 삽입 전용 병합은 델타 테이블에 새 데이터만 추가하기 때문에 가능합니다.
Delta Lake를 사용하여 SCD(느린 변경 데이터) 및 CDC(변경 데이터 캡처)
Delta Live Tables는 SCD Type 1 및 Type 2를 추적하고 적용하기 위한 기본 지원을 제공합니다. 델타 라이브 테이블과 함께 사용하면 APPLY CHANGES INTO
CDC 피드를 처리할 때 잘못된 레코드가 올바르게 처리되는지 확인합니다. APPLY CHANGES API: Delta Live Tables을 사용하여 변경 데이터 캡처 간소화를 참조하세요.
델타 테이블을 원본과 증분 방식으로 동기화
Databricks SQL 및 Databricks Runtime 12.2 LTS 이상에서는 임의의 조건을 만들어 테이블의 일부를 원자성으로 삭제하고 바꿀 수 있습니다 WHEN NOT MATCHED BY SOURCE
. 이는 초기 데이터 입력 후 며칠 동안 레코드가 변경되거나 삭제될 수 있지만 최종 상태로 정착되는 원본 테이블이 있는 경우에 특히 유용할 수 있습니다.
다음 쿼리에서는 이 패턴을 사용하여 원본에서 5일간의 레코드를 선택하고, 대상에서 일치하는 레코드를 업데이트하고, 원본에서 대상으로 새 레코드를 삽입하고, 대상에서 지난 5일 동안 일치하지 않는 모든 레코드를 삭제하는 방법을 보여 줍니다.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
원본 및 대상 테이블에 동일한 부울 필터를 제공하면 삭제를 포함하여 원본에서 대상 테이블로 변경 내용을 동적으로 전파할 수 있습니다.
참고 항목
이 패턴은 조건부 절 없이 사용할 수 있지만 이로 인해 비용이 많이 들 수 있는 대상 테이블을 완전히 다시 작성하게 됩니다.