Delen via


Patroon om incrementeel gegevens te verzamelen met Dataflow Gen2

Belangrijk

Dit is een patroon om incrementeel gegevens te verzamelen met Dataflow Gen2. Dit is niet hetzelfde als incrementeel vernieuwen. Incrementeel vernieuwen is een functie die momenteel in ontwikkeling is. Deze functie is een van de belangrijkste stemideeën op onze ideeënwebsite. U kunt stemmen op deze functie op de site Fabric Ideas.

Deze zelfstudie duurt 15 minuten en beschrijft hoe u incrementeel gegevens in een lakehouse kunt verzamelen met behulp van Dataflow Gen2.

Incrementeel het verzamelen van gegevens in een gegevensbestemming vereist een techniek om alleen nieuwe of bijgewerkte gegevens in uw gegevensbestemming te laden. Deze techniek kan worden uitgevoerd met behulp van een query om de gegevens te filteren op basis van de gegevensbestemming. Deze zelfstudie laat zien hoe u een gegevensstroom maakt om gegevens uit een OData-bron in een lakehouse te laden en hoe u een query toevoegt aan de gegevensstroom om de gegevens te filteren op basis van de gegevensbestemming.

De overzichtelijke stappen in deze handleiding zijn als volgt:

  • Maak een gegevensstroom om gegevens uit een OData-bron in een lakehouse te laden.
  • Voeg een query toe aan de gegevensstroom om de gegevens te filteren op basis van de gegevensbestemming.
  • (Optioneel) laad gegevens opnieuw met behulp van notebooks en pijplijnen.

Vereisten

U moet een werkruimte met Microsoft Fabric hebben. Als u nog geen werkruimte hebt, raadpleegt u Een werkruimte maken. De handleiding gaat ervan uit dat je de diagramweergave in Gegevensstroom Gen2 gebruikt. Als u wilt controleren of u de diagramweergave gebruikt, gaat u in het bovenste lint naar Weergave en controleert u of Diagramweergave is geselecteerd.

Een gegevensstroom creëren om data uit een OData-bron naar een lakehouse te laden.

In deze sectie maakt u een gegevensstroom om gegevens uit een OData-bron in een lakehouse te laden.

  1. Maak een nieuw lakehouse in uw werkruimte.

    Schermopname van het dialoogvenster om een Lakehouse aan te maken.

  2. Maak een nieuwe Dataflow Gen2 in uw werkruimte.

    Schermafbeelding van de dropdownmenu voor het maken van een gegevensstroom.

  3. Voeg een nieuwe bron toe aan de gegevensstroom. Selecteer de OData-bron en voer de volgende URL in: https://services.OData.org/V4/Northwind/Northwind.svc

    Schermopname van het dialoogvenster Gegevens ophalen.

    Screenshot van de OData-connector.

    Schermopname van de OData-instellingen.

  4. Selecteer de tabel Orders en selecteer Volgende.

    Schermopname van het dialoogvenster Ordertabel selecteren.

  5. Selecteer de volgende kolommen die u wilt behouden:

    • OrderID
    • CustomerID
    • EmployeeID
    • OrderDate
    • RequiredDate
    • ShippedDate
    • ShipVia
    • Freight
    • ShipName
    • ShipAddress
    • ShipCity
    • ShipRegion
    • ShipPostalCode
    • ShipCountry

    Schermopname van de functie Kolommen Kiezen.

    Schermopname van de tabel voor kolomvolgorde kiezen.

  6. Wijzig het gegevenstype van OrderDate, RequiredDateen ShippedDate in datetime.

    Schermopname van de functie change datatype.

  7. Stel de gegevensbestemming in op uw lakehouse met behulp van de volgende instellingen:

    • Gegevensbestemming: Lakehouse
    • Lakehouse: Selecteer het lakehouse dat u in stap 1 hebt gemaakt.
    • Nieuwe tabelnaam: Orders
    • Updatemethode: Replace

    Schermopname van het lint van data destination lakehouse.

    Schermopname van de ordertabel voor het data destination lakehouse.

    Schermopname van de data destination lakehouse-instellingen vervangen.

  8. selecteer Volgende en publiceer de gegevensstroom.

    Schermopname van het dialoogvenster voor het delen van gegevensstromen.

Je hebt nu een dataflow gemaakt om gegevens uit een OData-bron in een lakehouse te laden. Deze gegevensstroom wordt gebruikt in de volgende sectie om een query toe te voegen aan de gegevensstroom om de gegevens te filteren op basis van de gegevensbestemming. Daarna kunt u de gegevensstroom gebruiken om gegevens opnieuw te laden met behulp van notebooks en pijplijnen.

Een query toevoegen aan de gegevensstroom om de gegevens te filteren op basis van de gegevensbestemming

In deze sectie wordt aan de gegevensstroom een query toegevoegd om de gegevens te filteren op basis van de gegevens in het doellakehouse. De query haalt het maximum OrderID op in het lakehouse aan het begin van het verversen van de datastroom en gebruikt de maximale OrderId om alleen de orders met een hogere OrderId van de bron op te halen om toe te voegen aan uw gewenste gegevenslocatie. Hierbij wordt ervan uitgegaan dat orders in een oplopende volgorde van OrderID aan de bron worden toegevoegd. Als dit niet het geval is, kunt u een andere kolom gebruiken om de gegevens te filteren. U kunt bijvoorbeeld de OrderDate kolom gebruiken om de gegevens te filteren.

Notitie

OData-filters worden toegepast in Fabric nadat de gegevens van de gegevensbron zijn ontvangen, maar voor databasebronnen zoals SQL Server wordt het filter toegepast in de query die is verzonden naar de back-endgegevensbron en worden alleen gefilterde rijen geretourneerd naar de service.

  1. Nadat de gegevensstroom is vernieuwd, heropent u de gegevensstroom die u in de vorige sectie hebt gemaakt.

    Schermopname van het dialoogvenster Gegevensstroom openen.

  2. Maak een nieuwe query met de naam IncrementalOrderID en haal gegevens op uit de tabel Orders in het lakehouse dat u in de vorige sectie hebt gemaakt.

    Schermopname van het dialoogvenster Gegevens ophalen.

    Schermopname van de Lakehouse-connector.

    Schermopname van de

    Schermopname van de functie voor het hernoemen van query's.

    Schermopname van de hernoemde query.

  3. Fasering van deze query uitschakelen.

    Schermopname die de functie voor het uitschakelen van faserings toont.

  4. Klik in het voorbeeld van de gegevens met de rechtermuisknop op de OrderID kolom en selecteer Inzoomen.

    Schermopname van de inzoomfunctie.

  5. Selecteer van het lint Lijsthulpmiddelen ->Statistieken ->Maximum.

    Schermopname van de functie maximumvolgorde-id voor statistieken.

U hebt nu een query die de maximale OrderID in het lakehouse retourneert. Deze query wordt gebruikt om de gegevens uit de OData-bron te filteren. In de volgende sectie wordt een query toegevoegd aan de gegevensstroom om de gegevens uit de OData-bron te filteren op basis van de maximale OrderID in lakehouse.

  1. Ga terug naar de query Orders en voeg een nieuwe stap toe om de gegevens te filteren. Gebruik de volgende instellingen:

    • Kolom: OrderID
    • Operatie: Greater than
    • Waarde: parameter IncrementalOrderID

    Schermopname van de order-id groter dan de filterfunctie.

    Schermopname van de filterinstellingen.

  2. Sta het combineren van de gegevens uit de OData-bron en het lakehouse toe door het bevestigen van de volgende dialoog:

    Schermopname van het dialoogvenster Gegevens combineren toestaan.

  3. Werk de gegevensbestemming bij om de volgende instellingen te gebruiken:

    • Updatemethode: Append

    Schermopname van de functie uitvoerinstellingen bewerken.

    Schermopname van de bestaande tabel orders.

    Schermopname van het toevoegen van de data destination lakehouse-instellingen.

  4. Publiceer de gegevensstroom.

    Schermopname van het dialoogvenster voor het publiceren van de dataflow.

Uw gegevensstroom bevat nu een query waarmee de gegevens uit de OData-bron worden gefilterd op basis van de maximale OrderID in lakehouse. Dit betekent dat alleen nieuwe of bijgewerkte gegevens in het lakehouse worden geladen. In de volgende sectie wordt de gegevensstroom gebruikt om gegevens opnieuw te laden met behulp van notebooks en pijplijnen.

(Optioneel) gegevens opnieuw laden met behulp van notebooks en pijplijnen

U kunt desgewenst specifieke gegevens opnieuw laden met behulp van notebooks en pijplijnen. Met aangepaste Python-code in het notebook verwijdert u de oude gegevens uit lakehouse. Door vervolgens een pijplijn te maken waarin u het notebook voor het eerst uitvoert en de gegevensstroom opeenvolgend uitvoert, laadt u de gegevens uit de OData-bron opnieuw in het lakehouse. Notebooks ondersteunen meerdere talen, maar in deze zelfstudie wordt PySpark gebruikt. Pyspark is een Python-API voor Spark en wordt in deze zelfstudie gebruikt om Spark SQL-query's uit te voeren.

  1. Maak een nieuw notitieblok in uw werkruimte.

    Schermopname van het nieuwe notitieblok dialoogvenster.

  2. Voeg de volgende PySpark-code toe aan uw notebook:

    ### Variables
    LakehouseName = "YOURLAKEHOUSE"
    TableName = "Orders"
    ColName = "OrderID"
    NumberOfOrdersToRemove = "10"
    
    
    ### Remove Old Orders
    Reload = spark.sql("SELECT Max({0})-{1} as ReLoadValue FROM {2}.{3}".format(ColName,NumberOfOrdersToRemove,LakehouseName,TableName)).collect()
    Reload = Reload[0].ReLoadValue
    spark.sql("Delete from {0}.{1} where {2} > {3}".format(LakehouseName, TableName, ColName, Reload))
    
  3. Voer het notebook uit om te controleren of de gegevens zijn verwijderd uit lakehouse.

  4. Maak een nieuwe pijplijn in uw werkruimte.

    Schermopname van het dialoogvenster nieuwe pijplijn.

  5. Voeg een nieuwe notebookactiviteit toe aan de pijplijn en selecteer het notebook dat u in de vorige stap hebt gemaakt.

    Schermopname van het dialoogvenster Notitieblokactiviteit toevoegen.

    Schermopname van het dialoogvenster Notitieblok Selecteren.

  6. Voeg een nieuwe gegevensstroomactiviteit toe aan de pijplijn en selecteer de gegevensstroom die u in de vorige sectie hebt gemaakt.

    Schermopname van het dialoogvenster Gegevensstroomactiviteit toevoegen.

    Schermopname van het dialoogvenster Gegevensstroom selecteren.

  7. Koppel de notebookactiviteit aan de gegevensstroomactiviteit met een succesvolle trigger.

    Schermopname van het dialoogvenster voor verbindingsactiviteiten.

  8. Sla de pijplijn op en voer deze uit.

    Schermopname van het dialoogvenster 'Pijplijn uitvoeren'.

U hebt nu een pijplijn waarmee oude gegevens uit het lakehouse worden verwijderd en de gegevens uit de OData-bron opnieuw worden geladen in het lakehouse. Met deze installatie kunt u de gegevens van de OData-bron regelmatig opnieuw laden in het lakehouse.