Megosztás a következőn keresztül:


Adatok növekményes betöltése a Data Warehouse-ból a Lakehouse-ba

Ebben az oktatóanyagban megtudhatja, hogyan tölthet be adatokat növekményesen a Data Warehouse-ból a Lakehouse-ba.

Áttekintés

Itt látható a megoldás összefoglaló jellegű ábrája:

Az adatlogika növekményes betöltését bemutató ábra.

Az alábbiak a megoldás kialakításának leglényegesebb lépései:

  1. A küszöb oszlopának kiválasztása. Jelöljön ki egy oszlopot a forrásadattáblában, amellyel minden futtatás új vagy frissített rekordjait szeletelheti. Normális esetben az ebben a kiválasztott oszlopban (például: last_modify_time vagy ID) lévő adatok a sorok létrehozásával vagy frissítésével folyamatosan növekednek. Az ebben az oszlopban lévő legnagyobb érték szolgál a küszöbként.

  2. Készítsen elő egy táblát az utolsó vízjel értékének adattárházban való tárolásához.

  3. Egy folyamat létrehozása a következő munkafolyamattal:

    Ebben a megoldásban a folyamat a következő tevékenységeket tartalmazza:

    • Két keresési tevékenység létrehozása. Az első keresési tevékenység az utolsó küszöbértéket kéri le. A második keresési tevékenység az új küszöbértéket kéri le. Ezeket a küszöbértékeket a rendszer átadja a másolási tevékenységnek.
    • Hozzon létre egy másolási tevékenységet, amely sorokat másol a forrásadattáblából a vízjel oszlopának a réginél nagyobb és az új vízjelnél kisebb értékével. Ezután átmásolja az adatokat az adattárházból a Lakehouse-ba egy új fájlként.
    • Hozzon létre egy tárolt eljárástevékenységet, amely frissíti a következő folyamatfuttatás utolsó vízjelértékét.

Előfeltételek

A forrás előkészítése

Az alábbiakban néhány táblát és tárolt eljárást kell előkészíteni a forrásadatraktárban a növekményes másolási folyamat konfigurálása előtt.

1. Adatforrástábla létrehozása az adattárházban

Futtassa a következő SQL-parancsot az adattárházban egy data_source_table nevű tábla adatforrástáblaként való létrehozásához. Ebben az oktatóanyagban mintaadatokként fogja használni a növekményes másoláshoz.

create table data_source_table
(
    PersonID int,
    Name varchar(255),
    LastModifytime DATETIME2(6)
);

INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');

Az adatforrástáblában szereplő adatok az alábbiakban láthatók:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1        | aaaa | 2017-09-01 00:56:00.000
2        | bbbb | 2017-09-02 05:23:00.000
3        | cccc | 2017-09-03 02:36:00.000
4        | dddd | 2017-09-04 03:21:00.000
5        | eeee | 2017-09-05 08:06:00.000

Ebben az oktatóanyagban a LastModifytime értéket használja vízjeloszlopként.

2. Hozzon létre egy másik táblát az adattárházban az utolsó vízjel értékének tárolásához

  1. Futtassa a következő SQL-parancsot az adattárházban egy vízjeltábla nevű tábla létrehozásához az utolsó vízjel értékének tárolásához:

    create table watermarktable
    (
    TableName varchar(255),
    WatermarkValue DATETIME2(6),
    );
    
  2. Állítsa be az utolsó vízjel alapértelmezett értékét a forrásadattábla táblájának nevével. Ebben az oktatóanyagban a tábla neve data_source_table, az alapértelmezett érték pedig 1/1/2010 12:00:00 AMaz .

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. Tekintse át a tábla vízjeltáblájában szereplő adatokat.

    Select * from watermarktable
    

    Hozam:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

3. Tárolt eljárás létrehozása az adattárházban

Futtassa a következő parancsot egy tárolt eljárás létrehozásához az adattárházban. Ez a tárolt eljárás segít frissíteni az utolsó vízjel értékét a legutóbbi folyamat futtatása után.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Folyamat konfigurálása növekményes másoláshoz

