Condividi tramite


Caricare dati in modo incrementale da più tabelle in SQL Server al database SQL di Azure con PowerShell

SI APPLICA A: Azure Data Factory Azure Synapse Analytics

Suggerimento

Provare Data Factory in Microsoft Fabric, una soluzione di analisi completa per le aziende. Microsoft Fabric copre tutti gli elementi, dallo spostamento dei dati all'analisi scientifica dei dati, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Scopri come avviare gratuitamente una nuova versione di valutazione .

In questa esercitazione si crea un'istanza di Azure Data Factory con una pipeline che carica dati differenziali da più tabelle di un database di SQL Server in Database SQL di Azure.

In questa esercitazione vengono completati i passaggi seguenti:

  • Preparare gli archivi dati di origine e di destinazione.
  • Creare una data factory.
  • Creare un runtime di integrazione self-hosted.
  • Installare il runtime di integrazione.
  • Creare servizi collegati.
  • Creare i set di dati di origine, sink e limite.
  • Creare, eseguire e monitorare una pipeline.
  • Rivedi i risultati.
  • Aggiungere o aggiornare dati nelle tabelle di origine.
  • Rieseguire e monitorare la pipeline.
  • Esaminare i risultati finali.

Panoramica

Di seguito sono descritti i passaggi fondamentali per la creazione di questa soluzione:

  1. Selezionare la colonna del limite.

    Selezionare una colonna per ogni tabella nell'archivio dati di origine, in cui identificare i record nuovi o aggiornati per ogni esecuzione. I dati della colonna selezionata (ad esempio last_modify_time o ID) continuano in genere ad aumentare quando le righe vengono create o aggiornate. Il valore massimo di questa colonna viene usato come limite.

  2. Preparare un archivio dati per l'archiviazione del valore limite.

    In questa esercitazione si archivia il valore limite in un database SQL.

  3. Creare una pipeline con le attività seguenti:

    1. Creare un'attività ForEach che esegue l'iterazione di un elenco di nomi di tabella di origine passato come parametro alla pipeline. Per ogni tabella di origine, l'attività richiama le attività seguenti per eseguire il caricamento differenziale per la tabella.

    2. Creare due attività di ricerca. Usare la prima attività di ricerca per recuperare l'ultimo valore limite. Usare la seconda attività di ricerca per recuperare il nuovo valore limite. Questi valori limite vengono passati all'attività di copia.

    3. Creare un attività Copy che copia le righe dall'archivio dati di origine con il valore della colonna filigrana maggiore del valore limite precedente e minore o uguale al nuovo valore limite. L'attività copierà quindi i dati delta dall'archivio dati di origine a un archivio BLOB di Azure come nuovo file.

    4. Creare un'attività stored procedure che aggiorni il valore limite per la pipeline che verrà eseguita la volta successiva.

    Il diagramma generale della soluzione è il seguente:

    Incrementally load data

Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.

Prerequisiti

  • SQL Server. In questa esercitazione si usa un database di SQL Server come archivio dati di origine.
  • Database SQL di Azure. Usare un database del database SQL di Azure come archivio dati sink. Se non è disponibile un database SQL, vedere Creare un database del database SQL di Azure per crearne uno.

Creare le tabelle di origine nel database di SQL Server

  1. Aprire SQL Server Management Studio (SSMS) o Azure Data Studio e connettersi al database di SQL Server.

  2. In Esplora server (SSMS) o nel riquadro Connessioni (Azure Data Studio) fare clic con il pulsante destro del mouse sul database e scegliere Nuova query.

  3. Eseguire il comando SQL seguente sul database per creare le tabelle denominate customer_table e project_table:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    
     INSERT INTO customer_table
     (PersonID, Name, LastModifytime)
     VALUES
     (1, 'John','9/1/2017 12:56:00 AM'),
     (2, 'Mike','9/2/2017 5:23:00 AM'),
     (3, 'Alice','9/3/2017 2:36:00 AM'),
     (4, 'Andy','9/4/2017 3:21:00 AM'),
     (5, 'Anny','9/5/2017 8:06:00 AM');
    
     INSERT INTO project_table
     (Project, Creationtime)
     VALUES
     ('project1','1/1/2015 0:00:00 AM'),
     ('project2','2/2/2016 1:23:00 AM'),
     ('project3','3/4/2017 5:16:00 AM');
    

