Sdílet prostřednictvím


Přírůstkové načtení dat z datového skladu do Lakehouse

V tomto kurzu se naučíte přírůstkově načítat data z datového skladu do Lakehouse.

Přehled

Tady je souhrnný diagram tohoto řešení:

Diagram showing incrementally load data logic.

Tady jsou důležité kroky pro vytvoření tohoto řešení:

  1. Vyberte sloupec meze. Ve zdrojové tabulce dat vyberte jeden sloupec, který lze použít k průřezu nových nebo aktualizovaných záznamů pro každé spuštění. Data v tomto vybraném sloupci (například čas_poslední_změny nebo ID) se při vytváření nebo aktualizaci řádků obvykle zvyšují. Maximální hodnota v tomto sloupci se používá jako horní mez.

  2. Připravte tabulku, do které se uloží poslední hodnota meze ve vašem datovém skladu.

  3. Vytvořte kanál s následujícím pracovním postupem:

    Kanál v tomto řešení má následující aktivity:

    • Vytvořte dvě aktivity vyhledávání. První aktivitu vyhledávání použijte k načtení poslední hodnoty meze. Druhou aktivitu vyhledávání použijte k načtení nové hodnoty meze. Tyto hodnoty meze se předají aktivitě kopírování.
    • Vytvořte aktivitu kopírování, která kopíruje řádky ze zdrojové tabulky dat s hodnotou sloupce vodoznaku větší než stará hodnota meze a menší než nová hodnota vodoznaku. Potom zkopíruje data z datového skladu do Lakehouse jako nový soubor.
    • Vytvořte aktivitu uložené procedury, která aktualizuje poslední hodnotu meze pro další spuštění kanálu.

Požadavky

  • Datový sklad. Datový sklad použijete jako zdrojové úložiště dat. Pokud ho nemáte, přečtěte si téma Vytvoření datového skladu , kde najdete postup jeho vytvoření.
  • Jezero. Lakehouse použijete jako cílové úložiště dat. Pokud ho nemáte, přečtěte si téma Vytvoření lakehouse , kde najdete postup jeho vytvoření. Vytvořte složku s názvem IncrementalCopy pro uložení zkopírovaných dat.

Příprava zdroje

Tady jsou některé tabulky a uložená procedura, které je potřeba připravit ve zdrojovém datovém skladu před konfigurací kanálu přírůstkové kopie.

1. Vytvoření tabulky zdroje dat v datovém skladu

Spuštěním následujícího příkazu SQL ve službě Data Warehouse vytvořte tabulku s názvem data_source_table jako tabulku zdroje dat. V tomto kurzu ho použijete jako ukázková data k přírůstkové kopii.

create table data_source_table
(
    PersonID int,
    Name varchar(255),
    LastModifytime DATETIME2(6)
);

INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');

Data v tabulce zdroje dat jsou zobrazená níže:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1        | aaaa | 2017-09-01 00:56:00.000
2        | bbbb | 2017-09-02 05:23:00.000
3        | cccc | 2017-09-03 02:36:00.000
4        | dddd | 2017-09-04 03:21:00.000
5        | eeee | 2017-09-05 08:06:00.000

V tomto kurzu použijete jako sloupec vodoznaku lastModifytime .

2. Vytvořte v datovém skladu jinou tabulku, do které se uloží poslední hodnota vodoznaku.

  1. Spuštěním následujícího příkazu SQL ve službě Data Warehouse vytvořte tabulku s názvem watermarktable pro uložení poslední hodnoty vodoznaku:

    create table watermarktable
    (
    TableName varchar(255),
    WatermarkValue DATETIME2(6),
    );
    
  2. Nastavte výchozí hodnotu posledního vodoznaku s názvem tabulky zdrojové tabulky dat. V tomto kurzu je název tabulky data_source_table a výchozí hodnota je 1/1/2010 12:00:00 AM.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. Zkontrolujte data v tabulce watermarktable.

    Select * from watermarktable
    

    Výstup:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

3. Vytvoření uložené procedury v datovém skladu

