Sdílet prostřednictvím


Muster pro postupné shromažďování dat pomocí Dataflow Gen2

Důležité

Jedná se o vzor pro přírůstkové hromadění dat pomocí Dataflow Gen2. To není stejné jako přírůstková aktualizace. Přírůstková aktualizace je funkce, která je aktuálně ve vývoji. Tato funkce je jedním z nejlepších nápadů na našem webu nápadů. Pro tuto funkci můžete hlasovat na webu Fabric nápady.

Tento kurz trvá 15 minut a popisuje, jak přírůstkově shromažďovat data do datového jezera pomocí Dataflow Gen2.

Přírůstkové shromažďování dat v datovém úložišti vyžaduje techniku načtení pouze nových nebo aktualizovaných dat do tohoto úložiště. Tuto techniku můžete provést pomocí dotazu k filtrování dat na základě cíle dat. V tomto kurzu se dozvíte, jak vytvořit tok dat, který načte data ze zdroje OData do jezera a jak do toku dat přidat dotaz pro filtrování dat na základě cíle dat.

Základní kroky v tomto kurzu jsou následující:

  • Vytvořte tok dat pro načtení dat ze zdroje OData do jezera.
  • Přidejte do toku dat dotaz pro filtrování dat na základě cíle dat.
  • (Volitelné) znovu načtěte data pomocí poznámkových bloků a pipelin.

Požadavky

Musíte mít pracovní prostor s povolenou službou Microsoft Fabric. Pokud ho ještě nemáte, přečtěte si článek Vytvoření pracovního prostoru. Kurz také předpokládá, že používáte zobrazení diagramu v toku dat Gen2. Pokud chcete zkontrolovat, jestli používáte zobrazení diagramu, přejděte na horním pásu karet na Zobrazení a ujistěte se, že je zaškrtnuté zobrazení diagramu .

Vytvořte tok dat pro načtení dat ze zdroje OData do datového jezera.

V této části vytvoříte tok dat, který načte data ze zdroje OData do jezera.

  1. Ve svém pracovním prostoru vytvořte nový lakehouse.

    Snímek obrazovky zobrazující dialogové okno pro vytvoření lakehouse

  2. Ve svém pracovním prostoru vytvořte nový tok dat Gen2.

    Snímek obrazovky s rozevíracím seznamem pro vytvoření toku dat.

  3. Přidejte do toku dat nový zdroj. Vyberte zdroj OData a zadejte následující adresu URL: https://services.OData.org/V4/Northwind/Northwind.svc

    Snímek obrazovky znázorňující dialogové okno „Získat data“.

    Snímek obrazovky znázorňující konektor OData

    Snímek obrazovky s nastavením OData

  4. Vyberte tabulku Objednávky a vyberte Další.

    Snímek obrazovky s dialogovým oknem Vybrat tabulku objednávek

  5. Vyberte následující sloupce, které chcete zachovat:

    • OrderID
    • CustomerID
    • EmployeeID
    • OrderDate
    • RequiredDate
    • ShippedDate
    • ShipVia
    • Freight
    • ShipName
    • ShipAddress
    • ShipCity
    • ShipRegion
    • ShipPostalCode
    • ShipCountry

    Snímek obrazovky znázorňující funkci zvolit sloupce

    Snímek obrazovky zobrazující tabulku výběrových sloupců objednávek

  6. Změna datového OrderDatetypu , RequiredDatea ShippedDate na datetime.

    Snímek obrazovky znázorňující funkci change datatype

  7. Pomocí následujícího nastavení nastavte cíl dat do svého lakehouse:

    • Cíl dat: Lakehouse
    • Lakehouse: Vyberte lakehouse, který jste vytvořili v kroku 1.
    • Název nové tabulky: Orders
    • Metoda aktualizace: Replace

    Snímek obrazovky znázorňující pás karet datového úložiště jezera.

    Snímek obrazovky zobrazující tabulku objednávek v datovém lakehouse.

    Snímek obrazovky znázorňující nahrazení nastavení cílového datového jezera

  8. vyberte Další a publikujte tok dat.

    Snímek obrazovky s dialogovým oknem Publikovat tok dat

Teď jste vytvořili tok dat pro načtení dat ze zdroje OData do jezera. Tento tok dat se používá v další části k přidání dotazu do toku dat pro filtrování dat na základě cíle dat. Potom můžete datový tok použít k znovunačtení dat pomocí poznámkových bloků a pipeliny.

Přidání dotazu do toku dat pro filtrování dat na základě cíle dat

Tato část přidá do toku dat dotaz, který vyfiltruje data na základě dat v cílovém jezeře. Dotaz načte maximum OrderID v lakehousu na začátku aktualizace toku dat a použije maximální ID objednávky k získání pouze těch objednávek s vyšším ID objednávky ze zdroje, které se poté připojí k cílovým datům. To předpokládá, že objednávky jsou přidány do zdroje ve vzestupném pořadí OrderID. Pokud tomu tak není, můžete data filtrovat pomocí jiného sloupce. Sloupec můžete například použít OrderDate k filtrování dat.

Poznámka:

Filtry OData se po přijetí dat ze zdroje dat aplikují v systému Fabric, ale pro databázové zdroje, jako SQL Server, se filtr použije v dotazu odeslaném do back-endového zdroje dat a do služby se vrátí pouze filtrované řádky.

  1. Po aktualizaci toku dat znovu otevřete tok dat, který jste vytvořili v předchozí části.

    Snímek obrazovky zobrazující dialogové okno otevřeného toku dat.

  2. Vytvořte nový dotaz s názvem IncrementalOrderID a získejte data z tabulky Orders v jezeře, kterou jste vytvořili v předchozí části.

    Snímek obrazovky s dialogovým oknem získávání dat

    Snímek obrazovky znázorňující konektor lakehouse

    Snímek obrazovky znázorňující tabulku pro získání objednávek v Lakehouse.

    Snímek obrazovky zobrazující funkci dotazu pro přejmenování

    Snímek obrazovky zobrazující přejmenovaný dotaz

  3. Zakažte přípravu tohoto dotazu.

    Snímek obrazovky zobrazující funkci zakázání průběžného zpracování.

  4. V náhledu dat klikněte pravým tlačítkem na OrderID sloupec a vyberte Přejít k podrobnostem.

    Snímek obrazovky znázorňující funkci detailního zobrazení

  5. Na pásu karet vyberte Nástroje seznamu ->Statistiky ->Maximum.

    Snímek obrazovky znázorňující statistickou funkci maximálního orderid.

Nyní máte dotaz, který vrátí maximální ID objednávky v datovém úložišti Lakehouse. Tento dotaz slouží k filtrování dat ze zdroje OData. V další části se do toku dat přidá dotaz, který vyfiltruje data ze zdroje OData na základě maximálního ID objednávky v lakehouse.

  1. Vraťte se k dotazu Objednávky a přidejte nový krok pro filtrování dat. Použijte následující nastavení:

    • Sloupec: OrderID
    • Operace: Greater than
    • Hodnota: parametr IncrementalOrderID

    Snímek obrazovky znázorňující orderid větší než funkce filtru

    Snímek obrazovky s nastavením filtru

  2. Povolte kombinování dat ze zdroje OData a lakehouse potvrzením následujícího dialogu:

    Snímek obrazovky zobrazující dialogové okno umožňující kombinování dat.

  3. Aktualizujte cílovou destinaci dat podle následujících nastavení:

    • Metoda aktualizace: Append

    Snímek obrazovky znázorňující funkci upravit nastavení výstupu

    Snímek obrazovky zobrazující existující tabulku objednávek

    Snímek obrazovky ukazující nastavení přípony cílového úložiště typu lakehouse.

  4. Publikujte tok dat.

    Snímek obrazovky s dialogovým oknem pro publikování datového toku

Váš tok dat teď obsahuje dotaz, který filtruje data ze zdroje OData na základě nejvyššího OrderID v datovém jezeře. To znamená, že do jezera se načtou jenom nová nebo aktualizovaná data. Další část používá tok dat k opětovnému načtení dat pomocí poznámkových bloků a kanálů.

(Volitelné) opětovné načtení dat pomocí notebooků a zpracovatelských řetězců

Případně můžete znovu načíst konkrétní data pomocí poznámkových bloků a pipeline. Pomocí vlastního kódu Pythonu v prostředí notebooku odeberete stará data z lakehouse. Potom vytvoříte kanál, ve kterém nejprve spustíte poznámkový blok a postupně spustíte tok dat, znovu načtete data ze zdroje OData do jezera. Poznámkové sešity podporují více jazyků, ale tento návod používá PySpark. Pyspark je rozhraní Python API pro Spark a používá se v tomto kurzu ke spouštění dotazů Spark SQL.

  1. Vytvořte v pracovním prostoru nový poznámkový blok.

    Snímek obrazovky s dialogovým oknem nového poznámkového bloku

  2. Do poznámkového bloku přidejte následující kód PySpark:

    ### Variables
    LakehouseName = "YOURLAKEHOUSE"
    TableName = "Orders"
    ColName = "OrderID"
    NumberOfOrdersToRemove = "10"
    
    
    ### Remove Old Orders
    Reload = spark.sql("SELECT Max({0})-{1} as ReLoadValue FROM {2}.{3}".format(ColName,NumberOfOrdersToRemove,LakehouseName,TableName)).collect()
    Reload = Reload[0].ReLoadValue
    spark.sql("Delete from {0}.{1} where {2} > {3}".format(LakehouseName, TableName, ColName, Reload))
    
  3. Spuštěním poznámkového bloku ověřte, že se data z jezera odeberou.

  4. Vytvořte novou pipeline ve svém pracovním prostoru.

    Snímek obrazovky zobrazující dialogové okno nového pipeline

  5. Přidejte do kanálu novou aktivitu poznámkového bloku a vyberte poznámkový blok, který jste vytvořili v předchozím kroku.

    Snímek obrazovky zobrazující dialogové okno pro přidání aktivity poznámkového bloku

    Snímek obrazovky s dialogovým oknem pro výběr poznámkového bloku

  6. Přidejte do kanálu novou aktivitu toku dat a vyberte tok dat, který jste vytvořili v předchozí části.

    Snímek obrazovky s dialogovým oknem pro přidání aktivity toku dat.

    Snímek obrazovky s dialogovým oknem pro výběr toku dat.

  7. Propojte aktivitu poznámkového bloku s aktivitou toku dat s triggerem úspěchu.

    Snímek obrazovky zobrazující dialogové okno pro připojení aktivit.

  8. Uložte a spusťte pipeline.

    Snímek obrazovky s dialogovým oknem pro spuštění kanálu

Teď máte kanál, který odebere stará data z lakehouse a znovu načte data ze zdroje OData do jezera. Díky tomuto nastavení můžete data ze zdroje OData znovu načíst do jezera pravidelně.