Pattern to incrementally amass data with Dataflow Gen2

Important

This is a pattern to incrementally amass data with Dataflow Gen2. This isn't the same as incremental refresh. Incremental refresh is a feature that's currently in development. This feature is one of the top voted ideas on our ideas website. You can vote for this feature on the Fabric Ideas site.

This tutorial takes 15 minutes, and describes how to incrementally amass data into a lakehouse using Dataflow Gen2.

Incrementally amassing data in a data destination requires a technique to load only new or updated data into your data destination. This technique can be done by using a query to filter the data based on the data destination. This tutorial shows how to create a dataflow to load data from an OData source into a lakehouse and how to add a query to the dataflow to filter the data based on the data destination.

The high-level steps in this tutorial are as follows:

  • Create a dataflow to load data from an OData source into a lakehouse.
  • Add a query to the dataflow to filter the data based on the data destination.
  • (Optional) reload data using notebooks and pipelines.

Prerequisites

You must have a Microsoft Fabric enabled workspace. If you don't already have one, refer to Create a workspace. Also, the tutorial assumes you are using the diagram view in Dataflow Gen2. To check if you are using the diagram view, in the top ribbon go to View and make sure Diagram view is selected.

Create a dataflow to load data from an OData source into a lakehouse

In this section, you create a dataflow to load data from an OData source into a lakehouse.

  1. Create a new lakehouse in your workspace.

    Screenshot showing the create lakehouse dialog.

  2. Create a new Dataflow Gen2 in your workspace.

    Screenshot showing the create dataflow dropdown.

  3. Add a new source to the dataflow. Select the OData source and enter the following URL: https://services.OData.org/V4/Northwind/Northwind.svc

    Screenshot showing the get data dialog.

    Screenshot showing the OData connector.

    Screenshot showing the OData settings.

  4. Select the Orders table and select Next.

    Screenshot showing the select orders table dialog.

  5. Select the following columns to keep:

    • OrderID
    • CustomerID
    • EmployeeID
    • OrderDate
    • RequiredDate
    • ShippedDate
    • ShipVia
    • Freight
    • ShipName
    • ShipAddress
    • ShipCity
    • ShipRegion
    • ShipPostalCode
    • ShipCountry

    Screenshot showing the choose columns function.

    Screenshot showing the choose columns orders table.

  6. Change datatype of OrderDate, RequiredDate, and ShippedDate to datetime.

    Screenshot showing the change datatype function.

  7. Set up the data destination to your lakehouse using the following settings:

    • Data destination: Lakehouse
    • Lakehouse: Select the lakehouse you created in step 1.
    • New table name: Orders
    • Update method: Replace

    Screenshot showing the data destination lakehouse ribbon.

    Screenshot showing the data destination lakehouse order table.

    Screenshot showing the data destination lakehouse settings replace.

  8. select Next and publish the dataflow.

    Screenshot showing the publish dataflow dialog.

You have now created a dataflow to load data from an OData source into a lakehouse. This dataflow is used in the next section to add a query to the dataflow to filter the data based on the data destination. After that, you can use the dataflow to reload data using notebooks and pipelines.

Add a query to the dataflow to filter the data based on the data destination

This section adds a query to the dataflow to filter the data based on the data in the destination lakehouse. The query gets the maximum OrderID in the lakehouse at the beginning of the dataflow refresh and uses the maximum OrderId to only get the orders with a higher OrderId from to source to append to your data destination. This assumes that orders are added to the source in ascending order of OrderID. If this isn't the case, you can use a different column to filter the data. For example, you can use the OrderDate column to filter the data.

Note

OData filters are applied within Fabric after the data is received from the data source, however, for database sources like SQL Server, the filter is applied in the query submitted to the backend data source, and only filtered rows are returned to the service.

  1. After the dataflow refreshes, reopen the dataflow you created in the previous section.

    Screenshot showing the open dataflow dialog.

  2. Create a new query named IncrementalOrderID and get data from the Orders table in the lakehouse you created in the previous section.

    Screenshot showing the get data dialog.

    Screenshot showing the lakehouse connector.

    Screenshot showing the get orders table lakehouse.

    Screenshot showing the rename query function.

    Screenshot showing the renamed query.

  3. Disable staging of this query.

    Screenshot showing the disable staging function.

  4. In the data preview, right-click on the OrderID column and select Drill down.

    Screenshot showing the drill down function.

  5. From the ribbon, select List Tools -> Statistics -> Maximum.

    Screenshot showing the statistics maximum orderid function.

You now have a query that returns the maximum OrderID in the lakehouse. This query is used to filter the data from the OData source. The next section adds a query to the dataflow to filter the data from the OData source based on the maximum OrderID in the lakehouse.

  1. Go back to the Orders query and add a new step to filter the data. Use the following settings:

    • Column: OrderID
    • Operation: Greater than
    • Value: parameter IncrementalOrderID

    Screenshot showing the orderid greater than filter function.

    Screenshot showing the filter settings.

  2. Allow combining the data from the OData source and the lakehouse by confirming the following dialog:

    Screenshot showing the allow combining data dialog.

  3. Update the data destination to use the following settings:

    • Update method: Append

    Screenshot showing the edit output settings function.

    Screenshot showing the existing orders table.

    Screenshot showing the data destination lakehouse settings append.

  4. Publish the dataflow.

    Screenshot showing the publish dataflow dialog.

Your dataflow now contains a query that filters the data from the OData source based on the maximum OrderID in the lakehouse. This means that only new or updated data is loaded into the lakehouse. The next section uses the dataflow to reload data using notebooks and pipelines.

(Optional) reload data using notebooks and pipelines

Optionally, you can reload specific data using notebooks and pipelines. With custom python code in the notebook, you remove the old data from the lakehouse. By then creating a pipeline in which you first run the notebook and sequentially run the dataflow, you reload the data from the OData source into the lakehouse. Notebooks support multiple languages, but this tutorial uses PySpark. Pyspark is a Python API for Spark and is used in this tutorial to run Spark SQL queries.

  1. Create a new notebook in your workspace.

    Screenshot showing the new notebook dialog.

  2. Add the following PySpark code to your notebook:

    ### Variables
    LakehouseName = "YOURLAKEHOUSE"
    TableName = "Orders"
    ColName = "OrderID"
    NumberOfOrdersToRemove = "10"
    
    
    ### Remove Old Orders
    Reload = spark.sql("SELECT Max({0})-{1} as ReLoadValue FROM {2}.{3}".format(ColName,NumberOfOrdersToRemove,LakehouseName,TableName)).collect()
    Reload = Reload[0].ReLoadValue
    spark.sql("Delete from {0}.{1} where {2} > {3}".format(LakehouseName, TableName, ColName, Reload))
    
  3. Run the notebook to verify that the data is removed from the lakehouse.

  4. Create a new pipeline in your workspace.

    Screenshot showing the new pipeline dialog.

  5. Add a new notebook activity to the pipeline and select the notebook you created in the previous step.

    Screenshot showing the add notebook activity dialog.

    Screenshot showing the select notebook dialog.

  6. Add a new dataflow activity to the pipeline and select the dataflow you created in the previous section.

    Screenshot showing the add dataflow activity dialog.

    Screenshot showing the select dataflow dialog.

  7. Link the notebook activity to the dataflow activity with a success trigger.

    Screenshot showing the connect activities dialog.

  8. Save and run the pipeline.

    Screenshot showing the run pipeline dialog.

You now have a pipeline that removes old data from the lakehouse and reloads the data from the OData source into the lakehouse. With this setup, you can reload the data from the OData source into the lakehouse on a regular basis.