Creare le tabelle di destinazione nel database SQL di Azure

  1. Aprire SQL Server Management Studio (SSMS) o Azure Data Studio e connettersi al database di SQL Server.

  2. In Esplora server (SSMS) o nel riquadro Connessioni (Azure Data Studio) fare clic con il pulsante destro del mouse sul database e scegliere Nuova query.

  3. Eseguire il comando SQL seguente sul database per creare le tabelle denominate customer_table e project_table:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    

Creare un'altra tabella nel database SQL di Azure per archiviare il valore del limite massimo

  1. Eseguire il comando SQL seguente nel database SQL per creare una tabella denominata watermarktable in cui archiviare il valore limite:

     create table watermarktable
     (
    
         TableName varchar(255),
         WatermarkValue datetime,
     );
    
  2. Inserire i valori del limite iniziale per entrambe le tabelle di origine nella tabella dei limiti.

     INSERT INTO watermarktable
     VALUES
     ('customer_table','1/1/2010 12:00:00 AM'),
     ('project_table','1/1/2010 12:00:00 AM');
    

Creare una stored procedure nel database SQL di Azure

Eseguire il comando seguente per creare una stored procedure nel database. Questa stored procedure aggiorna il valore del limite dopo ogni esecuzione di pipeline.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Creare tipi di dati e stored procedure aggiuntive nel database SQL di Azure

Eseguire la query seguente per creare due stored procedure e due tipi di dati nel database. Questi elementi vengono usati per unire i dati delle tabelle di origine nelle tabelle di destinazione.

Per semplificare la procedura, si usano direttamente queste stored procedure passando i dati differenziali tramite una variabile di tabella e quindi unendoli nell'archivio di destinazione. Si noti che non è prevista l'archiviazione di un numero di righe differenziali elevato (più di 100) nella variabile di tabella.

Se è necessario unire un numero elevato di righe delta nell'archivio di destinazione, è consigliabile usare un'attività di copia per copiare prima tutti i dati differenziali in una tabella temporanea nell'archivio di destinazione e quindi compilare la propria stored procedure senza usare la variabile di tabella per eseguire il merge dalla tabella temporanea alla tabella finale.

CREATE TYPE DataTypeforCustomerTable AS TABLE(
    PersonID int,
    Name varchar(255),
    LastModifytime datetime
);

GO

CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS

BEGIN
  MERGE customer_table AS target
  USING @customer_table AS source
  ON (target.PersonID = source.PersonID)
  WHEN MATCHED THEN
      UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
  WHEN NOT MATCHED THEN
      INSERT (PersonID, Name, LastModifytime)
      VALUES (source.PersonID, source.Name, source.LastModifytime);
END

GO

CREATE TYPE DataTypeforProjectTable AS TABLE(
    Project varchar(255),
    Creationtime datetime
);

GO

CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS

BEGIN
  MERGE project_table AS target
  USING @project_table AS source
  ON (target.Project = source.Project)
  WHEN MATCHED THEN
      UPDATE SET Creationtime = source.Creationtime
  WHEN NOT MATCHED THEN
      INSERT (Project, Creationtime)
      VALUES (source.Project, source.Creationtime);
END

Azure PowerShell

Installare i moduli di Azure PowerShell più recenti seguendo le istruzioni descritte in Installare e configurare Azure PowerShell.

