Condividi tramite


Caricare i dati in modo incrementale da Database SQL di Azure ad Archiviazione BLOB di Azure con il portale di Azure

SI APPLICA A: Azure Data Factory Azure Synapse Analytics

Suggerimento

Provare Data Factory in Microsoft Fabric, una soluzione di analisi all-in-one per le aziende. Microsoft Fabric copre tutto, dallo spostamento dati al data science, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Vedere le informazioni su come iniziare una nuova prova gratuita!

In questa esercitazione si crea un'istanza di Azure Data Factory con una pipeline che carica i dati differenziali di una tabella di Database SQL di Azure in archiviazione BLOB di Azure.

In questa esercitazione vengono completati i passaggi seguenti:

  • Preparare l'archivio dati per l'archiviazione del valore limite.
  • Creare una data factory.
  • Creare servizi collegati.
  • Creare i set di dati di origine, sink e limite.
  • Creare una pipeline.
  • Esegui la pipeline.
  • Monitorare l'esecuzione della pipeline.
  • Verificare i risultati
  • Aggiungere altri dati all'origine.
  • Eseguire di nuovo la pipeline.
  • Monitorare la seconda esecuzione della pipeline
  • Esaminare i risultati della seconda esecuzione.

Panoramica

Il diagramma generale della soluzione è il seguente:

Caricare i dati in modo incrementale

Di seguito sono descritti i passaggi fondamentali per la creazione di questa soluzione:

  1. Selezionare la colonna del limite. Selezionare una colonna nell'archivio dati di origine, che può essere usata per analizzare approfonditamente i record nuovi o aggiornati per ogni esecuzione. I dati della colonna selezionata (ad esempio last_modify_time o ID) continuano in genere ad aumentare quando le righe vengono create o aggiornate. Il valore massimo di questa colonna viene usato come limite.

  2. Preparare un archivio dati per l'archiviazione del valore limite. In questa esercitazione si archivia il valore limite in un database SQL.

  3. Creare una pipeline con il flusso di lavoro seguente:

    La pipeline di questa soluzione contiene le attività seguenti:

    • Creare due attività di ricerca. Usare la prima attività di ricerca per recuperare l'ultimo valore limite. Usare la seconda attività di ricerca per recuperare il nuovo valore limite. Questi valori limite vengono passati all'attività di copia.
    • Creare un'attività di copia che copi le righe dell'archivio dati di origine con il valore della colonna del limite maggiore del valore limite precedente e minore del nuovo valore limite. L'attività copierà quindi i dati delta dall'archivio dati di origine a un archivio BLOB come nuovo file.
    • Creare un'attività stored procedure che aggiorni il valore limite per la pipeline che verrà eseguita la volta successiva.

Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.

Prerequisiti

  • Database SQL di Azure. Usare il database come archivio dati di origine. Se non si ha un database in Database SQL di Azure, vedere la procedura per crearne uno descritta in Creare un database in Database SQL di Azure.
  • Archiviazione di Azure. Usare l'archivio BLOB come archivio dati sink. Se non si ha un account di archiviazione, vedere Creare un account di archiviazione per informazioni su come crearne uno. Creare un contenitore denominato adftutorial.

Creare una tabella di origine dati nel database SQL

  1. Aprire SQL Server Management Studio. In Esplora server fare clic con il pulsante destro del mouse sul database e scegliere Nuova query.

  2. Eseguire questo comando SQL sul database SQL per creare una tabella denominata data_source_table come archivio dell'origine dati:

    create table data_source_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    INSERT INTO data_source_table
        (PersonID, Name, LastModifytime)
    VALUES
        (1, 'aaaa','9/1/2017 12:56:00 AM'),
        (2, 'bbbb','9/2/2017 5:23:00 AM'),
        (3, 'cccc','9/3/2017 2:36:00 AM'),
        (4, 'dddd','9/4/2017 3:21:00 AM'),
        (5, 'eeee','9/5/2017 8:06:00 AM');
    

    In questa esercitazione si usa LastModifytime come colonna del valore limite. I dati nell'archivio dell'origine dati sono visualizzati nella tabella seguente:

    PersonID | Name | LastModifytime
    -------- | ---- | --------------
    1        | aaaa | 2017-09-01 00:56:00.000
    2        | bbbb | 2017-09-02 05:23:00.000
    3        | cccc | 2017-09-03 02:36:00.000
    4        | dddd | 2017-09-04 03:21:00.000
    5        | eeee | 2017-09-05 08:06:00.000
    

