Not
Åtkomst till denna sida kräver auktorisation. Du kan prova att logga in eller byta katalog.
Åtkomst till denna sida kräver auktorisation. Du kan prova att byta katalog.
Den här handledningen visar hur du endast kopierar nya eller uppdaterade data från ditt informationslager till ett Lakehouse. Den här metoden kallas inkrementell inläsning och är användbar när du vill behålla dina data up-to-date utan att kopiera allt varje gång.
Här är lösningens design på hög nivå:
Välj en kolumn för vattenstämplar. Välj en kolumn i källtabellen som hjälper dig att spåra nya eller ändrade poster. Den här kolumnen innehåller vanligtvis värden som ökar när rader läggs till eller uppdateras (till exempel en tidsstämpel eller ett ID). Vi använder det högsta värdet i den här kolumnen som vår "referenspunkt" för att veta var vi slutade.
Konfigurera en tabell för att lagra ditt senaste vattenstämpelvärde.
Skapa en pipeline som gör följande:
Pipelinen innehåller följande aktiviteter:
- Två uppslagsaktiviteter. Den första får det sista vattenstämpelvärdet (där vi slutade förra gången). Den andra får det nya vattenstämpelvärdet (där vi stoppar den här gången). Båda värdena överförs till kopieringsaktiviteten.
- En kopieringsaktivitet som hittar rader där värdet för vattenstämpelkolumnen ligger mellan de gamla och nya vattenstämplarna. Sedan kopieras dessa data från ditt informationslager till Lakehouse som en ny fil.
- En lagrad proceduraktivitet som sparar det nya vattenstämpelvärdet så att nästa körning av pipeline vet var den ska börja.
Förutsättningar
- Informationslager. Du använder informationslagret som källdatalager. Om du inte har någon kan du läsa mer i Skapa ett informationslager .
-
Lakehouse. Du använder Lakehouse som måldatalager. Om du inte har någon, se Skapa en Lakehouse för anvisningar.
- Skapa en mapp med namnet IncrementalCopy för att lagra dina kopierade data.
Förbereda källan
Nu ska vi konfigurera tabellerna och den lagrade proceduren som du behöver i informationslagret innan du konfigurerar den inkrementella kopieringspipelinen.
1. Skapa en datakällatabell i informationslagret
Kör följande SQL-kommando i informationslagret för att skapa en tabell med namnet data_source_table som källtabell. Vi använder detta som exempeldata för den inkrementella kopian.
create table data_source_table
(
PersonID int,
Name varchar(255),
LastModifytime DATETIME2(6)
);
INSERT INTO data_source_table
(PersonID, Name, LastModifytime)
VALUES
(1, 'aaaa','9/1/2017 12:56:00 AM'),
(2, 'bbbb','9/2/2017 5:23:00 AM'),
(3, 'cccc','9/3/2017 2:36:00 AM'),
(4, 'dddd','9/4/2017 3:21:00 AM'),
(5, 'eeee','9/5/2017 8:06:00 AM');
Data i källtabellen ser ut så här:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
I den här tutorialen använder vi LastModifytime som vattenmärkeskolumn.
2. Skapa en annan tabell i informationslagret för att lagra det sista vattenstämpelvärdet
Kör följande SQL-kommando i informationslagret för att skapa en tabell med namnet watermarktable för att lagra det sista vattenstämpelvärdet:
create table watermarktable ( TableName varchar(255), WatermarkValue DATETIME2(6), );Ange standardvärdet för den senaste vattenstämpeln med källtabellens namn. I den här självstudien är tabellnamnet data_source_table och vi anger standardvärdet till
1/1/2010 12:00:00 AM.INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')Kontrollera data i watermarktabell.
Select * from watermarktableUtdata:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
3. Skapa en lagrad procedur i informationslagret
Kör följande kommando för att skapa en lagrad procedur i informationslagret. Den här lagrade proceduren uppdaterar det senaste vattenstämpelvärdet efter varje pipelinekörning.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Konfigurera en pipeline för inkrementell kopiering
Steg 1: Skapa en pipeline
Gå till Power BI.
Välj Power BI-ikonen längst ned till vänster på skärmen och välj sedan Infrastruktur.
Välj Min arbetsyta för att öppna arbetsytan Fabric.
Välj + Nytt objekt, välj Pipeline och ange sedan ett pipelinenamn för att skapa en ny pipeline.
Steg 2: Lägg till en uppslagsaktivitet för den senaste vattenstämpeln
I det här steget skapar du en uppslagsaktivitet för att hämta det sista vattenstämpelvärdet. Vi får det standardvärde 1/1/2010 12:00:00 AM som vi angav tidigare.
Välj Pipelineaktivitet och välj Uppslag från rullgardinsmenyn.
Under fliken Allmänt byter du namn på aktiviteten till LookupOldWaterMarkActivity.
Under fliken Inställningar konfigurerar du följande:
- Anslutning: Under Lager väljer du Bläddra bland alla och väljer ditt informationslager i listan.
- Använd fråga: Välj Tabell.
- Tabell: Välj dbo.watermarktable.
- Endast första raden: Vald.
Steg 3: Lägg till en uppslagsaktivitet för den nya vattenstämpeln
I det här steget skapar du en uppslagsaktivitet för att hämta det nya vattenstämpelvärdet. Du använder en sökfråga för att hämta den nya vattenstämpeln från källans datatabell. Vi får det högsta värdet i kolumnen LastModifytime från data_source_table.
I det övre fältet väljer du Uppslag under fliken Aktiviteter för att lägga till den andra uppslagsaktiviteten.
Under fliken Allmänt byter du namn på aktiviteten till LookupNewWaterMarkActivity.
Under fliken Inställningar konfigurerar du följande:
Anslutning: Under Informationslager väljer du Bläddra bland alla och väljer ditt informationslager i listan eller väljer ditt informationslager från Infrastrukturobjektanslutningar.
Använd fråga: Välj Fråga.
Fråga: Ange följande fråga för att välja den maximala senast ändrade tiden som den nya vattenstämpeln:
select MAX(LastModifytime) as NewWatermarkvalue from data_source_tableEndast första raden: Vald.
Steg 4: Lägg till kopieringsaktiviteten för att kopiera inkrementella data
I det här steget lägger du till en kopieringsaktivitet för att kopiera inkrementella data mellan den senaste vattenstämpeln och den nya vattenstämpeln från ditt informationslager till Lakehouse.
Välj Aktiviteter i det övre fältet och välj Kopiera data –> Lägg till på arbetsytan för att hämta kopieringsaktiviteten.
Under fliken Allmänt byter du namn på den här aktiviteten till IncrementalCopyActivity.
Anslut båda uppslagsaktiviteterna till kopieringsaktiviteten genom att dra den gröna knappen (Vid lyckat resultat) som är kopplad till uppslagsaktiviteterna till kopieringsaktiviteten. Släpp musknappen när du ser kantfärgen för kopieringsaktiviteten ändras till grön.
Under fliken Källa konfigurerar du följande:
Anslutning: Under Informationslager väljer du Bläddra bland alla och väljer ditt informationslager i listan eller väljer ditt informationslager från Infrastrukturobjektanslutningar.
Lager: Välj ditt lager.
Använd fråga: Välj Fråga.
Fråga: Ange följande fråga för att kopiera inkrementella data mellan den senaste vattenstämpeln och den nya vattenstämpeln.
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
Under fliken Mål konfigurerar du följande:
- Anslutning: Under Lakehouse väljer du Bläddra bland alla och väljer ditt sjöhus i listan eller väljer ditt lakehouse från Infrastrukturobjektanslutningar.
- Lakehouse: Välj din Lakehouse.
- Rotmapp: Välj Filer.
-
Filsökväg: Välj den mapp där du vill lagra dina kopierade data. Välj Bläddra för att välja din mapp. För att ange filnamnet öppnar du Lägg till dynamiskt innehåll och anger
@CONCAT('Incremental-', pipeline().RunId, '.txt')i det fönster som öppnas för att skapa filnamn för dina kopierade datafiler i Lakehouse-miljön. - Filformat: Välj formattypen för dina data.
Steg 5: Lägga till en lagrad proceduraktivitet
I det här steget lägger du till en aktivitet för att uppdatera det sista markörvärdet i den lagrade proceduren inför nästa pipelinekörning.
Välj Aktiviteter i det övre fältet och välj Lagrad procedur för att lägga till en lagrad proceduraktivitet.
Under fliken Allmänt byter du namn på den här aktiviteten till StoredProceduretoWriteWatermarkActivity.
Anslut det gröna (vid framgång) utdataflöde för kopieringsaktiviteten till aktiviteten för lagrad procedur.
Under fliken Inställningar konfigurerar du följande:
Informationslager: Välj ditt informationslager.
Namn på lagrad procedur: Välj den lagrade procedur som du skapade i ditt informationslager: [dbo].[ usp_write_watermark].
Expandera parametrar för lagrad procedur. Om du vill ange värden för parametrarna för lagrad procedur väljer du Importera och anger följande värden för parametrarna:
Namn Typ Värde SenastÄndradTid Datum/tid @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} TabellNamn String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
Steg 6: Kör pipelinen och övervaka resultatet
I det övre fältet väljer du Kör under fliken Start . Välj sedan Spara och kör. Pipelinen börjar köras och du kan övervaka den under fliken Utdata .
Gå till Lakehouse så ser du att datafilen finns under den mapp som du har valt. Du kan välja filen för att förhandsgranska kopierade data.
Lägg till mer data för att se resultatet av inkrementell kopiering
När du har slutfört den första pipelinekörningen ska vi lägga till mer data i källtabellen för informationslagret för att se om den här pipelinen kan kopiera dina inkrementella data.
Steg 1: Lägg till mer data i källan
Infoga nya data i informationslagret genom att köra följande fråga:
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
Uppdaterade data för data_source_table är:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
Steg 2: Utlösa en annan pipelinekörning och övervaka resultatet
Gå tillbaka till din pipelinesida. I det övre fältet väljer du Kör under fliken Start igen. Pipelinen börjar köras och du kan övervaka den under Utdata.
Gå till Lakehouse så ser du att den nya kopierade datafilen finns under den mapp som du har valt. Du kan välja filen för att förhandsgranska kopierade data. Du kommer att se att dina inkrementella data visas i den här filen.
Relaterat innehåll
Läs sedan mer om att kopiera från Azure Blob Storage till Lakehouse.