Compartilhar via


Carregar dados de maneira incremental de várias tabelas no SQL Server para o Banco de Dados SQL do Azure usando o PowerShell

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Dica

Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange desde movimentação de dados até ciência de dados, análise em tempo real, business intelligence e relatórios. Saiba como iniciar uma avaliação gratuita!

Neste tutorial, você criará um Azure Data Factory com um pipeline que carrega dados delta de várias tabelas em um banco de dados do SQL Server no Banco de Dados SQL do Azure.

Neste tutorial, você realizará os seguintes procedimentos:

  • Preparará os armazenamentos de dados de origem e destino.
  • Criar um data factory.
  • Criar um runtime de integração auto-hospedada.
  • Instalar o Integration Runtime.
  • Criar serviços vinculados.
  • Criar os conjuntos de dados de origem, de coletor e de marca-d'água.
  • Crie, execute e monitore um pipeline.
  • Revise os resultados.
  • Adicionar ou atualizar dados nas tabelas de origem.
  • Executar novamente e monitorar o pipeline.
  • Examinar os resultados finais.

Visão geral

Aqui estão as etapas importantes ao criar essa solução:

  1. Selecione a coluna de marca-d'água.

    Selecione uma coluna para cada tabela no armazenamento de dados de origem, na qual você poderá identificar os registros novos ou atualizados de cada execução. Normalmente, os dados nessa coluna selecionada (por exemplo, ID ou last_modify_time) seguem crescendo quando linhas são criadas ou atualizadas. O valor máximo dessa coluna é usado como uma marca-d'água.

  2. Prepare um armazenamento de dados para armazenar o valor de marca-d'água.

    Neste tutorial, você deve armazenar o valor de marca-d'água em um banco de dados SQL.

  3. Crie um pipeline com as seguintes atividades:

    1. Crie uma atividade ForEach que itere em uma lista de nomes de tabela de origem passada como um parâmetro para o pipeline. Para cada tabela de origem, são chamadas as próximas atividades necessárias para o carregamento delta.

    2. Crie duas atividades de pesquisa. Use a primeira atividade de Pesquisa para recuperar o último valor de marca-d'água. Use a segunda atividade de Pesquisa para recuperar o novo valor de marca-d'água. Esses valores de marca-d'água são passados para a atividade de Cópia.

    3. Crie um atividade Copy que copia linhas do armazenamento de dados de origem com o valor da coluna de marca-d'água maior que o valor da marca-d'água antiga e menor ou igual ao novo valor da marca-d'água. Em seguida, ela copia os dados delta do armazenamento de dados de origem para um Armazenamento de Blobs do Azure como um novo arquivo.

    4. Crie uma atividade de StoredProcedure que atualize o valor de marca-d'água para o pipeline que for executado da próxima vez.

    A seguir está diagrama da solução de alto nível:

    Incrementally load data

Se você não tiver uma assinatura do Azure, crie uma conta gratuita antes de começar.

Pré-requisitos

  • SQL Server. Você usa um banco de dados do SQL Server como o armazenamento de dados de origem neste tutorial.
  • Banco de dados SQL do Azure. Você usa um banco de dados no Banco de Dados SQL do Azure como o armazenamento de dados do coletor. Se você não tiver um Banco de Dados SQL, confira Criar um banco de dados no Banco de Dados SQL do Azure para saber as etapas para criar um.

Criar tabelas de origem no banco de dados do SQL Server

  1. Abra o SSMS (SQL Server Management Studio) ou o Azure Data Studio e conecte-se ao banco de dados do SQL Server.

  2. No SSMS (Gerenciador de Servidores) ou no painel Conexões (Azure Data Studio) , clique com o botão direito do mouse no banco de dados e escolha Nova Consulta.

  3. Execute o seguinte comando SQL no banco de dados para criar as tabelas customer_table e project_table:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    
     INSERT INTO customer_table
     (PersonID, Name, LastModifytime)
     VALUES
     (1, 'John','9/1/2017 12:56:00 AM'),
     (2, 'Mike','9/2/2017 5:23:00 AM'),
     (3, 'Alice','9/3/2017 2:36:00 AM'),
     (4, 'Andy','9/4/2017 3:21:00 AM'),
     (5, 'Anny','9/5/2017 8:06:00 AM');
    
     INSERT INTO project_table
     (Project, Creationtime)
     VALUES
     ('project1','1/1/2015 0:00:00 AM'),
     ('project2','2/2/2016 1:23:00 AM'),
     ('project3','3/4/2017 5:16:00 AM');
    

