Carregue dados incrementalmente do Banco de Dados SQL do Azure para o armazenamento de Blob do Azure usando o PowerShell
APLICA-SE A: Azure Data Factory Azure Synapse Analytics
Gorjeta
Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!
Neste tutorial, você usa o Azure Data Factory para criar um pipeline que carrega dados delta de uma tabela no Banco de Dados SQL do Azure para o armazenamento de Blob do Azure.
Vai executar os seguintes passos neste tutorial:
- Preparar o arquivo de dados para armazenar o valor de limite de tamanho.
- Criar uma fábrica de dados.
- Criar serviços ligados.
- Crie conjuntos de dados de origem, de sink e de marca d'água.
- Criar um pipeline.
- Executar o pipeline.
- Monitorizar a execução do pipeline.
Descrição geral
Eis o diagrama de nível elevado da solução:
Eis os passos importantes para criar esta solução:
Selecionar a coluna de limite de tamanho. Selecione uma coluna no arquivo de dados de origem, que pode ser utilizada para dividir os registos novos ou atualizados para cada execução. Normalmente, os dados nesta coluna selecionada (por exemplo, last_modify_time ou ID) continuam a aumentar quando as linhas são criadas ou atualizadas. O valor máximo nesta coluna é utilizado como limite de tamanho.
Preparar um arquivo de dados para armazenar o valor de limite de tamanho.
Neste tutorial, vai armazenar o valor de marca d'água numa base de dados SQL.Crie um pipeline com o seguinte fluxo de trabalho:
O pipeline nesta solução tem as seguintes atividades:
- Crie duas atividades de Pesquisa. Utilize a primeira atividade Pesquisa para obter o último valor de limite de tamanho. Utilize a segunda para obter o valor de limite de tamanho novo. Estes valores de limite de tamanho são transmitidos para a atividade Copy.
- Crie uma atividade Copiar que copie linhas do armazenamento de dados de origem com o valor da coluna de marca d'água maior que o valor da marca d'água antiga e menor ou igual ao novo valor da marca d'água. Em seguida, copia os dados delta do arquivo de dados de origem para um armazenamento de Blobs como um ficheiro novo.
- Crie uma atividade StoredProcedure, que atualiza o valor de marca d'água do pipeline que vai ser executado da próxima vez.
Se não tiver uma subscrição do Azure, crie uma conta gratuita antes de começar.
Pré-requisitos
Nota
Recomendamos que utilize o módulo Azure Az do PowerShell para interagir com o Azure. Para começar, consulte Instalar o Azure PowerShell. Para saber como migrar para o módulo do Az PowerShell, veja Migrar o Azure PowerShell do AzureRM para o Az.
- Base de Dados SQL do Azure. Vai utilizar a base de dados como o arquivo de dados de origem. Se você não tiver um banco de dados no Banco de Dados SQL do Azure, consulte Criar um banco de dados no Banco de Dados SQL do Azure para conhecer as etapas para criar um.
- Armazenamento do Azure. Vai utilizar o armazenamento de blobs como arquivo de dados de sink. Se não tiver uma conta de armazenamento, veja Criar uma conta de armazenamento para seguir os passos para criar uma. Crie um contentor com o nome adftutorial.
- Azure PowerShell. Siga as instruções em Install and configure Azure PowerShell (Instalar e configurar o Azure PowerShell).
Criar uma tabela de origem de dados na base de dados SQL
Abra o SQL Server Management Studio. No Explorador de Servidores, clique com botão direito do rato na base de dados e escolha Nova Consulta.
Execute o seguinte comando SQL na base de dados SQL para criar uma tabela com o nome
data_source_table
e armazenar o valor de limite de tamaho:create table data_source_table ( PersonID int, Name varchar(255), LastModifytime datetime ); INSERT INTO data_source_table (PersonID, Name, LastModifytime) VALUES (1, 'aaaa','9/1/2017 12:56:00 AM'), (2, 'bbbb','9/2/2017 5:23:00 AM'), (3, 'cccc','9/3/2017 2:36:00 AM'), (4, 'dddd','9/4/2017 3:21:00 AM'), (5, 'eeee','9/5/2017 8:06:00 AM');
Neste tutorial, vai utilizar LastModifytime como a coluna de limite de tamanho. Os dados no arquivo da origem de dados são apresentados na tabela seguinte:
PersonID | Name | LastModifytime -------- | ---- | -------------- 1 | aaaa | 2017-09-01 00:56:00.000 2 | bbbb | 2017-09-02 05:23:00.000 3 | cccc | 2017-09-03 02:36:00.000 4 | dddd | 2017-09-04 03:21:00.000 5 | eeee | 2017-09-05 08:06:00.000
Criar outra tabela na base de dados SQL para armazenar o valor de limite superior de tamanho
Execute o comando SQL seguinte na base de dados SQL para criar uma tabela com o nome
watermarktable
e armazenar o valor de limite de tamanho:create table watermarktable ( TableName varchar(255), WatermarkValue datetime, );
Defina o valor predefinido do limite superior de tamanho com o nome da tabela do arquivo de dados de origem. Neste tutorial, o nome da tabela é data_source_table.
INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')
Reveja os dados na tabela
watermarktable
.Select * from watermarktable
Saída:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
Criar um procedimento armazenado na base de dados SQL
Execute o comando seguinte para criar um procedimento armazenado na base de dados SQL:
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Criar uma fábrica de dados
Defina uma variável para o nome do grupo de recursos que vai utilizar nos comandos do PowerShell mais tarde. Copie o texto do comando seguinte para o PowerShell, especifique um nome para o Grupo de recursos do Azure com aspas duplas e execute o comando. Um exemplo é
"adfrg"
.$resourceGroupName = "ADFTutorialResourceGroup";
Se o grupo de recursos já existir, é possível que não queira substituí-lo. Atribua outro valor à variável
$resourceGroupName
e execute novamente o comando.Defina uma variável para a localização da fábrica de dados.
$location = "East US"
Para criar o grupo de recursos do Azure, execute o comando abaixo:
New-AzResourceGroup $resourceGroupName $location
Se o grupo de recursos já existir, é possível que não queira substituí-lo. Atribua outro valor à variável
$resourceGroupName
e execute novamente o comando.Defina uma variável para o nome da fábrica de dados.
Importante
Atualize o nome da fábrica de dados para que seja globalmente exclusivo. Por exemplo, ADFTutorialFactorySP1127.
$dataFactoryName = "ADFIncCopyTutorialFactory";
Para criar o data factory, execute o seguinte cmdlet Set-AzDataFactoryV2 :
Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "East US" -Name $dataFactoryName
Tenha em conta os seguintes pontos:
O nome da fábrica de dados tem de ser globalmente exclusivo. Se receber o erro seguinte, altere o nome e tente novamente:
The specified Data Factory name 'ADFv2QuickStartDataFactory' is already in use. Data Factory names must be globally unique.
Para criar instâncias do Data Factory, a conta de utilizador que utiliza para iniciar sessão no Azure tem de ser membro das funções contribuidor ou proprietário ou administrador da subscrição do Azure.
Para obter uma lista de regiões do Azure em que o Data Factory está atualmente disponível, selecione as regiões que lhe interessam na página seguinte e, em seguida, expanda Analytics para localizar Data Factory: Produtos disponíveis por região. Os armazenamentos de dados (Armazenamento, Banco de Dados SQL, Instância Gerenciada SQL do Azure e assim por diante) e os cálculos (Azure HDInsight, etc.) usados pela fábrica de dados podem estar em outras regiões.
Criar serviços ligados
Os serviços ligados são criados numa fábrica de dados para ligar os seus arquivos de dados e serviços de computação a essa fábrica de dados. Nesta seção, você cria serviços vinculados à sua conta de armazenamento e ao Banco de dados SQL.
Criar um serviço ligado ao Armazenamento
Crie um ficheiro JSON com o nome AzureStorageLinkedService.json na pasta C:\ADF com o conteúdo seguinte. (Crie a pasta ADF se ela ainda não existir.) Substitua
<accountName>
e<accountKey>
pelo nome e chave da sua conta de armazenamento antes de salvar o arquivo.{ "name": "AzureStorageLinkedService", "properties": { "type": "AzureStorage", "typeProperties": { "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>" } } }
No PowerShell, mude para a pasta ADF.
Execute o cmdlet Set-AzDataFactoryV2LinkedService para criar o serviço vinculado AzureStorageLinkedService. No exemplo seguinte, vai transmitir os valores para os parâmetros ResourceGroupName e DataFactoryName:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
Segue-se o resultado do exemplo:
LinkedServiceName : AzureStorageLinkedService ResourceGroupName : <resourceGroupName> DataFactoryName : <dataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
Criar um serviço ligado à Base de Dados SQL
Crie um ficheiro JSON com o nome AzureSQLDatabaseLinkedService.json na pasta C:\ADF com o conteúdo seguinte. (Crie a pasta ADF se ela ainda não existir.) Substitua <your-server-name> e <your-database-name> pelo nome do servidor e do banco de dados antes de salvar o arquivo. Você também deve configurar o SQL Server do Azure para conceder acesso à identidade gerenciada do data factory.
{ "name": "AzureSqlDatabaseLinkedService", "properties": { "type": "AzureSqlDatabase", "typeProperties": { "connectionString": "Server=tcp:<your-server-name>.database.windows.net,1433;Database=<your-database-name>;" }, "authenticationType": "ManagedIdentity", "annotations": [] } }
No PowerShell, mude para a pasta ADF.
Execute o cmdlet Set-AzDataFactoryV2LinkedService para criar o serviço vinculado AzureSQLDatabaseLinkedService.
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
Segue-se o resultado do exemplo:
LinkedServiceName : AzureSQLDatabaseLinkedService ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService ProvisioningState :
Criar conjuntos de dados
Neste passo, vai criar conjuntos de dados para representar os dados de origem e sink.
Criar um conjunto de dados de origem
Crie um ficheiro JSON com o nome SourceDataset.json na mesma pasta com o seguinte conteúdo:
{ "name": "SourceDataset", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "data_source_table" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }
Neste tutorial, vai utilizar o nome de tabela data_source_table. Substitua-o se utilizar uma tabela com um nome diferente.
Execute o cmdlet Set-AzDataFactoryV2Dataset para criar o conjunto de dados SourceDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
Eis a saída de exemplo do cmdlet:
DatasetName : SourceDataset ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
Criar um conjunto de dados de sink
Crie um ficheiro JSON com o nome SinkDataset.json na mesma pasta com o seguinte conteúdo:
{ "name": "SinkDataset", "properties": { "type": "AzureBlob", "typeProperties": { "folderPath": "adftutorial/incrementalcopy", "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')", "format": { "type": "TextFormat" } }, "linkedServiceName": { "referenceName": "AzureStorageLinkedService", "type": "LinkedServiceReference" } } }
Importante
Este trecho pressupõe que você tenha um contêiner de blob nomeado
adftutorial
em seu armazenamento de blob. Crie o contentor se ainda não existir ou defina-o como o nome de um contentor existente. A pasta de saídaincrementalcopy
é criada automaticamente se não existir no contentor. Neste tutorial, o nome de ficheiro é gerado dinamicamente através da expressão@CONCAT('Incremental-', pipeline().RunId, '.txt')
.Execute o cmdlet Set-AzDataFactoryV2Dataset para criar o conjunto de dados SinkDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
Eis a saída de exemplo do cmdlet:
DatasetName : SinkDataset ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset
Criar um conjunto de dados para uma marca d'água
Neste passo, vai criar um conjunto de dados para armazenar um valor de limite superior de tamanho.
Crie um ficheiro JSON com o nome WatermarkDataset.json na mesma pasta com o seguinte conteúdo:
{ "name": " WatermarkDataset ", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "watermarktable" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }
Execute o cmdlet Set-AzDataFactoryV2Dataset para criar o conjunto de dados WatermarkDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
Eis a saída de exemplo do cmdlet:
DatasetName : WatermarkDataset ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
Criar um pipeline
Neste tutorial, vai criar um pipeline com duas atividades de Pesquisa, uma atividade de Cópia e uma atividade StoredProcedure encadeadas num pipeline.
Crie um ficheiro JSON IncrementalCopyPipeline.json na mesma pasta com o seguinte conteúdo:
{ "name": "IncrementalCopyPipeline", "properties": { "activities": [ { "name": "LookupOldWaterMarkActivity", "type": "Lookup", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select * from watermarktable" }, "dataset": { "referenceName": "WatermarkDataset", "type": "DatasetReference" } } }, { "name": "LookupNewWaterMarkActivity", "type": "Lookup", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select MAX(LastModifytime) as NewWatermarkvalue from data_source_table" }, "dataset": { "referenceName": "SourceDataset", "type": "DatasetReference" } } }, { "name": "IncrementalCopyActivity", "type": "Copy", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'" }, "sink": { "type": "BlobSink" } }, "dependsOn": [ { "activity": "LookupNewWaterMarkActivity", "dependencyConditions": [ "Succeeded" ] }, { "activity": "LookupOldWaterMarkActivity", "dependencyConditions": [ "Succeeded" ] } ], "inputs": [ { "referenceName": "SourceDataset", "type": "DatasetReference" } ], "outputs": [ { "referenceName": "SinkDataset", "type": "DatasetReference" } ] }, { "name": "StoredProceduretoWriteWatermarkActivity", "type": "SqlServerStoredProcedure", "typeProperties": { "storedProcedureName": "usp_write_watermark", "storedProcedureParameters": { "LastModifiedtime": {"value": "@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type": "datetime" }, "TableName": { "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"String"} } }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" }, "dependsOn": [ { "activity": "IncrementalCopyActivity", "dependencyConditions": [ "Succeeded" ] } ] } ] } }
Execute o cmdlet Set-AzDataFactoryV2Pipeline para criar o pipeline IncrementalCopyPipeline.
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
Segue-se o resultado do exemplo:
PipelineName : IncrementalCopyPipeline ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Activities : {LookupOldWaterMarkActivity, LookupNewWaterMarkActivity, IncrementalCopyActivity, StoredProceduretoWriteWatermarkActivity} Parameters :
Executar o pipeline
Execute o pipeline IncrementalCopyPipeline usando o cmdlet Invoke-AzDataFactoryV2Pipeline . Substitua os marcadores de posição pelos nomes do seu grupo de recursos e da sua fábrica de dados.
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
Verifique o status do pipeline executando o cmdlet Get-AzDataFactoryV2ActivityRun até ver todas as atividades sendo executadas com êxito. Substitua os marcadores de posição pela sua hora apropriada para os parâmetros RunStartedAfter e RunStartedBefore. Neste tutorial, vai utilizar -RunStartedAfter "2017/09/14" e -RunStartedBefore "2017/09/15".
Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
Segue-se o resultado do exemplo:
ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupNewWaterMarkActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {NewWatermarkvalue} LinkedServiceName : ActivityRunStart : 9/14/2017 7:42:42 AM ActivityRunEnd : 9/14/2017 7:42:50 AM DurationInMs : 7777 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupOldWaterMarkActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {TableName, WatermarkValue} LinkedServiceName : ActivityRunStart : 9/14/2017 7:42:42 AM ActivityRunEnd : 9/14/2017 7:43:07 AM DurationInMs : 25437 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : IncrementalCopyActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {source, sink} Output : {dataRead, dataWritten, rowsCopied, copyDuration...} LinkedServiceName : ActivityRunStart : 9/14/2017 7:43:10 AM ActivityRunEnd : 9/14/2017 7:43:29 AM DurationInMs : 19769 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : StoredProceduretoWriteWatermarkActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {storedProcedureName, storedProcedureParameters} Output : {} LinkedServiceName : ActivityRunStart : 9/14/2017 7:43:32 AM ActivityRunEnd : 9/14/2017 7:43:47 AM DurationInMs : 14467 Status : Succeeded Error : {errorCode, message, failureType, target}
Rever os resultados
No armazenamento de blobs (arquivo sink), vê que os dados foram copiados para o ficheiro definido em SinkDataset. Neste tutorial, o nome de ficheiro é
Incremental- d4bf3ce2-5d60-43f3-9318-923155f61037.txt
. Ao abrir o ficheiro, poderá ver que os registos no ficheiro são iguais aos dados na base de dados SQL.1,aaaa,2017-09-01 00:56:00.0000000 2,bbbb,2017-09-02 05:23:00.0000000 3,cccc,2017-09-03 02:36:00.0000000 4,dddd,2017-09-04 03:21:00.0000000 5,eeee,2017-09-05 08:06:00.0000000
Verifique o valor mais recente do
watermarktable
. Verá que o valor de limite de tamanho foi atualizado.Select * from watermarktable
Segue-se o resultado do exemplo:
TableName WatermarkValue data_source_table 2017-09-05 8:06:00.000
Inserir dados no arquivo de dados de origem para verificar o carregamento de dados delta
Insira novos dados na base de dados SQL (arquivo de dados de origem).
INSERT INTO data_source_table VALUES (6, 'newdata','9/6/2017 2:23:00 AM') INSERT INTO data_source_table VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
Os dados atualizados na base de dados SQL são:
PersonID | Name | LastModifytime -------- | ---- | -------------- 1 | aaaa | 2017-09-01 00:56:00.000 2 | bbbb | 2017-09-02 05:23:00.000 3 | cccc | 2017-09-03 02:36:00.000 4 | dddd | 2017-09-04 03:21:00.000 5 | eeee | 2017-09-05 08:06:00.000 6 | newdata | 2017-09-06 02:23:00.000 7 | newdata | 2017-09-07 09:01:00.000
Execute o pipeline IncrementalCopyPipeline novamente usando o cmdlet Invoke-AzDataFactoryV2Pipeline . Substitua os marcadores de posição pelos nomes do seu grupo de recursos e da sua fábrica de dados.
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
Verifique o status do pipeline executando o cmdlet Get-AzDataFactoryV2ActivityRun até ver todas as atividades sendo executadas com êxito. Substitua os marcadores de posição pela sua hora apropriada para os parâmetros RunStartedAfter e RunStartedBefore. Neste tutorial, vai utilizar -RunStartedAfter "2017/09/14" e -RunStartedBefore "2017/09/15".
Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
Segue-se o resultado do exemplo:
ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupNewWaterMarkActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {NewWatermarkvalue} LinkedServiceName : ActivityRunStart : 9/14/2017 8:52:26 AM ActivityRunEnd : 9/14/2017 8:52:58 AM DurationInMs : 31758 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupOldWaterMarkActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {TableName, WatermarkValue} LinkedServiceName : ActivityRunStart : 9/14/2017 8:52:26 AM ActivityRunEnd : 9/14/2017 8:52:52 AM DurationInMs : 25497 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : IncrementalCopyActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {source, sink} Output : {dataRead, dataWritten, rowsCopied, copyDuration...} LinkedServiceName : ActivityRunStart : 9/14/2017 8:53:00 AM ActivityRunEnd : 9/14/2017 8:53:20 AM DurationInMs : 20194 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : StoredProceduretoWriteWatermarkActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {storedProcedureName, storedProcedureParameters} Output : {} LinkedServiceName : ActivityRunStart : 9/14/2017 8:53:23 AM ActivityRunEnd : 9/14/2017 8:53:41 AM DurationInMs : 18502 Status : Succeeded Error : {errorCode, message, failureType, target}
No armazenamento de blobs, verá que outro ficheiro foi criado. Neste tutorial, o novo nome de ficheiro é
Incremental-2fc90ab8-d42c-4583-aa64-755dba9925d7.txt
. Abra esse ficheiro e verá duas linhas de registos no mesmo.Verifique o valor mais recente do
watermarktable
. Verá que o valor de marca d’água foi atualizado.Select * from watermarktable
saída de exemplo:
TableName WatermarkValue data_source_table 2017-09-07 09:01:00.000
Conteúdos relacionados
Neste tutorial, executou os passos seguintes:
- Preparar o arquivo de dados para armazenar o valor de limite de tamanho.
- Criar uma fábrica de dados.
- Criar serviços ligados.
- Crie conjuntos de dados de origem, de sink e de marca d'água.
- Criar um pipeline.
- Executar o pipeline.
- Monitorizar a execução do pipeline.
Neste tutorial, o pipeline copiou dados de uma única tabela no Banco de Dados SQL do Azure para o armazenamento de Blob. Avance para o tutorial a seguir para aprender a copiar dados de várias tabelas em um banco de dados do SQL Server para o Banco de dados SQL.