Partilhar via


Carregue dados incrementalmente de várias tabelas no SQL Server para o Banco de Dados SQL do Azure usando o PowerShell

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Gorjeta

Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!

Neste tutorial, você cria um Azure Data Factory com um pipeline que carrega dados delta de várias tabelas em um banco de dados do SQL Server para o Banco de Dados SQL do Azure.

Vai executar os seguintes passos neste tutorial:

  • Prepare os arquivos de dados de origem e de destino.
  • Criar uma fábrica de dados.
  • Criar um integration runtime autoalojado.
  • Instalar o integration runtime.
  • Criar serviços ligados.
  • Crie conjuntos de dados de origem, de sink e de marca d'água.
  • Criar, executar e monitorizar um pipeline.
  • Reveja os resultados.
  • Adicionou ou atualizou os dados nas tabelas de origem.
  • Voltou a executar e a monitorizar o pipeline.
  • Reviu os resultados finais.

Descrição geral

Eis os passos importantes para criar esta solução:

  1. Selecionar a coluna de limite de tamanho.

    Selecione uma coluna para cada tabela no armazenamento de dados de origem, que você pode identificar os registros novos ou atualizados para cada execução. Normalmente, os dados nesta coluna selecionada (por exemplo, last_modify_time ou ID) continuam a aumentar quando as linhas são criadas ou atualizadas. O valor máximo nesta coluna é utilizado como limite de tamanho.

  2. Preparar um arquivo de dados para armazenar o valor de limite de tamanho.

    Neste tutorial, vai armazenar o valor de marca d'água numa base de dados SQL.

  3. Criar um pipeline com as seguintes atividades:

    1. Criar uma atividade ForEach que itera através de uma lista de nomes de tabelas de origem que é transmitida como um parâmetro para o pipeline. Para cada tabela de origem, este invoca as seguintes atividades para efetuar o carregamento de diferenças para essa tabela.

    2. Criar duas atividades de pesquisa. Utilize a primeira atividade Pesquisa para obter o último valor de limite de tamanho. Utilize a segunda para obter o valor de limite de tamanho novo. Estes valores de limite de tamanho são transmitidos para a atividade Copy.

    3. Crie uma atividade Copiar que copie linhas do armazenamento de dados de origem com o valor da coluna de marca d'água maior que o valor da marca d'água antiga e menor ou igual ao novo valor da marca d'água. Em seguida, copia os dados delta do arquivo de dados de origem para o armazenamento de Blobs do Azure como um ficheiro novo.

    4. Crie uma atividade StoredProcedure, que atualiza o valor de marca d'água do pipeline que vai ser executado da próxima vez.

    Eis o diagrama de nível elevado da solução:

    Carregar dados de forma incremental

Se não tiver uma subscrição do Azure, crie uma conta gratuita antes de começar.

Pré-requisitos

  • SQL Server. Você usa um banco de dados do SQL Server como o armazenamento de dados de origem neste tutorial.
  • Base de Dados SQL do Azure. Você usa um banco de dados no Banco de Dados SQL do Azure como o armazenamento de dados do coletor. Se você não tiver um banco de dados SQL, consulte Criar um banco de dados no Banco de Dados SQL do Azure para conhecer as etapas para criar um.

Criar tabelas de origem na base de dados do SQL Server

  1. Abra o SQL Server Management Studio (SSMS) ou o Azure Data Studio e conecte-se ao seu banco de dados do SQL Server.

  2. No Gerenciador de Servidores (SSMS) ou no painel Conexões (Azure Data Studio), clique com o botão direito do mouse no banco de dados e escolha Nova Consulta.

  3. Execute o seguinte comando SQL na base de dados para criar tabelas com o nome customer_table e project_table:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    
     INSERT INTO customer_table
     (PersonID, Name, LastModifytime)
     VALUES
     (1, 'John','9/1/2017 12:56:00 AM'),
     (2, 'Mike','9/2/2017 5:23:00 AM'),
     (3, 'Alice','9/3/2017 2:36:00 AM'),
     (4, 'Andy','9/4/2017 3:21:00 AM'),
     (5, 'Anny','9/5/2017 8:06:00 AM');
    
     INSERT INTO project_table
     (Project, Creationtime)
     VALUES
     ('project1','1/1/2015 0:00:00 AM'),
     ('project2','2/2/2016 1:23:00 AM'),
     ('project3','3/4/2017 5:16:00 AM');
    