Criar tabelas de destino no Banco de Dados SQL do Azure

  1. Abra o SSMS (SQL Server Management Studio) ou o Azure Data Studio e conecte-se ao banco de dados do SQL Server.

  2. No SSMS (Gerenciador de Servidores) ou no painel Conexões (Azure Data Studio) , clique com o botão direito do mouse no banco de dados e escolha Nova Consulta.

  3. Execute o seguinte comando SQL no banco de dados para criar as tabelas customer_table e project_table:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    

Criar outra tabela no Banco de Dados SQL do Azure para armazenar o valor de marca d'água alta

  1. Execute o seguinte comando SQL no banco de dados para criar uma tabela chamada watermarktable para armazenar o valor de marca-d'água:

     create table watermarktable
     (
    
         TableName varchar(255),
         WatermarkValue datetime,
     );
    
  2. Insira valores de marca d'água iniciais para ambas as tabelas de origem na tabela de marca d'água.

     INSERT INTO watermarktable
     VALUES
     ('customer_table','1/1/2010 12:00:00 AM'),
     ('project_table','1/1/2010 12:00:00 AM');
    

Criar um procedimento armazenado no Banco de Dados SQL do Azure

Execute o comando a seguir para criar um procedimento armazenado no banco de dados. Esse procedimento armazenado atualiza o valor de marca d'água após cada execução de pipeline.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Criar tipos de dados e procedimentos armazenados adicionais no Banco de Dados SQL do Azure

Execute a consulta a seguir para criar dois tipos de dados e dois procedimentos armazenados no banco de dados. Eles são usados para mesclar os dados das tabelas de origem nas tabelas de destino.

Para facilitar a jornada de introdução, usamos diretamente esses procedimentos armazenados passando os dados delta por meio de uma variável de tabela e, em seguida, mesclamos esses dados no repositório de destino. Tenha cuidado, pois ele não espera que um número "grande" de linhas delta (mais de 100) seja armazenado na variável de tabela.

Caso precise mesclar um grande número de linhas delta no repositório de destino, sugerimos que você use a atividade de cópia para copiar todos os dados delta para uma tabela temporária de "preparo" no repositório de destino primeiro e, em seguida, criar seu próprio procedimento armazenado sem o uso da variável de tabela para mesclá-los da tabela de “preparo” para a tabela “final”.

CREATE TYPE DataTypeforCustomerTable AS TABLE(
    PersonID int,
    Name varchar(255),
    LastModifytime datetime
);

GO

CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS

BEGIN
  MERGE customer_table AS target
  USING @customer_table AS source
  ON (target.PersonID = source.PersonID)
  WHEN MATCHED THEN
      UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
  WHEN NOT MATCHED THEN
      INSERT (PersonID, Name, LastModifytime)
      VALUES (source.PersonID, source.Name, source.LastModifytime);
END

GO

CREATE TYPE DataTypeforProjectTable AS TABLE(
    Project varchar(255),
    Creationtime datetime
);

GO

CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS

BEGIN
  MERGE project_table AS target
  USING @project_table AS source
  ON (target.Project = source.Project)
  WHEN MATCHED THEN
      UPDATE SET Creationtime = source.Creationtime
  WHEN NOT MATCHED THEN
      INSERT (Project, Creationtime)
      VALUES (source.Project, source.Creationtime);
END

Azure PowerShell

Instale os módulos mais recentes do Azure PowerShell seguindo as instruções em Instalar e configurar o Azure PowerShell.

