테이블을 수정하는 각 작업은 새 테이블 버전을 만듭니다. 기록 정보를 사용하여 작업 감사, 테이블 롤백 또는 시간 여행을 사용하여 특정 시점에 테이블을 쿼리합니다.
참고 항목
Databricks는 데이터 보관을 위한 장기 백업 솔루션으로 테이블 기록을 사용하지 않는 것이 좋습니다. 데이터 및 로그 보존 구성을 모두 더 큰 값으로 설정하지 않은 경우 지난 7일 동안의 시간 이동 작업에만 사용합니다.
테이블 기록 검색
history 명령을 실행함으로써 테이블에 쓰기 작업이 발생할 때마다 해당 작업, 사용자 및 타임스탬프를 포함한 정보를 검색합니다. 연산은 역순으로 반환됩니다.
테이블 기록 보존은 기본적으로 30일인 테이블 설정 logRetentionDuration에 의해 결정됩니다.
참고 항목
시간 이동 및 테이블 기록은 서로 다른 보존 임계값에 의해 제어됩니다. 시간 여행이란?을 참조하세요.
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Spark SQL 구문 세부 정보는 DESCRIBE HISTORY참조하세요.
Scala/Java/Python 구문에 대한 자세한 내용은 Delta Lake API 설명서를 참조하세요.
카탈로그 탐색기는 이 자세한 테이블 정보 및 기록을 시각적으로 볼 수 있습니다. 테이블 스키마 및 샘플 데이터 외에도 기록 탭을 클릭하여 DESCRIBE HISTORY를 통해 표시하는 테이블 기록을 볼 수 있습니다.
기록 스키마
history 작업의 출력은 다음과 같은 열을 갖습니다.
| 칼럼 | 유형 | 설명 |
|---|---|---|
| 버전 | 길다 | 작업에 의해 생성된 테이블 버전. |
| 시간표시 | 시간표시 | 이 버전이 커밋된 시점. |
| 사용자 ID | 문자열 | 작업을 실행한 사용자의 ID. |
| 사용자 이름 | 문자열 | 작업을 실행한 사용자의 이름. |
| 수술 | 문자열 | 작업의 이름입니다. |
| 운영 매개변수 | 지도 | 작업의 매개 변수(예: 조건자). |
| 작업(job) | 구조체 (struct) | 작업을 실행한 작업의 세부 정보. |
| Notebook | 구조체 (struct) | 작업이 실행된 Notebook의 세부 정보. |
| 클러스터ID | 문자열 | 작업이 실행된 클러스터의 ID. |
| 버전읽기 | 길다 | 쓰기 작업을 수행하기 위해 읽은 테이블의 버전. |
| 격리 수준 | 문자열 | 이 작업에 사용된 격리 수준. |
| isBlindAppend | 부울 값 | 이 작업에 데이터가 추가되었는지 여부. |
| operationMetrics | 지도 | 작업의 메트릭(예: 수정된 행 및 파일의 수). |
| 사용자 메타데이터 | 문자열 | 지정된 경우, 사용자 정의 커밋 메타데이터 |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
참고 항목
- 다음 메서드를 사용하여 테이블에 쓰는 경우 다른 열 중 일부를 사용할 수 없습니다.
- 추후 추가되는 열은 항상 마지막 열 뒤에 추가됩니다.
작업 매개 변수 이해 partitionBy
이 partitionBy 필드는 테이블의 파티션 스키마를 정의하거나 변경하는 CREATE 및 OVERWRITE 작업에만 의미가 있습니다.
기존 테이블(APPEND, INSERT, UPDATE, DELETE, MERGE)에 대한 추가 작업의 경우, 이 필드는 사용된 쓰기 방법([] vs .save())에 따라 빈 배열 .saveAsTable() 또는 파티션 열을 표시할 수 있습니다. 이 불일치는 예상된 동작이며 쓰기의 유효성을 검사하는 데 사용하면 안 됩니다.
중요합니다
기록에서 partitionBy에 의존하여 추가 작업의 유효성을 검사하지 마세요. 값은 구현 세부 정보에 따라 다르지만 데이터가 파티션에 기록되는 방식에는 영향을 주지 않습니다.
Example
date 열로 분할된 테이블을 고려합니다.
# Initial table creation - partitionBy is populated
df.write.format("delta") \
.partitionBy("date") \
.saveAsTable("sales_data")
기록 내 CREATE 작업은 다음을 보여줍니다.
operationParameters: {
"mode": "ErrorIfExists",
"partitionBy": "[\"date\"]"
}
이 테이블에 데이터를 추가하는 경우:
# Subsequent append - partitionBy shows empty
new_df.write.format("delta") \
.mode("append") \
.saveAsTable("sales_data")
APPEND 작업에는 다음이 표시됩니다.
operationParameters: {
"mode": "Append",
"partitionBy": "[]"
}
빈 partitionBy 값이 필요합니다. 데이터는 여전히 테이블의 기존 파티션 스키마를 기반으로 올바른 파티션에 기록됩니다.
.save() 경로에 이 필드의 파티션 열이 표시 될 수 있지만 이러한 차이는 구현 세부 정보이며 쓰기 동작에 영향을 주지 않습니다.
작업 메트릭
history 작업은 operationMetrics 열 맵으로 작업 메트릭 모음을 반환합니다.
다음 표에는 작업별 맵 키 정의가 나와 있습니다.
| 연산 | 메트릭 이름 | 설명 |
|---|---|---|
| 기록, CREATE TABLE을 SELECT로, TABLE를 SELECT로 교체, COPY INTO | ||
| 파일수 | 쓰기된 파일의 수. | |
| 출력 바이트 수 (numOutputBytes) | 쓰기된 콘텐츠의 크기(바이트). | |
| 출력행수 | 쓰기된 행의 수. | |
| 스트리밍 UPDATE | ||
| 추가된 파일 수 | 추가된 파일의 수. | |
| 제거된파일수 | 제거된 파일의 수. | |
| 출력행수 | 쓰기된 행의 수. | |
| 출력 바이트 수 (numOutputBytes) | 쓰기의 크기(바이트). | |
| 삭제 | ||
| 추가된 파일 수 | 추가된 파일의 수. 테이블의 파티션이 삭제된 경우에는 제공되지 않음. | |
| 제거된파일수 | 제거된 파일의 수. | |
| 삭제된 행 수 | 제거된 행의 수. 테이블의 파티션이 삭제된 경우에는 제공되지 않음. | |
| numCopiedRows | 파일을 삭제하는 과정에서 복사된 행의 수. | |
| 실행시간(ms) | 작업 전체를 실행하는 데 걸린 시간. | |
| scanTimeMs | 파일에서 일치하는 항목을 검색하는 데 걸린 시간. | |
| rewriteTimeMs | 일치하는 파일을 다시 쓰는 데 걸린 시간. | |
| 잘라내다 | ||
| 제거된파일수 | 제거된 파일의 수. | |
| 실행시간(ms) | 작업 전체를 실행하는 데 걸린 시간. | |
| 병합 | ||
| numSourceRows | 원본 DataFrame의 행 수. | |
| numTargetRowsInserted | 대상 테이블에 삽입된 행 수. | |
| 목표 행 갱신 수 | 대상 테이블에서 업데이트된 행 수. | |
| 대상 행 삭제 수 | 대상 테이블에서 삭제된 행 수. | |
| 복사된 대상 행 수 | 복사된 대상 행의 수. | |
| 출력행수 | 출력으로 쓰기된 행의 총 수. | |
| 대상 파일 추가 수 | 싱크(대상)에 추가된 파일의 수. | |
| 대상 파일 수 제거됨 | 싱크(대상)에서 제거된 파일의 수. | |
| 실행시간(ms) | 작업 전체를 실행하는 데 걸린 시간. | |
| scanTimeMs | 파일에서 일치하는 항목을 검색하는 데 걸린 시간. | |
| rewriteTimeMs | 일치하는 파일을 다시 쓰는 데 걸린 시간. | |
| UPDATE | ||
| 추가된 파일 수 | 추가된 파일의 수. | |
| 제거된파일수 | 제거된 파일의 수. | |
| numUpdatedRows | 업데이트된 행의 수. | |
| numCopiedRows | 파일을 업데이트하는 과정에서 방금 복사된 행의 수. | |
| 실행시간(ms) | 작업 전체를 실행하는 데 걸린 시간. | |
| scanTimeMs | 파일에서 일치하는 항목을 검색하는 데 걸린 시간. | |
| rewriteTimeMs | 일치하는 파일을 다시 쓰는 데 걸린 시간. | |
| FSCK (파일 시스템 검사 유틸리티) | 제거된파일수 | 제거된 파일의 수. |
| 변환 | 변환된 파일 수 | 변환된 Parquet 파일의 수. |
| OPTIMIZE | ||
| 추가된 파일 수 | 추가된 파일의 수. | |
| 제거된파일수 | 최적화된 파일의 수. | |
| 추가된 바이트 수 | 테이블이 최적화된 후 추가된 바이트 수. | |
| 제거된 바이트 수 | 제거된 바이트 수. | |
| 최소파일크기 | 테이블이 최적화된 후 가장 작은 파일의 크기. | |
| p25파일 크기 | 테이블이 최적화된 후 25번째 백분위수 파일의 크기. | |
| p50FileSize | 테이블이 최적화된 후의 파일 크기의 중앙값. | |
| p75FileSize | 테이블이 최적화된 후 75번째 백분위수 파일의 크기. | |
| 최대 파일 크기 | 테이블이 최적화된 후 가장 큰 파일의 크기. | |
| 클론 | ||
| 소스 테이블 크기 (sourceTableSize) | 복제된 버전에서 원본 테이블의 크기(바이트)입니다. | |
| sourceNumOfFiles | 복제된 버전에서 원본 테이블의 파일 수입니다. | |
| 제거된파일수 | 이전 테이블을 대체한 경우 대상 테이블에서 제거된 파일 수입니다. | |
| 삭제된 파일 크기 | 이전 테이블을 교체한 경우 대상 테이블에서 제거된 파일의 총 크기(바이트)입니다. | |
| 복사된파일수 | 새 위치로 복사된 파일의 수. 얕은 복제의 경우 0. | |
| 복사된파일크기 | 새 위치로 복사된 파일의 총 크기(바이트). 얕은 복제의 경우 0. | |
| RESTORE | ||
| 복원 후 테이블 크기 | 복원 후 테이블 크기(바이트). | |
| 복원 후 파일 수 | 복원 후 테이블의 파일 수. | |
| 제거된파일수 | 복원 작업으로 제거된 파일의 수. | |
| 복구된 파일 수 | 복원의 결과로 추가된 파일의 수. | |
| 삭제된 파일 크기 | 복원에 의해 제거된 파일의 크기(바이트). | |
| 복원된 파일 크기 | 복원에 의해 추가된 파일의 크기(바이트). | |
| VACUUM | ||
| 삭제된 파일 수 | 삭제된 파일의 수. | |
| 진공처리된디렉토리수 | vacuum된 디렉터리의 수. | |
| numFilesToDelete | 삭제할 파일의 수. |
시간 여행이란?
시간 이동은 타임스탬프 또는 테이블 버전(트랜잭션 로그에 기록된 대로)을 기반으로 이전 테이블 버전 쿼리를 지원합니다. 다음과 같은 애플리케이션에 시간 이동 기능을 사용할 수 있습니다.
- 분석, 보고서 또는 출력(예: 기계 학습 모델의 출력)을 다시 생성합니다. 특히 규제 산업에서 디버깅 또는 감사에 유용할 수 있습니다.
- 복잡한 임시 쿼리를 작성합니다.
- 데이터의 실수를 수정합니다.
- 데이터가 빠르게 변경되는 테이블의 쿼리 집합에 스냅샷 격리를 제공합니다.
중요합니다
Databricks Runtime 18.0 이상에서는 테이블 속성(기본값 7일)보다 deletedFileRetentionDuration 오래된 버전을 요청하는 경우 시간 이동 쿼리가 차단됩니다. Unity 카탈로그 관리 테이블의 경우 Databricks Runtime 12.2 이상에 적용됩니다.
시간 이동 구문
테이블 이름 사양 다음에 절을 추가하여 시간 이동이 있는 테이블을 쿼리합니다.
-
timestamp_expression는 다음 중 하나일 수 있습니다.-
'2018-10-18T22:15:12.013Z', 즉, 타임스탬프로 캐스팅할 수 있는 문자열 cast('2018-10-18 13:36:32 CEST' as timestamp)-
'2018-10-18', 즉, 날짜 문자열 current_timestamp() - interval 12 hoursdate_sub(current_date(), 1)- 타임스탬프이거나 타임스탬프로 캐스팅될 수 있는 기타 식
-
-
version은DESCRIBE HISTORY table_spec출력에서 가져올 수 있는 긴 값입니다.
timestamp_expression 및 version는 모두 하위 쿼리가 될 수 없습니다.
날짜 또는 타임스탬프 문자열만 허용됩니다. 예를 들어 "2019-01-01" 및 "2019-01-01T00:00:00.000Z"를 지정합니다. 다음 코드에서 예제 구문 참조:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
파이썬
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
@ 구문을 사용하여 테이블 이름의 일부로 타임스탬프 또는 버전을 지정할 수도 있습니다. 타임스탬프는 yyyyMMddHHmmssSSS 형식이어야 합니다. 버전 앞에 @를 추가하여 v 뒤에 버전을 지정할 수 있습니다. 다음 코드에서 예제 구문 참조:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
파이썬
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
트랜잭션 로그 검사점이란?
테이블 버전은 테이블 데이터와 함께 저장되는 트랜잭션 로그 디렉터리 내에서 JSON 파일로 기록됩니다. 검사점 쿼리를 최적화하기 위해 테이블 버전은 Parquet 검사점 파일로 집계되어 테이블 기록의 모든 JSON 버전을 읽을 필요가 없습니다. Azure Databricks는 데이터 크기 및 워크로드에 대한 검사점 빈도를 최적화합니다. 사용자는 검사점과 직접 상호 작용할 필요가 없습니다. 검사점 빈도는 예고 없이 변경될 수 있습니다.
시간 이동 쿼리에 대한 데이터 보존 구성
이전 테이블 버전을 쿼리하려면 해당 버전의 로그와 데이터 파일을 모두 유지해야 합니다.
테이블에 대해 실행하면 VACUUM 데이터 파일이 삭제됩니다. 로그 파일 제거는 체크포인트 테이블 버전 이후 자동으로 관리됩니다.
대부분의 테이블은 정기적으로 VACUUM이(가) 실행되기 때문에 특정 시점 쿼리는 기본적으로 7일인 VACUUM의 보존 임계값을 준수해야 합니다.
테이블에 대한 데이터 보존 임계값을 늘리려면 다음 테이블 속성을 구성해야 합니다.
-
delta.logRetentionDuration = "interval <interval>": 테이블에 대한 기록이 유지되는 기간을 제어합니다. 기본값은interval 30 days입니다. -
delta.deletedFileRetentionDuration = "interval <interval>": 현재 테이블 버전에서 더 이상 참조되지 않는 데이터 파일을 제거하는 데 사용되는 임계값VACUUM을 결정합니다. 기본값은interval 7 days입니다.
테이블을 만드는 동안 테이블 속성을 지정하거나 문을 사용하여 ALTER TABLE 설정할 수 있습니다.
테이블 속성 참조를 참조하세요.
참고 항목
Databricks Runtime 18.0 및 그 이상에서는 logRetentionDuration가 deletedFileRetentionDuration보다 크거나 같아야 합니다. Unity 카탈로그 관리 테이블의 경우 Databricks Runtime 12.2 이상에 적용됩니다.
30일간의 기록 데이터에 액세스하려면 기본 설정delta.deletedFileRetentionDuration = "interval 30 days"과 일치하도록 설정합니다 delta.logRetentionDuration .
데이터 보존 임계값을 늘리면 더 많은 데이터 파일이 유지 관리됨에 따라 스토리지 비용이 증가할 수 있습니다.
테이블을 이전 상태로 복원
명령을 사용하여 RESTORE 테이블을 이전 상태로 복원할 수 있습니다. 테이블은 내부적으로 이전 버전으로 복원할 수 있도록 기록 버전을 유지 관리합니다.
이전 상태에 해당하는 버전 또는 이전 상태가 만들어진 시점의 타임스탬프가 RESTORE 명령에 의해 옵션으로 지원됩니다.
중요합니다
- 이미 복원된 테이블을 복원할 수 있습니다.
- 복제된 테이블을 복원할 수 있습니다.
- 복원할 테이블에 대한
MODIFY권한이 필요합니다. - 데이터 파일이 수동으로 또는 에 의해
vacuum삭제된 이전 버전으로 테이블을 복원할 수 없습니다.spark.sql.files.ignoreMissingFiles가true로 설정된 경우에는 이 버전으로 부분적으로 복원하는 것이 가능합니다. - 이전 상태로 복원하기 위한 타임스탬프 형식은
yyyy-MM-dd HH:mm:ss입니다. 날짜(yyyy-MM-dd) 문자열만 제공하는 것도 지원됩니다.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
구문 세부 정보는 RESTORE참조하세요.
중요합니다
복원은 데이터 변경 작업으로 간주됩니다. 명령에 의해 추가된 RESTORE 로그 항목에는 dataChange 가 true로 설정된 것이 포함됩니다. 테이블에 대한 업데이트를 처리하는 구조적 스트리밍 작업과 같은 다운스트림 애플리케이션이 있는 경우 복원 작업에서 추가한 데이터 변경 로그 항목은 새 데이터 업데이트로 간주되며 이를 처리하면 데이터가 중복될 수 있습니다.
예시:
| 테이블 버전 | 연산 | 로그 업데이트 | 데이터 변경 로그 업데이트의 레코드 |
|---|---|---|---|
| 0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (이름 = Viktor, 나이 = 29, (이름 = George, 나이 = 55) |
| 1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (name = George, age = 39) |
| 2 | OPTIMIZE | 파일추가(/path/to/file-3, 데이터변경 = false), 파일제거(/path/to/file-1), 파일제거(/path/to/file-2) | (최적화 압축으로 레코드가 없는 경우 테이블의 데이터가 변경되지 않음) |
| 3 | RESTORE(버전=1) | 파일 제거(/path/to/file-3), 파일 추가(/path/to/file-1, dataChange = true), 파일 추가(/path/to/file-2, dataChange = true) | (이름 = Viktor, 나이 = 29), (이름 = George, 나이 = 55), (이름 = George, 나이 = 39) |
앞의 예제 RESTORE 에서 이 명령은 테이블 버전 0과 1을 읽을 때 이미 표시된 업데이트를 생성합니다. 스트리밍 쿼리가 이 테이블을 읽는 경우 이러한 파일은 새로 추가된 데이터로 간주되어 다시 처리됩니다.
메트릭 복원
RESTORE는 작업이 완료되면 다음과 같은 메트릭을 단일 행 DataFrame으로 보고합니다.
table_size_after_restore: 복원 후 테이블의 크기.num_of_files_after_restore: 복원 후 테이블의 파일 수.num_removed_files: 테이블에서 제거된(논리적으로 삭제된) 파일의 수.num_restored_files: 롤백으로 인해 복원된 파일의 수.removed_files_size: 테이블에서 제거된 파일의 총 크기(바이트).restored_files_size: 복원된 파일의 총 크기(바이트).
시간 이동 사용 예
실수로 인한 사용자(
111)에 대한 테이블 삭제를 수정합니다.INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111실수로 인한 테이블에 대한 잘못된 업데이트를 수정합니다.
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *지난 주에 추가된 신규 고객 수를 쿼리합니다.
SELECT ( SELECT count(distinct userId) FROM my_table ) - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7) ) AS new_customers
Spark 세션에서 마지막 커밋의 버전을 찾으려면 어떻게 해야 하나요?
모든 스레드와 모든 테이블에서 현재 SparkSession에 의해 써진 마지막 커밋의 버전 번호를 가져오려면 SQL 구성 spark.databricks.delta.lastCommitVersionInSession을 쿼리합니다.
참고 항목
Apache Iceberg 테이블의 경우 spark.databricks.iceberg.lastCommitVersionInSession를 spark.databricks.delta.lastCommitVersionInSession 대신 사용합니다.
SQL
SET spark.databricks.delta.lastCommitVersionInSession
파이썬
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
스칼라
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
SparkSession에서 만들어진 커밋이 없는 경우 키를 쿼리하면 빈 값이 반환됩니다.
참고 항목
여러 스레드에서 동일한 SparkSession 항목을 공유하는 경우 여러 스레드에서 변수를 공유하는 것과 비슷합니다. 구성 값이 동시에 업데이트되면 경합 조건에 도달할 수 있습니다.