Mönster för att inkrementellt samla in data med Dataflöde Gen2

Viktigt!

Det här är ett mönster för att inkrementellt samla in data med Dataflöde Gen2. Detta är inte samma sak som inkrementell uppdatering. Inkrementell uppdatering är en funktion som för närvarande är under utveckling. Den här funktionen är en av de mest framröstade idéerna på vår idéwebbplats. Du kan rösta på den här funktionen på webbplatsen Fabric Ideas.

Den här handledningen tar 15 minuter och beskriver hur du successivt samlar in data i ett data lakehouse med Dataflow Gen2.

Inkrementell insamling av data i ett datamål kräver en teknik för att endast läsa in nya eller uppdaterade data till ditt datamål. Den här tekniken kan göras med hjälp av en fråga för att filtrera data baserat på datamålet. Den här självstudien visar hur du skapar ett dataflöde för att ladda data från en OData-källa till ett lakehouse och hur du lägger till en fråga i dataflödet för att filtrera data baserat på datadestinationen.

De övergripande stegen i den här självstudien är följande:

  • Skapa ett dataflöde för att läsa in data från en OData-källa till ett lakehouse.
  • Lägg till en fråga i dataflödet för att filtrera data baserat på datamålet.
  • (Valfritt) läsa in data igen med hjälp av notebook-filer och pipelines.

Förutsättningar

Du måste ha en Microsoft Fabric-aktiverad arbetsyta. Om du inte redan har en kan du läsa Skapa en arbetsyta. Handledningen förutsätter också att du använder diagramvyn i Dataflöde Gen2. Om du vill kontrollera om du använder diagramvyn går du till Visa i det övre menyfliksområdet och kontrollerar att diagramvyn är markerad.

Skapa ett dataflöde för att ladda data från en OData-källa till ett lakehouse

I det här avsnittet skapar du ett dataflöde för att ladda in data från en OData-källa till ett lakehouse.

  1. Skapa ett nytt sjöhus på din arbetsyta.

    Skärmbild som visar dialogrutan skapa lakehouse.

  2. Skapa en ny Dataflow Gen2 på din arbetsyta.

    Skärmbild som visar listrutan Skapa dataflöde.

  3. Lägg till en ny källa i dataflödet. Välj OData-källan och ange följande URL: https://services.OData.org/V4/Northwind/Northwind.svc

    Skärmbild som visar dialogrutan hämta data.

    Skärmbild som visar OData-anslutningsappen.

    Skärmbild som visar OData-inställningarna.

  4. Välj tabellen Beställningar och välj Nästa.

    Skärmbild som visar dialogrutan välj ordertabell.

  5. Välj följande kolumner att behålla:

    • OrderID
    • CustomerID
    • EmployeeID
    • OrderDate
    • RequiredDate
    • ShippedDate
    • ShipVia
    • Freight
    • ShipName
    • ShipAddress
    • ShipCity
    • ShipRegion
    • ShipPostalCode
    • ShipCountry

    Skärmbild som visar funktionen Välj kolumner.

    Skärmbild som visar tabellen Välj kolumner för beställningar.

  6. Ändra datatypen OrderDateför , RequiredDateoch ShippedDate till datetime.

    Skärmbild som visar funktionen ändra datatyp.

  7. Konfigurera datadestinationen för ditt lakehouse med följande inställningar:

    • Datamål: Lakehouse
    • Lakehouse: Välj det sjöhus som du skapade i steg 1.
    • Nytt tabellnamn: Orders
    • Uppdateringsmetod: Replace

    Skärmbild som visar menyfliksområdet för datamålsdestination i lakehouse.

    Skärmbild som visar datadestinationens lakehouse-ordertabell.

    Skärmbild som visar hur lakehouse-inställningarna för datamål ersätts.

  8. välj Nästa och publicera dataflödet.

    Skärmbild som visar dialogrutan publicera dataflöde.

Du har nu skapat ett dataflöde för att läsa in data från en OData-källa till ett lakehouse. Det här dataflödet används i nästa avsnitt för att lägga till en fråga i dataflödet för att filtrera data baserat på datamålet. Därefter kan du använda dataflödet för att ladda om data med hjälp av notebooks och pipelines.

Lägga till en fråga i dataflödet för att filtrera data baserat på datamålet

Det här avsnittet lägger till en fråga i dataflödet för att filtrera data baserat på informationen i destinationens lakehouse. Frågan hämtar det maximala värdet på OrderID i lakehouset vid början av uppdateringen av dataflödet och använder det högsta OrderId för att endast hämta order med ett högre OrderId från källan för att lägga till i ditt datamål. Detta förutsätter att beställningar läggs till i källan i stigande ordning på OrderID. Om så inte är fallet kan du använda en annan kolumn för att filtrera data. Du kan till exempel använda OrderDate kolumnen för att filtrera data.

Kommentar