Criar uma data factory

  1. Defina uma variável para o nome do grupo de recursos que você usa nos comandos do PowerShell posteriormente. Copie o seguinte texto de comando para o PowerShell, especifique um nome para o grupo de recursos do Azure entre aspas duplas e, em seguida, execute o comando. Um exemplo é "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Se o grupo de recursos já existir, não convém substituí-lo. Atribua um valor diferente para a variável $resourceGroupName e execute o comando novamente.

  2. Defina uma variável para o local do data factory.

    $location = "East US"
    
  3. Para criar o grupo de recursos do Azure, execute o seguinte comando:

    New-AzResourceGroup $resourceGroupName $location
    

    Se o grupo de recursos já existir, não convém substituí-lo. Atribua um valor diferente para a variável $resourceGroupName e execute o comando novamente.

  4. Defina uma variável para o nome do data factory.

    Importante

    Atualize o Nome do data factory para que ele seja globalmente exclusivo. Um exemplo seria ADFIncMultiCopyTutorialFactorySP1127.

    $dataFactoryName = "ADFIncMultiCopyTutorialFactory";
    
  5. Para criar o data factory, execute o seguinte cmdlet Set-AzDataFactoryV2:

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
    

Observe os seguintes pontos:

  • O nome do data factory deve ser globalmente exclusivo. Se você receber o erro a seguir, altere o nome e tente novamente:

    Set-AzDataFactoryV2 : HTTP Status Code: Conflict
    Error Code: DataFactoryNameInUse
    Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.
    
  • Para criar instâncias de Data Factory, a conta de usuário usada para entrar no Azure deve ser um membro das funções colaborador ou proprietário, ou um administrador da assinatura do Azure.

  • Para obter uma lista de regiões do Azure no qual o Data Factory está disponível no momento, selecione as regiões que relevantes para você na página a seguir e, em seguida, expanda Análise para localizar Data Factory: Produtos disponíveis por região. Os armazenamentos de dados (Banco de Dados SQL, Instância Gerenciada de SQL, Armazenamento do Azure etc.) e serviços de computação (Azure HDInsight etc.) usados pelo data factory podem estar em outras regiões.

Criar um Integration Runtime auto-hospedado

Nesta seção, você cria um Integration Runtime auto-hospedado e o associa a um computador local com o banco de dados do SQL Server. O runtime de integração auto-hospedada é o componente que copia os dados do SQL Server no computador para o Banco de Dados SQL do Azure.

  1. Crie uma variável para o nome do Integration Runtime. Use um nome exclusivo e anote-o. Você o usará posteriormente neste tutorial.

    $integrationRuntimeName = "ADFTutorialIR"
    
  2. Criar um runtime de integração auto-hospedada.

    Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Veja o exemplo de saída:

     Name              : <Integration Runtime name>
     Type              : SelfHosted
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Description       : 
     Id                : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIR
    
  3. Para recuperar o status do Integration Runtime criado, execute o comando a seguir. Confirme se o valor da propriedade Estado está definido como NeedRegistration.

    Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Status
    

    Veja o exemplo de saída:

    State                     : NeedRegistration
    Version                   : 
    CreateTime                : 9/24/2019 6:00:00 AM
    AutoUpdate                : On
    ScheduledUpdateDate       : 
    UpdateDelayOffset         : 
    LocalTimeZoneOffset       : 
    InternalChannelEncryption : 
    Capabilities              : {}
    ServiceUrls               : {eu.frontend.clouddatahub.net}
    Nodes                     : {}
    Links                     : {}
    Name                      : ADFTutorialIR
    Type                      : SelfHosted
    ResourceGroupName         : <ResourceGroup name>
    DataFactoryName           : <DataFactory name>
    Description               : 
    Id                        : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>
    
  4. Para recuperar as chaves de autenticação usadas para registrar o Integration Runtime auto-hospedado com o serviço Azure Data Factory na nuvem, execute o comando a seguir:

    Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-Json
    

    Veja o exemplo de saída:

    {
     "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=",
     "AuthKey2":  "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy="
    }
    
  5. Copie uma das chaves (excluindo as aspas) usadas para registrar o Integration Runtime auto-hospedado instalado em seu computador nas próximas etapas.

