Condividi tramite


Caricare dati in modo incrementale da Data Warehouse a Lakehouse

In questa esercitazione viene mostrato come caricare in modo incrementale i dati da Data Warehouse a Lakehouse.

Panoramica

Ecco il diagramma generale della soluzione:

Diagramma che mostra la logica di caricamento incrementale dei dati.

Di seguito sono descritti i passaggi fondamentali per la creazione di questa soluzione:

  1. Selezionare la colonna della filigrana. Seleziona una colonna nella tabella dati di origine, che può essere usata per suddividere i record nuovi o aggiornati per ogni esecuzione. I dati della colonna selezionata (ad esempio last_modify_time o ID) continuano in genere ad aumentare quando le righe vengono create o aggiornate. Il valore massimo di questa colonna viene usato come limite.

  2. Prepara una tabella per archiviare l'ultimo valore limite nel data warehouse.

  3. Creare una pipeline con il flusso di lavoro seguente:

    La pipeline di questa soluzione contiene le attività seguenti:

    • Creare due attività di consultazione. Usare la prima attività di ricerca per recuperare l'ultimo valore di marcatore. Usare la seconda attività di ricerca per recuperare il nuovo valore del watermark. Questi valori limite vengono passati all'attività di copia.
    • Crea un'attività di copia che copia le righe dalla tabella dei dati di origine in cui il valore della colonna watermark è maggiore rispetto al vecchio valore watermark e minore rispetto al nuovo valore watermark. Copia quindi i dati dal data warehouse a Lakehouse come nuovo file.
    • Crea un'attività di stored procedure che aggiorna l'ultimo indicatore per l'esecuzione successiva della pipeline.

Prerequisiti

  • Data warehouse. Uso del data warehouse come archivio dati di origine. Se non è disponibile, vedere Crea un Data warehouse per crearne uno.
  • Lakehouse. Si usa il Lakehouse come archivio dati di destinazione. Se non è disponibile, vedere Crea un Lakehouse per crearne uno. Crea una cartella denominata IncrementalCopy per archiviare i dati copiati.

Preparazione della sorgente

Ecco alcune tabelle e stored procedure che devi preparare nel tuo data warehouse di origine prima di configurare la pipeline di copia incrementale.

1. Crea una tabella di origine dati nel data warehouse

Esegui il comando SQL seguente in Data Warehouse per creare una tabella denominata data_source_table come tabella dell'origine dati. In questa esercitazione viene usata come dati di esempio per eseguire la copia incrementale.

create table data_source_table
(
    PersonID int,
    Name varchar(255),
    LastModifytime DATETIME2(6)
);

INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');

I dati nella tabella dell'origine dati sono visualizzati di seguito:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1        | aaaa | 2017-09-01 00:56:00.000
2        | bbbb | 2017-09-02 05:23:00.000
3        | cccc | 2017-09-03 02:36:00.000
4        | dddd | 2017-09-04 03:21:00.000
5        | eeee | 2017-09-05 08:06:00.000

In questa esercitazione, si usa LastModifytime come colonna del valore limite.

2. Crea un'altra tabella nel data warehouse per archiviare l’ultimo valore limite

  1. Esegui il comando SQL seguente nel data warehouse per creare una tabella denominata watermarktable in cui archiviare l’ultimo valore limite:

    create table watermarktable
    (
    TableName varchar(255),
    WatermarkValue DATETIME2(6),
    );
    
  2. Impostare il valore predefinito dell'ultima filigrana con il nome della tabella dei dati di origine. In questa esercitazione, il nome della tabella è data_source_table, e il valore predefinito è 1/1/2010 12:00:00 AM.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. Rivedi i dati nella tabella watermarktable.

    Select * from watermarktable
    

    Output:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

3. Crea una procedura memorizzata nel Data Warehouse

Esegui il comando seguente per creare una stored procedure nel data warehouse. Questa stored procedure viene usata per aggiornare l'ultimo valore limite dopo l'ultima esecuzione della pipeline.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Configurare una pipeline per la copia incrementale