Creare una data factory

  1. Definire una variabile per il nome del gruppo di risorse usato in seguito nei comandi di PowerShell. Copiare il testo del comando seguente in PowerShell, specificare un nome per il gruppo di risorse di Azure tra virgolette doppie e quindi eseguire il comando. Un esempio è "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Se il gruppo di risorse esiste già, potrebbe essere preferibile non sovrascriverlo. Assegnare un valore diverso alla variabile $resourceGroupName ed eseguire di nuovo il comando.

  2. Definire una variabile per la località della data factory.

    $location = "East US"
    
  3. Per creare il gruppo di risorse di Azure, eseguire questo comando:

    New-AzResourceGroup $resourceGroupName $location
    

    Se il gruppo di risorse esiste già, potrebbe essere preferibile non sovrascriverlo. Assegnare un valore diverso alla variabile $resourceGroupName ed eseguire di nuovo il comando.

  4. Definire una variabile per il nome della data factory.

    Importante

    Aggiornare il nome della data factory in modo che sia univoco a livello globale, Un esempio è ADFIncMultiCopyTutorialFactorySP1127.

    $dataFactoryName = "ADFIncMultiCopyTutorialFactory";
    
  5. Per creare la data factory, eseguire questo cmdlet Set-AzDataFactoryV2:

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
    

Notare i punti seguenti:

  • Il nome della data factory deve essere univoco a livello globale. Se viene visualizzato l'errore seguente, modificare il nome e riprovare:

    Set-AzDataFactoryV2 : HTTP Status Code: Conflict
    Error Code: DataFactoryNameInUse
    Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.
    
  • Per creare istanze di Data Factory, l'account utente usato per accedere ad Azure deve essere un membro dei ruoli collaboratore o proprietario oppure un amministratore della sottoscrizione di Azure.

  • Per un elenco di aree di Azure in cui Data Factory è attualmente disponibile, selezionare le aree di interesse nella pagina seguente, quindi espandere Analytics per individuare Data Factory: Prodotti disponibili in base all'area. Gli archivi dati (Archiviazione di Azure, database SQL, Istanza gestita di SQL e così via) e le risorse di calcolo (Azure HDInsight e così via) usati dalla data factory possono trovarsi in altre aree.

Creare un runtime di integrazione self-hosted

In questa sezione si crea un runtime di integrazione self-hosted e lo si associa a un computer locale con il database di SQL Server. Il runtime di integrazione self-hosted è il componente che copia i dati dall'istanza di SQL Server nel computer al database SQL di Azure.

  1. Creare una variabile per il nome del runtime di integrazione. Usare un nome univoco e prenderne nota. perché sarà usato più avanti in questa esercitazione.

    $integrationRuntimeName = "ADFTutorialIR"
    
  2. Creare un runtime di integrazione self-hosted.

    Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Di seguito è riportato l'output di esempio:

     Name              : <Integration Runtime name>
     Type              : SelfHosted
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Description       : 
     Id                : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIR
    
  3. Eseguire questo comando per recuperare lo stato del runtime di integrazione creato. Verificare che il valore della proprietà State sia impostato su NeedRegistration.

    Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Status
    

    Di seguito è riportato l'output di esempio:

    State                     : NeedRegistration
    Version                   : 
    CreateTime                : 9/24/2019 6:00:00 AM
    AutoUpdate                : On
    ScheduledUpdateDate       : 
    UpdateDelayOffset         : 
    LocalTimeZoneOffset       : 
    InternalChannelEncryption : 
    Capabilities              : {}
    ServiceUrls               : {eu.frontend.clouddatahub.net}
    Nodes                     : {}
    Links                     : {}
    Name                      : ADFTutorialIR
    Type                      : SelfHosted
    ResourceGroupName         : <ResourceGroup name>
    DataFactoryName           : <DataFactory name>
    Description               : 
    Id                        : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>
    
  4. Eseguire questo comando per recuperare le chiavi di autenticazione per la registrazione del runtime di integrazione self-hosted nel servizio Azure Data Factory nel cloud:

    Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-Json
    

    Di seguito è riportato l'output di esempio:

    {
     "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=",
     "AuthKey2":  "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy="
    }
    
  5. Copiare una delle chiavi (escluse le virgolette) usate per la registrazione del runtime di integrazione self-hosted che si installerà nel computer nei passaggi successivi.