1. lépés: Folyamat létrehozása

  1. Lépjen a Power BI-ba.

  2. A képernyő bal alsó részén válassza a Power BI ikont, majd a Data Factoryt választva nyissa meg a Data Factory kezdőlapját.

  3. Lépjen a Microsoft Fabric-munkaterületre.

  4. Válassza az Adatfolyamat lehetőséget, majd adjon meg egy folyamatnevet egy új folyamat létrehozásához.

    Képernyőkép az újonnan létrehozott munkaterület új adatfolyam gombjáról.

    Képernyőkép egy új folyamat létrehozásának nevéről.

2. lépés: Keresési tevékenység hozzáadása az utolsó vízjelhez

Ebben a lépésben létrehoz egy keresési tevékenységet az utolsó vízjel értékének lekéréséhez. A rendszer a korábban beállított alapértelmezett értéket 1/1/2010 12:00:00 AM fogja megkapni.

  1. Válassza a Folyamat hozzáadása tevékenység hozzáadása lehetőséget, és válassza a Keresés lehetőséget a legördülő listából.

  2. Az Általános lapon nevezze át ezt a tevékenységet LookupOldWaterMarkActivity névre.

  3. A Beállítások lapon hajtsa végre a következő konfigurációt:

    • Adattár típusa: Munkaterület kiválasztása.
    • Munkaterület-adattár típusa: Válassza az Adattárház lehetőséget.
    • Adattárház: Válassza ki az adattárházat.
    • Lekérdezés használata: Tábla kiválasztása.
    • Táblázat: Válassza a dbo.watermarktable lehetőséget.
    • Csak az első sor: Kijelölve.

    A régi vízjel keresését bemutató képernyőkép.

3. lépés: Keresési tevékenység hozzáadása az új vízjelhez

Ebben a lépésben létrehoz egy keresési tevékenységet az új vízjel értékének lekéréséhez. Lekérdezéssel lekérjük az új vízjelet a forrásadattáblából. A data_source_table LastModifytime oszlopának maximális értéke lesz lekért.

  1. A felső sávon válassza a Keresés a Tevékenységek lapon a második keresési tevékenység hozzáadásához.

  2. Az Általános lapon nevezze át ezt a tevékenységet LookupNewWaterMarkActivity névre.

  3. A Beállítások lapon hajtsa végre a következő konfigurációt:

    • Adattár típusa: Munkaterület kiválasztása.

    • Munkaterület-adattár típusa: Válassza az Adattárház lehetőséget.

    • Adattárház: Válassza ki az adattárházat.

    • Lekérdezés használata: Válassza a Lekérdezés lehetőséget.

    • Lekérdezés: Adja meg a következő lekérdezést az utolsó utolsó módosítási idő új vízjelként való kiválasztásához:

      select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
      
    • Csak az első sor: Kijelölve.

    Az új vízjel keresését bemutató képernyőkép.

4. lépés: A másolási tevékenység hozzáadása a növekményes adatok másolásához

Ebben a lépésben egy másolási tevékenységet ad hozzá a növekményes adatok másolásához az utolsó vízjel és az új vízjel között a Data Warehouse-ból a Lakehouse-ba.

  1. A felső sávon válassza a Tevékenységek lehetőséget, majd az Adatok másolása –>Hozzáadás a vászonhoz lehetőséget a másolási tevékenység lekéréséhez.

  2. Az Általános lapon nevezze át ezt a tevékenységet a IncrementalCopyActivity névre.

  3. Csatlakoztassa mindkét keresési tevékenységet a másolási tevékenységhez úgy, hogy a keresési tevékenységekhez csatolt zöld gombot (A sikeres verzió) a másolási tevékenységhez húzza. Engedje fel az egérgombot, ha a másolási tevékenység szegélyszíne zöldre változik.

    A keresési és másolási tevékenységek összekapcsolását bemutató képernyőkép.

  4. A Forrás lapon hajtsa végre a következő konfigurációt:

    • Adattár típusa: Munkaterület kiválasztása.

    • Munkaterület-adattár típusa: Válassza az Adattárház lehetőséget.

    • Adattárház: Válassza ki az adattárházat.

    • Lekérdezés használata: Válassza a Lekérdezés lehetőséget.

    • Lekérdezés: Adja meg a következő lekérdezést a növekményes adatok másolásához az utolsó vízjel és az új vízjel között.

      select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
      

    Képernyőkép a másolás forráskonfigurációjáról.

  5. A Cél lapon hajtsa végre a következő konfigurációt:

    • Adattár típusa: Munkaterület kiválasztása.
    • Munkaterület adattártípusa: Válassza a Lakehouse lehetőséget.
    • Lakehouse: Válassza ki a Lakehouse.
    • Gyökérmappa: Fájlok kiválasztása.
    • Fájl elérési útja: Adja meg a másolt adatokat tárolni kívánt mappát. Válassza a Tallózás lehetőséget a mappa kiválasztásához. A fájlnévhez nyissa meg a Dinamikus tartalom hozzáadása lehetőséget, és írja be @CONCAT('Incremental-', pipeline().RunId, '.txt') a megnyitott ablakban a másolt adatfájl fájlneveit a Lakehouse-ban.
    • Fájlformátum: Válassza ki az adatok formátumtípusát.

    Képernyőkép a másolási cél konfigurációjáról.