Criar tabelas de destino na Base de Dados SQL do Azure

  1. Abra o SQL Server Management Studio (SSMS) ou o Azure Data Studio e conecte-se ao seu banco de dados do SQL Server.

  2. No Gerenciador de Servidores (SSMS) ou no painel Conexões (Azure Data Studio), clique com o botão direito do mouse no banco de dados e escolha Nova Consulta.

  3. Execute o seguinte comando SQL na base de dados para criar tabelas com o nome customer_table e project_table:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    

Criar outra tabela no Banco de Dados SQL do Azure para armazenar o alto valor da marca d'água

  1. Execute o seguinte comando SQL em seu banco de dados para criar uma tabela nomeada watermarktable para armazenar o valor da marca d'água:

     create table watermarktable
     (
    
         TableName varchar(255),
         WatermarkValue datetime,
     );
    
  2. Inserir valores de marca d'água iniciais para ambas as tabelas de origem na tabela de marca d'água.

     INSERT INTO watermarktable
     VALUES
     ('customer_table','1/1/2010 12:00:00 AM'),
     ('project_table','1/1/2010 12:00:00 AM');
    

Criar um procedimento armazenado no Banco de Dados SQL do Azure

Execute o seguinte comando para criar um procedimento armazenado em seu banco de dados. Este procedimento armazenado atualiza o valor de limite de tamanho após cada execução de pipeline.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Criar tipos de dados e procedimentos armazenados adicionais no Banco de Dados SQL do Azure

Execute a consulta a seguir para criar dois procedimentos armazenados e dois tipos de dados em seu banco de dados. São utilizados para intercalar os dados das tabelas de origem nas tabelas de destino.

Para facilitar o início da jornada, usamos diretamente esses Procedimentos Armazenados passando os dados delta por meio de uma variável de tabela e, em seguida, mesclamos os mesmos no repositório de destino. Seja cauteloso, não está esperando que um "grande" número de linhas delta (mais de 100) seja armazenado na variável de tabela.

Se você precisar mesclar um grande número de linhas delta no repositório de destino, sugerimos que você use a atividade de cópia para copiar todos os dados delta em uma tabela temporária de "preparação" no repositório de destino primeiro e, em seguida, crie seu próprio procedimento armazenado sem usar a variável de tabela para mesclá-los da tabela de "preparação" para a tabela "final".

CREATE TYPE DataTypeforCustomerTable AS TABLE(
    PersonID int,
    Name varchar(255),
    LastModifytime datetime
);

GO

CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS

BEGIN
  MERGE customer_table AS target
  USING @customer_table AS source
  ON (target.PersonID = source.PersonID)
  WHEN MATCHED THEN
      UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
  WHEN NOT MATCHED THEN
      INSERT (PersonID, Name, LastModifytime)
      VALUES (source.PersonID, source.Name, source.LastModifytime);
END

GO

CREATE TYPE DataTypeforProjectTable AS TABLE(
    Project varchar(255),
    Creationtime datetime
);

GO

CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS

BEGIN
  MERGE project_table AS target
  USING @project_table AS source
  ON (target.Project = source.Project)
  WHEN MATCHED THEN
      UPDATE SET Creationtime = source.Creationtime
  WHEN NOT MATCHED THEN
      INSERT (Project, Creationtime)
      VALUES (source.Project, source.Creationtime);
END

Azure PowerShell

Siga as instruções em Instalar e Configurar o Azure PowerShell para instalar os módulos mais recentes do Azure PowerShell.