Installare lo strumento del runtime di integrazione

  1. Se il runtime di integrazione è già installato nel computer, disinstallarlo tramite Installazione applicazioni.

  2. Scaricare il runtime di integrazione self-hosted in un computer Windows locale. Eseguire l'installazione.

  3. Nella pagina dell'installazione guidata di Microsoft Integration Runtime fare clic su Avanti.

  4. Nella pagina Contratto di licenza con l'utente finale accettare le condizioni e fare clic su Avanti.

  5. Nella pagina Cartella di destinazione fare clic su Avanti.

  6. Nella pagina Pronto per l'installazione fare clic su Installa.

  7. Nella pagina Installazione di Microsoft Integration Runtime completata fare clic su Fine.

  8. Nella pagina Registra Runtime di integrazione (self-hosted) incollare la chiave salvata nella sezione precedente e fare clic su Registra.

    Register the integration runtime

  9. Nella pagina Nuovo nodo di Integration Runtime (self-hosted) fare clic su Fine.

  10. Al termine della registrazione del runtime di integrazione self-hosted viene visualizzato il messaggio seguente:

    Registered successfully

  11. Nella pagina Registra Runtime di integrazione (self-hosted) fare clic su Avvia Configuration Manager.

  12. Quando il nodo viene connesso al servizio cloud, viene visualizzata la pagina seguente:

    Node is connected page

  13. Testare ora la connettività al database di SQL Server.

    Diagnostics tab

    a. Nella pagina Configuration Manager passare alla scheda Diagnostica.

    b. Selezionare SqlServer per il tipo di origine dati.

    c. Immettere il nome del server.

    d. Immettere il nome del database.

    e. Selezionare la modalità di autenticazione.

    f. Immettere il nome utente.

    g. Immettere la password associata al nome utente.

    h. Selezionare Test per verificare che il runtime di integrazione possa connettersi a SQL Server. Se la connessione ha esito positivo, viene visualizzato un segno di spunta verde. Se la connessione non ha esito positivo, viene visualizzato un messaggio di errore. Risolvere eventuali problemi e assicurarsi che il runtime di integrazione possa connettesi a SQL Server.

    Nota

    Prendere nota di questi valori per tipo di autenticazione, server, database, utente e password, perché saranno usati più avanti in questa esercitazione.

Creare servizi collegati

Si creano servizi collegati in una data factory per collegare gli archivi dati e i servizi di calcolo alla data factory. In questa sezione vengono creati i servizi collegati al database di SQL Server e al database nel database SQL di Azure.

Creare il servizio collegato di SQL Server

In questo passaggio si collega il database di SQL Server alla data factory.

  1. Nella cartella C:\ADFTutorials\IncCopyMultiTableTutorial (creare le cartelle locali se non sono già esistenti) creare un file JSON denominato SqlServerLinkedService.json con il contenuto seguente. Selezionare la sezione corretta in base all'autenticazione usata per connettersi a SQL Server.

    Importante

    Selezionare la sezione corretta in base all'autenticazione usata per connettersi a SQL Server.

    Se si usa l'autenticazione SQL, copiare la definizione JSON seguente:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>"
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Se si usa l'autenticazione Windows, copiare la definizione JSON seguente:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>",
                 "userName":"<username> or <domain>\\<username>",
                 "password":{
                     "type":"SecureString",
                     "value":"<password>"
                 }
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Importante

    • Selezionare la sezione corretta in base all'autenticazione usata per connettersi a SQL Server.
    • Sostituire <il nome> del runtime di integrazione con il nome del runtime di integrazione.
    • Sostituire <servername>, <databasename>, <username> e <password> con i valori del database di SQL Server prima di salvare il file.
    • Se è necessario usare una barra (\) nell'account utente o nel nome del server, usare il carattere di escape (\). Un esempio è mydomain\\myuser.
  2. In PowerShell eseguire il cmdlet seguente per passare alla cartella C:\ADFTutorials\IncCopyMultiTableTutorial.

    Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'
    
  3. Eseguire il cmdlet Set-AzDataFactoryV2LinkedService per creare il servizio collegato AzureStorageLinkedService. Nell'esempio seguente si passano i valori per i parametri ResourceGroupName e DataFactoryName:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"
    

    Di seguito è riportato l'output di esempio:

    LinkedServiceName : SqlServerLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
    

