Megosztás a következőn keresztül:


Az adatok növekményes feltöltésének mintája a Dataflow Gen2-vel

Fontos

Ez egy olyan minta, amely növekményesen növeli az adatokat a Dataflow Gen2-vel. Ez nem ugyanaz, mint a növekményes frissítés. A növekményes frissítés egy jelenleg fejlesztés alatt álló funkció. Ez a funkció az egyik legjobban szavazott ötlet az ötletek honlapján. Erre a funkcióra a Fabric ötletbörze webhelyen szavazhat.

Ez az oktatóanyag 15 percet vesz igénybe, és bemutatja, hogyan lehet fokozatosan adatokat gyűjteni egy adatkezelő házba a Dataflow Gen2 használatával.

A fokozatosan halmozódó adatok kezeléséhez az adatcélhelyen szükség van egy olyan technikára, amely csak új vagy frissített adatokat tölt be. Ezt a technikát egy lekérdezéssel végezheti el az adatok az adatcél alapján történő szűréséhez. Ez az oktatóanyag bemutatja, hogyan hozhat létre adatfolyamot, amely adatokat tölt be egy OData-forrásból egy tóházba, és hogyan adhat hozzá lekérdezést az adatfolyamhoz az adatok célhely szerinti szűréséhez.

Az oktatóanyag magas szintű lépései a következők:

  • Hozzon létre egy adatfolyamot, amely adatokat tölt be egy OData-forrásból egy tóházba.
  • Adjon hozzá egy lekérdezést az adatfolyamhoz az adatok célhely szerinti szűréséhez.
  • (Nem kötelező) adatok újratöltése jegyzetfüzetek és csővezetékek használatával.

Előfeltételek

Microsoft Fabric-kompatibilis munkaterületnek kell lennie. Ha még nincs ilyenje, tekintse meg a Munkaterület létrehozása című témakört. Az oktatóanyag azt is feltételezi, hogy a Diagram nézetet használja a Dataflow Gen2-ben. Ha ellenőrizni szeretné, hogy a diagramnézetet használja-e, a felső menüszalagon lépjen Nézet elemre, és győződjön meg arról, hogy Diagram nézet van kiválasztva.

Adatfolyam létrehozása OData-forrásból származó adatok tóházba való betöltéséhez

Ebben a szakaszban létrehoz egy adatfolyamot, amely adatokat tölt be egy OData-forrásból egy tóházba.

  1. Hozzon létre egy új tóházat a munkaterületen.

    Képernyőkép a Lakehouse létrehozása ablakról.

  2. Hozzon létre egy új Adatfolyam Gen2-t a munkaterületen.

    Képernyőkép, amely az adatfolyam létrehozása legördülő menüt mutatja.

  3. Adjon hozzá egy új forrást az adatfolyamhoz. Válassza ki az OData-forrást, és adja meg a következő URL-címet: https://services.OData.org/V4/Northwind/Northwind.svc

    Képernyőkép az Adatok lekérése párbeszédpanelről.

    Képernyőkép az OData-összekötőről.

    Képernyőkép az OData-beállításokról.

  4. Válassza a Rendelések táblát, és válassza a Tovább gombot.

    Képernyőkép a Rendelések kiválasztása tábla párbeszédpanelről.

  5. Válassza ki a következő oszlopokat a megtartandók közül:

    • OrderID
    • CustomerID
    • EmployeeID
    • OrderDate
    • RequiredDate
    • ShippedDate
    • ShipVia
    • Freight
    • ShipName
    • ShipAddress
    • ShipCity
    • ShipRegion
    • ShipPostalCode
    • ShipCountry

    Képernyőkép a kiválasztási oszlopfüggvényről.

    Képernyőkép az oszloprendelés választó táblázatról.

  6. A OrderDate, RequiredDate és ShippedDate adattípusát módosítsa datetime típusra.

    Képernyőkép a change datatype függvényről.

  7. Állítsa be az adatcél helyét a lakehouse-ban az alábbi beállításokkal:

    • Adatcél: Lakehouse
    • Lakehouse: Válassza ki az 1. lépésben létrehozott tóházat.
    • Új táblanév: Orders
    • Frissítési módszer: Replace

    Képernyőkép az adat célhelyének lakehouse menüszalagjáról.

    Képernyőkép az adat célhelyi lakehouse rendelési tábláról.

    Képernyőkép az adatcél lakehouse-beállításainak cseréjéről.

  8. válassza a Tovább lehetőséget, és tegye közzé az adatfolyamot.

    Képernyőkép az adatfolyam közzétételének párbeszédpanelről.

Most létrehozott egy adatfolyamot, amely adatokat tölt be egy OData-forrásból egy tóházba. Ez az adatfolyam a következő szakaszban egy lekérdezést ad hozzá az adatfolyamhoz az adatok célhely szerinti szűréséhez. Ezt követően az adatfolyam használatával újra betöltheti az adatokat notebookok és pipelinek használatával.

Lekérdezés hozzáadása az adatfolyamhoz az adatok célhely szerinti szűréséhez

Ez a szakasz egy lekérdezést ad hozzá az adatfolyamhoz az adatok szűréséhez a céltóházban lévő adatok alapján. A lekérdezés az adatfolyam frissítésének kezdetén lekéri a lakehouse-ból a maximális OrderID értéket, majd a maximális OrderId alapján csak azokat a rendeléseket szerzi meg a forrástól, amelyek magasabb OrderId-val rendelkeznek, hogy azokat hozzáfűzze az adatok célhelyéhez. Ez feltételezi, hogy a rendelések növekvő sorrendben vannak hozzáadva a forráshoz OrderID. Ha nem ez a helyzet, egy másik oszlop használatával szűrheti az adatokat. Az oszlop segítségével OrderDate például szűrheti az adatokat.

Feljegyzés

Az OData-szűrők a Fabricben lesznek alkalmazva az adatforrásból származó adatok fogadása után, azonban az olyan adatbázis-források esetében, mint az SQL Server, a szűrőt a háttéradatforrásnak küldött lekérdezésben alkalmazza a rendszer, és csak a szűrt sorokat adja vissza a szolgáltatásnak.

  1. Az adatfolyam frissítése után nyissa meg újra az előző szakaszban létrehozott adatfolyamot.

    Képernyőkép a megnyitott adatfolyam párbeszédpanelről.

  2. Hozzon létre egy új IncrementalOrderID nevű lekérdezést, és kérje le az adatokat az előző szakaszban létrehozott lakehouse Orders táblájából.

    Képernyőkép az Adatok lekérése párbeszédpanelről.

    Képernyőkép, amely a lakehouse-összekötőt mutatja.

    Képernyőkép a Rendelések lekérése tábla lakehouse-ról.

    Képernyőkép az átnevezés lekérdezési függvényről.

    Képernyőkép az átnevezett lekérdezésről.

  3. Tiltsa le a lekérdezés átmeneti beállítását.

    Képernyőkép a színpadi funkció letiltásáról.

  4. Az adatok előnézetében kattintson a jobb gombbal a OrderID oszlopra, és válassza a Részletezés lehetőséget.

    A drill-down funkció képernyőképe.

  5. A menüszalagon válassza a Listaeszközök ->Statisztika ->Maximum lehetőséget.

    Képernyőkép a statisztikák maximális orderid függvényről.

Most már rendelkezik egy lekérdezéssel, amely a lakehouse maximális OrderID azonosítóját adja vissza. Ez a lekérdezés az OData-forrásból származó adatok szűrésére szolgál. A következő szakasz egy lekérdezést ad hozzá az adatfolyamhoz az OData-forrásból származó adatok szűréséhez a lakehouse maximális OrderID-azonosítója alapján.

  1. Térjen vissza az Orders lekérdezéshez, és adjon hozzá egy új lépést az adatok szűréséhez. Használja a következő beállításokat:

    • Oszlop: OrderID
    • Művelet: Greater than
    • Érték: paraméter IncrementalOrderID

    Képernyőkép az „orderid nagyobb mint” szűrőfunkcióról.

    Képernyőkép a szűrőbeállításokról.

  2. Engedélyezze az OData-forrásból és a lakehouse-ból származó adatok kombinálását az alábbi párbeszédpanel megerősítésével:

    Képernyőkép az adatok kombinálásának engedélyezése párbeszédpanelről.

  3. Frissítse az adatcélt a következő beállítások használatára:

    • Frissítési módszer: Append

    Képernyőkép a kimeneti beállítások szerkesztési funkcióról.

    Képernyőkép a meglévő rendelések tábláról.

    Képernyőkép az adatcél lakehouse-beállításainak hozzáfűzésről.

  4. Tegye közzé az adatfolyamot.

    Képernyőkép az adatfolyam közzétételének párbeszédpanelről.

Az adatfolyam tartalmaz egy lekérdezést, amely az OData-forrásból szűri az adatokat a Lakehouse-ban lévő maximális OrderID azonosító alapján. Ez azt jelenti, hogy csak az új vagy frissített adatok töltődnek be a lakehouse-ba. A következő szakasz az adatfolyamot használja az adatok jegyzetfüzetek és folyamatok használatával való újratöltésére.

(Nem kötelező) adatok újrabetöltése jegyzetfüzetek és adatfolyamok használatával

Opcionálisan, jegyzetfüzetek és csővezetékek használatával is újratöltheti a meghatározott adatokat. A jegyzetfüzetben lévő egyéni Python-kóddal eltávolíthatja a régi adatokat a lakehouse-ból. Ha létrehoz egy folyamatot, amelyben először futtatja a jegyzetfüzetet, majd szekvenciálisan futtatja az adatfolyamot, az OData-forrásból származó adatokat újra betölti a tóházba. A jegyzetfüzetek több nyelvet is támogatnak, de ez az oktatóanyag a PySparkot használja. A Pyspark egy Python API a Sparkhoz, és ebben az oktatóanyagban Spark SQL-lekérdezések futtatására szolgál.

  1. Hozzon létre egy új jegyzetfüzetet a munkaterületen.

    Képernyőkép az új jegyzetfüzet párbeszédpanelről.

  2. Adja hozzá a következő PySpark-kódot a jegyzetfüzethez:

    ### Variables
    LakehouseName = "YOURLAKEHOUSE"
    TableName = "Orders"
    ColName = "OrderID"
    NumberOfOrdersToRemove = "10"
    
    
    ### Remove Old Orders
    Reload = spark.sql("SELECT Max({0})-{1} as ReLoadValue FROM {2}.{3}".format(ColName,NumberOfOrdersToRemove,LakehouseName,TableName)).collect()
    Reload = Reload[0].ReLoadValue
    spark.sql("Delete from {0}.{1} where {2} > {3}".format(LakehouseName, TableName, ColName, Reload))
    
  3. Futtassa a jegyzetfüzetet annak ellenőrzéséhez, hogy az adatok el lettek távolítva a lakehouse-ból.

  4. Hozzon létre egy új folyamatot a munkaterületen.

    Képernyőkép az új folyamat párbeszédpanelről.

  5. Adjon hozzá egy új jegyzetfüzet-tevékenységet a folyamathoz, és válassza ki az előző lépésben létrehozott jegyzetfüzetet.

    Képernyőkép a jegyzetfüzet hozzáadása tevékenység párbeszédpanelről.

    Képernyőkép a Jegyzetfüzet kiválasztása párbeszédpanelről.

  6. Adjon hozzá egy új adatfolyam-tevékenységet a folyamathoz, és válassza ki az előző szakaszban létrehozott adatfolyamot.

    Képernyőkép az adatfolyam hozzáadása tevékenység párbeszédpanelről.

    Képernyőkép az adatfolyam kiválasztása párbeszédpanelről.

  7. Kapcsolja össze a jegyzetfüzet-tevékenységet az adatfolyam-tevékenységgel egy sikeres eseményindítóval.

    Képernyőkép a csatlakozási tevékenységek párbeszédpanelről.

  8. Mentse és futtassa a pipeline-t.

    Képernyőkép a futtatási folyamat párbeszédpanelről.

Most már van egy csatorna, amely eltávolítja a régi adatokat a tóházból, és visszatölti az adatokat az OData-forrásból a tóházba. Ezzel a beállítással az OData-forrásból származó adatokat rendszeresen újra betöltheti a lakehouse-ba.