Instalar a ferramenta Integration Runtime

  1. Se você já tiver o Integration Runtime em seu computador, desinstale-o usando Adicionar ou Remover Programas.

  2. Baixe o runtime de integração auto-hospedada em uma máquina local do Windows. Execute a instalação.

  3. Na página Bem-vindo à instalação do Microsoft Integration Runtime, selecione Avançar.

  4. Na página Contrato de Licença do Usuário Final, aceite os termos e o contrato de licença e selecione Avançar.

  5. Na página Pasta de Destino, selecione Avançar.

  6. Na página Pronto para instalar o Microsoft Integration Runtime, selecione Instalar.

  7. Na página Instalação do Microsoft Integration Runtime concluída, selecione Concluir.

  8. Na página Registrar Integration Runtime (auto-hospedado) , cole a chave que você salvou na seção anterior e selecione Registrar.

    Register the integration runtime

  9. Na página Novo nó do Integration Runtime (auto-hospedado) , selecione Concluir.

  10. Você verá a seguinte mensagem quando o Integration Runtime auto-hospedado for registrado com êxito:

    Registered successfully

  11. Na página Registrar Integration Runtime (auto-hospedado) , selecione Iniciar o Configuration Manager.

  12. Você verá a página a seguir quando o nó estiver conectado ao serviço de nuvem:

    Node is connected page

  13. Agora teste a conectividade com o seu banco de dados do SQL Server.

    Diagnostics tab

    a. Na página Gerenciador de Configurações, vá para a guia Diagnóstico.

    b. Selecione SqlServer para o tipo da fonte de dados.

    c. Insira o nome do servidor.

    d. Insira o nome do banco de dados.

    e. Selecione o modo de autenticação.

    f. Insira o nome de usuário.

    g. Insira a senha associada ao nome de usuário.

    h. Selecione Testar para confirmar que esse runtime de integração pode se conectar ao SQL Server. Você verá uma marca de seleção verde se a conexão tiver êxito. Você verá uma mensagem de erro se a conexão não tiver êxito. Corrija todos os problemas e verifique se o runtime de integração pode se conectar ao SQL Server.

    Observação

    Anote os valores de tipo de autenticação, servidor, banco de dados, usuário e senha. Você os usará posteriormente neste tutorial.

Criar serviços vinculados

Os serviços vinculados são criados em um data factory para vincular seus armazenamentos de dados e serviços de computação ao data factory. Nesta seção, você criará serviços vinculados para o banco de dados do SQL Server e o seu banco de dados no Banco de Dados SQL do Azure.

Criar um serviço vinculado do SQL Server

Nesta etapa, você vincula seu banco de dados do SQL Server ao data factory.

  1. Crie um arquivo JSON chamado SqlServerLinkedService.json na pasta C:\ADFTutorials\IncCopyMultiTableTutorial (crie as pastas locais, caso elas ainda não existam) com o conteúdo a seguir. Selecione a seção correta com base na autenticação que você usa para se conectar ao SQL Server.

    Importante

    Selecione a seção correta com base na autenticação que você usa para se conectar ao SQL Server.

    Se você estiver usando uma autenticação do SQL, copie a seguinte definição de JSON:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>"
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Se você estiver usando uma autenticação do Windows, copie a seguinte definição de JSON:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>",
                 "userName":"<username> or <domain>\\<username>",
                 "password":{
                     "type":"SecureString",
                     "value":"<password>"
                 }
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Importante

    • Selecione a seção correta com base na autenticação que você usa para se conectar ao SQL Server.
    • Substitua o <nome do runtime de integração> pelo nome do seu runtime de integração.
    • Substitua <servername>, <databasename>, <username> e <password> por valores de seu banco de dados do SQL Server antes de salvar o arquivo.
    • Se precisar usar um caractere de barra (\) em sua conta de usuário e nome de servidor, use o caractere de escape (\). Um exemplo é mydomain\\myuser.
  2. No PowerShell, execute o cmdlet a seguir para alternar para a pasta C:\ADFTutorials\IncCopyMultiTableTutorial.

    Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'
    
  3. Execute o cmdlet Set-AzDataFactoryV2LinkedService para criar o serviço vinculado AzureStorageLinkedService. No exemplo a seguir, você passa valores para os parâmetros ResourceGroupName e DataFactoryName:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"
    

    Veja o exemplo de saída:

    LinkedServiceName : SqlServerLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
    

