Gyakorlat – 1. típusú, lassan változó dimenzió tervezése és implementálása adatfolyamok leképezésével
Ebben a gyakorlatban egy 1. típusú SCD-hez hoz létre adatfolyamot az Azure Synapse dedikált SQL-készletének forrásként és célként való használatával. Ez az adatfolyam ezután hozzáadható egy Synapse-folyamathoz, és a kinyerési, átalakítási, betöltési (ETL) folyamat részeként futtatható.
Forrás- és dimenziótábla beállítása
Ebben a gyakorlatban egy dimenziótáblát szeretne betölteni az Azure Synapse-ban olyan forrásadatokból, amelyek számos különböző rendszertípusból származhatnak, például az Azure SQL-ből, az Azure Storage-ból stb. Ebben a példában egyszerűnek tarthatja a forrásadatokat az Azure Synapse-adatbázisban.
A Synapse Studióban lépjen a Data Hubra.
Válassza a Munkaterület lapot (1), bontsa ki az Adatbázisok elemet, majd kattintson a jobb gombbal az SQLPool01 (2) elemre. Válassza az Új SQL-szkript (3), majd az Üres szkript (4) lehetőséget.
Illessze be a következő szkriptet az üres szkriptablakba, majd válassza a Futtatás vagy a Találat
F5
lehetőséget a lekérdezés végrehajtásához:CREATE TABLE [dbo].[CustomerSource] ( [CustomerID] [int] NOT NULL, [Title] [nvarchar](8), [FirstName] [nvarchar](50), [MiddleName] [nvarchar](50), [LastName] [nvarchar](50), [Suffix] [nvarchar](10), [CompanyName] [nvarchar](128), [SalesPerson] [nvarchar](256), [EmailAddress] [nvarchar](50), [Phone] [nvarchar](25) ) WITH ( HEAP ) COPY INTO [dbo].[CustomerSource] FROM 'https://solliancepublicdata.blob.core.windows.net/dataengineering/dp-203/awdata/CustomerSource.csv' WITH ( FILE_TYPE='CSV', FIELDTERMINATOR='|', FIELDQUOTE='', ROWTERMINATOR='0x0a', ENCODING = 'UTF16' ) CREATE TABLE dbo.[DimCustomer]( [CustomerID] [int] NOT NULL, [Title] [nvarchar](8) NULL, [FirstName] [nvarchar](50) NOT NULL, [MiddleName] [nvarchar](50) NULL, [LastName] [nvarchar](50) NOT NULL, [Suffix] [nvarchar](10) NULL, [CompanyName] [nvarchar](128) NULL, [SalesPerson] [nvarchar](256) NULL, [EmailAddress] [nvarchar](50) NULL, [Phone] [nvarchar](25) NULL, [InsertedDate] [datetime] NOT NULL, [ModifiedDate] [datetime] NOT NULL, [HashKey] [char](64) ) WITH ( DISTRIBUTION = REPLICATE, CLUSTERED COLUMNSTORE INDEX )
Leképezési adatfolyam létrehozása
Az adatfolyamok leképezése olyan folyamattevékenységek, amelyek vizuális módot biztosítanak az adatok kód nélküli átalakításának megadására. Ezután létrehoz egy leképezési adatfolyamot egy 1. típusú SCD létrehozásához.
Lépjen a Fejlesztési központhoz.
Válassza ki +az adatfolyamot, majd válassza az Adatfolyam lehetőséget.
Az új adatfolyam tulajdonságok paneljén adja meg a Név mezőt (1), majd a Tulajdonságok gombra (2) kattintva elrejtheti a tulajdonságok panelt.
UpdateCustomerDimension
Válassza a Forrás hozzáadása lehetőséget a vásznon.
A csoportban
Source settings
konfigurálja a következő tulajdonságokat:- Kimeneti stream neve: Enter
SourceDB
- Forrás típusa: Kiválasztás
Dataset
- Beállítások: Jelölje be
Allow schema drift
a többi beállítást, és hagyja bejelölve - Mintavételezés: Kiválasztás
Disable
- Adatkészlet: Új adatkészlet létrehozásához válassza az + Új lehetőséget
- Kimeneti stream neve: Enter
Az új integrációs adathalmaz párbeszédpanelen válassza az Azure Synapse Analytics, majd a Folytatás lehetőséget.
Az adathalmaz tulajdonságaiban konfigurálja a következőket:
- Név: Enter
CustomerSource
- Társított szolgáltatás: Válassza ki a Synapse-munkaterület társított szolgáltatását
- Tábla neve: Válassza a Legördülő lista Melletti Frissítés gombot
- Név: Enter
Az Érték mezőben adja meg az SQL-készlet nevét, majd kattintson az OK gombra.
Válassza a Táblanév területen, válassza az
From connection/store
Importálás séma területen, majd az OK gombra kattintva hozza létre azdbo.CustomerSource
adathalmazt.Válassza a Megnyitás gombot a
CustomerSource
hozzáadott adathalmaz mellett.Adja meg az SQL-készlet nevét a mellette
DBName
lévő Érték mezőben.Az adatfolyam-szerkesztőben válassza a Forrás hozzáadása mezőt a SourceDB-tevékenység alatt. Konfigurálja ezt a forrást DimCustomer-táblaként a CustomerSource-hoz használt lépéseket követve.
- Kimeneti stream neve: Enter
DimCustomer
- Forrás típusa: Kiválasztás
Dataset
- Beállítások: Jelölje be
Allow schema drift
a többi beállítást, és hagyja bejelölve - Mintavételezés: Kiválasztás
Disable
- Adatkészlet: Új adatkészlet létrehozásához válassza az + Új lehetőséget. Használja az Azure Synapse társított szolgáltatást, és válassza a DimCustomer táblát. Ügyeljen arra, hogy a DBName értéke az SQL-készlet neve legyen.
- Kimeneti stream neve: Enter
Átalakítások hozzáadása az adatfolyamhoz
Válassza + ki a forrástól jobbra
SourceDB
a vásznon, majd válassza a Származtatott oszlop lehetőséget.A csoportban
Derived column's settings
konfigurálja a következő tulajdonságokat:- Kimeneti stream neve: Enter
CreateCustomerHash
- Bejövő stream: Válassza a
SourceDB
- Oszlopok: Adja meg a következőket:
Column Expression Leírás Írja be a következőt: HashKey
sha2(256, iifNull(Title,'') +FirstName +iifNull(MiddleName,'') +LastName +iifNull(Suffix,'') +iifNull(CompanyName,'') +iifNull(SalesPerson,'') +iifNull(EmailAddress,'') +iifNull(Phone,''))
Létrehoz egy SHA256 kivonatot a táblaértékek közül. Ezzel észleljük a sorváltozásokat, ha összehasonlítjuk a bejövő rekordok kivonatát a célrekordok kivonatértékével, és megfeleltetjük az CustomerID
értéket. AiifNull
függvény üres sztringekre cseréli a null értékeket. Ellenkező esetben a kivonatértékek ismétlődőek, ha null bejegyzések vannak jelen.- Kimeneti stream neve: Enter
Válassza + a vásznon a
CreateCustomerHash
származtatott oszloptól jobbra, majd válassza a Létező lehetőséget.A csoportban
Exists settings
konfigurálja a következő tulajdonságokat:- Kimeneti stream neve: Enter
Exists
- Bal oldali stream: Kiválasztás
CreateCustomerHash
- Jobb oldali stream: Válassza a
SynapseDimCustomer
- Létező típus: Válassza a
Doesn't exist
- Létező feltételek: Állítsa be a következőket a bal és a jobb oldalon:
Balra: CreateCustomerHash oszlopa Jobbra: SynapseDimCustomer oszlopa HashKey
HashKey
- Kimeneti stream neve: Enter
Válassza + a vászon jobb oldalán
Exists
, majd a Keresés lehetőséget.A csoportban
Lookup settings
konfigurálja a következő tulajdonságokat:- Kimeneti stream neve: Enter
LookupCustomerID
- Elsődleges stream: Kiválasztás
Exists
- Keresési stream: Kiválasztás
SynapseDimCustomer
- Több sor egyeztetése: Nincs bejelölve
- Egyezés bekapcsolva: Válassza a
Any row
- Keresési feltételek: Állítsa be a következőket a bal és a jobb oldalon:
Balra: Létezik oszlop Jobbra: SynapseDimCustomer oszlopa CustomerID
CustomerID
- Kimeneti stream neve: Enter
Válassza + ki a vászon jobb oldalán
LookupCustomerID
, majd válassza a Származtatott oszlop lehetőséget.A csoportban
Derived column's settings
konfigurálja a következő tulajdonságokat:- Kimeneti stream neve: Enter
SetDates
- Bejövő stream: Válassza a
LookupCustomerID
- Oszlopok: Adja meg a következőket:
Column Expression Leírás A következők szerint válasszon: InsertedDate
iif(isNull(InsertedDate), currentTimestamp(), {InsertedDate})
Ha az InsertedDate
érték null, szúrja be az aktuális időbélyeget. Ellenkező esetben használja azInsertedDate
értéket.A következők szerint válasszon: ModifiedDate
currentTimestamp()
Mindig frissítse az ModifiedDate
értéket az aktuális időbélyeggel.Megjegyzés:
A második oszlop beszúrásához válassza a + Hozzáadás lehetőséget az Oszlopok lista fölé, majd az Oszlop hozzáadása lehetőséget.
- Kimeneti stream neve: Enter
Válassza + ki a vásznon a
SetDates
származtatott oszlop lépésétől jobbra, majd válassza az Alter Row (Sor módosítása) lehetőséget.A csoportban
Alter row settings
konfigurálja a következő tulajdonságokat:- Kimeneti stream neve: Enter
AllowUpserts
- Bejövő stream: Válassza a
SetDates
- Sorfeltételek módosítása: Adja meg a következőket:
Feltétel Expression Leírás A következők szerint válasszon: Upsert if
true()
Állítsa a feltételt true()
a feltételre azUpsert if
upserts engedélyezéséhez. Ez biztosítja, hogy a leképezési adatfolyam lépésein áthaladó összes adat beszúrva vagy frissítve legyen a fogadóba.- Kimeneti stream neve: Enter
Válassza + a vászon alter row lépésének
AllowUpserts
jobb oldalán, majd válassza a Fogadó lehetőséget.A csoportban
Sink
konfigurálja a következő tulajdonságokat:- Kimeneti stream neve: Enter
Sink
- Bejövő stream: Válassza a
AllowUpserts
- Fogadó típusa: Válassza a
Dataset
- Adatkészlet: Kiválasztás
DimCustomer
- Beállítások: Ellenőrzés
Allow schema drift
és jelölés törléseValidate schema
- Kimeneti stream neve: Enter
Válassza a Gépház lapot, és konfigurálja a következő tulajdonságokat:
- Frissítési módszer: Az összes többi beállítás ellenőrzése
Allow upsert
és törlése - Kulcsoszlopok: Kijelölés
List of columns
, majd kijelölésCustomerID
a listában - Táblaművelet: Kiválasztás
None
- Előkészítés engedélyezése: Nincs bejelölve
- Frissítési módszer: Az összes többi beállítás ellenőrzése
Válassza a Leképezés lapot, majd törölje az Automatikus leképezés jelölőnégyzet jelölését. Konfigurálja a bemeneti oszlopok leképezését az alábbiak szerint:
Bemeneti oszlopok Kimeneti oszlopok SourceDB@CustomerID
CustomerID
SourceDB@Title
Title
SourceDB@FirstName
FirstName
SourceDB@MiddleName
MiddleName
SourceDB@LastName
LastName
SourceDB@Suffix
Suffix
SourceDB@CompanyName
CompanyName
SourceDB@SalesPerson
SalesPerson
SourceDB@EmailAddress
EmailAddress
SourceDB@Phone
Phone
InsertedDate
InsertedDate
ModifiedDate
ModifiedDate
CreateCustomerHash@HashKey
HashKey
A befejezett leképezési folyamatnak az alábbihoz hasonlóan kell kinéznie. A módosítások mentéséhez válassza az Összes közzététele lehetőséget.
Válassza a Közzététel lehetőséget.
Az adatfolyam tesztelése
Végrehajtott egy 1. típusú SCD-adatfolyamot. Ha úgy dönt, hogy teszteli, hozzáadhatja ezt az adatfolyamot egy Synapse-integrációs folyamathoz. Ezután egyszer futtathatja a folyamatot, hogy elvégezhesse az ügyfél forrásadatainak kezdeti terhelését a DimCustomer-célhelyre.
A folyamat minden további futtatása összehasonlítja a forrástábla adatait a dimenziótáblában lévő adatokkal (a Kivonatkulcs használatával), és csak a módosított rekordokat frissíti. Ennek teszteléséhez frissíthet egy rekordot a forrástáblában, majd újra futtathatja a folyamatot, és ellenőrizheti a rekordfrissítéseket a dimenziótáblában.
Vegyük példaként Janet Gates ügyfelet. A kezdeti terhelés a LastName
Gates és a CustomerId
4 terhelést mutatja.
Íme egy példautasítás, amely frissíti az ügyfél vezetéknevét a forrástáblában.
UPDATE [dbo].[CustomerSource]
SET LastName = 'Lopez'
WHERE [CustomerId] = 4
A rekord frissítése és a folyamat ismételt futtatása után a DimCustomer megjeleníti ezeket a frissített adatokat.
Az ügyfélrekord sikeresen frissítette az értéket a LastName
forrásrekordnak megfelelően, és a ModifiedDate
régi LastName
érték nyomon követése nélkül frissítette az értéket. Ez az 1. típusú SCD várt viselkedése. Ha előzményre volt szükség a LastName
mezőhöz, akkor a táblát és az adatfolyamot úgy kell módosítania, hogy az a többi tanult SCD-típus egyike legyen.