연습 - 매핑 데이터 흐름을 사용하여 형식 1 느린 변경 차원 디자인 및 구현

완료됨

이 연습에서는 Azure Synapse 전용 SQL 풀을 원본 및 대상으로 사용하여 유형 1 SCD의 데이터 흐름을 만듭니다. 그런 다음 이 데이터 흐름을 Synapse 파이프라인에 추가하고 ETL(추출, 변환, 로드) 프로세스의 일부로 실행할 수 있습니다.

원본 및 차원 테이블 설정

이 연습에서는 Azure SQL, Azure 스토리지 등과 같은 다양한 시스템 유형에서 만들 수 있는 원본 데이터에서 차원 테이블을 Azure Synapse에 로드하려고 합니다. 이 예제에서는 간단하게 하기 위해 Azure Synapse 데이터베이스에서 원본 데이터를 만듭니다.

  1. Synapse Studio에서 데이터 허브로 이동합니다.

    Data hub.

  2. 작업 영역(1)을 선택하고 데이터베이스를 확장한 다음 SQLPool01(2)을 마우스 오른쪽 단추로 클릭합니다. 새 SQL 스크립트(3), 빈 스크립트(4)를 차례로 선택합니다.

    The data hub is displayed with the context menus to create a new SQL script.

  3. 다음 스크립트를 빈 스크립트 창에 붙여넣은 다음 실행을 선택하거나 F5 키를 눌러 쿼리를 실행합니다.

    CREATE TABLE [dbo].[CustomerSource] (
        [CustomerID] [int] NOT NULL,
        [Title] [nvarchar](8),
        [FirstName] [nvarchar](50),
        [MiddleName] [nvarchar](50),
        [LastName] [nvarchar](50),
        [Suffix] [nvarchar](10),
        [CompanyName] [nvarchar](128),
        [SalesPerson] [nvarchar](256),
        [EmailAddress] [nvarchar](50),
        [Phone] [nvarchar](25)
    ) WITH ( HEAP )
    
    COPY INTO [dbo].[CustomerSource]
    FROM 'https://solliancepublicdata.blob.core.windows.net/dataengineering/dp-203/awdata/CustomerSource.csv'
    WITH (
        FILE_TYPE='CSV',
        FIELDTERMINATOR='|',
        FIELDQUOTE='',
        ROWTERMINATOR='0x0a',
        ENCODING = 'UTF16'
    )
    
    CREATE TABLE dbo.[DimCustomer](
        [CustomerID] [int] NOT NULL,
        [Title] [nvarchar](8) NULL,
        [FirstName] [nvarchar](50) NOT NULL,
        [MiddleName] [nvarchar](50) NULL,
        [LastName] [nvarchar](50) NOT NULL,
        [Suffix] [nvarchar](10) NULL,
        [CompanyName] [nvarchar](128) NULL,
        [SalesPerson] [nvarchar](256) NULL,
        [EmailAddress] [nvarchar](50) NULL,
        [Phone] [nvarchar](25) NULL,
        [InsertedDate] [datetime] NOT NULL,
        [ModifiedDate] [datetime] NOT NULL,
        [HashKey] [char](64)
    )
    WITH
    (
        DISTRIBUTION = REPLICATE,
        CLUSTERED COLUMNSTORE INDEX
    )
    

    The script and Run button are both highlighted.

매핑 데이터 흐름 만들기

매핑 데이터 흐름은 코드 없는 환경을 통해 시각적으로 데이터 변환 방식을 지정할 수 있는 파이프라인 작업입니다. 다음으로, 형식 1 SCD를 만드는 매핑 데이터 흐름을 만듭니다.

  1. 개발 허브로 이동합니다.

    Develop hub.

  2. +, 데이터 흐름을 차례로 선택합니다.

    The plus button and data flow menu item are highlighted.

  3. 새 데이터 흐름의 속성 창에서 이름 필드(1)UpdateCustomerDimension을 입력하고 속성 단추(2)를 선택하여 속성 창을 숨깁니다.

    The data flow properties pane is displayed.

  4. 캔버스에서 원본 추가를 선택합니다.

    The Add Source button is highlighted on the data flow canvas.

  5. Source settings에서 다음 속성을 구성합니다.

    • 출력 스트림 이름: SourceDB 입력
    • 원본 유형: Dataset 선택
    • 옵션: Allow schema drift를 선택하고 다른 옵션은 선택 안 함
    • 샘플링: Disable 선택
    • 데이터 세트: + 새로 만들기를 선택하여 새 데이터 세트 만들기

    The New button is highlighted next to Dataset.

  6. 새 통합 데이터 세트 대화 상자에서 Azure Synapse Analytics, 계속을 차례로 선택합니다.

    Azure SQL Database and the Continue button are highlighted.

  7. 데이터 세트 속성에서 다음을 구성합니다.

    • 이름: CustomerSource 입력
    • 연결된 서비스: Synapse 작업 영역 연결된 서비스 선택
    • 테이블 이름: 드롭다운 옆의 새로 고침 단추 선택

    The form is configured as described and the refresh button is highlighted.

  8. 필드에 SQL 풀 이름을 입력한 다음 확인을 선택합니다.

    The SQLPool01 parameter is highlighted.

  9. 테이블 이름에서 dbo.CustomerSource를 선택하고 스키마 가져오기에서 From connection/store를 선택한 다음 확인을 선택하여 데이터 세트를 만듭니다.

    The form is completed as described.

  10. 추가한 CustomerSource 데이터 세트 옆에 있는 열기를 선택합니다.

    The open button is highlighted next to the new dataset.

  11. DBName 옆의 필드에 SQL 풀 이름을 입력합니다.

  12. 데이터 흐름 편집기에서 SourceDB 작업 아래의 원본 추가 상자를 선택합니다. CustomerSource에서와 동일한 단계에 따라 이 원본을 DimCustomer 테이블로 구성합니다.

    • 출력 스트림 이름: DimCustomer 입력
    • 원본 유형: Dataset 선택
    • 옵션: Allow schema drift를 선택하고 다른 옵션은 선택 안 함
    • 샘플링: Disable 선택
    • 데이터 세트: + 새로 만들기를 선택하여 새 데이터 세트 만들기. Azure Synapse 연결된 서비스를 사용하고 DimCustomer 테이블을 선택합니다. DBName을 SQL 풀 이름으로 설정해야 합니다.

    The Add Source, Output stream name, and Dataset name are highlighted in the Source settings.

데이터 흐름에 변환 추가

  1. 캔버스에서 SourceDB 원본 오른쪽의 +를 선택하고 파생 열을 선택합니다.

    The plus button and derived column menu item are highlighted.

  2. Derived column's settings에서 다음 속성을 구성합니다.

    • 출력 스트림 이름: CreateCustomerHash 입력
    • 들어오는 스트림: SourceDB 선택
    • : 다음을 입력합니다.
    설명
    에서의 형식HashKey sha2(256, iifNull(Title,'') +FirstName +iifNull(MiddleName,'') +LastName +iifNull(Suffix,'') +iifNull(CompanyName,'') +iifNull(SalesPerson,'') +iifNull(EmailAddress,'') +iifNull(Phone,'')) 테이블 값의 SHA256 해시를 만듭니다. 이 해시를 사용하여 들어오는 레코드의 해시를 CustomerID 값이 일치하는 대상 레코드의 해시 값과 비교하여 행 변경 내용을 검색합니다. iifNull 함수는 null 값을 빈 문자열로 바꿉니다. 이렇게 하지 않으면 null 항목이 있는 경우 해시 값이 중복되는 경향이 있습니다.

    The Derived column's settings form is configured as described.

  3. 캔버스에서 CreateCustomerHash 파생 열 오른쪽의 +를 선택하고 있음을 선택합니다.

    The plus button and exists menu item are both highlighted.

  4. Exists settings에서 다음 속성을 구성합니다.

    • 출력 스트림 이름: Exists 입력
    • 왼쪽 스트림: CreateCustomerHash 선택
    • 오른쪽 스트림: SynapseDimCustomer 선택
    • 있음 형식: Doesn't exist 선택
    • 있음 조건: 왼쪽과 오른쪽에 다음과 같이 설정
    왼쪽: CreateCustomerHash의 열 오른쪽: SynapseDimCustomer의 열
    HashKey HashKey

    The Exists settings form is configured as described.

  5. 캔버스에서 Exists 오른쪽의 +를 선택하고 조회를 선택합니다.

    The plus button and lookup menu item are both highlighted.

  6. Lookup settings에서 다음 속성을 구성합니다.

    • 출력 스트림 이름: LookupCustomerID 입력
    • 기본 스트림: Exists 선택
    • 조회 스트림: SynapseDimCustomer 선택
    • 여러 행 일치: 선택 안 함
    • 일치: Any row 선택
    • 조회 조건: 왼쪽과 오른쪽에 다음과 같이 설정
    왼쪽: Exists의 열 오른쪽: SynapseDimCustomer의 열
    CustomerID CustomerID

    The Lookup settings form is configured as described.

  7. 캔버스에서 LookupCustomerID 오른쪽의 +를 선택하고 파생 열을 선택합니다.

    The plus button and derived column menu item are both highlighted.

  8. Derived column's settings에서 다음 속성을 구성합니다.

    • 출력 스트림 이름: SetDates 입력
    • 들어오는 스트림: LookupCustomerID 선택
    • : 다음을 입력합니다.
    설명
    InsertedDate 선택 iif(isNull(InsertedDate), currentTimestamp(), {InsertedDate}) InsertedDate 값이 null이면 현재 타임스탬프를 삽입합니다. 그렇지 않으면 InsertedDate 값을 사용합니다.
    ModifiedDate 선택 currentTimestamp() 항상 ModifiedDate 값을 현재 타임스탬프로 업데이트합니다.

    Another Derived column's settings form is configured as described.

    참고 항목

    두 번째 열을 삽입하려면 열 목록 위에 있는 + 추가를 선택한 다음 열 추가를 선택합니다.

  9. 캔버스에서 SetDates 파생 열 오른쪽의 +를 선택하고 행 변경을 선택합니다.

    The plus button and alter row menu item are both highlighted.

  10. Alter row settings에서 다음 속성을 구성합니다.

    • 출력 스트림 이름: AllowUpserts 입력
    • 들어오는 스트림: SetDates 선택
    • 행 조건 변경: 다음을 입력합니다.
    조건 설명
    Upsert if 선택 true() Upsert if 조건에서 조건을 true()로 설정하면 upsert가 허용됩니다. 이렇게 하면 매핑 데이터 흐름의 단계를 통과하는 모든 데이터가 싱크로 삽입되거나 업데이트됩니다.

    The alter row settings form is configured as described.

  11. 캔버스에서 AllowUpserts 행 변경 단계 오른쪽의 +를 선택하고 싱크를 선택합니다.

    The plus button and sink menu item are both highlighted.

  12. Sink에서 다음 속성을 구성합니다.

    • 출력 스트림 이름: Sink 입력
    • 들어오는 스트림: AllowUpserts 선택
    • 싱크 형식: Dataset 선택
    • 데이터 세트: DimCustomer 선택
    • 옵션: Allow schema drift을 선택하고 Validate schema는 선택 안 함

    The sink properties form is configured as described.

  13. 설정 탭을 선택하고 다음 속성을 구성합니다.

    • 업데이트 방법: Allow upsert을 선택하고 다른 모든 옵션은 선택 안 함
    • 키 열: List of columns을 선택하고 목록에서 CustomerID 선택
    • 테이블 작업: None 선택
    • 스테이징 사용: 선택 안 함

    The sink settings are configured as described.

  14. 매핑 탭을 선택하고 자동 매핑을 선택 취소합니다. 아래에 설명된 대로 입력 열 매핑을 구성합니다.

    입력 열 출력 열
    SourceDB@CustomerID CustomerID
    SourceDB@Title Title
    SourceDB@FirstName FirstName
    SourceDB@MiddleName MiddleName
    SourceDB@LastName LastName
    SourceDB@Suffix Suffix
    SourceDB@CompanyName CompanyName
    SourceDB@SalesPerson SalesPerson
    SourceDB@EmailAddress EmailAddress
    SourceDB@Phone Phone
    InsertedDate InsertedDate
    ModifiedDate ModifiedDate
    CreateCustomerHash@HashKey HashKey

    Mapping settings are configured as described.

  15. 완성된 매핑 흐름은 다음과 같이 표시됩니다. 모두 게시를 선택하여 변경 내용을 저장합니다.

    The completed data flow is displayed and Publish all is highlighted.

  16. 게시를 선택합니다.

    The publish button is highlighted.