Criar uma fábrica de dados

  1. Defina uma variável para o nome do grupo de recursos que vai utilizar nos comandos do PowerShell mais tarde. Copie o texto do comando seguinte para o PowerShell, especifique um nome para o Grupo de recursos do Azure com aspas duplas e execute o comando. Um exemplo é "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Se o grupo de recursos já existir, é possível que não queira substituí-lo. Atribua outro valor à variável $resourceGroupName e execute novamente o comando.

  2. Defina uma variável para a localização da fábrica de dados.

    $location = "East US"
    
  3. Para criar o grupo de recursos do Azure, execute o comando abaixo:

    New-AzResourceGroup $resourceGroupName $location
    

    Se o grupo de recursos já existir, é possível que não queira substituí-lo. Atribua outro valor à variável $resourceGroupName e execute novamente o comando.

  4. Defina uma variável para o nome da fábrica de dados.

    Importante

    Atualize o nome da fábrica de dados para que seja globalmente exclusivo. Um exemplo é ADFIncMultiCopyTutorialFactorySP1127.

    $dataFactoryName = "ADFIncMultiCopyTutorialFactory";
    
  5. Para criar o data factory, execute o seguinte cmdlet Set-AzDataFactoryV2 :

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
    

Tenha em conta os seguintes pontos:

  • O nome da fábrica de dados tem de ser globalmente exclusivo. Se receber o erro seguinte, altere o nome e tente novamente:

    Set-AzDataFactoryV2 : HTTP Status Code: Conflict
    Error Code: DataFactoryNameInUse
    Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.
    
  • Para criar instâncias do Data Factory, a conta de utilizador que utiliza para iniciar sessão no Azure tem de ser membro das funções contribuidor ou proprietário ou administrador da subscrição do Azure.

  • Para obter uma lista de regiões do Azure em que o Data Factory está atualmente disponível, selecione as regiões que lhe interessam na página seguinte e, em seguida, expanda Analytics para localizar Data Factory: Produtos disponíveis por região. Os armazenamentos de dados (Armazenamento do Azure, Banco de Dados SQL, Instância Gerenciada do SQL e assim por diante) e os cálculos (Azure HDInsight, etc.) usados pelo data factory podem estar em outras regiões.

Criar um integration runtime autoalojado

Nesta secção, vai criar um integration runtime autoalojado e vai associá-lo a um computador no local com a base de dados do SQL Server. O tempo de execução de integração auto-hospedado é o componente que copia dados do SQL Server em sua máquina para o Banco de Dados SQL do Azure.

  1. Crie uma variável para o nome do integration runtime. Utilize um nome exclusivo e tome nota do mesmo. Vai utilizá-lo mais tarde no tutorial.

    $integrationRuntimeName = "ADFTutorialIR"
    
  2. Criar um integration runtime autoalojado.

    Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Segue-se o resultado do exemplo:

     Name              : <Integration Runtime name>
     Type              : SelfHosted
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Description       : 
     Id                : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIR
    
  3. Para obter o estado do integration runtime criado, execute o comando seguinte. Confirme que o valor da propriedade Estado está definido como NeedRegistration.

    Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Status
    

    Segue-se o resultado do exemplo:

    State                     : NeedRegistration
    Version                   : 
    CreateTime                : 9/24/2019 6:00:00 AM
    AutoUpdate                : On
    ScheduledUpdateDate       : 
    UpdateDelayOffset         : 
    LocalTimeZoneOffset       : 
    InternalChannelEncryption : 
    Capabilities              : {}
    ServiceUrls               : {eu.frontend.clouddatahub.net}
    Nodes                     : {}
    Links                     : {}
    Name                      : ADFTutorialIR
    Type                      : SelfHosted
    ResourceGroupName         : <ResourceGroup name>
    DataFactoryName           : <DataFactory name>
    Description               : 
    Id                        : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>
    
  4. Para obter as chaves de autenticação utilizadas para registar o integration runtime autoalojado no serviço Azure Data Factory na cloud, execute o comando seguinte:

    Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-Json
    

    Segue-se o resultado do exemplo:

    {
     "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=",
     "AuthKey2":  "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy="
    }
    
  5. Copie uma das chaves (exclua as aspas) utilizadas para registar o integration runtime autoalojado que instalar no computador nos passos seguintes.

