Carregue dados incrementalmente do Banco de Dados SQL do Azure para o armazenamento de Blob do Azure usando o portal do Azure
APLICA-SE A: Azure Data Factory Azure Synapse Analytics
Gorjeta
Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!
Neste tutorial, você cria um Azure Data Factory com um pipeline que carrega dados delta de uma tabela no Banco de Dados SQL do Azure para o armazenamento de Blob do Azure.
Vai executar os seguintes passos neste tutorial:
- Preparar o arquivo de dados para armazenar o valor de limite de tamanho.
- Criar uma fábrica de dados.
- Criar serviços ligados.
- Crie conjuntos de dados de origem, de sink e de marca d'água.
- Criar um pipeline.
- Executar o pipeline.
- Monitorizar a execução do pipeline.
- Rever resultados
- Adicionar mais dados à origem.
- Executar o pipeline novamente.
- Monitorizar a segunda execução do pipeline
- Rever os resultados da segunda execução
Descrição geral
Eis o diagrama de nível elevado da solução:
Eis os passos importantes para criar esta solução:
Selecionar a coluna de limite de tamanho. Selecione uma coluna no arquivo de dados de origem, que pode ser utilizada para dividir os registos novos ou atualizados para cada execução. Normalmente, os dados nesta coluna selecionada (por exemplo, last_modify_time ou ID) continuam a aumentar quando as linhas são criadas ou atualizadas. O valor máximo nesta coluna é utilizado como limite de tamanho.
Preparar um arquivo de dados para armazenar o valor de limite de tamanho. Neste tutorial, vai armazenar o valor de marca d'água numa base de dados SQL.
Crie um pipeline com o seguinte fluxo de trabalho:
O pipeline nesta solução tem as seguintes atividades:
- Crie duas atividades de Pesquisa. Utilize a primeira atividade Pesquisa para obter o último valor de limite de tamanho. Utilize a segunda para obter o valor de limite de tamanho novo. Estes valores de limite de tamanho são transmitidos para a atividade Copy.
- Criar uma atividade Cópia que copia linhas do arquivo de dados de origem com o valor da coluna de limite de tamanho superior ao valor de limite de tamanho antigo e inferior ao valor novo. Em seguida, copia os dados delta do arquivo de dados de origem para um armazenamento de Blobs como um ficheiro novo.
- Crie uma atividade StoredProcedure, que atualiza o valor de marca d'água do pipeline que vai ser executado da próxima vez.
Se não tiver uma subscrição do Azure, crie uma conta gratuita antes de começar.
Pré-requisitos
- Base de Dados SQL do Azure. Vai utilizar a base de dados como o arquivo de dados de origem. Se você não tiver um banco de dados no Banco de Dados SQL do Azure, consulte Criar um banco de dados no Banco de Dados SQL do Azure para conhecer as etapas para criar um.
- Armazenamento do Azure. Vai utilizar o armazenamento de blobs como arquivo de dados de sink. Se não tiver uma conta de armazenamento, veja Criar uma conta de armazenamento para seguir os passos para criar uma. Crie um contentor com o nome adftutorial.
Criar uma tabela de origem de dados na base de dados SQL
Abra o SQL Server Management Studio. No Explorador de Servidores, clique com botão direito do rato na base de dados e escolha Nova Consulta.
Execute o seguinte comando SQL na base de dados SQL para criar uma tabela com o nome
data_source_table
e armazenar o valor de limite de tamaho:create table data_source_table ( PersonID int, Name varchar(255), LastModifytime datetime ); INSERT INTO data_source_table (PersonID, Name, LastModifytime) VALUES (1, 'aaaa','9/1/2017 12:56:00 AM'), (2, 'bbbb','9/2/2017 5:23:00 AM'), (3, 'cccc','9/3/2017 2:36:00 AM'), (4, 'dddd','9/4/2017 3:21:00 AM'), (5, 'eeee','9/5/2017 8:06:00 AM');
Neste tutorial, vai utilizar LastModifytime como a coluna de limite de tamanho. Os dados no arquivo da origem de dados são apresentados na tabela seguinte:
PersonID | Name | LastModifytime -------- | ---- | -------------- 1 | aaaa | 2017-09-01 00:56:00.000 2 | bbbb | 2017-09-02 05:23:00.000 3 | cccc | 2017-09-03 02:36:00.000 4 | dddd | 2017-09-04 03:21:00.000 5 | eeee | 2017-09-05 08:06:00.000
Criar outra tabela na base de dados SQL para armazenar o valor de limite superior de tamanho
Execute o comando SQL seguinte na base de dados SQL para criar uma tabela com o nome
watermarktable
e armazenar o valor de limite de tamanho:create table watermarktable ( TableName varchar(255), WatermarkValue datetime, );
Defina o valor predefinido do limite superior de tamanho com o nome da tabela do arquivo de dados de origem. Neste tutorial, o nome da tabela é data_source_table.
INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')
Reveja os dados na tabela
watermarktable
.Select * from watermarktable
Saída:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
Criar um procedimento armazenado na base de dados SQL
Execute o comando seguinte para criar um procedimento armazenado na base de dados SQL:
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Criar uma fábrica de dados
Abra o browser Microsoft Edge ou Google Chrome. Atualmente, a IU do Data Factory é suportada apenas nos browsers Microsoft Edge e Google Chrome.
No menu à esquerda, selecione Criar um recurso>Integration>Data Factory:
Na página Nova fábrica de dados, introduza ADFIncCopyTutorialDF no nome.
O nome do Azure Data Factory deve ser globalmente exclusivo. Se vir um ponto de exclamação vermelho com o seguinte erro, altere o nome da fábrica de dados (por exemplo, oseunomeADFIncCopyTutorialDF) e tente criá-la novamente. Veja o artigo Data Factory – Naming Rules (Data Factory – Regras de Nomenclatura) para obter as regras de nomenclatura dos artefactos do Data Factory.
O nome da fábrica de dados "ADFIncCopyTutorialDF" não está disponível
Selecione a sua subscrição do Azure na qual pretende criar a fábrica de dados.
No Grupo de Recursos, siga um destes passos:
Selecione Utilizar existente e selecione um grupo de recursos já existente na lista pendente.
Selecione Criar novo e introduza o nome de um grupo de recursos.
Para saber mais sobre os grupos de recursos, veja Utilizar grupos de recursos para gerir os recursos do Azure.
Selecione V2 para a versão.
Selecione a localização da fábrica de dados. Só aparecem na lista pendente as localizações que são suportadas. Os armazenamentos de dados (Armazenamento do Azure, Banco de Dados SQL do Azure, Instância Gerenciada SQL do Azure e assim por diante) e os cálculos (HDInsight, etc.) usados pelo data factory podem estar em outras regiões.
Clique em Criar.
Depois de concluída a criação, vai ver a página Data Factory, conforme mostrado na imagem.
Selecione Abrir no bloco Abrir o Estúdio do Azure Data Factory para iniciar a interface do usuário (UI) do Azure Data Factory em uma guia separada.
Criar um pipeline
Neste tutorial, vai criar um pipeline com duas atividades de Pesquisa, uma atividade de Cópia e uma atividade StoredProcedure encadeadas num pipeline.
Na página inicial da interface do usuário do Data Factory, clique no bloco Orquestrar .
No painel Geral, em Propriedades, especifique IncrementalCopyPipeline para Name. Em seguida, feche o painel clicando no ícone Propriedades no canto superior direito.
Vamos adicionar a primeira atividade Lookup para obter o valor de limite de tamanho antigo. Na caixa de ferramentas Atividades, expanda Geral e arraste e largue a atividade Lookup na superfície de desenho do pipeline. Altere o nome da atividade para LookupOldWaterMarkActivity.
Mude para o separador Definições e clique em + Novo em Conjunto de Dados de Origem. Neste passo, vai criar um conjunto de dados para representar os dados em watermarktable. Esta tabela contém o limite de tamanho antigo que foi utilizado na operação de cópia anterior.
Na janela Novo Conjunto de Dados, selecione Banco de Dados SQL do Azure e clique em Continuar. Você verá uma nova janela aberta para o conjunto de dados.
Na janela Definir propriedades do conjunto de dados, insira WatermarkDataset para Name.
Para Serviço Vinculado, selecione Novo e siga as seguintes etapas:
Introduza AzureSqlDatabaseLinkedService em Nome.
Selecione o servidor para Nome do servidor.
Selecione o nome do banco de dados na lista suspensa.
Introduza o seu Nome de utilizador & Palavra-passe.
Para testar a conexão com o banco de dados SQL, clique em Testar conexão.
Clique em Concluir.
Confirme se AzureSqlDatabaseLinkedService está selecionado para Serviço vinculado.
Selecione Concluir.
Na guia Conexão, selecione [dbo].[ watermarktable] para Table. Se quiser pré-visualizar os dados na tabela, clique em Pré-visualizar dados.
Clique no separador do pipeline, na parte superior, ou clique no nome do pipeline na vista de árvore, do lado esquerdo, para mudar para o editor do pipeline. Na janela de propriedades da atividade Lookup, confirme que WatermarkDataset está selecionado no campo Conjunto de Dados de Origem.
Na caixa de ferramentas Atividades, expanda Geral e arraste e largue outra atividade Lookup na superfície de desenho do pipeline e defina o nome como LookupNewWaterMarkActivity no separador Geral da janela de propriedades. Esta atividade Lookup obtém o valor de limite de tamanho antigo da tabela com a origem de dados que vai ser copiada para o destino.
Na janela de propriedades da segunda atividade Lookup, mude para o separador Definições e clique em Novo. Crie um conjunto de dados que aponte para a tabela de origem que contém o valor de limite de tamanho novo (valor máximo de LastModifyTime).
Na janela Novo Conjunto de Dados, selecione Banco de Dados SQL do Azure e clique em Continuar.
Na janela Definir propriedades, digite SourceDataset para Name. Selecione AzureSqlDatabaseLinkedService em Serviço ligado.
Selecione [dbo].[data_source_table] em Tabela. Vai especificar uma consulta neste conjunto de dados mais adiante no tutorial. A consulta tem precedência sobre a tabela a que especificar neste passo.
Selecione Concluir.
Clique no separador do pipeline, na parte superior, ou clique no nome do pipeline na vista de árvore, do lado esquerdo, para mudar para o editor do pipeline. Na janela de propriedades da atividade Lookup, confirme que SourceDataset está selecionado no campo Conjunto de Dados de Origem.
Selecione Consulta no campo Utilizar Consulta e introduza a consulta seguinte; só está a selecionar o valor máximo de LastModifytime de data_ source_table. Por favor, certifique-se de que também verificou apenas a primeira linha.
select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
Na caixa de ferramentas Atividades, expanda Mover & Transformar, arraste e solte a atividade Copiar da caixa de ferramentas Atividades e defina o nome como IncrementalCopyActivity.
Ligue ambas as atividades Lookup à atividade Copy ao arrastar o botão verde associado às atividades Lookup para a atividade Copy. Largue o botão do rato quando vir que a cor do limite da atividade Copy muda para azul.
Selecione a atividade Copy e confirme que vê as propriedades da atividade na janela Propriedades.
Mude para o separador Origem, na janela Propriedades, e siga os passos abaixo:
Selecione SourceDataset no campo Conjunto de Dados de Origem.
Selecione Consulta no campo Utilizar Consulta.
Introduza a seguinte consulta SQL no campo Consulta.
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
Mude para o separador Sink e clique em + Novo no campo Conjunto de Dados de Sink.
Neste tutorial, o arquivo de dados de sink é do tipo Armazenamento de Blobs do Azure. Portanto, selecione Armazenamento de Blob do Azure e clique em Continuar na janela Novo Conjunto de Dados.
Na janela Selecionar formato, selecione o tipo de formato dos dados e clique em Continuar.
Na janela Definir propriedades, digite SinkDataset para Name. Em Serviço Vinculado, selecione + Novo. Neste passo, vai criar uma ligação (serviço ligado) para o Armazenamento de Blobs do Azure.
Na janela Novo Serviço Vinculado (Armazenamento de Blob do Azure), execute as seguintes etapas:
- Introduza AzureStorageLinkedService em Nome.
- Selecione a sua conta de Armazenamento do Azure em Nome da conta de armazenamento.
- Testar Conexão e clique em Concluir.
Na janela Definir Propriedades, confirme se AzureStorageLinkedService está selecionado para Serviço vinculado. Em seguida, selecione Concluir.
Vá para a guia Conexão de SinkDataset e execute as seguintes etapas:
- Para o campo Caminho do arquivo, insira adftutorial/incrementalcopy. adftutorial é o nome do contentor de blobs e incrementalcopy é o nome da pasta. Este fragmento parte do princípio de que tem um contentor de blobs denominado adftutorial no armazenamento de blobs. Crie o contentor se ainda não existir ou defina-o como o nome de um contentor existente. O Azure Data Factory cria automaticamente a pasta de saída incrementalcopy, se não existir. Também pode utilizar o botão Procurar do Caminho do ficheiro para navegar para uma pasta num contentor de blobs.
- Para a parte Arquivo do campo Caminho do arquivo, selecione Adicionar conteúdo dinâmico [Alt+P] e insira
@CONCAT('Incremental-', pipeline().RunId, '.txt')
na janela aberta. Em seguida, selecione Concluir. O nome do ficheiro é gerado dinamicamente através da expressão. Cada execução de pipeline tem um ID exclusivo. A atividade Copy utiliza o ID de execução para gerar o nome do ficheiro.
Clique no separador do pipeline, na parte superior, ou clique no nome do pipeline na vista de árvore, do lado esquerdo, para mudar para o editor do pipeline.
Na caixa de ferramentas Atividades, expanda Geral e arraste e largue a atividade Stored Procedure da caixa de ferramentas Atividades na superfície de desenho do pipeline. Ligue a saída verde (Êxito) da atividade Copy à atividade Stored Procedure.
Selecione Atividade de Procedimento Armazenado no estruturador do pipeline e altere o nome para StoredProceduretoWriteWatermarkActivity.
Alterne para a guia Conta SQL e selecione AzureSqlDatabaseLinkedService para serviço vinculado.
Mude para o separador Procedimento Armazenado e siga os passos abaixo:
Em Nome do procedimento armazenado, selecione usp_write_watermark.
Para especificar valores para os parâmetros do procedimento armazenado, clique em Importar parâmetro e introduza os seguintes valores para os parâmetros:
Nome Tipo valor LastModifiedtime DateTime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
Para validar as definições do pipeline, clique em Validar, na barra de ferramentas. Confirme que não há erros de validação. Para fechar a janela Relatório de Validação de Pipeline, clique em >>.
Selecione o botão Publicar Tudo para publicar entidades (serviços ligados, conjuntos de dados e pipelines) no serviço Azure Data Factory. Aguarde até ver uma mensagem a indicar que a publicação foi bem-sucedida.
Acionar uma execução de pipeline
Clique em Adicionar gatilho na barra de ferramentas e clique em Gatilho agora.
Na janela Executar Pipeline, selecione Concluir.
Monitorizar a execução do pipeline.
Mude para o separador Monitorizar, no lado esquerdo. Você vê o status da execução do pipeline acionada por um gatilho manual. Você pode usar links na coluna NOME DO PIPELINE para exibir detalhes da execução e executar novamente o pipeline.
Para ver as execuções de atividade associadas à execução do pipeline, selecione o link na coluna NOME DO PIPELINE. Para obter detalhes sobre a atividade executada, selecione o link Detalhes (ícone de óculos) na coluna NOME DA ATIVIDADE. Selecione Todas as execuções de pipeline na parte superior para voltar à visualização Execuções de pipeline. Para atualizar a vista, selecione Atualizar.
Rever os resultados
Utilize ferramentas como o Explorador de Armazenamento do Azure para ligar à sua Conta de Armazenamento do Azure. Verifique se é criado um ficheiro de saída na pasta incrementalcopy do contentor adftutorial.
Abra o ficheiro de saída e repare que todos os dados são copiados de data_source_table para o ficheiro de blob.
1,aaaa,2017-09-01 00:56:00.0000000 2,bbbb,2017-09-02 05:23:00.0000000 3,cccc,2017-09-03 02:36:00.0000000 4,dddd,2017-09-04 03:21:00.0000000 5,eeee,2017-09-05 08:06:00.0000000
Verifique o valor mais recente do
watermarktable
. Verá que o valor de limite de tamanho foi atualizado.Select * from watermarktable
A saída é:
| TableName | WatermarkValue | | --------- | -------------- | | data_source_table | 2017-09-05 8:06:00.000 |
Adicionar mais dados à origem
Insira novos dados em seu banco de dados (armazenamento da fonte de dados).
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
Os dados atualizados na sua base de dados são:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
Acionar outra execução de pipeline
Alterne para a guia Editar . Clique no pipeline na visualização em árvore se ele não estiver aberto no designer.
Clique em Adicionar gatilho na barra de ferramentas e clique em Gatilho agora.
Monitorizar a segunda execução do pipeline
Mude para o separador Monitorizar, no lado esquerdo. Você vê o status da execução do pipeline acionada por um gatilho manual. Você pode usar links na coluna NOME DO PIPELINE para exibir detalhes da atividade e executar novamente o pipeline.
Para ver as execuções de atividade associadas à execução do pipeline, selecione o link na coluna NOME DO PIPELINE. Para obter detalhes sobre a atividade executada, selecione o link Detalhes (ícone de óculos) na coluna NOME DA ATIVIDADE. Selecione Todas as execuções de pipeline na parte superior para voltar à visualização Execuções de pipeline. Para atualizar a vista, selecione Atualizar.
Verificar a segunda saída
No armazenamento de blobs, verá que outro ficheiro foi criado. Neste tutorial, o novo nome de ficheiro é
Incremental-<GUID>.txt
. Abra esse ficheiro e verá duas linhas de registos no mesmo.6,newdata,2017-09-06 02:23:00.0000000 7,newdata,2017-09-07 09:01:00.0000000
Verifique o valor mais recente do
watermarktable
. Verá que o valor de marca d’água foi atualizado.Select * from watermarktable
Saída de exemplo:
| TableName | WatermarkValue | | --------- | -------------- | | data_source_table | 2017-09-07 09:01:00.000 |
Conteúdos relacionados
Neste tutorial, executou os passos seguintes:
- Preparar o arquivo de dados para armazenar o valor de limite de tamanho.
- Criar uma fábrica de dados.
- Criar serviços ligados.
- Crie conjuntos de dados de origem, de sink e de marca d'água.
- Criar um pipeline.
- Executar o pipeline.
- Monitorizar a execução do pipeline.
- Rever resultados
- Adicionar mais dados à origem.
- Executar o pipeline novamente.
- Monitorizar a segunda execução do pipeline
- Rever os resultados da segunda execução
Neste tutorial, o pipeline copiou dados de uma única tabela no Banco de dados SQL para o armazenamento de Blob. Avance para o tutorial a seguir para aprender a copiar dados de várias tabelas em um banco de dados do SQL Server para o Banco de dados SQL.