Creare il servizio collegato per il database SQL

  1. Nella cartella C:\ADFTutorials\IncCopyMultiTableTutorial creare un file JSON denominato AzureSQLDatabaseLinkedService.json con il contenuto seguente. Se non esiste già, creare la cartella ADF. Sostituire <nome server>, <nome> database, <nome> utente e <password> con il nome del database DI SQL Server, il nome del database, il nome utente e la password prima di salvare il file.

     {
         "name":"AzureSQLDatabaseLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"AzureSqlDatabase",
             "typeProperties":{
                 "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;"
             }
         }
     }
    
  2. In PowerShell eseguire il cmdlet Set-AzDataFactoryV2LinkedService per creare il servizio collegato AzureSQLDatabaseLinkedService.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Di seguito è riportato l'output di esempio:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

Creare i set di dati

In questo passaggio vengono creati i set di dati per rappresentare l'origine dati, la destinazione dati e la posizione di archiviazione del valore limite.

Creare set di dati di origine

  1. Creare un file JSON denominato SourceDataset.json nella stessa cartella con il contenuto seguente:

    {
         "name":"SourceDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"SqlServerLinkedService",
                 "type":"LinkedServiceReference"
             },
             "annotations":[
    
             ],
             "type":"SqlServerTable",
             "schema":[
    
             ]
         }
    }
    

    L'attività di copia nella pipeline usa una query SQL per caricare i dati invece di caricare l'intera tabella.

  2. Eseguire il cmdlet Set-AzDataFactoryV2Dataset per creare il set di dati SourceDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Ecco l'output di esempio del cmdlet:

    DatasetName       : SourceDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
    

Creare un set di dati sink

  1. Nella stessa cartella creare un file JSON denominato SinkDataset.json con il contenuto seguente. L'elemento tableName viene impostato dalla pipeline in modo dinamico in fase di esecuzione. L'attività ForEach nella pipeline esegue l'iterazione di un elenco di nomi di tabella e passa il nome di tabella a questo set di dati in ogni interazione.

     {
         "name":"SinkDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"AzureSQLDatabaseLinkedService",
                 "type":"LinkedServiceReference"
             },
             "parameters":{
                 "SinkTableName":{
                     "type":"String"
                 }
             },
             "annotations":[
    
             ],
             "type":"AzureSqlTable",
             "typeProperties":{
                 "tableName":{
                     "value":"@dataset().SinkTableName",
                     "type":"Expression"
                 }
             }
         }
     }
    
  2. Eseguire il cmdlet Set-AzDataFactoryV2Dataset per creare il set di dati SinkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Ecco l'output di esempio del cmdlet:

    DatasetName       : SinkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Creare un set di dati per un limite

In questo passaggio si crea un set di dati per l'archiviazione di un valore limite massimo.

  1. Nella stessa cartella creare un file JSON denominato WatermarkDataset.json con il contenuto seguente:

     {
         "name": " WatermarkDataset ",
         "properties": {
             "type": "AzureSqlTable",
             "typeProperties": {
                 "tableName": "watermarktable"
             },
             "linkedServiceName": {
                 "referenceName": "AzureSQLDatabaseLinkedService",
                 "type": "LinkedServiceReference"
             }
         }
     }
    
  2. Eseguire il cmdlet Set-AzDataFactoryV2Dataset per creare il set di dati WatermarkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    Ecco l'output di esempio del cmdlet:

    DatasetName       : WatermarkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Creare una pipeline

Questa pipeline accetta un elenco di nomi di tabella come parametro. L'attività ForEach esegue l'iterazione dell'elenco dei nomi di tabella ed esegue le operazioni seguenti:

  1. Usa l'attività di ricerca per recuperare il valore del limite precedente (valore iniziale o usato nell'ultima iterazione).

  2. Usa l'attività di ricerca per recuperare il nuovo valore del limite (valore massimo della colonna dei limiti nella tabella di origine).

  3. Usa l'attività di copia per copiare dati tra i due valori del limite dal database di origine al database di destinazione.

  4. Usa l'attività StoredProcedure per aggiornare il valore del limite precedente da usare nel primo passaggio dell'iterazione successiva.

Creare la pipeline

  1. Nella stessa cartella creare un file JSON denominato IncrementalCopyPipeline.json con il contenuto seguente:

     {
         "name":"IncrementalCopyPipeline",
         "properties":{
             "activities":[
                 {
                     "name":"IterateSQLTables",
                     "type":"ForEach",
                     "dependsOn":[
    
                     ],
                     "userProperties":[
    
                     ],
                     "typeProperties":{
                         "items":{
                             "value":"@pipeline().parameters.tableList",
                             "type":"Expression"
                         },
                         "isSequential":false,
                         "activities":[
                             {
                                 "name":"LookupOldWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"AzureSqlSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from watermarktable where TableName  =  '@{item().TABLE_NAME}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"WatermarkDataset",
                                         "type":"DatasetReference"
                                     }
                                 }
                             },
                             {
                                 "name":"LookupNewWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     },
                                     "firstRowOnly":true
                                 }
                             },
                             {
                                 "name":"IncrementalCopyActivity",
                                 "type":"Copy",
                                 "dependsOn":[
                                     {
                                         "activity":"LookupOldWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     },
                                     {
                                         "activity":"LookupNewWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "sink":{
                                         "type":"AzureSqlSink",
                                         "sqlWriterStoredProcedureName":{
                                             "value":"@{item().StoredProcedureNameForMergeOperation}",
                                             "type":"Expression"
                                         },
                                         "sqlWriterTableType":{
                                             "value":"@{item().TableType}",
                                             "type":"Expression"
                                         },
                                         "storedProcedureTableTypeParameterName":{
                                             "value":"@{item().TABLE_NAME}",
                                             "type":"Expression"
                                         },
                                         "disableMetricsCollection":false
                                     },
                                     "enableStaging":false
                                 },
                                 "inputs":[
                                     {
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     }
                                 ],
                                 "outputs":[
                                     {
                                         "referenceName":"SinkDataset",
                                         "type":"DatasetReference",
                                         "parameters":{
                                             "SinkTableName":{
                                                 "value":"@{item().TABLE_NAME}",
                                                 "type":"Expression"
                                             }
                                         }
                                     }
                                 ]
                             },
                             {
                                 "name":"StoredProceduretoWriteWatermarkActivity",
                                 "type":"SqlServerStoredProcedure",
                                 "dependsOn":[
                                     {
                                         "activity":"IncrementalCopyActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "storedProcedureName":"[dbo].[usp_write_watermark]",
                                     "storedProcedureParameters":{
                                         "LastModifiedtime":{
                                             "value":{
                                                 "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}",
                                                 "type":"Expression"
                                             },
                                             "type":"DateTime"
                                         },
                                         "TableName":{
                                             "value":{
                                                 "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}",
                                                 "type":"Expression"
                                             },
                                             "type":"String"
                                         }
                                     }
                                 },
                                 "linkedServiceName":{
                                     "referenceName":"AzureSQLDatabaseLinkedService",
                                     "type":"LinkedServiceReference"
                                 }
                             }
                         ]
                     }
                 }
             ],
             "parameters":{
                 "tableList":{
                     "type":"array"
                 }
             },
             "annotations":[
    
             ]
         }
     }
    
  2. Eseguire il cmdlet Set-AzDataFactoryV2Pipeline per creare la pipeline IncrementalCopyPipeline.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Di seguito è riportato l'output di esempio:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Activities        : {IterateSQLTables}
     Parameters        : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
    

Eseguire la pipeline

  1. Nella stessa cartella creare un file di parametri denominato Parameters.json con il contenuto seguente:

     {
         "tableList":
         [
             {
                 "TABLE_NAME": "customer_table",
                 "WaterMark_Column": "LastModifytime",
                 "TableType": "DataTypeforCustomerTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
             },
             {
                 "TABLE_NAME": "project_table",
                 "WaterMark_Column": "Creationtime",
                 "TableType": "DataTypeforProjectTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
             }
         ]
     }
    
  2. Eseguire la pipeline IncrementalCopyPipeline usando il cmdlet Invoke-AzDataFactoryV2Pipeline. Sostituire i segnaposto con il nome del gruppo di risorse e della data factory.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    