Instalar a ferramenta de tempo de execução de integração

  1. Se já tiver o integration runtime no seu computador, desinstale-o utilizando Adicionar ou Remover Programas.

  2. Transfira o integration runtime autoalojado num computador windows local. Execute a instalação.

  3. Na página Bem-vindo à Configuração do Microsoft Integration Runtime , selecioneSeguinte.

  4. Na página Contrato de Licença do Utilizador Final, aceite os termos e o contrato de licença e selecione Seguinte.

  5. Na página Pasta de Destino, selecione Seguinte.

  6. Na página Pronto para instalar o Microsoft Integration Runtime, selecione Instalar.

  7. Na página Configuração do Microsoft Integration Runtime Concluída, selecione Concluir.

  8. Na página Registar o Integration Runtime (Autoalojado), cole a chave que guardou na secção anterior e selecione Registar.

    Registar o integration runtime

  9. Na página Novo Nó de Tempo de Execução de Integração (Auto-hospedado), selecione Concluir.

  10. Quando o integration runtime autoalojado for registado com êxito, verá a mensagem seguinte:

    Registado com êxito

  11. Na página Registar Integration Runtime (Autoalojado), selecione Configuration Manager.

  12. Quando o nó for ligado ao serviço cloud, verá a página seguinte:

    Página o nó está ligado

  13. Agora, teste a conectividade à base de dados do SQL Server.

    Separador Diagnóstico

    a. Na página do Configuration Manager, vá para o separador Diagnósticos.

    b. Selecione SqlServer para o tipo de origem de dados.

    c. Introduza o nome do servidor.

    d. Introduza o nome da base de dados.

    e. Selecione o modo de autenticação.

    f. Introduza o nome de utilizador.

    g. Introduza a palavra-passe associada ao nome de utilizador.

    h. Selecione Testar para confirmar que o integration runtime se consegue ligar ao SQL Server. Se a ligação for bem-sucedida, verá uma marca de verificação verde. Se a ligação não for bem-sucedida, verá uma mensagem de erro. Corrija os problemas e confirme que o runtime de integração se consegue ligar ao SQL Server.

    Nota

    Tome nota dos valores para o tipo de autenticação, servidor, base de dados, utilizador e palavra-passe. Vai utilizá-los mais tarde no tutorial.

Criar serviços ligados

Os serviços ligados são criados numa fábrica de dados para ligar os seus arquivos de dados e serviços de computação a essa fábrica de dados. Nesta seção, você cria serviços vinculados ao seu banco de dados do SQL Server e ao seu banco de dados no Banco de Dados SQL do Azure.

Criar o serviço ligado do SQL Server

Nesta etapa, você vincula seu banco de dados do SQL Server ao data factory.

  1. Crie um arquivo JSON chamado SqlServerLinkedService.json na pasta C:\ADFTutorials\IncCopyMultiTableTutorial (crie as pastas locais se elas ainda não existirem) com o seguinte conteúdo. Selecione a secção certa com base na autenticação que utiliza para se ligar ao SQL Server.

    Importante

    Selecione a secção certa com base na autenticação que utiliza para se ligar ao SQL Server.

    Se utilizar a autenticação do SQL, copie a seguinte definição JSON:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>"
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Se utilizar a autenticação Windows, copie a seguinte definição JSON:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>",
                 "userName":"<username> or <domain>\\<username>",
                 "password":{
                     "type":"SecureString",
                     "value":"<password>"
                 }
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Importante

    • Selecione a secção certa com base na autenticação que utiliza para se ligar ao SQL Server.
    • Substitua <o nome> do tempo de execução de integração pelo nome do seu tempo de execução de integração.
    • Substitua <servername>, <databasename>, <username> e <password> por valores do banco de dados do SQL Server antes de salvar o arquivo.
    • Se precisar de utilizar um caráter de barra invertida (\) no nome da sua conta de utilizador ou no nome do seu servidor, utilize o caráter de escape (\). Um exemplo é mydomain\\myuser.
  2. No PowerShell, execute o cmdlet a seguir para alternar para a pasta C:\ADFTutorials\IncCopyMultiTableTutorial.

    Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'
    
  3. Execute o cmdlet Set-AzDataFactoryV2LinkedService para criar o serviço vinculado AzureStorageLinkedService. No exemplo seguinte, vai transmitir os valores para os parâmetros ResourceGroupName e DataFactoryName:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"
    

    Segue-se o resultado do exemplo:

    LinkedServiceName : SqlServerLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
    