데이터 흐름을 테스트하는 방법

유형 1 SCD 데이터 흐름을 완료했습니다. 이 흐름을 테스트하도록 선택하는 경우 데이터 흐름을 Synapse 통합 파이프라인에 추가할 수 있습니다. 그런 다음 파이프라인을 한 번 실행하여 고객 원본 데이터를 DimCustomer 대상으로 초기 로드할 수 있습니다.

이후 파이프라인을 실행할 때마다 원본 테이블의 데이터를 차원 테이블에 이미 있는 데이터와 비교(HashKey 사용)하고 변경된 레코드만 업데이트합니다. 이 테스트를 위해 원본 테이블의 레코드를 업데이트한 다음 파이프라인을 다시 실행하고 차원 테이블의 레코드 업데이트를 확인할 수 있습니다.

고객 Janet Gates를 예로 들어 보겠습니다. 초기 로드에서는 LastName이 Gates이고 CustomerId가 4라고 표시됩니다.

The script is displayed with the initial customer record.

다음은 원본 테이블에서 고객 성을 업데이트하는 문의 예제입니다.

UPDATE [dbo].[CustomerSource]
SET LastName = 'Lopez'
WHERE [CustomerId] = 4

레코드를 업데이트하고 파이프라인을 다시 실행하면 DimCustomer에서 업데이트된 해당 데이터를 표시합니다.

The script is displayed with the updated customer record.

고객 레코드에서 LastName 값을 원본 레코드와 일치하도록 업데이트했고 이전 LastName 값을 추적하지 않고 ModifiedDate를 업데이트했습니다. 이는 유형 1 SCD의 올바른 동작입니다. LastName 필드의 기록이 필요한 경우 테이블 및 데이터 흐름을 앞에서 배운 다른 SCD 유형 중 하나로 수정할 수 있습니다.