Monitorare la pipeline

  1. Accedi al portale di Azure.

  2. Selezionare Tutti i servizi, eseguire una ricerca con la parola chiave Data factory e selezionare Data factory.

  3. Cercare la propria data factory nell'elenco delle data factory e selezionarla per aprire la pagina Data factory.

  4. Nella pagina Data factory selezionare Apri nel riquadro Apri Azure Data Factory Studio per avviare Azure Data Factory in una scheda separata.

  5. Nella home page di Azure Data Factory selezionare Monitoraggio sul lato sinistro.

    Screenshot shows the home page for Azure Data Factory.

  6. È possibile visualizzare tutte le esecuzioni di pipeline e i rispettivi stati. Notare che nell'esempio seguente lo stato dell'esecuzione della pipeline è Riuscito. Per verificare i parametri passati alla pipeline, selezionare il collegamento nella colonna Parametri. Se si è verificato un errore, verrà visualizzato un collegamento nella colonna Errore.

    Screenshot shows pipeline runs for a data factory including your pipeline.

  7. Quando si seleziona il collegamento nella colonna Azioni, vengono visualizzate tutte le esecuzioni di attività per la pipeline.

  8. Per tornare alla visualizzazione Pipeline Runs (Esecuzioni di pipeline), selezionare Tutte le esecuzioni della pipeline.

Esaminare i risultati

In SQL Server Management Studio eseguire queste query sul database SQL di destinazione per verificare che i dati siano stati copiati dalle tabelle di origine alle tabelle di destinazione:

Query

select * from customer_table

Output

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            Alice    2017-09-03 02:36:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Query

select * from project_table

Output

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000

Query

select * from watermarktable

Output

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-05 08:06:00.000
project_table    2017-03-04 05:16:00.000

Notare che i valori del limite per entrambe le tabelle sono stati aggiornati.

Aggiungere altri dati alle tabelle di origine

Eseguire questa query sul database di SQL Server di origine per aggiornare una riga esistente in customer_table. Inserire una nuova riga in project_table.

UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3

INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');

Eseguire di nuovo la pipeline

  1. Eseguire di nuovo la pipeline con il comando di PowerShell seguente:

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    
  2. Monitorare le esecuzioni di pipeline seguendo le istruzioni contenute nella sezione Monitorare la pipeline. Quando lo stato della pipeline è In corso, viene visualizzato un altro collegamento a un'azione in Azioni per annullare l'esecuzione della pipeline.

  3. Selezionare Aggiorna per aggiornare l'elenco finché l'esecuzione della pipeline non riesce.

  4. Fare eventualmente clic sul collegamento Visualizza esecuzioni attività in Azioni per visualizzare tutte le esecuzioni di attività associate a questa esecuzione di pipeline.

Esaminare i risultati finali

In SQL Server Management Studio eseguire le query seguenti sul database di destinazione per verificare che i dati nuovi/aggiornati siano stati copiati dalle tabelle di origine alle tabelle di destinazione.

Query

select * from customer_table

Output

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            NewName    2017-09-08 00:00:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Notare i nuovi valori di Name e LastModifytime per PersonID per il numero 3.

Query

select * from project_table

Output

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000
NewProject    2017-10-01 00:00:00.000

Si noti che è stata aggiunta la voce NewProject a project_table.

Query

select * from watermarktable

Output

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-08 00:00:00.000
project_table    2017-10-01 00:00:00.000

Notare che i valori del limite per entrambe le tabelle sono stati aggiornati.

In questa esercitazione sono stati eseguiti i passaggi seguenti:

  • Preparare gli archivi dati di origine e di destinazione.
  • Creare una data factory.
  • Creare un runtime di integrazione self-hosted.
  • Installare il runtime di integrazione.
  • Creare servizi collegati.
  • Creare i set di dati di origine, sink e limite.
  • Creare, eseguire e monitorare una pipeline.
  • Rivedi i risultati.
  • Aggiungere o aggiornare dati nelle tabelle di origine.
  • Rieseguire e monitorare la pipeline.
  • Esaminare i risultati finali.

Passare all'esercitazione successiva per informazioni sulla trasformazione dei dati usando un cluster Spark in Azure: