대량의 데이터를 처리할 때 전체 데이터 세트를 다시 처리하는 대신 새 레코드와 변경된 레코드만 처리할 수 있는 파이프라인이 필요합니다. 이를 증분 ETL이라고합니다. Databricks SQL에서는 절차 코드를 작성하거나 수동 새로 고침을 예약하지 않고도 스트리밍 테이블 및 구체화된 뷰를 사용하여 증분 ETL 파이프라인을 빌드할 수 있습니다.
이 자습서에서는 일반적인 패턴인 시간에 따른 제품 변경 내용 추적을 안내합니다. 원본 테이블을 만들고, 변경 이벤트를 캡처하고, 각 제품의 전체 기록을 유지하는 차원 테이블을 빌드하고, 위에 집계 보고 계층을 추가합니다.
이 자습서의 주요 기능은 다음과 같습니다 AUTO CDC. 기존 웨어하우스에서는 삽입, 업데이트 및 삭제 이벤트를 대상 테이블에 조정하는 복잡한 MERGE INTO 문을 작성합니다. 이 방법은 특히 이벤트가 순서를 벗어나는 경우 오류가 발생하기 쉽습니다.
AUTO CDC 에서는 이 작업을 처리합니다. 비즈니스 키, 시퀀싱 열 및 SCD 유형 1(최신 값만 해당) 또는 SCD 형식 2(전체 기록)를 선언하고 Azure Databricks 올바른 병합 논리를 자동으로 적용할지 여부를 선언합니다. CDC 개요에 대해서는 AUTO CDC API: 파이프라인을 사용하여 변경 데이터 캡처 간소화를 참조하세요.
이 자습서를 마치면 다음을 수행할 수 있습니다.
- 변경 데이터 피드를 사용하여 변경 내용을 추적하는 원본 테이블을 만들었습니다.
- CDC 이벤트 스트림을 이해하기 위해 원시 변경 데이터를 검사했습니다.
- 해당 이벤트에서 SCD Type 2 차원 테이블을 빌드하는 데 사용됩니다
AUTO CDC. - 파이프라인을 통해 증분 방식으로 삭제 이벤트를 처리했습니다.
- 집계 보고서를 증분 방식으로 유지하는 구체화된 뷰를 만들었습니다.
- 파이프라인을 통해 변경 내용이 자동으로 전파되도록 구성
SCHEDULE REFRESH EVERY 1 DAY되었습니다.
요구 사항
이 자습서를 완료하려면 다음 요구 사항을 충족해야 합니다.
- Unity Catalog가 설정된 Azure Databricks 작업 영역입니다.
- SQL 웨어하우스(서버리스 또는 프로).
- 컴퓨팅 리소스를 만들거나 컴퓨팅 리소스에액세스할 수 있는 권한이 있습니다.
- 계정에 대해 서버리스 컴퓨팅이 사용하도록 설정되었습니다. 지역 가용성이 제한된 기능을참조하세요.
1단계: 카탈로그 및 스키마 설정
Databricks SQL 편집기를 열고 작업 카탈로그 및 스키마를 설정합니다. 선택한 카탈로그 및 스키마에 대한 USE 권한이 있어야 합니다.
USE CATALOG <your-catalog>;
USE SCHEMA <your-schema>;
2단계: 원본 테이블 만들기 및 데이터 로드
테이블을 만들고 초기 레코드를 로드합니다.
CREATE OR REPLACE TABLE products ( product_id INT, product_name STRING, category STRING, warehouse STRING ) TBLPROPERTIES (delta.enableChangeDataFeed = true); INSERT INTO products VALUES (1, 'Spoon', 'Cutlery', 'Seattle'), (2, 'Fork', 'Cutlery', 'Portland'), (3, 'Knife', 'Cutlery', 'Denver'), (4, 'Chair', 'Furniture', 'Austin'), (5, 'Table', 'Furniture', 'Chicago'), (6, 'Lamp', 'Lighting', 'Boston'), (7, 'Mug', 'Kitchenware', 'Seattle'), (8, 'Plate', 'Kitchenware', 'Atlanta'), (9, 'Bowl', 'Kitchenware', 'Dallas'), (10, 'Glass', 'Kitchenware', 'Phoenix');새 제품, 웨어하우스 이동 및 범주 재할당을 포함하여 업스트림 변경 내용을 시뮬레이트합니다.
INSERT INTO products VALUES (11, 'Napkin', 'Dining', 'San Francisco'), (12, 'Coaster', 'Dining', 'New York'); UPDATE products SET warehouse = 'Los Angeles' WHERE product_id = 1; UPDATE products SET category = 'Dining' WHERE product_id = 2;
3단계: 변경 데이터 피드 쿼리
다운스트림 파이프라인을 빌드하기 전에 처리할 내용을 AUTO CDC 이해할 수 있도록 원시 변경 이벤트를 살펴보는 데 도움이 됩니다. 이 함수는 table_changes() CDF 로그를 읽고 메타데이터 열과 함께 캡처된 모든 작업을 반환합니다.
SELECT
product_id, product_name, warehouse,
_change_type, _commit_version
FROM table_changes('products', 1)
ORDER BY _commit_version, product_id;
예를 들어, Spoon에는 insert (시애틀), update_preimage (시애틀), update_postimage (로스앤젤레스)라는 세 가지 이벤트가 있습니다.
단일 논리적 변경(예: 숟가락을 다른 웨어하우스로 이동)은 여러 이벤트(사전 이미지 및 사후 이미지)를 생성합니다. 기존 웨어하우스에서는 이러한 모든 이벤트를 대상 테이블로 조정하고, 별도의 논리로 삽입, 업데이트 및 삭제를 처리하고, 이벤트가 올바른 순서로 적용되도록 하는 문을 작성 MERGE 합니다. 이것이 바로 다음 단계에서 제거되는 AUTO CDC 복잡성입니다.
4단계: SCD 타입 2 차원을 다음을 사용하여 빌드하십시오 AUTO CDC
중요합니다
AUTO CDC
베타에 있습니다. Databricks Runtime 17.3 이상이 필요합니다.
스트리밍 테이블은 데이터를 증분 방식으로 처리합니다. 새로 고칠 때마다 마지막 실행 이후 새 행만 읽으므로 전체 데이터 세트를 다시 처리할 필요가 없습니다. 따라서 대용량 또는 자주 변경되는 원본에 적합합니다.
AUTO CDC 는 스트리밍 테이블 위에 변경 데이터 캡처 처리를 추가합니다. 삽입, 업데이트 및 삭제를 수동으로 처리하는 MERGE INTO 문을 작성하는 대신 비즈니스 키 및 시퀀싱 열을 선언하고 Azure Databricks 올바른 논리를 적용할 수 있습니다.
AUTO CDC 또한 순서가 잘못된 이벤트를 자동으로 처리합니다. 이는 분산 시스템에서 도착하는 이벤트 또는 겹치는 타임스탬프가 있는 일괄 처리 로드를 처리하는 데 사용할 MERGE INTO 때 일반적인 문제입니다.
다음 문은 각 제품의 전체 버전 기록을 유지하는 SCD Type 2 테이블을 만듭니다. 각 버전이 가져오 __START_AT 고 __END_AT 타임스탬프가 표시됩니다. A NULL in __END_AT 은 현재 버전을 표시합니다.
CREATE OR REFRESH STREAMING TABLE products_history
SCHEDULE REFRESH EVERY 1 DAY
FLOW AUTO CDC
FROM STREAM products WITH (readChangeFeed = true)
KEYS (product_id)
APPLY AS DELETE WHEN _change_type = 'delete'
SEQUENCE BY _commit_timestamp
COLUMNS * EXCEPT (_change_type, _commit_version, _commit_timestamp)
STORED AS SCD TYPE 2;
-
SCHEDULE REFRESH EVERY 1 DAY: 매일 일정에 따라 테이블을 새로 고칩니다. -
FLOW AUTO CDC: 이를 CDC 흐름으로 선언합니다. Azure Databricks 삽입, 업데이트 및 삭제 의미 체계를 자동으로 적용합니다. -
KEYS (product_id): 비즈니스 키입니다. 키가 같은 이벤트는 버전이 지정된 행으로 병합됩니다. -
APPLY AS DELETE WHEN _change_type = 'delete': 삭제 이벤트가 도착하면 현재 버전을 닫습니다. 이렇게 하면 삭제 이벤트를 식별하는 조건을 정의할 수 있습니다. -
SEQUENCE BY _commit_timestamp: 이벤트 순서를 설정합니다. 잘못된 도착을 올바르게 처리합니다. -
STORED AS SCD TYPE 2: 전체 기록을 유지합니다.AUTO CDC는 SCD Type 1과 SCD Type 2를 모두 지원합니다.
차원 테이블을 쿼리합니다.
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
- 숟가락 : 두 가지 버전. 시애틀(닫힘,
__END_AT집합) 및 로스앤젤레스(현재,__END_AT = NULL). - 포크: 두 가지 버전. 칼붙이 범주(닫힘) 및 식사 범주(현재).
- Napkin 및 Coaster: 각각 하나의 버전(새로 삽입됨,
__END_AT = NULL). - 다른 모든 제품: 각각 하나의 버전(
__END_AT = NULL).
5단계: 파이프라인을 통해 삭제 처리
이제 원본 테이블에서 두 개의 중단된 제품을 삭제하여 시뮬레이션합니다.
DELETE FROM products WHERE product_id = 9;
DELETE FROM products WHERE product_id = 10;
이러한 삭제 이벤트는 CDF 로그에 기록되지만 스트리밍 테이블에는 아직 표시되지 않습니다. 스트리밍 테이블을 새로 고쳐 새 이벤트를 처리합니다.
REFRESH STREAMING TABLE products_history;
차원 테이블을 쿼리하여 삭제가 적용되었는지 확인합니다.
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
그릇과 유리는 이제 __END_AT 설정으로 단종되어 중단된 것으로 표시됩니다. 다른 모든 현재 제품은 변경되지 않은 상태로 유지됩니다. 스트리밍 테이블은 이전 새로 고침에서 삽입 및 업데이트를 다시 처리하지 않고 새 삭제 이벤트만 처리했습니다.
6단계: 집계 구체화된 뷰 만들기
원본 변경 내용과 함께 최신 상태로 유지되는 차원 테이블이 있으므로 맨 위에 보고 계층을 추가할 수 있습니다.
구체화된 뷰는 미리 계산된 쿼리 결과를 실제 테이블로 저장합니다. 쿼리를 읽을 때마다 쿼리를 다시 실행하는 일반 보기와 달리 구체화된 뷰는 결과를 유지하며 각 새로 고침 시 업스트림 변경의 영향을 받는 행만 다시 계산합니다. 따라서 쿼리 성능이 중요한 대시보드 및 보고서에 적합합니다.
CREATE OR REPLACE MATERIALIZED VIEW products_by_category
SCHEDULE REFRESH EVERY 1 DAY
AS
SELECT
category,
COUNT(*) AS active_products
FROM products_history
WHERE __END_AT IS NULL
GROUP BY category;
SCHEDULE REFRESH EVERY 1 DAY 는 이 보기가 매일 일정에 따라 새로 고침됩니다. 스트리밍 테이블의 동일한 일정과 결합하면 이제 원본 테이블의 변경 내용이 차원을 통해 각 새로 고침 주기의 집계로 계단식으로 바뀌는 3단계 파이프라인이 있습니다. 실행할 수동 새로 고침이 없습니다.
SELECT * FROM products_by_category ORDER BY active_products DESC;
7단계: 엔드 투 엔드 캐스케이드 확인
전체 파이프라인을 계단식으로 확인하려면 원본 테이블을 변경합니다.
UPDATE products SET warehouse = 'Seattle' WHERE product_id = 3;
칼은 덴버에서 시애틀로 이동합니다. 이 단일 DML 변경은 전체 파이프라인 연계를 트리거하여 세 단계가 함께 작동하는 방법을 보여 줍니다.
-
products는 CDF를 통해 변경 이벤트를 기록합니다. -
products_history는 이벤트를 처리하고 Knife에 대한 새 버전을 추가합니다. -
products_by_category영향을 받는 커틀러리 행만 다시 계산합니다.
확인:
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
WHERE product_id = 3
ORDER BY __START_AT;
SELECT * FROM products_by_category ORDER BY active_products DESC;
정리
이 자습서에서 만든 리소스를 정리하려면 다음 SQL을 사용합니다.
DROP MATERIALIZED VIEW IF EXISTS products_by_category;
DROP STREAMING TABLE IF EXISTS products_history;
DROP TABLE IF EXISTS products;