Creare un'altra tabella nel database SQL per archiviare il valore del limite massimo

  1. Eseguire questo comando SQL sul database SQL per creare una tabella denominata watermarktable in cui archiviare il valore limite:

    create table watermarktable
    (
    
    TableName varchar(255),
    WatermarkValue datetime,
    );
    
  2. Impostare il valore predefinito del limite massimo con il nome tabella dell'archivio dei dati di origine. In questa esercitazione, il nome della tabella è data_source_table.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. Esaminare i dati nella tabella watermarktable.

    Select * from watermarktable
    

    Output:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

Creare una stored procedure nel database SQL

Eseguire questo comando per creare una stored procedure nel database SQL:

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Creare una data factory

  1. Avviare il Web browser Microsoft Edge o Google Chrome. L'interfaccia utente di Data Factory è attualmente supportata solo nei Web browser Microsoft Edge e Google Chrome.

  2. Nel menu sinistro selezionare Crea una risorsa>Integrazione>Data factory:

    Selezione di Data Factory nella

  3. Nella pagina Nuova data factory immettere ADFIncCopyTutorialDF per Nome.

    Il nome dell'istanza di Azure Data Factory deve essere univoco globale. Se viene visualizzato un punto esclamativo rosso con l'errore seguente, modificare il nome della data factory, ad esempio nomeutenteADFIncCopyTutorialDF, e riprovare. Per informazioni sulle regole di denominazione per gli elementi di Data Factory, vedere l'articolo Data Factory - Regole di denominazione.

    Il nome della data factory "ADFIncCopyTutorialDF" non è disponibile

  4. Selezionare la sottoscrizione di Azure in cui creare la data factory.

  5. Per il gruppo di risorse, eseguire una di queste operazioni:

  6. Selezionare V2 per version.

  7. Selezionare la località per la data factory. Nell'elenco a discesa vengono mostrate solo le località supportate. Gli archivi dati (Archiviazione di Azure, Database SQL di Azure, Istanza gestita di SQL di Azure e così via) e le risorse di calcolo (HDInsight e così via) usati dalla data factory possono trovarsi in altre aree.

  8. Cliccare su Crea.

  9. Al termine della creazione verrà visualizzata la pagina Data factory, come illustrato nell'immagine.

    Home page per Azure Data Factory, con il riquadro Apri Azure Data Factory Studio.

  10. Selezionare Apri nel riquadro Apri Azure Data Factory Studio per avviare l'interfaccia utente di Azure Data Factory in una scheda separata.

Creare una pipeline

