Dela via


Läs in data inkrementellt från Data Warehouse till Lakehouse

I den här självstudien får du lära dig hur du inkrementellt läser in data från Data Warehouse till Lakehouse.

Översikt

Här är det avancerade diagrammet:

Diagram showing incrementally load data logic.

Här är några viktiga steg för att skapa den här lösningen:

  1. Markera vattenstämpelkolumnen. Välj en kolumn i källdatatabellen, som kan användas för att segmentera nya eller uppdaterade poster för varje körning. Vanligtvis ökar data i den markerade kolumnen (till exempel last_modify_time elle ID) när rader skapas eller uppdateras. Det maximala värdet i den här kolumnen används som vattenstämpel.

  2. Förbered en tabell för att lagra det sista vattenstämpelvärdet i ditt informationslager.

  3. Skapa en pipeline med följande arbetsflöde:

    Pipelinen i den här lösningen har följande aktiviteter:

    • Skapa två sökningsaktiviteter. Använd den första lookup-aktiviteten för att hämta det sista vattenstämpelvärdet. Använd den andra lookup-aktiviteten för att hämta det nya vattenstämpelvärdet. Vattenstämpelvärdena skickas till kopieringsaktiviteten.
    • Skapa en kopieringsaktivitet som kopierar rader från källdatatabellen med värdet för vattenstämpelkolumnen som är större än det gamla vattenstämpelvärdet och mindre än det nya vattenstämpelvärdet. Sedan kopieras data från datalagret till Lakehouse som en ny fil.
    • Skapa en lagrad proceduraktivitet som uppdaterar det sista vattenstämpelvärdet för nästa pipelinekörning.

Förutsättningar

  • Informationslager. Du använder informationslagret som källdatalager. Om du inte har det kan du läsa Skapa ett informationslager för steg för att skapa ett.
  • Lakehouse. Du använder Lakehouse som måldatalager. Om du inte har det kan du läsa Skapa en Lakehouse för steg för att skapa en. Skapa en mapp med namnet IncrementalCopy för att lagra kopierade data.

Förbereda källan

Här följer några tabeller och en lagrad procedur som du behöver förbereda i ditt källdatalager innan du konfigurerar den inkrementella kopieringspipelinen.

1. Skapa en datakällatabell i informationslagret

Kör följande SQL-kommando i informationslagret för att skapa en tabell med namnet data_source_table som datakälltabell. I den här självstudien använder du den som exempeldata för att göra den inkrementella kopian.

create table data_source_table
(
    PersonID int,
    Name varchar(255),
    LastModifytime DATETIME2(6)
);

INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');

Data i datakällans tabell visas nedan:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1        | aaaa | 2017-09-01 00:56:00.000
2        | bbbb | 2017-09-02 05:23:00.000
3        | cccc | 2017-09-03 02:36:00.000
4        | dddd | 2017-09-04 03:21:00.000
5        | eeee | 2017-09-05 08:06:00.000

I den här självstudien använder du LastModifytime som vattenstämpelkolumn.

2. Skapa en annan tabell i informationslagret för att lagra det sista vattenstämpelvärdet

  1. Kör följande SQL-kommando i informationslagret för att skapa en tabell med namnet watermarktable för att lagra det sista vattenstämpelvärdet:

    create table watermarktable
    (
    TableName varchar(255),
    WatermarkValue DATETIME2(6),
    );
    
  2. Ange standardvärdet för den senaste vattenstämpeln med tabellnamnet för källdatatabellen. I den här självstudien är tabellnamnet data_source_table och standardvärdet är 1/1/2010 12:00:00 AM.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. Granska data i tabellens vattenstämpeltabell.

    Select * from watermarktable
    

    Utdata:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

3. Skapa en lagrad procedur i informationslagret

Kör följande kommando för att skapa en lagrad procedur i informationslagret. Den här lagrade proceduren används för att uppdatera det senaste vattenstämpelvärdet efter den senaste pipelinekörningen.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Konfigurera en pipeline för inkrementell kopiering

Steg 1: Skapa en pipeline

  1. Gå till Power BI.

  2. Välj Power BI-ikonen längst ned till vänster på skärmen och välj sedan Data factory för att öppna startsidan för Data Factory.

    Screenshot with the data factory experience emphasized.

  3. Gå till din Microsoft Fabric-arbetsyta.

  4. Välj Datapipeline och ange sedan ett pipelinenamn för att skapa en ny pipeline.

    Screenshot showing the new data pipeline button in the newly created workspace.

    Screenshot showing the name of creating a new pipeline.

Steg 2: Lägg till en uppslagsaktivitet för den senaste vattenstämpeln

I det här steget skapar du en uppslagsaktivitet för att hämta det sista vattenstämpelvärdet. Standardvärdet 1/1/2010 12:00:00 AM som angetts innan hämtas.

  1. Välj Lägg till pipelineaktivitet och välj Uppslag i listrutan.

  2. Under fliken Allmänt byter du namn på aktiviteten till LookupOldWaterMarkActivity.

  3. Utför följande konfiguration under fliken Inställningar:

    • Datalagertyp: Välj Arbetsyta.
    • Datalagertyp för arbetsyta: Välj Informationslager.
    • Informationslager: Välj ditt informationslager.
    • Använd fråga: Välj Tabell.
    • Tabell: Välj dbo.watermarktable.
    • Endast första raden: Markerad.

    Screenshot showing lookup old watermark.

Steg 3: Lägg till en uppslagsaktivitet för den nya vattenstämpeln

I det här steget skapar du en uppslagsaktivitet för att hämta det nya vattenstämpelvärdet. Du använder en fråga för att hämta den nya vattenstämpeln från källdatatabellen. Det maximala värdet i kolumnen LastModifytime i data_source_table hämtas.

  1. I det övre fältet väljer du Uppslag under fliken Aktiviteter för att lägga till den andra uppslagsaktiviteten.

  2. Under fliken Allmänt byter du namn på aktiviteten till LookupNewWaterMarkActivity.

  3. Utför följande konfiguration under fliken Inställningar:

    • Datalagertyp: Välj Arbetsyta.

    • Datalagertyp för arbetsyta: Välj Informationslager.

    • Informationslager: Välj ditt informationslager.

    • Använd fråga: Välj Fråga.

    • Fråga: Ange följande fråga för att välja den maximala senast ändrade tiden som den nya vattenstämpeln:

      select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
      
    • Endast första raden: Markerad.

    Screenshot showing lookup new watermark.

Steg 4: Lägg till kopieringsaktiviteten för att kopiera inkrementella data

I det här steget lägger du till en kopieringsaktivitet för att kopiera inkrementella data mellan den senaste vattenstämpeln och den nya vattenstämpeln från Data Warehouse till Lakehouse.

  1. Välj Aktiviteter i det övre fältet och välj Kopiera data –> Lägg till på arbetsytan för att hämta kopieringsaktiviteten.

  2. Under fliken Allmänt byter du namn på den här aktiviteten till IncrementalCopyActivity.

  3. Anslut båda uppslagsaktiviteterna till kopieringsaktiviteten genom att dra den gröna knappen (Vid lyckad) som är kopplad till uppslagsaktiviteterna till kopieringsaktiviteten. Släpp musknappen när du ser kantfärgen för kopieringsaktiviteten ändras till grön.

    Screenshot showing connecting lookup and copy activities.

  4. Utför följande konfiguration under fliken Källa :

    • Datalagertyp: Välj Arbetsyta.

    • Datalagertyp för arbetsyta: Välj Informationslager.

    • Informationslager: Välj ditt informationslager.

    • Använd fråga: Välj Fråga.

    • Fråga: Ange följande fråga för att kopiera inkrementella data mellan den senaste vattenstämpeln och den nya vattenstämpeln.

      select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
      

    Screenshot showing copy source configuration.

  5. Utför följande konfiguration under fliken Mål :

    • Datalagertyp: Välj Arbetsyta.
    • Datalagertyp för arbetsyta: Välj Lakehouse.
    • Lakehouse: Välj din Lakehouse.
    • Rotmapp: Välj Filer.
    • Filsökväg: Ange den mapp som du vill lagra dina kopierade data. Välj Bläddra för att välja din mapp. För filnamnet öppnar du Lägg till dynamiskt innehåll och anger @CONCAT('Incremental-', pipeline().RunId, '.txt') i det öppnade fönstret för att skapa filnamn för den kopierade datafilen i Lakehouse.
    • Filformat: Välj formattypen för dina data.

    Screenshot showing copy destination configuration.

Steg 5:Lägga till en lagrad proceduraktivitet

I det här steget lägger du till en lagrad proceduraktivitet för att uppdatera det senaste vattenstämpelvärdet för nästa pipelinekörning.

  1. Välj Aktiviteter i det övre fältet och välj Lagrad procedur för att lägga till en lagrad proceduraktivitet.

  2. Under fliken Allmänt byter du namn på aktiviteten till StoredProceduretoWriteWatermarkActivity.

  3. Anslut den gröna utdatan (Vid lyckad) av kopieringsaktiviteten till aktiviteten för lagrad procedur.

  4. Utför följande konfiguration under fliken Inställningar:

    • Datalagertyp: Välj Arbetsyta.

    • Informationslager: Välj ditt informationslager.

    • Namn på lagrad procedur: Ange den lagrade procedur som du skapade i informationslagret: [dbo].[ usp_write_watermark].

    • Expandera parametrar för lagrad procedur. Om du vill ange värden för parametrarna för lagrad procedur väljer du Importera och anger följande värden för parametrarna:

      Namn Typ Värde
      LastModifiedtime Datum/tid @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

    Screenshot showing stored procedure activity configuration.

Steg 6:Kör pipelinen och övervaka resultatet

I det övre fältet väljer du Kör under fliken Start . Välj sedan Spara och kör. Pipelinen börjar köras och du kan övervaka pipelinen under fliken Utdata .

Screenshot showing pipeline run results.

Gå till Lakehouse, du hittar datafilen under den mapp som du angav och du kan välja filen för att förhandsgranska kopierade data.

Screenshot showing lakehouse data for the first pipeline run.

Screenshot showing lakehouse data preview for the first pipeline run.

Lägg till mer data för att se resultatet av inkrementell kopiering

När du har slutfört den första pipelinekörningen ska vi försöka lägga till mer data i datalagrets källtabell för att se om den här pipelinen kan kopiera dina inkrementella data.

Steg 1: Lägg till mer data i källan

Infoga nya data i informationslagret genom att köra följande fråga:

INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')

INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')

Uppdaterade data för data_source_table är:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000

Steg 2:Utlös en annan pipelinekörning och övervaka resultatet

Gå tillbaka till pipelinesidan. I det övre fältet väljer du Kör under fliken Start igen. Pipelinen börjar köras och du kan övervaka pipelinen under Utdata.

Gå till Lakehouse, du hittar den nya kopierade datafilen under den mapp som du angav och du kan välja filen för att förhandsgranska kopierade data. Du ser att dina inkrementella data visas i den här filen.

Screenshot showing lakehouse data for the second pipeline run.

Screenshot showing lakehouse data preview for the second pipeline run.

Gå sedan vidare för att lära dig mer om att kopiera från Azure Blob Storage till Lakehouse.