Criar um serviço vinculado do Banco de Dados SQL

  1. Crie um arquivo JSON chamado AzureSQLDatabaseLinkedService.json na pasta C:\ADFTutorials\IncCopyMultiTableTutorial com o conteúdo a seguir. (Crie a pasta ADF se ela não existir). Substitua <nome do servidor>, <nome do banco de dados>, <nome de usuário> e <senha> pelo nome do banco de dados do SQL Server, pelo nome do banco de dados, pelo nome de usuário e pela senha antes de salvar o arquivo.

     {
         "name":"AzureSQLDatabaseLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"AzureSqlDatabase",
             "typeProperties":{
                 "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;"
             }
         }
     }
    
  2. No PowerShell, execute o cmdlet Set-AzDataFactoryV2LinkedService para criar o serviço vinculado AzureSQLDatabaseLinkedService.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Veja o exemplo de saída:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

Criar conjuntos de dados

Nesta etapa, você cria conjuntos de dados para representar a fonte de dados, o destino de dados e o local para armazenar a marca d'água.

Criar um conjunto de dados de origem

  1. Crie um arquivo JSON chamado SourceDataset.json na mesma pasta, com o seguinte conteúdo:

    {
         "name":"SourceDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"SqlServerLinkedService",
                 "type":"LinkedServiceReference"
             },
             "annotations":[
    
             ],
             "type":"SqlServerTable",
             "schema":[
    
             ]
         }
    }
    

    A atividade de Cópia no pipeline usa uma consulta SQL para carregar os dados em vez de carregar a tabela inteira.

  2. Execute o cmdlet Set-AzDataFactoryV2Dataset para criar o conjunto de dados SourceDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Aqui está a amostra de saída do cmdlet:

    DatasetName       : SourceDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
    

Criar um conjunto de dados de coletor

  1. Crie um arquivo JSON chamado SinkDataset.json na mesma pasta com o conteúdo a seguir. O elemento de tableName é definido pelo pipeline dinamicamente em runtime. A atividade ForEach no pipeline itera por meio de uma lista de nomes de tabela e passa o nome da tabela para esse conjunto de dados em cada iteração.

     {
         "name":"SinkDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"AzureSQLDatabaseLinkedService",
                 "type":"LinkedServiceReference"
             },
             "parameters":{
                 "SinkTableName":{
                     "type":"String"
                 }
             },
             "annotations":[
    
             ],
             "type":"AzureSqlTable",
             "typeProperties":{
                 "tableName":{
                     "value":"@dataset().SinkTableName",
                     "type":"Expression"
                 }
             }
         }
     }
    
  2. Execute o cmdlet Set-AzDataFactoryV2Dataset para criar o conjunto de dados SinkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Aqui está a amostra de saída do cmdlet:

    DatasetName       : SinkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Criar um conjunto de dados para uma marca-d'água

Nesta etapa, você deve criar um conjunto de dados para armazenar um valor de marca d'água alta.

  1. Crie um arquivo JSON chamado WatermarkDataset.json na mesma pasta com o seguinte conteúdo:

     {
         "name": " WatermarkDataset ",
         "properties": {
             "type": "AzureSqlTable",
             "typeProperties": {
                 "tableName": "watermarktable"
             },
             "linkedServiceName": {
                 "referenceName": "AzureSQLDatabaseLinkedService",
                 "type": "LinkedServiceReference"
             }
         }
     }
    
  2. Execute o cmdlet Set-AzDataFactoryV2Dataset para criar o conjunto de dados WatermarkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    Aqui está a amostra de saída do cmdlet:

    DatasetName       : WatermarkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Criar um pipeline

O pipeline usa uma lista de nomes de tabela como um parâmetro. A atividade ForEach itera na lista de nomes de tabela e executa as seguintes operações:

  1. Use a atividade de Pesquisa para recuperar o valor antigo de marca-d'água (o valor inicial ou o que foi usado na última iteração).

  2. Use a atividade de Pesquisa para recuperar o novo valor de marca-d'água (o valor máximo da coluna de marca-d'água na tabela de origem).

  3. Use a atividade de Cópia para copiar os dados entre esses dois valores de marca-d'água do banco de dados de origem para o banco de dados de destino.

  4. Use a atividade StoredProcedure para atualizar o valor antigo de marca-d'água a ser usado na primeira etapa da próxima iteração.