In questa esercitazione si crea una pipeline con due attività di ricerca, un'attività di copia e un'attività di stored procedure concatenate in una pipeline.

  1. Nella home page dell'interfaccia utente di Data Factory fare clic sul riquadro Orchestrate .On the home page of Data Factory UI, click the Orchestrate tile.

    Screenshot che mostra la home page della data factory con il pulsante Orchestrate evidenziato.

  2. Nel pannello Generale in Proprietà specificare IncrementalCopyPipeline per Nome. Comprimere quindi il pannello facendo clic sull'icona Proprietà nell'angolo in alto a destra.

  3. Verrà ora aggiunta la prima attività di ricerca per recuperare il valore limite precedente. Nella casella degli strumenti Attività espandere Generale e trascinare l'attività Cerca nell'area di progettazione della pipeline. Modificare il nome dell'attività in LookupOldWaterMarkActivity.

    Prima attività di ricerca: nome

  4. Passare alla scheda Impostazioni e fare clic su + Nuovo in corrispondenza di Source Dataset (Set di dati di origine). In questo passaggio viene creato un set di dati per rappresentare i dati in watermarktable. Questa tabella contiene il limite precedente che è stato usato nella precedente operazione di copia.

  5. Nella finestra Nuovo set di dati selezionare Database SQL di Azure e fare clic su Continua. Verrà aperta una nuova finestra per il set di dati.

  6. Nella finestra Imposta proprietà per il set di dati immettere WatermarkDataset per Nome.

  7. Per Servizio collegato, selezionare Nuovo e quindi seguire questa procedura:

    1. Immettere AzureSqlDatabaseLinkedService per Nome.

    2. Selezionare il server per Nome server.

    3. Selezionare il Nome del database dall'elenco a discesa.

    4. Immettere il nome utente e la password.

    5. Per testare la connessione al database SQL di Azure, fare clic su Test connessione.

    6. Fare clic su Fine.

    7. Verificare che per Servizio collegato sia selezionato AzureSqlDatabaseLinkedService.

      Finestra New linked service (Nuovo servizio collegato)

    8. Selezionare Fine.

  8. Nella scheda Connessione selezionare [dbo].[watermarktable] per Tabella. Se si vuole visualizzare un'anteprima dei dati nella tabella, fare clic su Anteprima dati.

    Set di dati limite: impostazioni di connessione

  9. Passare all'editor di pipeline facendo clic sulla scheda della pipeline in alto oppure sul nome della pipeline nella visualizzazione albero a sinistra. Nella finestra delle proprietà per l'attività Ricerca verificare che nel campo Source Dataset (Set di dati di origine) sia selezionato WatermarkDataset.

  10. Nella casella degli strumenti Attività espandere Generale, trascinare un'altra attività Cerca nell'area di progettazione della pipeline e impostare il nome su LookupNewWaterMarkActivity nella scheda Generale della finestra delle proprietà. Questa attività di ricerca recupera il nuovo valore limite dalla tabella con i dati di origine da copiare nella destinazione.

  11. Nella finestra delle proprietà per la seconda attività Cerca passare alla scheda Impostazioni e fare clic su Nuovo. Si creerà un set di dati per fare riferimento alla tabella di origine contenente il nuovo valore limite, ossia il valore massimo di LastModifyTime.

  12. Nella finestra Nuovo set di dati selezionare Database SQL di Azure e fare clic su Continua.

  13. Nella finestra Imposta proprietà immettere SourceDataset per Nome. Selezionare AzureSqlDatabaseLinkedService per Servizio collegato.

  14. Selezionare [dbo].[data_source_table] per Tabella. Più avanti nell'esercitazione si specificherà una query su questo set di dati. La query avrà la precedenza rispetto alla tabella specificata in questo passaggio.

  15. Selezionare Fine.

  16. Passare all'editor di pipeline facendo clic sulla scheda della pipeline in alto oppure sul nome della pipeline nella visualizzazione albero a sinistra. Nella finestra delle proprietà per l'attività Cerca verificare che nel campo Source Dataset (Set di dati di origine) sia selezionato SourceDataset.

  17. Selezionare Query per il campo Use Query (Usa query) e immettere la query seguente, con cui si seleziona solo il valore massimo di LastModifytime da data_source_table. Assicurarsi di aver selezionato anche Solo prima riga.

    select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
    

    Seconda attività di ricerca: query

  18. Nella casella degli strumenti Attività espandere Move & Transform (Sposta e trasforma), trascinare l'attività Copia dalla casella degli strumenti Attività e impostare il nome su IncrementalCopyActivity.

  19. Connettere entrambe le attività di ricerca all'attività di copia trascinando il pulsante verde associato alle attività di ricerca sull'attività di copia. Rilasciare il pulsante del mouse quando il bordo dell'attività di copia diventa di colore blu.

    Connessione delle attività di ricerca all'attività di copia

  20. Selezionare l'attività Copia e verificare che le relative proprietà vengano visualizzate nella finestra Proprietà.

  21. Passare alla scheda Origine nella finestra Proprietà e seguire questa procedura:

    1. Selezionare SourceDataset nel campo Source Dataset (Set di dati di origine).

    2. Selezionare Query per il campo Use Query (Usa query).

    3. Immettere la query SQL seguente nel campo Query.

      select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
      

      Attività di copia: origine

  22. Passare alla scheda Sink e fare clic su + Nuovo in corrispondenza del campo Sink Dataset (Set di dati sink).

  23. In questa esercitazione, l'archivio di dati sink è un archivio BLOB di Azure. Selezionare quindi Archiviazione BLOB di Azure e fare clic su Continua nella finestra Nuovo set di dati.

  24. Nella pagina Select Format (Seleziona formato) selezionare il tipo di formato dei dati e fare clic su Continua.

  25. Nella finestra Imposta proprietà immettere SinkDataset per Nome. Per Servizio collegato selezionare + Nuovo. In questo passaggio si crea una connessione all'archivio BLOB di Azure, ossia un servizio collegato.

  26. Nella finestra New Linked Service (Azure Blog Storage) (Nuovo servizio collegato - Archiviazione BLOB di Azure), procedere come segue:

    1. Immettere AzureStorageLinkedService per Nome.
    2. Selezionare il proprio account di archiviazione di Azure per Nome account di archiviazione.
    3. Verificare la connessione e quindi fare clic su Fine.
  27. Nella finestra Imposta proprietà verificare che per Servizio collegato sia selezionato AzureStorageLinkedService. Quindi selezionare Fine.

  28. Passare alla scheda Connessione di SinkDataset e seguire questa procedura:

    1. Per il campo Percorso file, immettere adftutorial/incrementalcopy. adftutorial è il nome del contenitore BLOB e incrementalcopy è il nome della cartella. Questo frammento di codice presuppone che nell'archivio BLOB sia presente un contenitore BLOB denominato adftutorial. Creare il contenitore se non esiste oppure impostare il nome di un contenitore esistente. Se non esiste, la cartella di output incrementalcopy viene creata automaticamente da Azure Data Factory. È anche possibile usare il pulsante Sfoglia per Percorso file per passare a una cartella in un contenitore BLOB.
    2. Per la parte File del campo Percorso file, fare clic su Aggiungi contenuto dinamico [AL+P] e quindi immettere @CONCAT('Incremental-', pipeline().RunId, '.txt') nella finestra che viene aperta. Quindi selezionare Fine. Il nome file viene generato in modo dinamico usando l'espressione. Ogni esecuzione della pipeline ha un ID univoco, che viene usato dall'attività di copia per generare il nome file.
  29. Passare all'editor di pipeline facendo clic sulla scheda della pipeline in alto oppure sul nome della pipeline nella visualizzazione albero a sinistra.

  30. Nella casella degli strumenti Attività espandere Generale e trascinare l'attività Stored procedure dalla casella degli strumenti Attività all'area di progettazione della pipeline. Connettere l'output contrassegnato in verde (come operazione riuscita) dell'attività Copia all'attività Stored procedure.

  31. Selezionare l'attività Stored procedure nella finestra di progettazione della pipeline e modificarne il nome in StoredProceduretoWriteWatermarkActivity.

  32. Passare alla scheda Account SQL e selezionare AzureSqlDatabaseLinkedService per Servizio collegato.

  33. Passare alla scheda Stored procedure e seguire questa procedura:

    1. In Nome stored procedure selezionare usp_write_watermark.

    2. Per specificare i valori dei parametri della stored procedure, fare clic su Import parameter (Importa parametro) e immettere i valori seguenti per i parametri:

      Nome Type Valore
      LastModifiedtime Data/Ora @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

      Attività stored procedure: impostazioni della stored procedure

  34. Per convalidare le impostazioni della pipeline, fare clic su Convalida sulla barra degli strumenti. Verificare che non siano presenti errori di convalida. Per chiudere la finestra Report di convalida della pipeline, fare clic su >>.

  35. Pubblicare le entità (servizi collegati, set di dati e pipeline) nel servizio Azure Data Factory selezionando il pulsante Pubblica tutti. Attendere fino alla visualizzazione del messaggio che informa che la pubblicazione è riuscita.

Attivare un'esecuzione della pipeline

  1. Fare clic su Aggiungi trigger sulla barra degli strumenti e quindi su Trigger Now (Attiva adesso).

  2. Nella finestra Pipeline Run (Esecuzione di pipeline) selezionare Fine.

Monitorare l'esecuzione della pipeline

  1. Passare alla scheda Monitoraggio a sinistra. Verrà visualizzato lo stato dell'esecuzione della pipeline attivata da un trigger manuale. È possibile usare i collegamenti nella colonna NOME PIPELINE per visualizzare i dettagli dell'esecuzione ed eseguire di nuovo la pipeline.

  2. Per visualizzare le esecuzioni di attività associate all'esecuzione della pipeline, selezionare il collegamento sotto la colonna NOME PIPELINE. Per i dettagli sulle esecuzioni di attività, selezionare il collegamento Dettagli (icona degli occhiali) sotto la colonna NOME ATTIVITÀ. Per tornare alla visualizzazione Esecuzioni della pipeline, selezionare Tutte le esecuzioni di pipeline in alto. Per aggiornare la visualizzazione, selezionare Aggiorna.

Esaminare i risultati

  1. Connettersi all'account di archiviazione di Azure usando strumenti come Azure Storage Explorer. Verificare che sia stato creato un file di output nella cartella incrementalcopy del contenitore adftutorial.

    Primo file di output

  2. Aprire il file di output. Come si può notare, tutti i dati sono stati copiati da data_source_table al file BLOB.

    1,aaaa,2017-09-01 00:56:00.0000000
    2,bbbb,2017-09-02 05:23:00.0000000
    3,cccc,2017-09-03 02:36:00.0000000
    4,dddd,2017-09-04 03:21:00.0000000
    5,eeee,2017-09-05 08:06:00.0000000
    
  3. Verificare il valore più recente da watermarktable. Si noterà che il valore limite è stato aggiornato.

    Select * from watermarktable
    

    Ecco l'output:

    | TableName | WatermarkValue |
    | --------- | -------------- |
    | data_source_table | 2017-09-05	8:06:00.000 |
    

Aggiungere altri dati all'origine

Inserire nuovi dati nel database (archivio dell'origine dati).

INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')

INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')

I dati aggiornati nel database sono i seguenti:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000

Attivare un'altra esecuzione della pipeline

  1. Passare alla scheda Modifica . Fare clic sulla pipeline nella visualizzazione albero se non è aperta nella finestra di progettazione.

  2. Fare clic su Aggiungi trigger sulla barra degli strumenti e quindi su Trigger Now (Attiva adesso).

Monitorare la seconda esecuzione della pipeline

  1. Passare alla scheda Monitoraggio a sinistra. Verrà visualizzato lo stato dell'esecuzione della pipeline attivata da un trigger manuale. È possibile usare i collegamenti nella colonna NOME PIPELINE per visualizzare i dettagli delle attività ed eseguire di nuovo la pipeline.

  2. Per visualizzare le esecuzioni di attività associate all'esecuzione della pipeline, selezionare il collegamento sotto la colonna NOME PIPELINE. Per i dettagli sulle esecuzioni di attività, selezionare il collegamento Dettagli (icona degli occhiali) sotto la colonna NOME ATTIVITÀ. Per tornare alla visualizzazione Esecuzioni della pipeline, selezionare Tutte le esecuzioni di pipeline in alto. Per aggiornare la visualizzazione, selezionare Aggiorna.

Verificare il secondo output

  1. Nell'archivio BLOB è stato creato un altro file. In questa esercitazione, il nome del nuovo file è Incremental-<GUID>.txt. Aprendo il file si noteranno due righe di record.

    6,newdata,2017-09-06 02:23:00.0000000
    7,newdata,2017-09-07 09:01:00.0000000    
    
  2. Verificare il valore più recente da watermarktable. Si noterà che il valore limite è stato aggiornato di nuovo.

    Select * from watermarktable
    

    Output di esempio:

    | TableName | WatermarkValue |
    | --------- | -------------- |
    | data_source_table | 2017-09-07 09:01:00.000 |
    

In questa esercitazione sono stati eseguiti i passaggi seguenti:

  • Preparare l'archivio dati per l'archiviazione del valore limite.
  • Creare una data factory.
  • Creare servizi collegati.
  • Creare i set di dati di origine, sink e limite.
  • Creare una pipeline.
  • Esegui la pipeline.
  • Monitorare l'esecuzione della pipeline.
  • Verificare i risultati
  • Aggiungere altri dati all'origine.
  • Eseguire di nuovo la pipeline.
  • Monitorare la seconda esecuzione della pipeline
  • Esaminare i risultati della seconda esecuzione.

In questa esercitazione la pipeline ha copiato dati da una singola tabella di Database SQL ad Archiviazione BLOB. Passare all'esercitazione successiva per informazioni sulla copia di dati da più tabelle di un database di SQL Server a un database SQL.