Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Důležité
Jedná se o vzor pro přírůstkové hromadění dat pomocí Dataflow Gen2. To není stejné jako přírůstková aktualizace. Přírůstková aktualizace je funkce, která je aktuálně ve vývoji. Tato funkce je jedním z nejlepších nápadů na našem webu nápadů. Pro tuto funkci můžete hlasovat na webu Fabric nápady.
Tento kurz trvá 15 minut a popisuje, jak přírůstkově shromažďovat data do datového jezera pomocí Dataflow Gen2.
Přírůstkové shromažďování dat v datovém úložišti vyžaduje techniku načtení pouze nových nebo aktualizovaných dat do tohoto úložiště. Tuto techniku můžete provést pomocí dotazu k filtrování dat na základě cíle dat. V tomto kurzu se dozvíte, jak vytvořit tok dat, který načte data ze zdroje OData do jezera a jak do toku dat přidat dotaz pro filtrování dat na základě cíle dat.
Základní kroky v tomto kurzu jsou následující:
- Vytvořte tok dat pro načtení dat ze zdroje OData do jezera.
- Přidejte do toku dat dotaz pro filtrování dat na základě cíle dat.
- (Volitelné) znovu načtěte data pomocí poznámkových bloků a pipelin.
Požadavky
Musíte mít pracovní prostor s povolenou službou Microsoft Fabric. Pokud ho ještě nemáte, přečtěte si článek Vytvoření pracovního prostoru. Kurz také předpokládá, že používáte zobrazení diagramu v toku dat Gen2. Pokud chcete zkontrolovat, jestli používáte zobrazení diagramu, přejděte na horním pásu karet na
Vytvořte tok dat pro načtení dat ze zdroje OData do datového jezera.
V této části vytvoříte tok dat, který načte data ze zdroje OData do jezera.
Ve svém pracovním prostoru vytvořte nový lakehouse.
Ve svém pracovním prostoru vytvořte nový tok dat Gen2.
Přidejte do toku dat nový zdroj. Vyberte zdroj OData a zadejte následující adresu URL:
https://services.OData.org/V4/Northwind/Northwind.svc
Vyberte tabulku Objednávky a vyberte Další.
Vyberte následující sloupce, které chcete zachovat:
OrderIDCustomerIDEmployeeIDOrderDateRequiredDateShippedDateShipViaFreightShipNameShipAddressShipCityShipRegionShipPostalCodeShipCountry
Změna datového
OrderDatetypu ,RequiredDateaShippedDatenadatetime.
Pomocí následujícího nastavení nastavte cíl dat do svého lakehouse:
- Cíl dat:
Lakehouse - Lakehouse: Vyberte lakehouse, který jste vytvořili v kroku 1.
- Název nové tabulky:
Orders - Metoda aktualizace:
Replace
- Cíl dat:
vyberte Další a publikujte tok dat.
Teď jste vytvořili tok dat pro načtení dat ze zdroje OData do jezera. Tento tok dat se používá v další části k přidání dotazu do toku dat pro filtrování dat na základě cíle dat. Potom můžete datový tok použít k znovunačtení dat pomocí poznámkových bloků a pipeliny.
Přidání dotazu do toku dat pro filtrování dat na základě cíle dat
Tato část přidá do toku dat dotaz, který vyfiltruje data na základě dat v cílovém jezeře. Dotaz načte maximum OrderID v lakehousu na začátku aktualizace toku dat a použije maximální ID objednávky k získání pouze těch objednávek s vyšším ID objednávky ze zdroje, které se poté připojí k cílovým datům. To předpokládá, že objednávky jsou přidány do zdroje ve vzestupném pořadí OrderID. Pokud tomu tak není, můžete data filtrovat pomocí jiného sloupce. Sloupec můžete například použít OrderDate k filtrování dat.
Poznámka:
Filtry OData se po přijetí dat ze zdroje dat aplikují v systému Fabric, ale pro databázové zdroje, jako SQL Server, se filtr použije v dotazu odeslaném do back-endového zdroje dat a do služby se vrátí pouze filtrované řádky.
Po aktualizaci toku dat znovu otevřete tok dat, který jste vytvořili v předchozí části.
Vytvořte nový dotaz s názvem
IncrementalOrderIDa získejte data z tabulky Orders v jezeře, kterou jste vytvořili v předchozí části.
Zakažte přípravu tohoto dotazu.
V náhledu dat klikněte pravým tlačítkem na
OrderIDsloupec a vyberte Přejít k podrobnostem.
Na pásu karet vyberte Nástroje seznamu ->Statistiky ->Maximum.
Nyní máte dotaz, který vrátí maximální ID objednávky v datovém úložišti Lakehouse. Tento dotaz slouží k filtrování dat ze zdroje OData. V další části se do toku dat přidá dotaz, který vyfiltruje data ze zdroje OData na základě maximálního ID objednávky v lakehouse.
Vraťte se k dotazu Objednávky a přidejte nový krok pro filtrování dat. Použijte následující nastavení:
- Sloupec:
OrderID - Operace:
Greater than - Hodnota: parametr
IncrementalOrderID
- Sloupec:
Povolte kombinování dat ze zdroje OData a lakehouse potvrzením následujícího dialogu:
Aktualizujte cílovou destinaci dat podle následujících nastavení:
- Metoda aktualizace:
Append
- Metoda aktualizace:
Publikujte tok dat.
Váš tok dat teď obsahuje dotaz, který filtruje data ze zdroje OData na základě nejvyššího OrderID v datovém jezeře. To znamená, že do jezera se načtou jenom nová nebo aktualizovaná data. Další část používá tok dat k opětovnému načtení dat pomocí poznámkových bloků a kanálů.
(Volitelné) opětovné načtení dat pomocí notebooků a zpracovatelských řetězců
Případně můžete znovu načíst konkrétní data pomocí poznámkových bloků a pipeline. Pomocí vlastního kódu Pythonu v prostředí notebooku odeberete stará data z lakehouse. Potom vytvoříte kanál, ve kterém nejprve spustíte poznámkový blok a postupně spustíte tok dat, znovu načtete data ze zdroje OData do jezera. Poznámkové sešity podporují více jazyků, ale tento návod používá PySpark. Pyspark je rozhraní Python API pro Spark a používá se v tomto kurzu ke spouštění dotazů Spark SQL.
Vytvořte v pracovním prostoru nový poznámkový blok.
Do poznámkového bloku přidejte následující kód PySpark:
### Variables LakehouseName = "YOURLAKEHOUSE" TableName = "Orders" ColName = "OrderID" NumberOfOrdersToRemove = "10" ### Remove Old Orders Reload = spark.sql("SELECT Max({0})-{1} as ReLoadValue FROM {2}.{3}".format(ColName,NumberOfOrdersToRemove,LakehouseName,TableName)).collect() Reload = Reload[0].ReLoadValue spark.sql("Delete from {0}.{1} where {2} > {3}".format(LakehouseName, TableName, ColName, Reload))Spuštěním poznámkového bloku ověřte, že se data z jezera odeberou.
Vytvořte novou pipeline ve svém pracovním prostoru.
Přidejte do kanálu novou aktivitu poznámkového bloku a vyberte poznámkový blok, který jste vytvořili v předchozím kroku.
Přidejte do kanálu novou aktivitu toku dat a vyberte tok dat, který jste vytvořili v předchozí části.
Propojte aktivitu poznámkového bloku s aktivitou toku dat s triggerem úspěchu.
Uložte a spusťte pipeline.
Teď máte kanál, který odebere stará data z lakehouse a znovu načte data ze zdroje OData do jezera. Díky tomuto nastavení můžete data ze zdroje OData znovu načíst do jezera pravidelně.