Passaggio 1: creare una pipeline

  1. Accedere a Power BI.

  2. Selezionare l'icona di Power BI nella parte in basso a sinistra dello schermo, quindi selezionare Data Factory per aprire la home page di Data Factory.

  3. Accedere all'area di lavoro di Microsoft Fabric.

  4. Selezionare Pipeline di dati e quindi immettere un nome di pipeline per creare una nuova pipeline.

    Screenshot che mostra il pulsante della nuova pipeline di dati nell'area di lavoro appena creata.

    Screenshot che mostra il nome della creazione di una nuova pipeline.

Passaggio 2: aggiungi un'attività di ricerca per l'ultimo marcatore.

In questo passaggio, si crea un'attività di ricerca per ottenere l'ultimo valore di riferimento. Il valore predefinito 1/1/2010 12:00:00 AM impostato in precedenza è stato ottenuto.

  1. Seleziona Aggiungi attività della pipeline e seleziona Ricerca dall'elenco a discesa.

  2. Nella scheda Generale rinomina questa attività in LookupOldWaterMarkActivity.

  3. Nella scheda Impostazioni, esegui la configurazione seguente:

    • Tipo di archivio dati: seleziona Area di lavoro.
    • Tipo di archivio dati dell'area di lavoro: seleziona Data Warehouse.
    • Data Warehouse: seleziona il data warehouse.
    • Usa query: scegli Tabella.
    • Tabella: scegli dbo.watermarktable.
    • Solo prima riga: selezionata.

    Screenshot che mostra il precedente valore limite di ricerca.

Passaggio 3: aggiungi un'attività di ricerca per il nuovo valore limite

In questo passaggio, si crea un'attività di ricerca per ottenere il nuovo valore limite. Si utilizza un'interrogazione per ottenere la nuova filigrana dalla tabella dati di origine. Viene ottenuto il valore massimo nella colonna LastModifytime dalla tabella data_source_table.

  1. Nella barra superiore, seleziona Ricerca nella scheda Attività per aggiungere la seconda attività di ricerca.

  2. Nella scheda Generale, rinomina questa attività in LookupNewWaterMarkActivity.

  3. Nella scheda Impostazioni, esegui la configurazione seguente:

    • Tipo di archivio dati: seleziona Area di lavoro.

    • Tipo di archivio dati dell'area di lavoro: seleziona Data Warehouse.

    • Data Warehouse: seleziona il data warehouse.

    • Usa query: scegli Query.

    • Query: immetti la query seguente per selezionare l'ultimo orario di modifica massimo come nuovo riferimento temporale:

      select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
      
    • Solo prima riga: selezionata.

    Screenshot che mostra il nuovo valore limite di ricerca.

Passaggio 4: aggiungi l'attività di copia per copiare i dati incrementali

In questo passaggio, si aggiunge un'attività di copia per trasferire i dati incrementali tra l'ultimo marcatore temporale e il nuovo marcatore temporale, dal Data Warehouse al Lakehouse.

  1. Seleziona Attività nella barra superiore e successivamente Copia dati ->Aggiungi all'area di lavoro per ottenere l'attività di copia.

  2. Nella scheda Generale, rinomina questa attività in IncrementalCopyActivity.

  3. Connettere entrambe le attività di Lookup all'attività Copia trascinando il pulsante verde (Successo) dalla sezione delle attività di Lookup verso l'attività Copia. Rilasciare il pulsante del mouse quando il bordo dell'attività di copia diventa di colore verde.

    Screenshot che mostra la connessione delle attività di ricerca e copia.

  4. Nella scheda Origine, esegui la configurazione seguente:

    • Tipo di archivio dati: seleziona Area di lavoro.

    • Tipo di archivio dati dell'area di lavoro: seleziona Data Warehouse.

    • Data Warehouse: seleziona il data warehouse.

    • Usa query: scegli Query.

    • Query: inserisci la query seguente per copiare i dati incrementali tra l'ultimo marcatore e il nuovo marcatore.

      select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
      

    Screenshot che mostra la configurazione della sorgente di copia.

  5. Nella scheda Descrizione, esegui la configurazione seguente:

    • Tipo di archivio dati: seleziona Area di lavoro.
    • Tipo di archivio dati dell'area di lavoro: seleziona Lakehouse.
    • Lakehouse: seleziona la tua Lakehouse.
    • Cartella radice: scegli File.
    • Percorso file: specifica la cartella in cui archiviare i dati copiati. Fai clic su Sfoglia per selezionare la cartella. Per il nome del file, apri Aggiungi contenuto dinamico e immetti @CONCAT('Incremental-', pipeline().RunId, '.txt') nella finestra aperta per creare nomi di file per il file di dati copiato in Lakehouse.
    • Formato file: seleziona il tipo di formato dei dati.

    Screenshot che mostra la configurazione della destinazione di copia.

