연습 - 매핑 데이터 흐름을 사용하여 형식 1 느린 변경 차원 디자인 및 구현
이 연습에서는 Azure Synapse 전용 SQL 풀을 원본 및 대상으로 사용하여 유형 1 SCD의 데이터 흐름을 만듭니다. 그런 다음 이 데이터 흐름을 Synapse 파이프라인에 추가하고 ETL(추출, 변환, 로드) 프로세스의 일부로 실행할 수 있습니다.
원본 및 차원 테이블 설정
이 연습에서는 Azure SQL, Azure 스토리지 등과 같은 다양한 시스템 유형에서 만들 수 있는 원본 데이터에서 차원 테이블을 Azure Synapse에 로드하려고 합니다. 이 예제에서는 간단하게 하기 위해 Azure Synapse 데이터베이스에서 원본 데이터를 만듭니다.
Synapse Studio에서 데이터 허브로 이동합니다.
작업 영역 탭(1)을 선택하고 데이터베이스를 확장한 다음 SQLPool01(2)을 마우스 오른쪽 단추로 클릭합니다. 새 SQL 스크립트(3), 빈 스크립트(4)를 차례로 선택합니다.
다음 스크립트를 빈 스크립트 창에 붙여넣은 다음 실행을 선택하거나
F5
키를 눌러 쿼리를 실행합니다.CREATE TABLE [dbo].[CustomerSource] ( [CustomerID] [int] NOT NULL, [Title] [nvarchar](8), [FirstName] [nvarchar](50), [MiddleName] [nvarchar](50), [LastName] [nvarchar](50), [Suffix] [nvarchar](10), [CompanyName] [nvarchar](128), [SalesPerson] [nvarchar](256), [EmailAddress] [nvarchar](50), [Phone] [nvarchar](25) ) WITH ( HEAP ) COPY INTO [dbo].[CustomerSource] FROM 'https://solliancepublicdata.blob.core.windows.net/dataengineering/dp-203/awdata/CustomerSource.csv' WITH ( FILE_TYPE='CSV', FIELDTERMINATOR='|', FIELDQUOTE='', ROWTERMINATOR='0x0a', ENCODING = 'UTF16' ) CREATE TABLE dbo.[DimCustomer]( [CustomerID] [int] NOT NULL, [Title] [nvarchar](8) NULL, [FirstName] [nvarchar](50) NOT NULL, [MiddleName] [nvarchar](50) NULL, [LastName] [nvarchar](50) NOT NULL, [Suffix] [nvarchar](10) NULL, [CompanyName] [nvarchar](128) NULL, [SalesPerson] [nvarchar](256) NULL, [EmailAddress] [nvarchar](50) NULL, [Phone] [nvarchar](25) NULL, [InsertedDate] [datetime] NOT NULL, [ModifiedDate] [datetime] NOT NULL, [HashKey] [char](64) ) WITH ( DISTRIBUTION = REPLICATE, CLUSTERED COLUMNSTORE INDEX )
매핑 데이터 흐름 만들기
매핑 데이터 흐름은 코드 없는 환경을 통해 시각적으로 데이터 변환 방식을 지정할 수 있는 파이프라인 작업입니다. 다음으로, 형식 1 SCD를 만드는 매핑 데이터 흐름을 만듭니다.
개발 허브로 이동합니다.
+, 데이터 흐름을 차례로 선택합니다.
새 데이터 흐름의 속성 창에서 이름 필드(1)에
UpdateCustomerDimension
을 입력하고 속성 단추(2)를 선택하여 속성 창을 숨깁니다.캔버스에서 원본 추가를 선택합니다.
Source settings
에서 다음 속성을 구성합니다.- 출력 스트림 이름:
SourceDB
입력 - 원본 유형:
Dataset
선택 - 옵션:
Allow schema drift
를 선택하고 다른 옵션은 선택 안 함 - 샘플링:
Disable
선택 - 데이터 세트: + 새로 만들기를 선택하여 새 데이터 세트 만들기
- 출력 스트림 이름:
새 통합 데이터 세트 대화 상자에서 Azure Synapse Analytics, 계속을 차례로 선택합니다.
데이터 세트 속성에서 다음을 구성합니다.
- 이름:
CustomerSource
입력 - 연결된 서비스: Synapse 작업 영역 연결된 서비스 선택
- 테이블 이름: 드롭다운 옆의 새로 고침 단추 선택
- 이름:
값 필드에 SQL 풀 이름을 입력한 다음 확인을 선택합니다.
테이블 이름에서
dbo.CustomerSource
를 선택하고 스키마 가져오기에서From connection/store
를 선택한 다음 확인을 선택하여 데이터 세트를 만듭니다.추가한
CustomerSource
데이터 세트 옆에 있는 열기를 선택합니다.DBName
옆의 값 필드에 SQL 풀 이름을 입력합니다.데이터 흐름 편집기에서 SourceDB 작업 아래의 원본 추가 상자를 선택합니다. CustomerSource에서와 동일한 단계에 따라 이 원본을 DimCustomer 테이블로 구성합니다.
- 출력 스트림 이름:
DimCustomer
입력 - 원본 유형:
Dataset
선택 - 옵션:
Allow schema drift
를 선택하고 다른 옵션은 선택 안 함 - 샘플링:
Disable
선택 - 데이터 세트: + 새로 만들기를 선택하여 새 데이터 세트 만들기. Azure Synapse 연결된 서비스를 사용하고 DimCustomer 테이블을 선택합니다. DBName을 SQL 풀 이름으로 설정해야 합니다.
- 출력 스트림 이름:
데이터 흐름에 변환 추가
캔버스에서
SourceDB
원본 오른쪽의 +를 선택하고 파생 열을 선택합니다.Derived column's settings
에서 다음 속성을 구성합니다.- 출력 스트림 이름:
CreateCustomerHash
입력 - 들어오는 스트림:
SourceDB
선택 - 열: 다음을 입력합니다.
열 식 설명 에서의 형식 HashKey
sha2(256, iifNull(Title,'') +FirstName +iifNull(MiddleName,'') +LastName +iifNull(Suffix,'') +iifNull(CompanyName,'') +iifNull(SalesPerson,'') +iifNull(EmailAddress,'') +iifNull(Phone,''))
테이블 값의 SHA256 해시를 만듭니다. 이 해시를 사용하여 들어오는 레코드의 해시를 CustomerID
값이 일치하는 대상 레코드의 해시 값과 비교하여 행 변경 내용을 검색합니다.iifNull
함수는 null 값을 빈 문자열로 바꿉니다. 이렇게 하지 않으면 null 항목이 있는 경우 해시 값이 중복되는 경향이 있습니다.- 출력 스트림 이름:
캔버스에서
CreateCustomerHash
파생 열 오른쪽의 +를 선택하고 있음을 선택합니다.Exists settings
에서 다음 속성을 구성합니다.- 출력 스트림 이름:
Exists
입력 - 왼쪽 스트림:
CreateCustomerHash
선택 - 오른쪽 스트림:
SynapseDimCustomer
선택 - 있음 형식:
Doesn't exist
선택 - 있음 조건: 왼쪽과 오른쪽에 다음과 같이 설정
왼쪽: CreateCustomerHash의 열 오른쪽: SynapseDimCustomer의 열 HashKey
HashKey
- 출력 스트림 이름:
캔버스에서
Exists
오른쪽의 +를 선택하고 조회를 선택합니다.Lookup settings
에서 다음 속성을 구성합니다.- 출력 스트림 이름:
LookupCustomerID
입력 - 기본 스트림:
Exists
선택 - 조회 스트림:
SynapseDimCustomer
선택 - 여러 행 일치: 선택 안 함
- 일치:
Any row
선택 - 조회 조건: 왼쪽과 오른쪽에 다음과 같이 설정
왼쪽: Exists의 열 오른쪽: SynapseDimCustomer의 열 CustomerID
CustomerID
- 출력 스트림 이름:
캔버스에서
LookupCustomerID
오른쪽의 +를 선택하고 파생 열을 선택합니다.Derived column's settings
에서 다음 속성을 구성합니다.- 출력 스트림 이름:
SetDates
입력 - 들어오는 스트림:
LookupCustomerID
선택 - 열: 다음을 입력합니다.
열 식 설명 InsertedDate
선택iif(isNull(InsertedDate), currentTimestamp(), {InsertedDate})
InsertedDate
값이 null이면 현재 타임스탬프를 삽입합니다. 그렇지 않으면InsertedDate
값을 사용합니다.ModifiedDate
선택currentTimestamp()
항상 ModifiedDate
값을 현재 타임스탬프로 업데이트합니다.참고 항목
두 번째 열을 삽입하려면 열 목록 위에 있는 + 추가를 선택한 다음 열 추가를 선택합니다.
- 출력 스트림 이름:
캔버스에서
SetDates
파생 열 오른쪽의 +를 선택하고 행 변경을 선택합니다.Alter row settings
에서 다음 속성을 구성합니다.- 출력 스트림 이름:
AllowUpserts
입력 - 들어오는 스트림:
SetDates
선택 - 행 조건 변경: 다음을 입력합니다.
조건 식 설명 Upsert if
선택true()
Upsert if
조건에서 조건을true()
로 설정하면 upsert가 허용됩니다. 이렇게 하면 매핑 데이터 흐름의 단계를 통과하는 모든 데이터가 싱크로 삽입되거나 업데이트됩니다.- 출력 스트림 이름:
캔버스에서
AllowUpserts
행 변경 단계 오른쪽의 +를 선택하고 싱크를 선택합니다.Sink
에서 다음 속성을 구성합니다.- 출력 스트림 이름:
Sink
입력 - 들어오는 스트림:
AllowUpserts
선택 - 싱크 형식:
Dataset
선택 - 데이터 세트:
DimCustomer
선택 - 옵션:
Allow schema drift
을 선택하고Validate schema
는 선택 안 함
- 출력 스트림 이름:
설정 탭을 선택하고 다음 속성을 구성합니다.
- 업데이트 방법:
Allow upsert
을 선택하고 다른 모든 옵션은 선택 안 함 - 키 열:
List of columns
을 선택하고 목록에서CustomerID
선택 - 테이블 작업:
None
선택 - 스테이징 사용: 선택 안 함
- 업데이트 방법:
매핑 탭을 선택하고 자동 매핑을 선택 취소합니다. 아래에 설명된 대로 입력 열 매핑을 구성합니다.
입력 열 출력 열 SourceDB@CustomerID
CustomerID
SourceDB@Title
Title
SourceDB@FirstName
FirstName
SourceDB@MiddleName
MiddleName
SourceDB@LastName
LastName
SourceDB@Suffix
Suffix
SourceDB@CompanyName
CompanyName
SourceDB@SalesPerson
SalesPerson
SourceDB@EmailAddress
EmailAddress
SourceDB@Phone
Phone
InsertedDate
InsertedDate
ModifiedDate
ModifiedDate
CreateCustomerHash@HashKey
HashKey
완성된 매핑 흐름은 다음과 같이 표시됩니다. 모두 게시를 선택하여 변경 내용을 저장합니다.
게시를 선택합니다.
데이터 흐름을 테스트하는 방법
유형 1 SCD 데이터 흐름을 완료했습니다. 이 흐름을 테스트하도록 선택하는 경우 데이터 흐름을 Synapse 통합 파이프라인에 추가할 수 있습니다. 그런 다음 파이프라인을 한 번 실행하여 고객 원본 데이터를 DimCustomer 대상으로 초기 로드할 수 있습니다.
이후 파이프라인을 실행할 때마다 원본 테이블의 데이터를 차원 테이블에 이미 있는 데이터와 비교(HashKey 사용)하고 변경된 레코드만 업데이트합니다. 이 테스트를 위해 원본 테이블의 레코드를 업데이트한 다음 파이프라인을 다시 실행하고 차원 테이블의 레코드 업데이트를 확인할 수 있습니다.
고객 Janet Gates를 예로 들어 보겠습니다. 초기 로드에서는 LastName
이 Gates이고 CustomerId
가 4라고 표시됩니다.
다음은 원본 테이블에서 고객 성을 업데이트하는 문의 예제입니다.
UPDATE [dbo].[CustomerSource]
SET LastName = 'Lopez'
WHERE [CustomerId] = 4
레코드를 업데이트하고 파이프라인을 다시 실행하면 DimCustomer에서 업데이트된 해당 데이터를 표시합니다.
고객 레코드에서 LastName
값을 원본 레코드와 일치하도록 업데이트했고 이전 LastName
값을 추적하지 않고 ModifiedDate
를 업데이트했습니다. 이는 유형 1 SCD의 올바른 동작입니다. LastName
필드의 기록이 필요한 경우 테이블 및 데이터 흐름을 앞에서 배운 다른 SCD 유형 중 하나로 수정할 수 있습니다.