Spuštěním následujícího příkazu vytvořte v datovém skladu uloženou proceduru. Tato uložená procedura slouží k aktualizaci poslední hodnoty meze po posledním spuštění kanálu.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Konfigurace kanálu pro přírůstkovou kopii

Krok 1: Vytvoření kanálu

  1. Přejděte do Power BI.

  2. V levém dolním rohu obrazovky vyberte ikonu Power BI a pak výběrem možnosti Data Factory otevřete domovskou stránku služby Data Factory.

    Screenshot with the data factory experience emphasized.

  3. Přejděte do pracovního prostoru Microsoft Fabric.

  4. Vyberte Datový kanál a zadejte název kanálu a vytvořte nový kanál.

    Screenshot showing the new data pipeline button in the newly created workspace.

    Screenshot showing the name of creating a new pipeline.

Krok 2: Přidání aktivity vyhledávání pro poslední vodoznak

V tomto kroku vytvoříte aktivitu vyhledávání, která získá poslední hodnotu meze. Výchozí hodnota 1/1/2010 12:00:00 AM nastavená před získáním.

  1. Vyberte Přidat aktivitu kanálu a v rozevíracím seznamu vyberte Vyhledat .

  2. Na kartě Obecné přejmenujte tuto aktivitu na LookupOldWaterMarkActivity.

  3. Na kartě Nastavení proveďte následující konfiguraci:

    • Typ úložiště dat: Vyberte pracovní prostor.
    • Typ úložiště dat pracovního prostoru: Vyberte datový sklad.
    • Datový sklad: Vyberte datový sklad.
    • Použít dotaz: Zvolte tabulku.
    • Tabulka: Zvolte dbo.watermarktable.
    • Pouze první řádek: Vybráno.

    Screenshot showing lookup old watermark.

Krok 3: Přidání aktivity vyhledávání pro nový vodoznak

V tomto kroku vytvoříte aktivitu vyhledávání, která získá novou hodnotu meze. K získání nového vodoznaku ze zdrojové tabulky dat použijete dotaz. Maximální hodnota ve sloupci LastModifytime ve data_source_table se získá.

  1. Na horním panelu vyberte Vyhledat na kartě Aktivity a přidejte druhou aktivitu vyhledávání.

  2. Na kartě Obecné přejmenujte tuto aktivitu na LookupNewWaterMarkActivity.

  3. Na kartě Nastavení proveďte následující konfiguraci:

    • Typ úložiště dat: Vyberte pracovní prostor.

    • Typ úložiště dat pracovního prostoru: Vyberte datový sklad.

    • Datový sklad: Vyberte datový sklad.

    • Použít dotaz: Zvolte dotaz.

    • Dotaz: Zadejte následující dotaz, který jako nový vodoznak vybere maximální čas poslední změny:

      select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
      
    • Pouze první řádek: Vybráno.

    Screenshot showing lookup new watermark.

Krok 4: Přidání aktivity kopírování pro kopírování přírůstkových dat

V tomto kroku přidáte aktivitu kopírování pro kopírování přírůstkových dat mezi posledním vodoznakem a novým vodoznakem z Datového skladu do Lakehouse.

  1. Vyberte Aktivity na horním panelu a výběrem možnosti Kopírovat data -> Přidat na plátno získejte aktivitu kopírování.

  2. Na kartě Obecné přejmenujte tuto aktivitu na IncrementalCopyActivity.

  3. Připojení obě aktivity vyhledávání do aktivity kopírování přetažením zeleného tlačítka (Při úspěchu) připojeného k aktivitám vyhledávání do aktivity kopírování. Uvolněte tlačítko myši, když se barva ohraničení aktivity kopírování změní na zelenou.

    Screenshot showing connecting lookup and copy activities.

  4. Na kartě Zdroj proveďte následující konfiguraci:

    • Typ úložiště dat: Vyberte pracovní prostor.

    • Typ úložiště dat pracovního prostoru: Vyberte datový sklad.

    • Datový sklad: Vyberte datový sklad.

    • Použít dotaz: Zvolte dotaz.

    • Dotaz: Zadáním následujícího dotazu zkopírujte přírůstková data mezi posledním vodoznakem a novým vodoznakem.

      select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
      

    Screenshot showing copy source configuration.

  5. Na kartě Cíl proveďte následující konfiguraci:

    • Typ úložiště dat: Vyberte pracovní prostor.
    • Typ úložiště dat pracovního prostoru: Vyberte Lakehouse.
    • Lakehouse: Vyberte svůj lakehouse.
    • Kořenová složka: Zvolte soubory.
    • Cesta k souboru: Zadejte složku, do které chcete uložit zkopírovaná data. Vyberte Procházet a vyberte složku. Jako název souboru otevřete Přidat dynamický obsah a zadejte @CONCAT('Incremental-', pipeline().RunId, '.txt') do otevřeného okna pro vytvoření názvů souborů pro zkopírovaný datový soubor v Lakehouse.
    • Formát souboru: Vyberte typ formátu dat.

    Screenshot showing copy destination configuration.

Krok 5:Přidání aktivity uložené procedury

V tomto kroku přidáte aktivitu uložené procedury, která aktualizuje poslední hodnotu meze pro další spuštění kanálu.

  1. Vyberte Aktivity na horním panelu a vyberte Uložená procedura a přidejte aktivitu uložené procedury.

  2. Na kartě Obecné přejmenujte tuto aktivitu na StoredProceduretoWriteWatermarkActivity.

  3. Připojení zelený výstup aktivity kopírování (Při úspěchu) do aktivity uložené procedury.

  4. Na kartě Nastavení proveďte následující konfiguraci:

    • Typ úložiště dat: Vyberte pracovní prostor.

    • Datový sklad: Vyberte datový sklad.

    • Název uložené procedury: Zadejte uloženou proceduru, kterou jste vytvořili v datovém skladu: [dbo].[ usp_write_watermark].

    • Rozbalte parametry uložené procedury. Pokud chcete zadat hodnoty parametrů uložené procedury, vyberte Importovat a zadejte následující hodnoty parametrů:

      Name Typ Hodnota
      LastModifiedtime DateTime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

    Screenshot showing stored procedure activity configuration.

Krok 6: Spuštění kanálu a monitorování výsledku

Na horním panelu vyberte Spustit pod kartou Domů . Pak vyberte Uložit a spustit. Kanál se spustí a kanál můžete monitorovat na kartě Výstup .

Screenshot showing pipeline run results.

Přejděte do svého Lakehouse, zjistíte, že datový soubor je ve složce, kterou jste zadali, a můžete vybrat soubor a zobrazit náhled zkopírovaných dat.

Screenshot showing lakehouse data for the first pipeline run.

Screenshot showing lakehouse data preview for the first pipeline run.

Přidání dalších dat pro zobrazení výsledků přírůstkové kopie

Po dokončení prvního spuštění kanálu se pokusíme do tabulky zdroje datového skladu přidat další data, abychom zjistili, jestli tento kanál dokáže zkopírovat přírůstková data.

Krok 1: Přidání dalších dat do zdroje

Do datového skladu vložte nová data spuštěním následujícího dotazu:

INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')

INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')

Aktualizovaná data pro data_source_table jsou:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000

Krok 2: Aktivace dalšího spuštění kanálu a monitorování výsledku

Vraťte se na stránku kanálu. Na horním panelu znovu vyberte Spustit pod kartou Domů . Kanál se spustí a v části Výstup můžete kanál monitorovat.

Přejděte do lakehouse, najdete nový zkopírovaný datový soubor ve složce, kterou jste zadali, a můžete vybrat soubor, ve které chcete zobrazit náhled zkopírovaných dat. V tomto souboru se zobrazí přírůstková data.

Screenshot showing lakehouse data for the second pipeline run.

Screenshot showing lakehouse data preview for the second pipeline run.

V dalším kroku se dozvíte další informace o kopírování z Azure Blob Storage do Lakehouse.