Passaggio 5: Aggiungere un'attività Stored Procedure

In questo passaggio, viene aggiunta un'attività di stored procedure per aggiornare l'ultimo valore limite per l'esecuzione del successivo pipeline.

  1. Seleziona Attività nella barra superiore e successivamente Procedura memorizzata per aggiungere un'attività procedura memorizzata.

  2. Nella scheda Generale, rinomina questa attività in StoredProceduretoWriteWatermarkActivity.

  3. Connetti l'output contrassegnato in verde (alla riuscita dell'operazione) dell'attività di copia all'attività di procedura memorizzata.

  4. Nella scheda Impostazioni, esegui la configurazione seguente:

    • Tipo di archivio dati: seleziona Area di lavoro.

    • Data Warehouse: seleziona il data warehouse.

    • Nome Stored Procedure: Specifica la stored procedure creata nel data warehouse: [dbo].[usp_write_watermark].

    • Espandi parametri della procedura memorizzata. Per specificare i valori dei parametri della stored procedure, seleziona Importa e immetti i seguenti valori per i parametri:

      Nome tipo Valore
      OraUltimaModifica Data e Ora @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      NomeTabella String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

    Screenshot che mostra la configurazione dell'attività della procedura memorizzata.

Passaggio 6: esegui la pipeline e monitora il risultato

Nella barra superiore, seleziona Esegui nella scheda Home. Seleziona quindi Salva ed esegui. La pipeline inizia l'esecuzione e potrai monitorare la pipeline nella scheda Output.

Screenshot che mostra i risultati dell'esecuzione della pipeline.

Vai a Lakehouse, dove troverai il file di dati nella cartella specificata; seleziona quindi il file per visualizzare in anteprima i dati copiati.

Screenshot che mostra i dati del lakehouse per la prima esecuzione della pipeline.

Screenshot che mostra l'anteprima dei dati lakehouse per la prima esecuzione della pipeline.

Aggiunge altri dati per visualizzare i risultati della copia incrementale

Dopo aver completato la prima esecuzione della pipeline, prova ad aggiungere altri dati nella tabella di origine del data warehouse per vedere se questa pipeline può copiare i dati incrementali.

Passaggio 1: aggiungi altri dati all'origine

Inserisci nuovi dati nel data warehouse eseguendo la query seguente:

INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')

INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')

I dati aggiornati per data_source_table sono:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000

Passaggio 2: attiva un'altra esecuzione della pipeline e monitora il risultato

Torna alla pagina della pipeline. Nella barra superiore, seleziona nuovamente Esegui nella scheda Home. La pipeline avvia l'esecuzione e puoi monitorarla in Output.

Vai a Lakehouse, dove troverai il nuovo file di dati copiati nella cartella specificata; seleziona quindi il file per visualizzare in anteprima i dati copiati. I tuoi dati incrementali vengono visualizzati in questo file.

Screenshot che mostra i dati del lakehouse per la seconda esecuzione della pipeline.

Screenshot che mostra l'anteprima dei dati lakehouse per la seconda esecuzione della pipeline.

Passare quindi a ulteriori informazioni sulla copia da Archiviazione BLOB di Azure a Lakehouse.