Criar o pipeline

  1. Crie um arquivo JSON chamado IncrementalCopyPipeline.json na mesma pasta com o seguinte conteúdo:

     {
         "name":"IncrementalCopyPipeline",
         "properties":{
             "activities":[
                 {
                     "name":"IterateSQLTables",
                     "type":"ForEach",
                     "dependsOn":[
    
                     ],
                     "userProperties":[
    
                     ],
                     "typeProperties":{
                         "items":{
                             "value":"@pipeline().parameters.tableList",
                             "type":"Expression"
                         },
                         "isSequential":false,
                         "activities":[
                             {
                                 "name":"LookupOldWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"AzureSqlSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from watermarktable where TableName  =  '@{item().TABLE_NAME}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"WatermarkDataset",
                                         "type":"DatasetReference"
                                     }
                                 }
                             },
                             {
                                 "name":"LookupNewWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     },
                                     "firstRowOnly":true
                                 }
                             },
                             {
                                 "name":"IncrementalCopyActivity",
                                 "type":"Copy",
                                 "dependsOn":[
                                     {
                                         "activity":"LookupOldWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     },
                                     {
                                         "activity":"LookupNewWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "sink":{
                                         "type":"AzureSqlSink",
                                         "sqlWriterStoredProcedureName":{
                                             "value":"@{item().StoredProcedureNameForMergeOperation}",
                                             "type":"Expression"
                                         },
                                         "sqlWriterTableType":{
                                             "value":"@{item().TableType}",
                                             "type":"Expression"
                                         },
                                         "storedProcedureTableTypeParameterName":{
                                             "value":"@{item().TABLE_NAME}",
                                             "type":"Expression"
                                         },
                                         "disableMetricsCollection":false
                                     },
                                     "enableStaging":false
                                 },
                                 "inputs":[
                                     {
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     }
                                 ],
                                 "outputs":[
                                     {
                                         "referenceName":"SinkDataset",
                                         "type":"DatasetReference",
                                         "parameters":{
                                             "SinkTableName":{
                                                 "value":"@{item().TABLE_NAME}",
                                                 "type":"Expression"
                                             }
                                         }
                                     }
                                 ]
                             },
                             {
                                 "name":"StoredProceduretoWriteWatermarkActivity",
                                 "type":"SqlServerStoredProcedure",
                                 "dependsOn":[
                                     {
                                         "activity":"IncrementalCopyActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "storedProcedureName":"[dbo].[usp_write_watermark]",
                                     "storedProcedureParameters":{
                                         "LastModifiedtime":{
                                             "value":{
                                                 "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}",
                                                 "type":"Expression"
                                             },
                                             "type":"DateTime"
                                         },
                                         "TableName":{
                                             "value":{
                                                 "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}",
                                                 "type":"Expression"
                                             },
                                             "type":"String"
                                         }
                                     }
                                 },
                                 "linkedServiceName":{
                                     "referenceName":"AzureSQLDatabaseLinkedService",
                                     "type":"LinkedServiceReference"
                                 }
                             }
                         ]
                     }
                 }
             ],
             "parameters":{
                 "tableList":{
                     "type":"array"
                 }
             },
             "annotations":[
    
             ]
         }
     }
    
  2. Execute o Set-AzDataFactoryV2Pipeline para criar o pipeline IncrementalCopyPipeline.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Veja o exemplo de saída:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Activities        : {IterateSQLTables}
     Parameters        : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
    

Executar o pipeline

  1. Crie um arquivo de parâmetro chamado Parameters.json na mesma pasta com o seguinte conteúdo:

     {
         "tableList":
         [
             {
                 "TABLE_NAME": "customer_table",
                 "WaterMark_Column": "LastModifytime",
                 "TableType": "DataTypeforCustomerTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
             },
             {
                 "TABLE_NAME": "project_table",
                 "WaterMark_Column": "Creationtime",
                 "TableType": "DataTypeforProjectTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
             }
         ]
     }
    
  2. Execute o pipeline IncrementalCopyPipeline usando o cmdlet Invoke-AzDataFactoryV2Pipeline. Substitua os espaços reservados com seus próprios nomes de grupo de recursos e de data factory.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    

Monitorar o Pipeline

  1. Entre no portal do Azure.

  2. Clique em Todos os serviços, pesquise com a palavra-chave Data factories e selecione Data factories.

  3. Procure seu data factory na lista de data factories e selecione-o para abrir a página Data factory.

  4. Na página Data factory, selecione Abrir no bloco Abrir Azure Data Factory Studio para iniciar a interface do usuário do Data Factory em uma guia separada.

  5. Na página inicial do Azure Data Factory, selecione Monitor no lado esquerdo.

    Screenshot shows the home page for Azure Data Factory.

  6. Você pode ver todas as execuções de pipeline e seus status. Observe que, no exemplo a seguir, o status da execução de pipeline é Com Êxito. Para verificar os parâmetros passados para o pipeline, selecione o link da coluna Parâmetros. Se houver um erro, você verá um link na coluna Erro.

    Screenshot shows pipeline runs for a data factory including your pipeline.

  7. Ao selecionar o link na coluna Ações, você verá todas as execuções de atividade para o pipeline.

  8. Para voltar à exibição Execuções de Pipeline, selecione Todas as Execuções de Pipeline.

Revise os resultados

No SQL Server Management Studio, execute as seguintes consultas no banco de dados SQL de destino para verificar se os dados foram copiados das tabelas de origem para as tabelas de destino:

Consulta

select * from customer_table

Saída

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            Alice    2017-09-03 02:36:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Consulta

select * from project_table

Saída

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000

Consulta

select * from watermarktable

Saída

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-05 08:06:00.000
project_table    2017-03-04 05:16:00.000

Observe que os valores de marca d'água para ambas as tabelas foram atualizados.

Adicionar mais dados às tabelas de origem

Execute a consulta a seguir feita no banco de dados do SQL Server de origem para atualizar uma linha existente na customer_table. Insera uma nova linha em project_table.

UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3

INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');

Executar novamente o pipeline

  1. Agora, execute novamente o pipeline ao executar o seguinte comando do PowerShell:

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    
  2. Monitore as execuções de pipeline seguindo as instruções da seção Monitorar o pipeline. Quando o status do pipeline for Em Andamento, você verá outro link de ação em Ações para cancelar a execução do pipeline.

  3. Selecione Atualizar para atualizar a lista até que a execução do pipeline seja bem-sucedida.

  4. Opcionalmente, clique no link Exibir execuções de atividade em Ações para ver todas as execuções de atividade associadas a esta execução de pipeline.

Examine os resultados finais

No SQL Server Management Studio, execute as seguintes consultas no banco de dados de destino para verificar se os dados atualizados/novos foram copiados das tabelas de origem para as tabelas de destino.

Consulta

select * from customer_table

Saída

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            NewName    2017-09-08 00:00:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Observe os novos valores de Name e LastModifytime para PersonID para o número 3.

Consulta

select * from project_table

Saída

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000
NewProject    2017-10-01 00:00:00.000

Observe que a entrada NewProject foi adicionada a project_table.

Consulta

select * from watermarktable

Saída

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-08 00:00:00.000
project_table    2017-10-01 00:00:00.000

Observe que os valores de marca d'água para ambas as tabelas foram atualizados.

Neste tutorial, você realizará os seguintes procedimentos:

  • Preparará os armazenamentos de dados de origem e destino.
  • Criar um data factory.
  • Criar um Integration Runtime (IR) auto-hospedado.
  • Instalar o Integration Runtime.
  • Criar serviços vinculados.
  • Criar os conjuntos de dados de origem, de coletor e de marca-d'água.
  • Crie, execute e monitore um pipeline.
  • Revise os resultados.
  • Adicionar ou atualizar dados nas tabelas de origem.
  • Executar novamente e monitorar o pipeline.
  • Examinar os resultados finais.

Avance para o tutorial a seguir para saber mais sobre como transformar dados usando um cluster Spark no Azure: