다음을 통해 공유


Azure Databricks에서 Delta Lake 변경 데이터 피드 사용

변경 데이터 피드를 통해 Azure Databricks는 델타 테이블 버전 간의 행 수준 변경 내용을 추적할 수 있습니다. 델타 테이블에서 사용하도록 설정하면 런타임은 테이블에 기록된 모든 데이터에 대해 변경 이벤트를 기록합니다. 여기에는 지정된 행이 삽입, 삭제 또는 업데이트되었는지 여부를 나타내는 메타데이터와 함께 행 데이터가 포함됩니다.

Important

변경 데이터 피드는 테이블 기록과 함께 작동하여 변경 정보를 제공합니다. 델타 테이블을 복제하면 별도의 기록이 생성되므로 복제된 테이블의 변경 데이터 피드가 원래 테이블의 변경 데이터 피드와 일치하지 않습니다.

변경 데이터 증분 처리

Databricks는 구조적 스트리밍과 함께 변경 데이터 피드를 사용하여 델타 테이블의 변경 내용을 증분 처리할 것을 권장합니다. 테이블의 변경 데이터 피드에 대한 버전을 자동으로 추적하려면 Azure Databricks용 구조적 스트리밍을 사용해야 합니다.

참고 항목

Delta Live Tables는 변경 데이터를 쉽게 전파하고 결과를 SCD(느린 변경 차원) 유형 1 또는 형식 2 테이블로 저장하는 기능을 제공합니다. 적용 변경 API: 델타 라이브 테이블을 사용하여 변경 데이터 캡처 간소화를 참조 하세요.

테이블에서 변경 데이터 피드를 읽으려면 해당 테이블에서 변경 데이터 피드를 사용하도록 설정해야 합니다. 변경 데이터 피드 사용을 참조하세요.

다음 구문 예제와 같이 변경 데이터 피드를 읽도록 테이블에 대해 스트림을 구성할 때 옵션을 readChangeFeed true 설정합니다.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

기본적으로 스트림은 스트림이 처음 시작될 때 테이블의 최신 스냅샷을 INSERT 반환하고 나중에 변경 데이터로 변경합니다.

데이터 커밋을 Delta Lake 트랜잭션의 일부로 변경하면 새 데이터가 테이블에 커밋되는 동시에 사용할 수 있게 됩니다.

필요에 따라 시작 버전을 지정할 수 있습니다. 시작 버전을 지정해야 하나요?를 참조하세요.

변경 데이터 피드는 시작 버전을 지정해야 하는 일괄 처리 실행도 지원합니다. 일괄 처리 쿼리의 변경 내용 읽기를 참조 하세요.

변경 데이터를 읽을 때 속도 제한(maxFilesPerTrigger, maxBytesPerTrigger)과 같은 옵션과 더불어 excludeRegex도 지원됩니다.

속도 제한은 시작 스냅샷 버전 이외의 버전에 대해 원자 단위일 수 있습니다. 즉, 전체 커밋 버전은 속도가 제한되거나 전체 커밋이 반환됩니다.

시작 버전을 지정해야 하나요?

특정 버전 이전에 발생한 변경 내용을 무시하려면 필요에 따라 시작 버전을 지정할 수 있습니다. 타임스탬프 또는 델타 트랜잭션 로그에 기록된 버전 ID 번호를 사용하여 버전을 지정할 수 있습니다.

참고 항목

일괄 처리 읽기에는 시작 버전이 필요하며, 많은 일괄 처리 패턴은 선택적 종료 버전을 설정하면 도움이 될 수 있습니다.

변경 데이터 피드와 관련된 구조적 스트리밍 워크로드를 구성하는 경우 시작 버전을 지정하는 것이 처리에 미치는 영향을 이해하는 것이 중요합니다.

많은 스트리밍 워크로드, 특히 새 데이터 처리 파이프라인은 기본 동작의 이점을 활용합니다. 기본 동작을 사용하면 스트림이 먼저 테이블의 모든 기존 레코드를 변경 데이터 피드의 작업으로 INSERT 기록할 때 첫 번째 일괄 처리가 처리됩니다.

대상 테이블에 특정 시점까지 적절한 변경 내용이 있는 모든 레코드가 이미 포함되어 있는 경우 원본 테이블 상태를 이벤트로 INSERT 처리하지 않도록 시작 버전을 지정합니다.

다음 예제에서는 검사점이 손상된 스트리밍 실패에서 복구하는 구문입니다. 이 예제에서는 다음 조건을 가정합니다.

  1. 변경 데이터 피드는 테이블을 만들 때 원본 테이블에서 사용하도록 설정되었습니다.
  2. 대상 다운스트림 테이블은 버전 75까지의 모든 변경 내용을 처리했습니다.
  3. 원본 테이블의 버전 기록은 버전 70 이상에서 사용할 수 있습니다.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

이 예제에서는 새 검사점 위치도 지정해야 합니다.

Important

시작 버전을 지정하면 시작 버전이 테이블 기록에 더 이상 없는 경우 스트림이 새 검사점에서 시작되지 않습니다. Delta Lake는 기록 버전을 자동으로 정리하므로 지정된 모든 시작 버전이 결국 삭제됩니다.

변경 데이터 피드를 사용하여 테이블의 전체 기록을 재생할 수 있나요?

일괄 처리 쿼리의 변경 내용 읽기

일괄 처리 쿼리 구문을 사용하여 특정 버전에서 시작하는 모든 변경 내용을 읽거나 지정된 버전 범위 내의 변경 내용을 읽을 수 있습니다.

버전을 정수로 지정하고 타임스탬프를 yyyy-MM-dd[ HH:mm:ss[.SSS]] 형식의 문자열로 지정합니다.

시작 및 끝 버전은 쿼리에 포함됩니다. 특정 시작 버전에서 테이블의 최신 버전으로 변경 내용을 읽으려면 시작 버전만 지정합니다.

변경 이벤트를 기록한 버전보다 낮은 버전 또는 오래된 타임스탬프를 제공하는 경우, 즉, 변경 데이터 피드를 사용하도록 설정했을 때 변경 데이터 피드가 사용 설정되지 않았음을 나타내는 오류가 throw됩니다.

다음 구문 예제에서는 일괄 처리 읽기와 함께 버전 시작 및 종료 옵션을 사용하는 방법을 보여 줍니다.

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

참고 항목

기본적으로 사용자가 테이블의 마지막 커밋을 초과하는 버전 또는 타임스탬프를 전달하는 경우 timestampGreaterThanLatestCommit 오류가 throw됩니다. Databricks Runtime 11.3 LTS 이상에서는 사용자가 다음 구성을 다음으로 설정하는 경우 변경 데이터 피드가 범위를 벗어난 버전 사례를 처리할 true수 있습니다.

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

테이블의 마지막 커밋보다 큰 시작 버전을 제공하거나 테이블의 마지막 커밋보다 최신의 시작 타임스탬프를 제공하는 경우 이전 구성을 사용하도록 설정하면 빈 읽기 결과가 반환됩니다.

테이블의 마지막 커밋보다 큰 최종 버전을 제공하거나 테이블의 마지막 커밋보다 최신의 종료 타임스탬프를 제공하는 경우 이전 구성을 일괄 읽기 모드로 사용하도록 설정하면 시작 버전과 마지막 커밋 간의 모든 변경 내용이 반환됩니다.

변경 데이터 피드의 스키마는 무엇인가요?

테이블에 대한 변경 데이터 피드에서 읽을 때 최신 테이블 버전의 스키마가 사용됩니다.

참고 항목

대부분의 스키마 변경 및 진화 작업은 완전히 지원됩니다. 열 매핑을 사용하도록 설정된 테이블은 모든 사용 사례를 지원하지 않으며 다른 동작을 보여 줍니다. 열 매핑이 사용하도록 설정된 테이블에 대한 변경 데이터 피드 제한을 참조하세요.

델타 테이블의 스키마에 있는 데이터 열 외에도 변경 데이터 피드에는 변경 이벤트의 유형을 식별하는 메타데이터 열이 포함되어 있습니다.

열 이름 Type
_change_type 문자열 insert, update_preimage , update_postimagedelete (1)
_commit_version Long 변경 내용이 포함된 델타 로그 또는 테이블 버전입니다.
_commit_timestamp 타임스탬프 커밋을 만들 때 연결된 타임스탬프입니다.

(1) preimage 는 업데이트 전 값이며 업데이트 postimage 후의 값입니다.

참고 항목

스키마에 추가된 열과 이름이 같은 열이 포함된 경우 테이블에서 변경 데이터 피드를 사용하도록 설정할 수 없습니다. 변경 데이터 피드를 사용하도록 설정하기 전에 이 충돌을 해결하기 위해 테이블의 열 이름을 바꿉니다.

변경 데이터 피드 사용

사용하도록 설정된 테이블에 대한 변경 데이터 피드만 읽을 수 있습니다. 다음 방법 중 하나를 사용하여 변경 데이터 피드 옵션을 명시적으로 사용하도록 설정해야 합니다.

  • 새 테이블: CREATE TABLE 명령에서 테이블 속성 delta.enableChangeDataFeed = true를 설정합니다.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • 기존 테이블: ALTER TABLE 명령에서 테이블 속성 delta.enableChangeDataFeed = true를 설정합니다.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • 모든 새 테이블:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Important

변경 데이터 피드를 사용하도록 설정한 후에 수행한 변경 내용만 기록됩니다. 테이블의 이전 변경 내용은 캡처되지 않습니다.

데이터 스토리지 변경

변경 데이터 피드를 사용하도록 설정하면 테이블에 대한 스토리지 비용이 약간 증가합니다. 변경 데이터 레코드는 쿼리가 실행될 때 생성되며 일반적으로 다시 작성된 파일의 총 크기보다 훨씬 작습니다.

Azure Databricks는 테이블 디렉터리 아래 _change_data 폴더의 UPDATE, DELETEMERGE 작업에 대한 변경 데이터를 기록합니다. 삽입 전용 작업 및 전체 파티션 삭제와 같은 일부 작업은 Azure Databricks가 트랜잭션 로그에서 _change_data 직접 변경 데이터 피드를 효율적으로 계산할 수 있기 때문에 디렉터리에 데이터를 생성하지 않습니다.

폴더의 데이터 파일에 대한 모든 읽기는 _change_data 지원되는 Delta Lake API를 거쳐야 합니다.

_change_data 폴더의 파일은 테이블의 보존 정책을 따릅니다. 명령이 실행되면 변경 데이터 피드 데이터가 삭제됩니다 VACUUM .

변경 데이터 피드를 사용하여 테이블의 전체 기록을 재생할 수 있나요?

변경 데이터 피드는 테이블에 대한 모든 변경 내용의 영구 레코드로 사용되지 않습니다. 변경 데이터 피드는 활성화된 후에 발생하는 변경 내용만 기록합니다.

변경 데이터 피드 및 Delta Lake를 사용하면 항상 원본 테이블의 전체 스냅샷을 다시 생성할 수 있습니다. 즉, 변경 데이터 피드를 사용하도록 설정된 테이블에 대해 새 스트리밍 읽기를 시작하고 해당 테이블의 현재 버전과 이후에 발생하는 모든 변경 내용을 캡처할 수 있습니다.

변경 데이터 피드의 레코드는 일시적이며 지정된 보존 기간에만 액세스할 수 있어야 합니다. 델타 트랜잭션 로그는 정기적으로 테이블 버전과 해당 변경 데이터 피드 버전을 제거합니다. 트랜잭션 로그에서 버전이 제거되면 해당 버전의 변경 데이터 피드를 더 이상 읽을 수 없습니다.

사용 사례에서 테이블에 대한 모든 변경 내용의 영구 기록을 유지해야 하는 경우 증분 논리를 사용하여 변경 데이터 피드에서 새 테이블로 레코드를 작성해야 합니다. 다음 코드 예제에서는 구조적 스트리밍의 증분 처리를 활용하지만 사용 가능한 데이터를 일괄 처리 워크로드로 처리하는 using trigger.AvailableNow을 보여 줍니다. 주 처리 파이프라인을 사용하여 이 워크로드를 비동기적으로 예약하여 감사 목적 또는 전체 재생성을 위해 변경 데이터 피드의 백업을 만들 수 있습니다.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

열 매핑을 사용하도록 설정된 테이블의 데이터 피드 제한 사항 변경

델타 테이블에서 열 매핑을 사용하도록 설정하면 기존 데이터에 대한 데이터 파일을 다시 작성하지 않고도 테이블의 열을 삭제하거나 이름을 바꿀 수 있습니다. 열 매핑을 사용하도록 설정하면 열 이름 바꾸기 또는 삭제, 데이터 형식 변경 또는 Null 허용 여부 변경과 같은 비가치 스키마 변경을 수행한 후 변경 데이터 피드에 제한이 있습니다.

Important

  • 일괄 처리 의미 체계를 사용하여 비가산적 스키마 변경이 발생하는 트랜잭션 또는 범위에 대한 변경 데이터 피드를 읽을 수 없습니다.
  • Databricks Runtime 12.2 LTS 이하에서는 비가산적 스키마 변경이 발생한 열 매핑이 설정된 테이블은 변경 데이터 피드의 스트리밍 읽기를 지원하지 않습니다. 열 매핑 및 스키마 변경 내용이 포함된 스트리밍을 참조하세요.
  • Databricks Runtime 11.3 LTS 이하에서는 열 이름 바꾸기 또는 삭제가 발생한 열 매핑이 활성화된 테이블에 대한 변경 데이터 피드를 읽을 수 없습니다.

Databricks Runtime 12.2 LTS 이상에서는 비가치 스키마 변경이 발생한 열 매핑이 활성화된 테이블에 대한 변경 데이터 피드에서 일괄 읽기를 수행할 수 있습니다. 읽기 작업은 최신 버전의 테이블 스키마를 사용하는 대신 쿼리에 지정된 테이블의 최종 버전 스키마를 사용합니다. 지정된 버전 범위가 비가산적 스키마 변경에 걸쳐 있으면 쿼리가 실패합니다.