OData-filter tillämpas inom Fabric efter att data har mottagits från datakällan, men för databaskällor som SQL Server tillämpas filtret i frågan som skickas till backend-datakällan och endast filtrerade rader returneras till tjänsten.

  1. När dataflödet har uppdaterats öppnar du det dataflöde som du skapade i föregående avsnitt igen.

    Skärmbild som visar dialogrutan för öppet dataflöde.

  2. Skapa en ny fråga med namnet IncrementalOrderID och hämta data från tabellen Beställningar i lakehouse som du skapade i föregående avsnitt.

    Skärmbild som visar dialogrutan hämta data.

    Skärmbild som visar lakehouse-anslutningen.

    Skärmbild som visar tabellen get orders lakehouse.

    Skärmbild som visar funktionen byt namn på frågan.

    Skärmbild som visar den omdöpta frågan.

  3. Inaktivera mellanlagring av den här frågan.

    Skärmbild som visar funktionen inaktivera iscensättning.

  4. Högerklicka på kolumnen i dataförhandsgranskningen OrderID och välj Öka detaljnivå.

    Skärmbild som visar funktionen för ökad detaljnivå.

  5. I menyfliksområdet väljer du Listverktyg ->Statistik ->Maximum.

    Skärmbild som visar den maximala orderid-funktionen för statistik.

Nu har du en fråga som returnerar maximalt OrderID i lakehouse. Den här frågan används för att filtrera data från OData-källan. Nästa avsnitt lägger till en fråga i dataflödet för att filtrera data från OData-källan baserat på det maximala OrderID i lakehouse.

  1. Gå tillbaka till frågan Beställningar och lägg till ett nytt steg för att filtrera data. Använd följande inställningar:

    • Spalt: OrderID
    • Operation: Greater than
    • Värde: parameter IncrementalOrderID

    Skärmbild som visar orderid större än filterfunktionen.

    Skärmbild som visar filterinställningarna.

  2. Tillåt att du kombinerar data från OData-källan och lakehouse genom att bekräfta följande dialogruta:

    Skärmbild som visar dialogrutan Tillåt att data kombineras.

  3. Uppdatera datadestinationen så att följande inställningar används:

    • Uppdateringsmetod: Append

    Skärmbild som visar funktionen redigera utdatainställningar.

    Skärmbild som visar den befintliga ordertabellen.

    Skärmbild som visar att datalagringsmålets lakehouse-inställningar läggs till.

  4. Publicera dataflödet.

    Skärmbild som visar dialogrutan publicera dataflöde.

Ditt dataflöde innehåller nu en fråga som filtrerar data från OData-källan baserat på maximalt OrderID i lakehouse. Det innebär att endast nya eller uppdaterade data läses in i lakehouse. I nästa avsnitt används dataflödet för att läsa in data igen med hjälp av notebook-filer och pipelines.

(Valfritt) ladda om data med hjälp av notebooks och pipelines

Du kan också läsa in specifika data igen med hjälp av notebook-filer och pipelines. Med anpassad Python-kod i notebook-filen tar du bort gamla data från lakehouse. När du sedan skapar en pipeline där du först kör notebook-filen och kör dataflödet sekventiellt läser du in data från OData-källan igen till lakehouse. Notebooks stöder flera språk, men i den här handledningen används PySpark. Pyspark är ett Python-API för Spark och används i den här självstudien för att köra Spark SQL-frågor.

  1. Skapa en ny anteckningsbok i din arbetsyta.

    Skärmbild som visar dialogrutan för den nya anteckningsboken.

  2. Lägg till följande PySpark-kod i anteckningsboken:

    ### Variables
    LakehouseName = "YOURLAKEHOUSE"
    TableName = "Orders"
    ColName = "OrderID"
    NumberOfOrdersToRemove = "10"
    
    
    ### Remove Old Orders
    Reload = spark.sql("SELECT Max({0})-{1} as ReLoadValue FROM {2}.{3}".format(ColName,NumberOfOrdersToRemove,LakehouseName,TableName)).collect()
    Reload = Reload[0].ReLoadValue
    spark.sql("Delete from {0}.{1} where {2} > {3}".format(LakehouseName, TableName, ColName, Reload))
    
  3. Kör notebook för att kontrollera att datan har tagits bort från lakehouse.

  4. Skapa en ny pipeline på din arbetsyta.

    Skärmbild som visar den nya pipelinedialogrutan.

  5. Lägg till en ny notebook-aktivitet i pipelinen och välj den notebook-fil som du skapade i föregående steg.

    Skärmbild som visar dialogrutan Lägg till notebook-aktivitet.

    Skärmbild som visar dialogrutan Välj anteckningsbok.

  6. Lägg till en ny dataflödesaktivitet i pipelinen och välj det dataflöde som du skapade i föregående avsnitt.

    Skärmbild som visar dialogrutan Lägg till dataflödesaktivitet.

    Skärmbild som visar dialogrutan Välj dataflöde.

  7. Länka aktiviteten i notebook till dataflödesaktiviteten med en utlösare för framgång.

    Skärmbild som visar dialogrutan Anslut aktiviteter.

  8. Spara och kör pipelinen.

    Skärmbild som visar dialogrutan för att köra pipeline.

Nu har du en pipeline som tar bort gamla data från lakehouse och läser in data från OData-källan igen till lakehouse. Med den här konfigurationen kan du ladda om data från OData-källan till lakehouse regelbundet.