Adatok növekményes betöltése a Data Warehouse-ból a Lakehouse-ba
Ebben az oktatóanyagban megtudhatja, hogyan tölthet be adatokat növekményesen a Data Warehouse-ból a Lakehouse-ba.
Áttekintés
Itt látható a megoldás összefoglaló jellegű ábrája:
Az alábbiak a megoldás kialakításának leglényegesebb lépései:
A küszöb oszlopának kiválasztása. Jelöljön ki egy oszlopot a forrásadattáblában, amellyel minden futtatás új vagy frissített rekordjait szeletelheti. Normális esetben az ebben a kiválasztott oszlopban (például: last_modify_time vagy ID) lévő adatok a sorok létrehozásával vagy frissítésével folyamatosan növekednek. Az ebben az oszlopban lévő legnagyobb érték szolgál a küszöbként.
Készítsen elő egy táblát az utolsó vízjel értékének adattárházban való tárolásához.
Egy folyamat létrehozása a következő munkafolyamattal:
Ebben a megoldásban a folyamat a következő tevékenységeket tartalmazza:
- Két keresési tevékenység létrehozása. Az első keresési tevékenység az utolsó küszöbértéket kéri le. A második keresési tevékenység az új küszöbértéket kéri le. Ezeket a küszöbértékeket a rendszer átadja a másolási tevékenységnek.
- Hozzon létre egy másolási tevékenységet, amely sorokat másol a forrásadattáblából a vízjel oszlopának a réginél nagyobb és az új vízjelnél kisebb értékével. Ezután átmásolja az adatokat az adattárházból a Lakehouse-ba egy új fájlként.
- Hozzon létre egy tárolt eljárástevékenységet, amely frissíti a következő folyamatfuttatás utolsó vízjelértékét.
Előfeltételek
- Adattárház. Az adattárházat használja forrásadattárként. Ha nincs meg, a létrehozás lépéseit az Adattárház létrehozása című témakörben találja.
- Lakehouse. A Lakehouse-t használja céladattárként. Ha nem rendelkezik vele, a létrehozás lépéseit a Lakehouse létrehozása című témakörben találja. Hozzon létre egy növekményescopy nevű mappát a másolt adatok tárolásához.
A forrás előkészítése
Az alábbiakban néhány táblát és tárolt eljárást kell előkészíteni a forrásadatraktárban a növekményes másolási folyamat konfigurálása előtt.
1. Adatforrástábla létrehozása az adattárházban
Futtassa a következő SQL-parancsot az adattárházban egy data_source_table nevű tábla adatforrástáblaként való létrehozásához. Ebben az oktatóanyagban mintaadatokként fogja használni a növekményes másoláshoz.
create table data_source_table
(
PersonID int,
Name varchar(255),
LastModifytime DATETIME2(6)
);
INSERT INTO data_source_table
(PersonID, Name, LastModifytime)
VALUES
(1, 'aaaa','9/1/2017 12:56:00 AM'),
(2, 'bbbb','9/2/2017 5:23:00 AM'),
(3, 'cccc','9/3/2017 2:36:00 AM'),
(4, 'dddd','9/4/2017 3:21:00 AM'),
(5, 'eeee','9/5/2017 8:06:00 AM');
Az adatforrástáblában szereplő adatok az alábbiakban láthatók:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
Ebben az oktatóanyagban a LastModifytime értéket használja vízjeloszlopként.
2. Hozzon létre egy másik táblát az adattárházban az utolsó vízjel értékének tárolásához
Futtassa a következő SQL-parancsot az adattárházban egy vízjeltábla nevű tábla létrehozásához az utolsó vízjel értékének tárolásához:
create table watermarktable ( TableName varchar(255), WatermarkValue DATETIME2(6), );
Állítsa be az utolsó vízjel alapértelmezett értékét a forrásadattábla táblájának nevével. Ebben az oktatóanyagban a tábla neve data_source_table, az alapértelmezett érték pedig
1/1/2010 12:00:00 AM
az .INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')
Tekintse át a tábla vízjeltáblájában szereplő adatokat.
Select * from watermarktable
Hozam:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
3. Tárolt eljárás létrehozása az adattárházban
Futtassa a következő parancsot egy tárolt eljárás létrehozásához az adattárházban. Ez a tárolt eljárás segít frissíteni az utolsó vízjel értékét a legutóbbi folyamat futtatása után.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Folyamat konfigurálása növekményes másoláshoz
1. lépés: Folyamat létrehozása
A képernyő bal alsó részén válassza a Power BI ikont, majd a Data Factoryt választva nyissa meg a Data Factory kezdőlapját.
Lépjen a Microsoft Fabric-munkaterületre.
Válassza az Adatfolyamat lehetőséget, majd adjon meg egy folyamatnevet egy új folyamat létrehozásához.
2. lépés: Keresési tevékenység hozzáadása az utolsó vízjelhez
Ebben a lépésben létrehoz egy keresési tevékenységet az utolsó vízjel értékének lekéréséhez. A rendszer a korábban beállított alapértelmezett értéket 1/1/2010 12:00:00 AM
fogja megkapni.
Válassza a Folyamat hozzáadása tevékenység hozzáadása lehetőséget, és válassza a Keresés lehetőséget a legördülő listából.
Az Általános lapon nevezze át ezt a tevékenységet LookupOldWaterMarkActivity névre.
A Beállítások lapon hajtsa végre a következő konfigurációt:
- Adattár típusa: Munkaterület kiválasztása.
- Munkaterület-adattár típusa: Válassza az Adattárház lehetőséget.
- Adattárház: Válassza ki az adattárházat.
- Lekérdezés használata: Tábla kiválasztása.
- Táblázat: Válassza a dbo.watermarktable lehetőséget.
- Csak az első sor: Kijelölve.
3. lépés: Keresési tevékenység hozzáadása az új vízjelhez
Ebben a lépésben létrehoz egy keresési tevékenységet az új vízjel értékének lekéréséhez. Lekérdezéssel lekérjük az új vízjelet a forrásadattáblából. A data_source_table LastModifytime oszlopának maximális értéke lesz lekért.
A felső sávon válassza a Keresés a Tevékenységek lapon a második keresési tevékenység hozzáadásához.
Az Általános lapon nevezze át ezt a tevékenységet LookupNewWaterMarkActivity névre.
A Beállítások lapon hajtsa végre a következő konfigurációt:
Adattár típusa: Munkaterület kiválasztása.
Munkaterület-adattár típusa: Válassza az Adattárház lehetőséget.
Adattárház: Válassza ki az adattárházat.
Lekérdezés használata: Válassza a Lekérdezés lehetőséget.
Lekérdezés: Adja meg a következő lekérdezést az utolsó utolsó módosítási idő új vízjelként való kiválasztásához:
select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
Csak az első sor: Kijelölve.
4. lépés: A másolási tevékenység hozzáadása a növekményes adatok másolásához
Ebben a lépésben egy másolási tevékenységet ad hozzá a növekményes adatok másolásához az utolsó vízjel és az új vízjel között a Data Warehouse-ból a Lakehouse-ba.
A felső sávon válassza a Tevékenységek lehetőséget, majd az Adatok másolása –>Hozzáadás a vászonhoz lehetőséget a másolási tevékenység lekéréséhez.
Az Általános lapon nevezze át ezt a tevékenységet a IncrementalCopyActivity névre.
Csatlakoztassa mindkét keresési tevékenységet a másolási tevékenységhez úgy, hogy a keresési tevékenységekhez csatolt zöld gombot (A sikeres verzió) a másolási tevékenységhez húzza. Engedje fel az egérgombot, ha a másolási tevékenység szegélyszíne zöldre változik.
A Forrás lapon hajtsa végre a következő konfigurációt:
Adattár típusa: Munkaterület kiválasztása.
Munkaterület-adattár típusa: Válassza az Adattárház lehetőséget.
Adattárház: Válassza ki az adattárházat.
Lekérdezés használata: Válassza a Lekérdezés lehetőséget.
Lekérdezés: Adja meg a következő lekérdezést a növekményes adatok másolásához az utolsó vízjel és az új vízjel között.
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
A Cél lapon hajtsa végre a következő konfigurációt:
- Adattár típusa: Munkaterület kiválasztása.
- Munkaterület adattártípusa: Válassza a Lakehouse lehetőséget.
- Lakehouse: Válassza ki a Lakehouse.
- Gyökérmappa: Fájlok kiválasztása.
- Fájl elérési útja: Adja meg a másolt adatokat tárolni kívánt mappát. Válassza a Tallózás lehetőséget a mappa kiválasztásához. A fájlnévhez nyissa meg a Dinamikus tartalom hozzáadása lehetőséget, és írja be
@CONCAT('Incremental-', pipeline().RunId, '.txt')
a megnyitott ablakban a másolt adatfájl fájlneveit a Lakehouse-ban. - Fájlformátum: Válassza ki az adatok formátumtípusát.
5. lépés: Tárolt eljárástevékenység hozzáadása
Ebben a lépésben egy tárolt eljárástevékenységet ad hozzá a következő folyamatfuttatás utolsó vízjelértékének frissítéséhez.
A felső sávon válassza a Tevékenységek lehetőséget, majd a Tárolt eljárás lehetőséget egy tárolt eljárástevékenység hozzáadásához.
Az Általános lapon nevezze át ezt a tevékenységet a StoredProceduretoWriteWatermarkActivity névre.
Csatlakoztassa a másolási tevékenység zöld (sikeres) kimenetét a tárolt eljárástevékenységhez.
A Beállítások lapon hajtsa végre a következő konfigurációt:
Adattár típusa: Munkaterület kiválasztása.
Adattárház: Válassza ki az adattárházat.
Tárolt eljárás neve: Adja meg az adattárházban létrehozott tárolt eljárást: [dbo].[ usp_write_watermark].
Bontsa ki a tárolt eljárás paramétereit. A tárolt eljárásparaméterek értékeinek megadásához válassza az Importálás lehetőséget, és adja meg a paraméterek következő értékeit:
Név Típus Érték LastModifiedtime Dátum/idő @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} TableName Sztring @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
6. lépés: A folyamat futtatása és az eredmény figyelése
A felső sávon válassza a Futtatás a Kezdőlap lapon lehetőséget. Ezután válassza a Mentés és futtatás lehetőséget. A folyamat elindul, és a Folyamat a Kimenet lapon figyelhető.
Lépjen a Lakehouse-ra, és keresse meg az adatfájlt a megadott mappa alatt, és kiválaszthatja a fájlt a másolt adatok előnézetének megtekintéséhez.
További adatok hozzáadása a növekményes másolási eredmények megtekintéséhez
Az első folyamatfuttatás befejezése után próbáljon meg további adatokat hozzáadni az adattárház forrástáblájához, és ellenőrizze, hogy ez a folyamat képes-e a növekményes adatok másolására.
1. lépés: További adatok hozzáadása a forráshoz
Új adatok beszúrása az adattárházba az alábbi lekérdezés futtatásával:
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
A data_source_table frissített adatai a következőek:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
2. lépés: Egy másik folyamat futtatásának aktiválása és az eredmény figyelése
Lépjen vissza a folyamatlapra. A felső sávon válassza ismét a Futtatás a Kezdőlap lapon lehetőséget. A folyamat elindul, és a Folyamat a Kimenet területen figyelhető.
Lépjen a Lakehouse-ra, és keresse meg, hogy az új másolt adatfájl a megadott mappa alatt található, és kiválaszthatja a fájlt a másolt adatok előnézetének megtekintéséhez. Láthatja, hogy a növekményes adatok ebben a fájlban jelennek meg.
Kapcsolódó tartalom
A következő lépésben további információt kaphat az Azure Blob Storage-ból a Lakehouse-ba történő másolásról.