데이터 오케스트레이션 및 자동 로더에 대해 SDP(Lakeflow Spark 선언적 파이프라인)를 사용하여 CDC(변경 데이터 캡처)를 사용하여 ETL(추출, 변환 및 로드) 파이프라인을 만들고 배포하는 방법을 알아봅니다. ETL 파이프라인은 원본 시스템에서 데이터를 읽고, 데이터 품질 검사 및 중복 제거 기록과 같은 요구 사항에 따라 해당 데이터를 변환하고, 데이터 웨어하우스 또는 데이터 레이크와 같은 대상 시스템에 데이터를 쓰는 단계를 구현합니다.
이 자습서에서는 MySQL 데이터베이스의 customers 테이블 데이터를 사용하여 다음을 수행합니다.
- Debezium 또는 다른 도구를 사용하여 트랜잭션 데이터베이스에서 변경 내용을 추출하고 클라우드 개체 스토리지(S3, ADLS 또는 GCS)에 저장합니다. 이 자습서에서는 외부 CDC 시스템 설정을 건너뛰고 대신 가짜 데이터를 생성하여 자습서를 간소화합니다.
- 자동 로더를 사용하여 클라우드 개체 스토리지에서 메시지를 증분 방식으로 로드하고 원시 메시지를
customers_cdc테이블에 저장합니다. 자동 로더는 스키마를 유추하고 스키마 진화를 처리합니다. - 기대치를
customers_cdc_clean사용하여 데이터 품질을 확인하는 테이블을 만듭니다. 예를 들어id은 upsert 작업을 실행하는 데 사용되므로 절대null가 되어서는 안 됩니다. - 정리된 CDC 데이터에 대해
AUTO CDC ... INTO작업을 수행하여 변경 내용을 최종customers테이블에 삽입 또는 갱신합니다. - 파이프라인이 모든 변경 사항을 추적하기 위해 유형 2의 SCD2(느린 변경 차원) 테이블을 만드는 방법을 보여 줍니다.
목표는 거의 실시간으로 원시 데이터를 수집하고 데이터 품질을 보장하면서 분석가 팀을 위한 테이블을 작성하는 것입니다.
이 자습서에서는 medallion Lakehouse 아키텍처를 사용하여 브론즈 계층을 통해 원시 데이터를 수집하고, 은색 계층으로 데이터를 정리하고 유효성을 검사하며, 골드 레이어를 사용하여 차원 모델링 및 집계를 적용합니다. 자세한 내용은 메달리온 레이크하우스 아키텍처가 무엇인지에 대해 참조하세요.
구현된 흐름은 다음과 같습니다.
파이프라인, 자동 로더 및 CDC에 대한 자세한 내용은 Lakeflow Spark 선언적 파이프라인, 자동 로더란? 및 CDC(변경 데이터 캡처)란?
요구 사항
이 자습서를 완료하려면 다음 요구 사항을 충족해야 합니다.
- Azure Databricks 작업 영역에 로그인합니다.
- 작업 영역에 Unity 카탈로그 를 사용하도록 설정합니다.
- 계정에 대해 서버리스 컴퓨팅 을 사용하도록 설정해야 합니다. 서버리스 Lakeflow Spark 선언적 파이프라인은 모든 작업 영역 지역에서 사용할 수 없습니다. 사용 가능한 지역에 대한 지역 가용성이 제한된 기능을 참조하세요. 계정에 대해 서버리스 컴퓨팅을 사용하도록 설정하지 않은 경우 단계는 작업 영역에 대한 기본 컴퓨팅과 함께 작동해야 합니다.
- 컴퓨팅 리소스를 만들거나 컴퓨팅 리소스에액세스할 수 있는 권한이 있습니다.
-
카탈로그에 새 스키마를 만들 수 있는 권한이 있습니다. 필요한 사용 권한은
ALL PRIVILEGES또는USE CATALOG및CREATE SCHEMA입니다. -
기존 스키마에 새 볼륨을 만들 수 있는 권한이 있습니다. 필요한 사용 권한은
ALL PRIVILEGES또는USE SCHEMA및CREATE VOLUME입니다.
ETL 파이프라인에서 데이터 캡처 변경
CDC(변경 데이터 캡처)는 트랜잭션 데이터베이스(예: MySQL 또는 PostgreSQL) 또는 데이터 웨어하우스에 대한 레코드의 변경 내용을 캡처하는 프로세스입니다. CDC는 일반적으로 외부 시스템에서 테이블을 다시 구체화하는 스트림으로 데이터 삭제, 추가 및 업데이트와 같은 작업을 캡처합니다. CDC를 사용하면 대량 로드 업데이트가 필요하지 않고도 증분 로드가 가능합니다.
비고
이 튜토리얼을 더 간단하게 하기 위해 외부 CDC 시스템 설정을 생략합니다. CDC 데이터를 실행하고 클라우드 개체 스토리지(S3, ADLS 또는 GCS)에 JSON 파일로 저장하고 있다고 가정합니다. 이 자습서에서는 라이브러리를 Faker 사용하여 자습서에 사용된 데이터를 생성합니다.
CDC 캡처
다양한 CDC 도구를 사용할 수 있습니다. 선도적인 오픈 소스 솔루션 중 하나는 Debezium이지만, Fivetran, Qlik Replicate, StreamSets, Talend, Oracle GoldenGate 및 AWS DMS와 같은 데이터 원본을 간소화하는 다른 구현이 있습니다.
이 자습서에서는 Debezium 또는 DMS와 같은 외부 시스템의 CDC 데이터를 사용합니다. Debezium은 변경된 모든 행을 캡처합니다. 일반적으로 Kafka 토픽에 데이터 변경 기록을 보내거나 파일로 저장합니다.
테이블(JSON 형식)에서 customers CDC 정보를 수집하고 올바른지 확인한 다음 Lakehouse에서 고객 테이블을 구체화해야 합니다.
Debezium의 CDC 입력
변경될 때마다 업데이트되는 행의 모든 필드(id, , firstname, lastnameemailaddress)가 포함된 JSON 메시지가 표시됩니다. 메시지에는 추가 메타데이터도 포함됩니다.
-
operation: 작업 코드(일반적으로 (DELETE,APPEND,UPDATE)) -
operation_date: 각 작업 작업에 대한 레코드의 날짜 및 타임스탬프입니다.
Debezium과 같은 도구는 변경 전의 행 값과 같은 고급 출력을 생성할 수 있지만 이 자습서에서는 간단히 생략합니다.
1단계: 파이프라인 만들기
CDC 데이터 원본을 쿼리하고 작업 영역에서 테이블을 생성하는 새 ETL 파이프라인을 만듭니다.
작업 영역에서 왼쪽 위 모서리에 있는
과더하기 아이콘 새로운 기능 을 클릭하세요.ETL 파이프라인을 클릭합니다.
파이프라인
Pipelines with CDC tutorial의 제목을 원하는 이름으로 변경합니다.제목 아래에서 쓰기 권한이 있는 카탈로그 및 스키마를 선택합니다.
이 카탈로그 및 스키마는 코드에서 카탈로그 또는 스키마를 지정하지 않는 경우 기본적으로 사용됩니다. 코드는 전체 경로를 지정하여 모든 카탈로그 또는 스키마에 쓸 수 있습니다. 이 자습서에서는 여기에서 지정한 기본값을 사용합니다.
고급 옵션에서 빈 파일로 시작을 선택합니다.
코드에 대한 폴더를 선택합니다. 찾아보기를 선택하여 작업 영역의 폴더 목록을 찾아볼 수 있습니다. 쓰기 권한이 있는 폴더를 선택할 수 있습니다.
버전 제어를 사용하려면 Git 폴더를 선택합니다. 새 폴더를 만들어야 하는 경우
을 선택합니다.
자습서에 사용하려는 언어에 따라 파일 언어에 대해 Python 또는 SQL 을 선택합니다.
[선택]을 클릭하여 이러한 설정을 사용하여 파이프라인을 만들고 Lakeflow 파이프라인 편집기를 엽니다.
이제 기본 카탈로그 및 스키마가 있는 빈 파이프라인이 있습니다. 다음으로, 자습서에서 가져올 샘플 데이터를 설정합니다.
2단계: 이 자습서에서 가져올 샘플 데이터 만들기
기존 원본에서 사용자 고유의 데이터를 가져오는 경우에는 이 단계가 필요하지 않습니다. 이 자습서에서는 자습서의 예로 가짜 데이터를 생성합니다. Python 데이터 생성 스크립트를 실행하는 Notebook을 만듭니다. 이 코드는 샘플 데이터를 생성하기 위해 한 번만 실행하면 되므로 파이프라인 업데이트의 일부로 실행되지 않는 파이프라인의 explorations 폴더 내에 만듭니다.
비고
이 코드는 Faker 를 사용하여 샘플 CDC 데이터를 생성합니다. Faker는 자동으로 설치할 수 있으므로 자습서에서 사용합니다 %pip install faker. Notebook에 faker 라이브러리의 종속성을 설정할 수도 있습니다.
Notebook에 종속성 추가를 참조하세요.
Lakeflow 파이프라인 편집기 내의 자산 브라우저 사이드바에서 편집기 왼쪽에 있는
을 클릭합니다.추가한 다음 탐색을 선택합니다.
이름(예:
Setup dataPython 선택)을 지정합니다. 새explorations폴더인 기본 대상 폴더를 그대로 둘 수 있습니다.만들기를 클릭합니다. 그러면 새 폴더에 Notebook이 만들어집니다.
첫 번째 셀에 다음 코드를 입력합니다. 이전 절차에서 선택한 기본 카탈로그 및 스키마와 일치하도록 정의를
<my_catalog><my_schema>변경해야 합니다.%pip install faker # Update these to match the catalog and schema # that you used for the pipeline in step 1. catalog = "<my_catalog>" schema = dbName = db = "<my_schema>" spark.sql(f'USE CATALOG `{catalog}`') spark.sql(f'USE SCHEMA `{schema}`') spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{db}`.`raw_data`') volume_folder = f"/Volumes/{catalog}/{db}/raw_data" try: dbutils.fs.ls(volume_folder+"/customers") except: print(f"folder doesn't exist, generating the data under {volume_folder}...") from pyspark.sql import functions as F from faker import Faker from collections import OrderedDict import uuid fake = Faker() import random fake_firstname = F.udf(fake.first_name) fake_lastname = F.udf(fake.last_name) fake_email = F.udf(fake.ascii_company_email) fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S")) fake_address = F.udf(fake.address) operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)]) fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0]) fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None) df = spark.range(0, 100000).repartition(100) df = df.withColumn("id", fake_id()) df = df.withColumn("firstname", fake_firstname()) df = df.withColumn("lastname", fake_lastname()) df = df.withColumn("email", fake_email()) df = df.withColumn("address", fake_address()) df = df.withColumn("operation", fake_operation()) df_customers = df.withColumn("operation_date", fake_date()) df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")자습서에서 사용되는 데이터 세트를 생성하려면 Shift + Enter 를 입력하여 코드를 실행합니다.
Optional. 이 자습서에 사용된 데이터를 미리 보려면 다음 셀에 다음 코드를 입력하고 코드를 실행합니다. 이전 코드의 경로와 일치하도록 카탈로그 및 스키마를 업데이트합니다.
# Update these to match the catalog and schema # that you used for the pipeline in step 1. catalog = "<my_catalog>" schema = "<my_schema>" display(spark.read.json(f"/Volumes/{catalog}/{schema}/raw_data/customers"))
이렇게 하면 자습서의 나머지 부분에 사용할 수 있는 큰 데이터 집합(가짜 CDC 데이터 포함)이 생성됩니다. 다음 단계에서는 자동 로더를 사용하여 데이터를 수집합니다.
3단계: 자동 로더를 사용하여 증분 데이터 수집
다음 단계는 (가짜) 클라우드 스토리지의 원시 데이터를 브론즈 계층으로 수집하는 것입니다.
이는 여러 가지 이유로 어려울 수 있으며, 해야 할 것은 다음과 같습니다.
- 대규모로 작동하여 수백만 개의 작은 파일을 수집할 수 있습니다.
- 유추 스키마 및 JSON 타입.
- 잘못된 JSON 스키마를 사용하여 잘못된 레코드를 처리합니다.
- 스키마 진화(예: 고객 테이블의 새 열)를 처리합니다.
자동 로더는 스키마 유추 및 스키마 진화를 포함하여 이 수집을 간소화하는 동시에 수백만 개의 들어오는 파일로 확장합니다. 자동 로더는 Python에서 사용 cloudFiles 및 SQL SELECT * FROM STREAM read_files(...) 에서 사용할 수 있으며 다양한 형식(JSON, CSV, Apache Avro 등)과 함께 사용할 수 있습니다.
테이블을 스트리밍 테이블로 정의하면 새 들어오는 데이터만 사용할 수 있습니다. 스트리밍 테이블로 정의하지 않으면 사용 가능한 모든 데이터를 검색하고 수집합니다. 자세한 내용은 스트리밍 테이블을 참조하세요.
자동 로더를 사용하여 들어오는 CDC 데이터를 수집하려면 다음 코드를 복사하여 파이프라인(호출
my_transformation.py)으로 만든 코드 파일에 붙여넣습니다. 파이프라인을 만들 때 선택한 언어에 따라 Python 또는 SQL을 사용할 수 있습니다. 파이프라인의 기본값으로 설정한<catalog>및<schema>로 교체해야 합니다.파이썬
from pyspark import pipelines as dp from pyspark.sql.functions import * # Replace with the catalog and schema name that # you are using: path = "/Volumes/<catalog>/<schema>/raw_data/customers" # Create the target bronze table dp.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone") # Create an Append Flow to ingest the raw data into the bronze table @dp.append_flow( target = "customers_cdc_bronze", name = "customers_bronze_ingest_flow" ) def customers_bronze_ingest_flow(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load(f"{path}") )SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_bronze_ingest_flow AS INSERT INTO customers_cdc_bronze BY NAME SELECT * FROM STREAM read_files( -- replace with the catalog/schema you are using: "/Volumes/<catalog>/<schema>/raw_data/customers", format => "json", inferColumnTypes => "true" )을 클릭합니다.파일을 실행하거나 파이프라인을 실행하여 연결된 파이프라인에 대한 업데이트를 시작합니다. 파이프라인에 원본 파일이 하나만 있으면 기능적으로 동일합니다.
업데이트가 완료되면 편집기가 파이프라인에 대한 정보로 업데이트됩니다.
- 코드 오른쪽의 사이드바에 있는 DAG(파이프라인 그래프)에는 단일 테이블
customers_cdc_bronze이 표시됩니다. - 업데이트 요약은 파이프라인 자산 브라우저의 맨 위에 표시됩니다.
- 생성된 테이블의 세부 정보가 아래쪽 창에 표시되며 이를 선택하여 테이블에서 데이터를 찾아볼 수 있습니다.
클라우드 스토리지에서 가져온 원시 브론즈 계층 데이터입니다. 다음 단계에서는 데이터를 정리하여 실버 계층 테이블을 만듭니다.
4단계: 데이터 품질을 추적하기 위한 정리 및 기대
브론즈 계층이 정의되면 데이터 품질을 제어하는 기대치를 추가하여 실버 레이어를 만듭니다. 다음 조건을 확인합니다.
- ID는 절대로
null가 될 수 없습니다. - CDC 작업 유형이 유효해야 합니다.
- JSON은 자동 로더에서 올바르게 읽어야 합니다.
이러한 조건을 충족하지 않는 행은 삭제됩니다.
자세한 내용은 파이프라인 기대치를 사용하여 데이터 품질 관리를 참조하세요.
파이프라인 자산 브라우저 사이드바에서
을 클릭합니다.추가한 다음 변환합니다.
이름을 입력하고 소스 코드 파일에 대한 언어(Python 또는 SQL)를 선택합니다. 파이프라인 내에서 언어를 혼합하고 일치시킬 수 있으므로 이 단계에서 둘 중 하나를 선택할 수 있습니다.
정리된 테이블이 있는 실버 계층을 만들고 제약 조건을 적용하려면 다음 코드를 복사하여 새 파일에 붙여넣습니다(파일 언어에 따라 Python 또는 SQL 선택).
파이썬
from pyspark import pipelines as dp from pyspark.sql.functions import * dp.create_streaming_table( name = "customers_cdc_clean", expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"} ) @dp.append_flow( target = "customers_cdc_clean", name = "customers_cdc_clean_flow" ) def customers_cdc_clean_flow(): return ( spark.readStream.table("customers_cdc_bronze") .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data") )SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_clean ( CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW ) COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_cdc_clean_flow AS INSERT INTO customers_cdc_clean BY NAME SELECT * FROM STREAM customers_cdc_bronze;을 클릭합니다.파일을 실행하거나 파이프라인을 실행하여 연결된 파이프라인에 대한 업데이트를 시작합니다.
이제 두 개의 원본 파일이 있으므로 이러한 파일은 동일한 작업을 수행하지 않지만 이 경우 출력은 동일합니다.
- 파이프라인 실행 은 3단계의 코드를 포함하여 전체 파이프라인을 실행합니다. 입력 데이터가 업데이트되는 경우 해당 원본에서 브론즈 계층으로 변경된 내용을 가져옵니다. 파이프라인에 대한 원본의 일부가 아니라 탐색 폴더에 있으므로 데이터 설정 단계에서 코드를 실행하지 않습니다.
- 실행 파일 은 현재 원본 파일만 실행합니다. 이 경우 입력 데이터가 업데이트되지 않으면 캐시된 브론즈 테이블에서 실버 데이터가 생성됩니다. 파이프라인 코드를 만들거나 편집할 때 더 빠른 반복을 위해 이 파일만 실행하는 것이 유용합니다.
업데이트가 완료되면 파이프라인 그래프에 두 개의 테이블(브론즈 계층에 따라 실버 계층 포함)이 표시되고 아래쪽 패널에 두 테이블에 대한 세부 정보가 표시되는 것을 볼 수 있습니다. 이제 파이프라인 자산 브라우저의 맨 위에 여러 실행 시간이 표시되지만 가장 최근 실행에 대한 세부 정보만 표시됩니다.
다음으로, 테이블의 최종 골드 레이어 버전을 만듭니다 customers .
5단계: AUTO CDC 흐름을 사용하여 고객 테이블 구체화
이 시점까지 테이블은 각 단계에서 CDC 데이터를 전달했습니다. 이제 원본 테이블의 복제본이자 최신 뷰를 포함하는 customers 테이블을 만드십시오. 이 테이블은 생성한 CDC 작업 목록이 아닙니다.
수동으로 구현하는 것은 쉽지 않습니다. 최신 행을 유지하려면 데이터 중복 제거와 같은 항목을 고려해야 합니다.
그러나 Lakeflow Spark 선언적 파이프라인은 AUTO CDC 작업을 사용하여 이러한 문제를 해결합니다.
파이프라인 자산 브라우저 사이드바에서
을 클릭합니다.추가 및 변환.
이름을 입력하고 새 소스 코드 파일에 대한 언어(Python 또는 SQL)를 선택합니다. 이 단계에 대해 두 언어 중 하나를 다시 선택할 수 있지만 아래의 올바른 코드를 사용합니다.
Lakeflow Spark 선언적 파이프라인에서 사용하여
AUTO CDCCDC 데이터를 처리하려면 다음 코드를 복사하여 새 파일에 붙여넣습니다.파이썬
from pyspark import pipelines as dp from pyspark.sql.functions import * dp.create_streaming_table(name="customers", comment="Clean, materialized customers") dp.create_auto_cdc_flow( target="customers", # The customer table being materialized source="customers_cdc_clean", # the incoming CDC keys=["id"], # what we'll be using to match the rows to upsert sequence_by=col("operation_date"), # de-duplicate by operation date, getting the most recent value ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), # DELETE condition except_column_list=["operation", "operation_date", "_rescued_data"], )SQL
CREATE OR REFRESH STREAMING TABLE customers; CREATE FLOW customers_cdc_flow AS AUTO CDC INTO customers FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 1;을 클릭합니다.파일을 실행하여 연결된 파이프라인에 대한 업데이트를 시작합니다.
업데이트가 완료되면 파이프라인 그래프에 3개의 테이블이 표시되어 브론즈에서 은,금으로 진행하는 것을 볼 수 있습니다.
6단계: 느린 변경 차원 유형 2(SCD2)를 사용하여 업데이트 기록 추적
다음과 같은 결과로 발생하는 모든 변경 내용을 추적하는 테이블을 만들어야 하는 APPENDUPDATEDELETE경우가 많습니다.
- 기록: 테이블에 대한 모든 변경 내용의 기록을 유지하려고 합니다.
- 추적 가능성: 어떤 작업이 발생했는지 확인하려고 합니다.
Lakeflow SDP를 사용하는 SCD2
Delta는 CDF(변경 데이터 흐름)를 지원하며 SQL 및 table_change Python에서 테이블 수정을 쿼리할 수 있습니다. 그러나 CDF의 주요 사용 사례는 처음부터 테이블 변경 내용의 전체 보기를 만들지 않고 파이프라인의 변경 내용을 캡처하는 것입니다.
순서가 다른 이벤트가 있는 경우 구현하는 것이 특히 복잡해집니다. 변경 내용을 타임스탬프별로 순서를 지정하고 과거에 발생한 수정 내용을 수신해야 하는 경우 SCD 테이블에 새 항목을 추가하고 이전 항목을 업데이트해야 합니다.
Lakeflow SDP는 이러한 복잡성을 제거하고 처음부터 모든 수정 사항을 포함하는 별도의 테이블을 만들 수 있습니다. 이 테이블은 필요한 경우 특정 파티션 또는 ZORDER 열과 함께 대규모로 사용할 수 있습니다. 기본적으로 순서가 다른 필드는 _sequence_by에 따라 자동으로 처리됩니다.
SCD2 테이블을 만들려면 SQL에서 STORED AS SCD TYPE 2 옵션을 사용하거나 Python에서 stored_as_scd_type="2" 옵션을 사용합니다.
비고
이 옵션을 사용하여 기능이 추적하는 열을 제한할 수도 있습니다. TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}
파이프라인 자산 브라우저 사이드바에서
을 클릭합니다.추가 및 변환.
이름을 입력하고 새 소스 코드 파일에 대한 언어(Python 또는 SQL)를 선택합니다.
다음 코드를 복사하여 새 파일에 붙여넣습니다.
파이썬
from pyspark import pipelines as dp from pyspark.sql.functions import * # create the table dp.create_streaming_table( name="customers_history", comment="Slowly Changing Dimension Type 2 for customers" ) # store all changes as SCD2 dp.create_auto_cdc_flow( target="customers_history", source="customers_cdc_clean", keys=["id"], sequence_by=col("operation_date"), ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), except_column_list=["operation", "operation_date", "_rescued_data"], stored_as_scd_type="2", ) # Enable SCD2 and store individual updatesSQL
CREATE OR REFRESH STREAMING TABLE customers_history; CREATE FLOW customers_history_cdc AS AUTO CDC INTO customers_history FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 2;을 클릭합니다.파일을 실행하여 연결된 파이프라인에 대한 업데이트를 시작합니다.
업데이트가 완료되면 파이프라인 그래프에는 실버 계층 테이블에 종속된 새 customers_history 테이블도 포함되며, 아래쪽 패널에는 4개 테이블 모두에 대한 세부 정보가 표시됩니다.
7단계: 정보를 가장 많이 변경한 사용자를 추적하는 구체화된 뷰 만들기
테이블에 customers_history 는 사용자가 해당 정보에 대해 변경한 모든 기록 변경 내용이 포함되어 있습니다. 골드 레이어에서 정보를 가장 많이 변경한 사용자를 추적하는 간단한 구체화된 뷰를 만듭니다. 실제 시나리오에서 사기 탐지 분석 또는 사용자 권장 사항에 사용할 수 있습니다. 또한 SCD2를 사용하여 변경 내용을 적용하면 중복 항목이 이미 제거되었으므로 사용자 ID당 행 수를 직접 계산할 수 있습니다.
파이프라인 자산 브라우저 사이드바에서
을 클릭합니다.추가 및 변환.
이름을 입력하고 새 소스 코드 파일에 대한 언어(Python 또는 SQL)를 선택합니다.
다음 코드를 복사하여 새 소스 파일에 붙여넣습니다.
파이썬
from pyspark import pipelines as dp from pyspark.sql.functions import * @dp.table( name = "customers_history_agg", comment = "Aggregated customer history" ) def customers_history_agg(): return ( spark.read.table("customers_history") .groupBy("id") .agg( count("address").alias("address_count"), count("email").alias("email_count"), count("firstname").alias("firstname_count"), count("lastname").alias("lastname_count") ) )SQL
CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS SELECT id, count("address") as address_count, count("email") AS email_count, count("firstname") AS firstname_count, count("lastname") AS lastname_count FROM customers_history GROUP BY id을 클릭합니다.파일을 실행하여 연결된 파이프라인에 대한 업데이트를 시작합니다.
업데이트가 완료되면 파이프라인 그래프에 테이블에 의존하는 customers_history 새 테이블이 있으며 아래쪽 패널에서 볼 수 있습니다. 이제 파이프라인이 완료되었습니다. 전체 실행 파이프라인을 수행하여 테스트할 수 있습니다. 남은 단계는 파이프라인이 정기적으로 업데이트되도록 예약하는 것입니다.
8단계: ETL 파이프라인을 실행하는 작업 만들기
다음으로 Databricks 작업을 사용하여 파이프라인에서 데이터 수집, 처리 및 분석 단계를 자동화하는 워크플로를 만듭니다.
- 편집기의 맨 위에서 일정 단추를 선택합니다.
- 일정 대화 상자 가 나타나면 일정 추가를 선택합니다.
- 그러면 새 일정 대화 상자가 열리고, 여기서 일정에 따라 파이프라인을 실행하는 작업을 만들 수 있습니다.
- 필요에 따라 작업에 이름을 지정합니다.
- 기본적으로 일정은 하루에 한 번 실행되도록 설정됩니다. 이 기본값을 적용하거나 고유한 일정을 설정할 수 있습니다. 고급을 선택하면 작업이 실행되는 특정 시간을 설정하는 옵션이 제공됩니다. 추가 옵션을 선택하면 작업이 실행되면 알림을 만들 수 있습니다.
- 만들기를 선택하여 변경 내용을 적용하고 작업을 만듭니다.
이제 작업이 매일 실행되어 파이프라인을 최신 상태로 유지합니다. 일정을 다시 선택하여 일정 목록을 볼 수 있습니다. 일정 추가, 편집 또는 제거를 포함하여 해당 대화 상자에서 파이프라인에 대한 일정을 관리할 수 있습니다.
일정(또는 작업)의 이름을 클릭하면 작업 및 파이프라인 목록의 작업 페이지로 이동합니다. 여기에서 실행 기록을 포함하여 작업 실행에 대한 세부 정보를 보거나 지금 실행 단추를 사용하여 작업을 즉시 실행할 수 있습니다.
작업 실행에 대한 자세한 내용은 Lakeflow 작업의 모니터링 및 관찰 가능성을 참조하세요.