Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
In questa esercitazione viene mostrato come caricare in modo incrementale i dati da Data Warehouse a Lakehouse.
Panoramica
Ecco il diagramma generale della soluzione:
Di seguito sono descritti i passaggi fondamentali per la creazione di questa soluzione:
Selezionare la colonna della filigrana. Seleziona una colonna nella tabella dati di origine, che può essere usata per suddividere i record nuovi o aggiornati per ogni esecuzione. I dati della colonna selezionata (ad esempio last_modify_time o ID) continuano in genere ad aumentare quando le righe vengono create o aggiornate. Il valore massimo di questa colonna viene usato come limite.
Prepara una tabella per archiviare l'ultimo valore limite nel data warehouse.
Creare una pipeline con il flusso di lavoro seguente:
La pipeline di questa soluzione contiene le attività seguenti:
- Creare due attività di consultazione. Usare la prima attività di ricerca per recuperare l'ultimo valore di marcatore. Usare la seconda attività di ricerca per recuperare il nuovo valore del watermark. Questi valori limite vengono passati all'attività di copia.
- Crea un'attività di copia che copia le righe dalla tabella dei dati di origine in cui il valore della colonna watermark è maggiore rispetto al vecchio valore watermark e minore rispetto al nuovo valore watermark. Copia quindi i dati dal data warehouse a Lakehouse come nuovo file.
- Crea un'attività di stored procedure che aggiorna l'ultimo indicatore per l'esecuzione successiva della pipeline.
Prerequisiti
- Data warehouse. Uso del data warehouse come archivio dati di origine. Se non è disponibile, vedere Crea un Data warehouse per crearne uno.
- Lakehouse. Si usa il Lakehouse come archivio dati di destinazione. Se non è disponibile, vedere Crea un Lakehouse per crearne uno. Crea una cartella denominata IncrementalCopy per archiviare i dati copiati.
Preparazione della sorgente
Ecco alcune tabelle e stored procedure che devi preparare nel tuo data warehouse di origine prima di configurare la pipeline di copia incrementale.
1. Crea una tabella di origine dati nel data warehouse
Esegui il comando SQL seguente in Data Warehouse per creare una tabella denominata data_source_table come tabella dell'origine dati. In questa esercitazione viene usata come dati di esempio per eseguire la copia incrementale.
create table data_source_table
(
PersonID int,
Name varchar(255),
LastModifytime DATETIME2(6)
);
INSERT INTO data_source_table
(PersonID, Name, LastModifytime)
VALUES
(1, 'aaaa','9/1/2017 12:56:00 AM'),
(2, 'bbbb','9/2/2017 5:23:00 AM'),
(3, 'cccc','9/3/2017 2:36:00 AM'),
(4, 'dddd','9/4/2017 3:21:00 AM'),
(5, 'eeee','9/5/2017 8:06:00 AM');
I dati nella tabella dell'origine dati sono visualizzati di seguito:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
In questa esercitazione, si usa LastModifytime come colonna del valore limite.
2. Crea un'altra tabella nel data warehouse per archiviare l’ultimo valore limite
Esegui il comando SQL seguente nel data warehouse per creare una tabella denominata watermarktable in cui archiviare l’ultimo valore limite:
create table watermarktable ( TableName varchar(255), WatermarkValue DATETIME2(6), );
Impostare il valore predefinito dell'ultima filigrana con il nome della tabella dei dati di origine. In questa esercitazione, il nome della tabella è data_source_table, e il valore predefinito è
1/1/2010 12:00:00 AM
.INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')
Rivedi i dati nella tabella watermarktable.
Select * from watermarktable
Output:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
3. Crea una procedura memorizzata nel Data Warehouse
Esegui il comando seguente per creare una stored procedure nel data warehouse. Questa stored procedure viene usata per aggiornare l'ultimo valore limite dopo l'ultima esecuzione della pipeline.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Configurare una pipeline per la copia incrementale
Passaggio 1: creare una pipeline
Accedere a Power BI.
Selezionare l'icona di Power BI nella parte in basso a sinistra dello schermo, quindi selezionare Data Factory per aprire la home page di Data Factory.
Accedere all'area di lavoro di Microsoft Fabric.
Selezionare Pipeline di dati e quindi immettere un nome di pipeline per creare una nuova pipeline.
Passaggio 2: aggiungi un'attività di ricerca per l'ultimo marcatore.
In questo passaggio, si crea un'attività di ricerca per ottenere l'ultimo valore di riferimento. Il valore predefinito 1/1/2010 12:00:00 AM
impostato in precedenza è stato ottenuto.
Seleziona Aggiungi attività della pipeline e seleziona Ricerca dall'elenco a discesa.
Nella scheda Generale rinomina questa attività in LookupOldWaterMarkActivity.
Nella scheda Impostazioni, esegui la configurazione seguente:
- Tipo di archivio dati: seleziona Area di lavoro.
- Tipo di archivio dati dell'area di lavoro: seleziona Data Warehouse.
- Data Warehouse: seleziona il data warehouse.
- Usa query: scegli Tabella.
- Tabella: scegli dbo.watermarktable.
- Solo prima riga: selezionata.
Passaggio 3: aggiungi un'attività di ricerca per il nuovo valore limite
In questo passaggio, si crea un'attività di ricerca per ottenere il nuovo valore limite. Si utilizza un'interrogazione per ottenere la nuova filigrana dalla tabella dati di origine. Viene ottenuto il valore massimo nella colonna LastModifytime dalla tabella data_source_table.
Nella barra superiore, seleziona Ricerca nella scheda Attività per aggiungere la seconda attività di ricerca.
Nella scheda Generale, rinomina questa attività in LookupNewWaterMarkActivity.
Nella scheda Impostazioni, esegui la configurazione seguente:
Tipo di archivio dati: seleziona Area di lavoro.
Tipo di archivio dati dell'area di lavoro: seleziona Data Warehouse.
Data Warehouse: seleziona il data warehouse.
Usa query: scegli Query.
Query: immetti la query seguente per selezionare l'ultimo orario di modifica massimo come nuovo riferimento temporale:
select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
Solo prima riga: selezionata.
Passaggio 4: aggiungi l'attività di copia per copiare i dati incrementali
In questo passaggio, si aggiunge un'attività di copia per trasferire i dati incrementali tra l'ultimo marcatore temporale e il nuovo marcatore temporale, dal Data Warehouse al Lakehouse.
Seleziona Attività nella barra superiore e successivamente Copia dati ->Aggiungi all'area di lavoro per ottenere l'attività di copia.
Nella scheda Generale, rinomina questa attività in IncrementalCopyActivity.
Connettere entrambe le attività di Lookup all'attività Copia trascinando il pulsante verde (Successo) dalla sezione delle attività di Lookup verso l'attività Copia. Rilasciare il pulsante del mouse quando il bordo dell'attività di copia diventa di colore verde.
Nella scheda Origine, esegui la configurazione seguente:
Tipo di archivio dati: seleziona Area di lavoro.
Tipo di archivio dati dell'area di lavoro: seleziona Data Warehouse.
Data Warehouse: seleziona il data warehouse.
Usa query: scegli Query.
Query: inserisci la query seguente per copiare i dati incrementali tra l'ultimo marcatore e il nuovo marcatore.
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
Nella scheda Descrizione, esegui la configurazione seguente:
- Tipo di archivio dati: seleziona Area di lavoro.
- Tipo di archivio dati dell'area di lavoro: seleziona Lakehouse.
- Lakehouse: seleziona la tua Lakehouse.
- Cartella radice: scegli File.
-
Percorso file: specifica la cartella in cui archiviare i dati copiati. Fai clic su Sfoglia per selezionare la cartella. Per il nome del file, apri Aggiungi contenuto dinamico e immetti
@CONCAT('Incremental-', pipeline().RunId, '.txt')
nella finestra aperta per creare nomi di file per il file di dati copiato in Lakehouse. - Formato file: seleziona il tipo di formato dei dati.
Passaggio 5: Aggiungere un'attività Stored Procedure
In questo passaggio, viene aggiunta un'attività di stored procedure per aggiornare l'ultimo valore limite per l'esecuzione del successivo pipeline.
Seleziona Attività nella barra superiore e successivamente Procedura memorizzata per aggiungere un'attività procedura memorizzata.
Nella scheda Generale, rinomina questa attività in StoredProceduretoWriteWatermarkActivity.
Connetti l'output contrassegnato in verde (alla riuscita dell'operazione) dell'attività di copia all'attività di procedura memorizzata.
Nella scheda Impostazioni, esegui la configurazione seguente:
Tipo di archivio dati: seleziona Area di lavoro.
Data Warehouse: seleziona il data warehouse.
Nome Stored Procedure: Specifica la stored procedure creata nel data warehouse: [dbo].[usp_write_watermark].
Espandi parametri della procedura memorizzata. Per specificare i valori dei parametri della stored procedure, seleziona Importa e immetti i seguenti valori per i parametri:
Nome tipo Valore OraUltimaModifica Data e Ora @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} NomeTabella String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
Passaggio 6: esegui la pipeline e monitora il risultato
Nella barra superiore, seleziona Esegui nella scheda Home. Seleziona quindi Salva ed esegui. La pipeline inizia l'esecuzione e potrai monitorare la pipeline nella scheda Output.
Vai a Lakehouse, dove troverai il file di dati nella cartella specificata; seleziona quindi il file per visualizzare in anteprima i dati copiati.
Aggiunge altri dati per visualizzare i risultati della copia incrementale
Dopo aver completato la prima esecuzione della pipeline, prova ad aggiungere altri dati nella tabella di origine del data warehouse per vedere se questa pipeline può copiare i dati incrementali.
Passaggio 1: aggiungi altri dati all'origine
Inserisci nuovi dati nel data warehouse eseguendo la query seguente:
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
I dati aggiornati per data_source_table sono:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
Passaggio 2: attiva un'altra esecuzione della pipeline e monitora il risultato
Torna alla pagina della pipeline. Nella barra superiore, seleziona nuovamente Esegui nella scheda Home. La pipeline avvia l'esecuzione e puoi monitorarla in Output.
Vai a Lakehouse, dove troverai il nuovo file di dati copiati nella cartella specificata; seleziona quindi il file per visualizzare in anteprima i dati copiati. I tuoi dati incrementali vengono visualizzati in questo file.
Contenuto correlato
Passare quindi a ulteriori informazioni sulla copia da Archiviazione BLOB di Azure a Lakehouse.