Criar o serviço vinculado do Banco de dados SQL

  1. Crie um arquivo JSON chamado AzureSQLDatabaseLinkedService.json na pasta C:\ADFTutorials\IncCopyMultiTableTutorial com o seguinte conteúdo. (Crie a pasta ADF se ela ainda não existir.) Substitua <servername>, <database name>, <user name> e <password> pelo nome do banco de dados do SQL Server, nome do banco de dados, nome de usuário e senha antes de salvar o arquivo.

     {
         "name":"AzureSQLDatabaseLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"AzureSqlDatabase",
             "typeProperties":{
                 "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;"
             }
         }
     }
    
  2. No PowerShell, execute o cmdlet Set-AzDataFactoryV2LinkedService para criar o serviço vinculado AzureSQLDatabaseLinkedService.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Segue-se o resultado do exemplo:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

Criar conjuntos de dados

Neste passo, vai criar conjuntos de dados para representar a origem de dados, o destino de dados e o local para armazenar o limite de tamanho.

Criar um conjunto de dados de origem

  1. Crie um ficheiro JSON com o nome SourceDataset.json na mesma pasta com o seguinte conteúdo:

    {
         "name":"SourceDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"SqlServerLinkedService",
                 "type":"LinkedServiceReference"
             },
             "annotations":[
    
             ],
             "type":"SqlServerTable",
             "schema":[
    
             ]
         }
    }
    

    A atividade Cópia no pipeline utiliza uma consulta SQL para carregar os dados em vez de carregar a tabela inteira.

  2. Execute o cmdlet Set-AzDataFactoryV2Dataset para criar o conjunto de dados SourceDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Eis a saída de exemplo do cmdlet:

    DatasetName       : SourceDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
    

Criar um conjunto de dados de sink

  1. Crie um arquivo JSON chamado SinkDataset.json na mesma pasta com o seguinte conteúdo. O elemento tableName é definido pelo pipeline dinamicamente durante a execução. A atividade ForEach no pipeline itera através de uma lista de nomes de tabelas e transmite o nome da tabela para este conjunto de dados em cada iteração.

     {
         "name":"SinkDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"AzureSQLDatabaseLinkedService",
                 "type":"LinkedServiceReference"
             },
             "parameters":{
                 "SinkTableName":{
                     "type":"String"
                 }
             },
             "annotations":[
    
             ],
             "type":"AzureSqlTable",
             "typeProperties":{
                 "tableName":{
                     "value":"@dataset().SinkTableName",
                     "type":"Expression"
                 }
             }
         }
     }
    
  2. Execute o cmdlet Set-AzDataFactoryV2Dataset para criar o conjunto de dados SinkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Eis a saída de exemplo do cmdlet:

    DatasetName       : SinkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Criar um conjunto de dados para uma marca d'água

Neste passo, vai criar um conjunto de dados para armazenar um valor de limite superior de tamanho.

  1. Crie um arquivo JSON chamado WatermarkDataset.json na mesma pasta com o seguinte conteúdo:

     {
         "name": " WatermarkDataset ",
         "properties": {
             "type": "AzureSqlTable",
             "typeProperties": {
                 "tableName": "watermarktable"
             },
             "linkedServiceName": {
                 "referenceName": "AzureSQLDatabaseLinkedService",
                 "type": "LinkedServiceReference"
             }
         }
     }
    
  2. Execute o cmdlet Set-AzDataFactoryV2Dataset para criar o conjunto de dados WatermarkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    Eis a saída de exemplo do cmdlet:

    DatasetName       : WatermarkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Criar um pipeline

O pipeline aceita uma lista de nomes de tabela como parâmetro. A atividade ForEach itera através da lista de nomes de tabela e executa as seguintes operações:

  1. Use a atividade Pesquisa para recuperar o valor da marca d'água antiga (o valor inicial ou o que foi usado na última iteração).

  2. Use a atividade Pesquisa para recuperar o novo valor da marca d'água (o valor máximo da coluna marca d'água na tabela de origem).

  3. Use a atividade Copiar para copiar dados entre esses dois valores de marca d'água do banco de dados de origem para o banco de dados de destino.

  4. Use a atividade StoredProcedure para atualizar o valor da marca d'água antiga a ser usada na primeira etapa da próxima iteração.

Criar o pipeline

  1. Crie um arquivo JSON chamado IncrementalCopyPipeline.json na mesma pasta com o seguinte conteúdo:

     {
         "name":"IncrementalCopyPipeline",
         "properties":{
             "activities":[
                 {
                     "name":"IterateSQLTables",
                     "type":"ForEach",
                     "dependsOn":[
    
                     ],
                     "userProperties":[
    
                     ],
                     "typeProperties":{
                         "items":{
                             "value":"@pipeline().parameters.tableList",
                             "type":"Expression"
                         },
                         "isSequential":false,
                         "activities":[
                             {
                                 "name":"LookupOldWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"AzureSqlSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from watermarktable where TableName  =  '@{item().TABLE_NAME}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"WatermarkDataset",
                                         "type":"DatasetReference"
                                     }
                                 }
                             },
                             {
                                 "name":"LookupNewWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     },
                                     "firstRowOnly":true
                                 }
                             },
                             {
                                 "name":"IncrementalCopyActivity",
                                 "type":"Copy",
                                 "dependsOn":[
                                     {
                                         "activity":"LookupOldWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     },
                                     {
                                         "activity":"LookupNewWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "sink":{
                                         "type":"AzureSqlSink",
                                         "sqlWriterStoredProcedureName":{
                                             "value":"@{item().StoredProcedureNameForMergeOperation}",
                                             "type":"Expression"
                                         },
                                         "sqlWriterTableType":{
                                             "value":"@{item().TableType}",
                                             "type":"Expression"
                                         },
                                         "storedProcedureTableTypeParameterName":{
                                             "value":"@{item().TABLE_NAME}",
                                             "type":"Expression"
                                         },
                                         "disableMetricsCollection":false
                                     },
                                     "enableStaging":false
                                 },
                                 "inputs":[
                                     {
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     }
                                 ],
                                 "outputs":[
                                     {
                                         "referenceName":"SinkDataset",
                                         "type":"DatasetReference",
                                         "parameters":{
                                             "SinkTableName":{
                                                 "value":"@{item().TABLE_NAME}",
                                                 "type":"Expression"
                                             }
                                         }
                                     }
                                 ]
                             },
                             {
                                 "name":"StoredProceduretoWriteWatermarkActivity",
                                 "type":"SqlServerStoredProcedure",
                                 "dependsOn":[
                                     {
                                         "activity":"IncrementalCopyActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "storedProcedureName":"[dbo].[usp_write_watermark]",
                                     "storedProcedureParameters":{
                                         "LastModifiedtime":{
                                             "value":{
                                                 "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}",
                                                 "type":"Expression"
                                             },
                                             "type":"DateTime"
                                         },
                                         "TableName":{
                                             "value":{
                                                 "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}",
                                                 "type":"Expression"
                                             },
                                             "type":"String"
                                         }
                                     }
                                 },
                                 "linkedServiceName":{
                                     "referenceName":"AzureSQLDatabaseLinkedService",
                                     "type":"LinkedServiceReference"
                                 }
                             }
                         ]
                     }
                 }
             ],
             "parameters":{
                 "tableList":{
                     "type":"array"
                 }
             },
             "annotations":[
    
             ]
         }
     }
    
  2. Execute o cmdlet Set-AzDataFactoryV2Pipeline para criar o pipeline IncrementalCopyPipeline.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Segue-se o resultado do exemplo:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Activities        : {IterateSQLTables}
     Parameters        : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
    

Executar o pipeline

  1. Crie um arquivo de parâmetro chamado Parameters.json na mesma pasta com o seguinte conteúdo:

     {
         "tableList":
         [
             {
                 "TABLE_NAME": "customer_table",
                 "WaterMark_Column": "LastModifytime",
                 "TableType": "DataTypeforCustomerTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
             },
             {
                 "TABLE_NAME": "project_table",
                 "WaterMark_Column": "Creationtime",
                 "TableType": "DataTypeforProjectTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
             }
         ]
     }
    
  2. Execute o pipeline IncrementalCopyPipeline usando o cmdlet Invoke-AzDataFactoryV2Pipeline . Substitua os marcadores de posição pelos nomes do seu grupo de recursos e da sua fábrica de dados.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    

Monitorizar o pipeline

  1. Inicie sessão no portal do Azure.

  2. Clique em Todos os serviços, pesquise com a palavra-chave Fábricas de dados e selecione Fábricas de dados.

  3. Pesquise pela sua fábrica de dados na lista de fábricas de dados e selecione-a para abrir a página Fábrica de dados.

  4. Na página Data factory, selecione Abrir no bloco Open Azure Data Factory Studio para iniciar o Azure Data Factory em uma guia separada.

  5. Na home page do Azure Data Factory, selecione Monitor no lado esquerdo.

    A captura de tela mostra a home page do Azure Data Factory.

  6. Pode ver todas as execuções de pipelines e os respetivos estados. Note que no seguinte exemplo, o estado da execução do pipeline é Com Êxito. Para verificar os parâmetros transmitidos para o pipeline, selecione a ligação na coluna Parâmetros. Se tiver ocorrido um erro, pode ver uma ligação na coluna Erro.

    A captura de tela mostra que o pipeline é executado para uma fábrica de dados, incluindo seu pipeline.

  7. Ao selecionar o link na coluna Ações , você verá todas as atividades executadas para o pipeline.

  8. Para voltar ao modo de exibição Execuções de Pipeline, selecione Todas as Execuções de Pipeline.

Rever os resultados

No SQL Server Management Studio, execute as seguintes consultas na base de dados SQL de destino para verificar que os dados foram copiados das tabelas de origem para as tabelas de destino.

Consulta

select * from customer_table

Saída

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            Alice    2017-09-03 02:36:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Consulta

select * from project_table

Saída

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000

Consulta

select * from watermarktable

Saída

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-05 08:06:00.000
project_table    2017-03-04 05:16:00.000

Tenha em atenção que os valores de limite de tamanho de ambas as tabelas foram atualizados.

Adicione mais dados às tabelas de origem

Execute a seguinte consulta na base de dados do SQL Server de origem para atualizar uma linha existente em customer_table. Insira uma linha nova em project_table.

UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3

INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');

Volte a executar o pipeline

  1. Agora, execute novamente o pipeline executando o seguinte comando do PowerShell:

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    
  2. Siga as instruções na secção Monitorizar o pipeline para monitorizar as execuções do pipeline. Quando o status do pipeline estiver Em andamento, você verá outro link de ação em Ações para cancelar a execução do pipeline.

  3. Selecione Atualizar para atualizar a lista até a execução do pipeline ter êxito.

  4. Opcionalmente, selecione a ligação Ver Execuções de Atividades, em Ações, para ver todas a execuções de atividades associadas a esta execução de pipeline.

Rever os resultados finais

No SQL Server Management Studio, execute as seguintes consultas na base de dados de destino para verificar que os dados atualizados/novos foram copiados a partir de tabelas de origem para tabelas de destino.

Consulta

select * from customer_table

Saída

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            NewName    2017-09-08 00:00:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Repare nos valores novos de Name e LastModifytime para PersonID relativamente ao número 3.

Consulta

select * from project_table

Saída

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000
NewProject    2017-10-01 00:00:00.000

Repare que a entrada NewProject foi adicionada a project_table.

Consulta

select * from watermarktable

Saída

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-08 00:00:00.000
project_table    2017-10-01 00:00:00.000

Tenha em atenção que os valores de limite de tamanho de ambas as tabelas foram atualizados.

Neste tutorial, executou os passos seguintes:

  • Prepare os arquivos de dados de origem e de destino.
  • Criar uma fábrica de dados.
  • Criou um integration runtime autoalojado (IR).
  • Instalar o integration runtime.
  • Criar serviços ligados.
  • Crie conjuntos de dados de origem, de sink e de marca d'água.
  • Criar, executar e monitorizar um pipeline.
  • Reveja os resultados.
  • Adicionou ou atualizou os dados nas tabelas de origem.
  • Voltou a executar e a monitorizar o pipeline.
  • Reviu os resultados finais.

Avance para o tutorial seguinte para saber como transformar dados através de um cluster do Spark no Azure: