Not
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Viktigt!
Det här är ett mönster för att inkrementellt samla in data med Dataflöde Gen2. Detta är inte samma sak som inkrementell uppdatering. Inkrementell uppdatering är en funktion som för närvarande är under utveckling. Den här funktionen är en av de mest framröstade idéerna på vår idéwebbplats. Du kan rösta på den här funktionen på webbplatsen Fabric Ideas.
Den här handledningen tar 15 minuter och beskriver hur du successivt samlar in data i ett data lakehouse med Dataflow Gen2.
Inkrementell insamling av data i ett datamål kräver en teknik för att endast läsa in nya eller uppdaterade data till ditt datamål. Den här tekniken kan göras med hjälp av en fråga för att filtrera data baserat på datamålet. Den här självstudien visar hur du skapar ett dataflöde för att ladda data från en OData-källa till ett lakehouse och hur du lägger till en fråga i dataflödet för att filtrera data baserat på datadestinationen.
De övergripande stegen i den här självstudien är följande:
- Skapa ett dataflöde för att läsa in data från en OData-källa till ett lakehouse.
- Lägg till en fråga i dataflödet för att filtrera data baserat på datamålet.
- (Valfritt) läsa in data igen med hjälp av notebook-filer och pipelines.
Förutsättningar
Du måste ha en Microsoft Fabric-aktiverad arbetsyta. Om du inte redan har en kan du läsa Skapa en arbetsyta. Handledningen förutsätter också att du använder diagramvyn i Dataflöde Gen2. Om du vill kontrollera om du använder diagramvyn går du till Visa i det övre menyfliksområdet och kontrollerar att diagramvyn är markerad.
Skapa ett dataflöde för att ladda data från en OData-källa till ett lakehouse
I det här avsnittet skapar du ett dataflöde för att ladda in data från en OData-källa till ett lakehouse.
Skapa ett nytt sjöhus på din arbetsyta.
Skapa en ny Dataflow Gen2 på din arbetsyta.
Lägg till en ny källa i dataflödet. Välj OData-källan och ange följande URL:
https://services.OData.org/V4/Northwind/Northwind.svc
Välj tabellen Beställningar och välj Nästa.
Välj följande kolumner att behålla:
OrderIDCustomerIDEmployeeIDOrderDateRequiredDateShippedDateShipViaFreightShipNameShipAddressShipCityShipRegionShipPostalCodeShipCountry
Ändra datatypen
OrderDateför ,RequiredDateochShippedDatetilldatetime.
Konfigurera datadestinationen för ditt lakehouse med följande inställningar:
- Datamål:
Lakehouse - Lakehouse: Välj det sjöhus som du skapade i steg 1.
- Nytt tabellnamn:
Orders - Uppdateringsmetod:
Replace
- Datamål:
välj Nästa och publicera dataflödet.
Du har nu skapat ett dataflöde för att läsa in data från en OData-källa till ett lakehouse. Det här dataflödet används i nästa avsnitt för att lägga till en fråga i dataflödet för att filtrera data baserat på datamålet. Därefter kan du använda dataflödet för att ladda om data med hjälp av notebooks och pipelines.
Lägga till en fråga i dataflödet för att filtrera data baserat på datamålet
Det här avsnittet lägger till en fråga i dataflödet för att filtrera data baserat på informationen i destinationens lakehouse. Frågan hämtar det maximala värdet på OrderID i lakehouset vid början av uppdateringen av dataflödet och använder det högsta OrderId för att endast hämta order med ett högre OrderId från källan för att lägga till i ditt datamål. Detta förutsätter att beställningar läggs till i källan i stigande ordning på OrderID. Om så inte är fallet kan du använda en annan kolumn för att filtrera data. Du kan till exempel använda OrderDate kolumnen för att filtrera data.
Kommentar
OData-filter tillämpas inom Fabric efter att data har mottagits från datakällan, men för databaskällor som SQL Server tillämpas filtret i frågan som skickas till backend-datakällan och endast filtrerade rader returneras till tjänsten.
När dataflödet har uppdaterats öppnar du det dataflöde som du skapade i föregående avsnitt igen.
Skapa en ny fråga med namnet
IncrementalOrderIDoch hämta data från tabellen Beställningar i lakehouse som du skapade i föregående avsnitt.
Inaktivera mellanlagring av den här frågan.
Högerklicka på kolumnen i dataförhandsgranskningen
OrderIDoch välj Öka detaljnivå.
I menyfliksområdet väljer du Listverktyg ->Statistik ->Maximum.
Nu har du en fråga som returnerar maximalt OrderID i lakehouse. Den här frågan används för att filtrera data från OData-källan. Nästa avsnitt lägger till en fråga i dataflödet för att filtrera data från OData-källan baserat på det maximala OrderID i lakehouse.
Gå tillbaka till frågan Beställningar och lägg till ett nytt steg för att filtrera data. Använd följande inställningar:
- Spalt:
OrderID - Operation:
Greater than - Värde: parameter
IncrementalOrderID
- Spalt:
Tillåt att du kombinerar data från OData-källan och lakehouse genom att bekräfta följande dialogruta:
Uppdatera datadestinationen så att följande inställningar används:
- Uppdateringsmetod:
Append
- Uppdateringsmetod:
Publicera dataflödet.
Ditt dataflöde innehåller nu en fråga som filtrerar data från OData-källan baserat på maximalt OrderID i lakehouse. Det innebär att endast nya eller uppdaterade data läses in i lakehouse. I nästa avsnitt används dataflödet för att läsa in data igen med hjälp av notebook-filer och pipelines.
(Valfritt) ladda om data med hjälp av notebooks och pipelines
Du kan också läsa in specifika data igen med hjälp av notebook-filer och pipelines. Med anpassad Python-kod i notebook-filen tar du bort gamla data från lakehouse. När du sedan skapar en pipeline där du först kör notebook-filen och kör dataflödet sekventiellt läser du in data från OData-källan igen till lakehouse. Notebooks stöder flera språk, men i den här handledningen används PySpark. Pyspark är ett Python-API för Spark och används i den här självstudien för att köra Spark SQL-frågor.
Skapa en ny anteckningsbok i din arbetsyta.
Lägg till följande PySpark-kod i anteckningsboken:
### Variables LakehouseName = "YOURLAKEHOUSE" TableName = "Orders" ColName = "OrderID" NumberOfOrdersToRemove = "10" ### Remove Old Orders Reload = spark.sql("SELECT Max({0})-{1} as ReLoadValue FROM {2}.{3}".format(ColName,NumberOfOrdersToRemove,LakehouseName,TableName)).collect() Reload = Reload[0].ReLoadValue spark.sql("Delete from {0}.{1} where {2} > {3}".format(LakehouseName, TableName, ColName, Reload))Kör notebook för att kontrollera att datan har tagits bort från lakehouse.
Skapa en ny pipeline på din arbetsyta.
Lägg till en ny notebook-aktivitet i pipelinen och välj den notebook-fil som du skapade i föregående steg.
Lägg till en ny dataflödesaktivitet i pipelinen och välj det dataflöde som du skapade i föregående avsnitt.
Länka aktiviteten i notebook till dataflödesaktiviteten med en utlösare för framgång.
Spara och kör pipelinen.
Nu har du en pipeline som tar bort gamla data från lakehouse och läser in data från OData-källan igen till lakehouse. Med den här konfigurationen kan du ladda om data från OData-källan till lakehouse regelbundet.