5. lépés: Tárolt eljárástevékenység hozzáadása

Ebben a lépésben egy tárolt eljárástevékenységet ad hozzá a következő folyamatfuttatás utolsó vízjelértékének frissítéséhez.

  1. A felső sávon válassza a Tevékenységek lehetőséget, majd a Tárolt eljárás lehetőséget egy tárolt eljárástevékenység hozzáadásához.

  2. Az Általános lapon nevezze át ezt a tevékenységet a StoredProceduretoWriteWatermarkActivity névre.

  3. Csatlakoztassa a másolási tevékenység zöld (sikeres) kimenetét a tárolt eljárástevékenységhez.

  4. A Beállítások lapon hajtsa végre a következő konfigurációt:

    • Adattár típusa: Munkaterület kiválasztása.

    • Adattárház: Válassza ki az adattárházat.

    • Tárolt eljárás neve: Adja meg az adattárházban létrehozott tárolt eljárást: [dbo].[ usp_write_watermark].

    • Bontsa ki a tárolt eljárás paramétereit. A tárolt eljárásparaméterek értékeinek megadásához válassza az Importálás lehetőséget, és adja meg a paraméterek következő értékeit:

      Név Típus Érték
      LastModifiedtime Dátum/idő @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      TableName Sztring @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

    Képernyőkép a tárolt eljárástevékenység konfigurációjáról.

6. lépés: A folyamat futtatása és az eredmény figyelése

A felső sávon válassza a Futtatás a Kezdőlap lapon lehetőséget. Ezután válassza a Mentés és futtatás lehetőséget. A folyamat elindul, és a Folyamat a Kimenet lapon figyelhető.

Képernyőkép a folyamatfuttatás eredményeiről.

Lépjen a Lakehouse-ra, és keresse meg az adatfájlt a megadott mappa alatt, és kiválaszthatja a fájlt a másolt adatok előnézetének megtekintéséhez.

Képernyőkép az első folyamatfuttatás lakehouse-adatairól.

Képernyőkép a Lakehouse-adatok előzetes verziójáról az első folyamatfuttatáshoz.

További adatok hozzáadása a növekményes másolási eredmények megtekintéséhez

Az első folyamatfuttatás befejezése után próbáljon meg további adatokat hozzáadni az adattárház forrástáblájához, és ellenőrizze, hogy ez a folyamat képes-e a növekményes adatok másolására.

1. lépés: További adatok hozzáadása a forráshoz

Új adatok beszúrása az adattárházba az alábbi lekérdezés futtatásával:

INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')

INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')

A data_source_table frissített adatai a következőek:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000

2. lépés: Egy másik folyamat futtatásának aktiválása és az eredmény figyelése

Lépjen vissza a folyamatlapra. A felső sávon válassza ismét a Futtatás a Kezdőlap lapon lehetőséget. A folyamat elindul, és a Folyamat a Kimenet területen figyelhető.

Lépjen a Lakehouse-ra, és keresse meg, hogy az új másolt adatfájl a megadott mappa alatt található, és kiválaszthatja a fájlt a másolt adatok előnézetének megtekintéséhez. Láthatja, hogy a növekményes adatok ebben a fájlban jelennek meg.

Képernyőkép a második folyamatfuttatás lakehouse-adatairól.

Képernyőkép a lakehouse-adatok előzetes verziójáról a második folyamatfuttatáshoz.

A következő lépésben további információt kaphat az Azure Blob Storage-ból a Lakehouse-ba történő másolásról.