培训
认证
Microsoft Certified: Azure Data Engineer Associate - Certifications
演示如何了解使用多种 Azure 服务在 Microsoft Azure 上实现和管理数据工程工作负荷的常见数据工程任务。
你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
适用于: Azure 数据工厂
Azure Synapse Analytics
提示
试用 Microsoft Fabric 中的数据工厂,这是一种适用于企业的一站式分析解决方案。 Microsoft Fabric 涵盖从数据移动到数据科学、实时分析、商业智能和报告的所有内容。 了解如何免费开始新的试用!
本教程将使用 Azure 数据工厂创建一个管道,用于将 Azure SQL 数据库的表中的增量数据加载到 Azure Blob 存储。
在本教程中执行以下步骤:
下面是高级解决方案示意图:
下面是创建此解决方案所要执行的重要步骤:
选择水印列。 在源数据存储中选择一个列,该列可用于将每个运行的新记录或已更新记录切片。 通常,在创建或更新行时,此选定列中的数据(例如 last_modify_time 或 ID)会不断递增。 此列中的最大值用作水印。
准备用于存储水印值的数据存储。
本教程在 SQL 数据库中存储水印值。
创建采用以下工作流的管道:
此解决方案中的管道具有以下活动:
如果没有 Azure 订阅,请在开始之前创建一个免费帐户。
备注
建议使用 Azure Az PowerShell 模块与 Azure 交互。 若要开始,请参阅安装 Azure PowerShell。 若要了解如何迁移到 Az PowerShell 模块,请参阅 将 Azure PowerShell 从 AzureRM 迁移到 Az。
打开 SQL Server Management Studio。 在“服务器资源管理器”中,右键单击数据库,然后选择“新建查询”。
针对 SQL 数据库运行以下 SQL 命令,创建名为 data_source_table
的表作为数据源存储:
create table data_source_table
(
PersonID int,
Name varchar(255),
LastModifytime datetime
);
INSERT INTO data_source_table
(PersonID, Name, LastModifytime)
VALUES
(1, 'aaaa','9/1/2017 12:56:00 AM'),
(2, 'bbbb','9/2/2017 5:23:00 AM'),
(3, 'cccc','9/3/2017 2:36:00 AM'),
(4, 'dddd','9/4/2017 3:21:00 AM'),
(5, 'eeee','9/5/2017 8:06:00 AM');
本教程使用 LastModifytime 作为水印列。 下表显示了数据源存储中的数据:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
针对 SQL 数据库运行以下 SQL 命令,创建名为 watermarktable
的表,用于存储水印值:
create table watermarktable
(
TableName varchar(255),
WatermarkValue datetime,
);
使用源数据存储的表名设置高水印的默认值。 在本教程中,表名为 data_source_table。
INSERT INTO watermarktable
VALUES ('data_source_table','1/1/2010 12:00:00 AM')
查看 watermarktable
表中的数据。
Select * from watermarktable
输出:
TableName | WatermarkValue
---------- | --------------
data_source_table | 2010-01-01 00:00:00.000
运行以下命令,在 SQL 数据库中创建存储过程:
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
为资源组名称定义一个变量,稍后会在 PowerShell 命令中使用该变量。 将以下命令文本复制到 PowerShell,在双引号中指定 Azure 资源组的名称,然后运行命令。 示例为 "adfrg"
。
$resourceGroupName = "ADFTutorialResourceGroup";
如果该资源组已存在,请勿覆盖它。 为 $resourceGroupName
变量分配另一个值,然后再次运行命令
定义一个用于数据工厂位置的变量。
$location = "East US"
若要创建 Azure 资源组,请运行以下命令:
New-AzResourceGroup $resourceGroupName $location
如果该资源组已存在,请勿覆盖它。 为 $resourceGroupName
变量分配另一个值,然后再次运行命令
定义一个用于数据工厂名称的变量。
重要
更新数据工厂名称,使之全局唯一。 例如 ADFTutorialFactorySP1127。
$dataFactoryName = "ADFIncCopyTutorialFactory";
要创建数据工厂,请运行以下 Set-AzDataFactoryV2 cmdlet:
Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "East US" -Name $dataFactoryName
请注意以下几点:
数据工厂的名称必须全局唯一。 如果收到以下错误,请更改名称并重试:
The specified Data Factory name 'ADFv2QuickStartDataFactory' is already in use. Data Factory names must be globally unique.
若要创建数据工厂实例,用于登录到 Azure 的用户帐户必须属于参与者或所有者角色,或者是 Azure 订阅的管理员。
若要查看目前提供数据工厂的 Azure 区域的列表,请在以下页面上选择感兴趣的区域,然后展开“分析”以找到“数据工厂”:可用产品(按区域)。 数据工厂使用的数据存储(Azure 存储、SQL 数据库、Azure SQL 托管实例等)和计算资源(Azure HDInsight 等)可以位于其他区域中。
可在数据工厂中创建链接服务,将数据存储和计算服务链接到数据工厂。 在本部分中,创建到存储帐户和 SQL 数据库的链接服务。
在 C:\ADF 文件夹中,创建包含以下内容的名为 AzureStorageLinkedService.json 的 JSON 文件。 (如果文件夹 ADF 不存在,请创建。)将 <accountName>
和 <accountKey>
替换为存储帐户的名称和密钥,然后保存文件。
{
"name": "AzureStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>"
}
}
}
在 PowerShell 中切换到 ADF 文件夹。
运行 Set-AzDataFactoryV2LinkedService cmdlet 以创建链接服务 AzureStorageLinkedService。 在以下示例中,传递 ResourceGroupName 和 DataFactoryName 参数的值:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
下面是示例输出:
LinkedServiceName : AzureStorageLinkedService
ResourceGroupName : <resourceGroupName>
DataFactoryName : <dataFactoryName>
Properties : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
在 C:\ADF 文件夹中,创建包含以下内容的名为 AzureSQLDatabaseLinkedService.json 的 JSON 文件。 (如果文件夹 ADF 不存在,请创建。)在保存文件之前,将 <your-server-name> 和 <your-database-name> 替换为你的服务器和数据库的名称。 此外,必须配置 Azure SQL Server,以便为数据工厂的托管标识授予访问权限。
{
"name": "AzureSqlDatabaseLinkedService",
"properties": {
"type": "AzureSqlDatabase",
"typeProperties": {
"connectionString": "Server=tcp:<your-server-name>.database.windows.net,1433;Database=<your-database-name>;"
},
"authenticationType": "ManagedIdentity",
"annotations": []
}
}
在 PowerShell 中切换到 ADF 文件夹。
运行 Set-AzDataFactoryV2LinkedService cmdlet 以创建链接服务 AzureSQLDatabaseLinkedService。
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
下面是示例输出:
LinkedServiceName : AzureSQLDatabaseLinkedService
ResourceGroupName : ADF
DataFactoryName : incrementalloadingADF
Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
ProvisioningState :
在此步骤中,创建表示源和接收器数据的数据集。
在同一文件夹中,创建包含以下内容的名为 SourceDataset.json 的 JSON 文件:
{
"name": "SourceDataset",
"properties": {
"type": "AzureSqlTable",
"typeProperties": {
"tableName": "data_source_table"
},
"linkedServiceName": {
"referenceName": "AzureSQLDatabaseLinkedService",
"type": "LinkedServiceReference"
}
}
}
本教程使用表名 data_source_table。 如果使用其他名称的表,请替换名称。
运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集 SourceDataset。
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
下面是该 cmdlet 的示例输出:
DatasetName : SourceDataset
ResourceGroupName : ADF
DataFactoryName : incrementalloadingADF
Structure :
Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
在同一文件夹中,创建包含以下内容的名为 SinkDataset.json 的 JSON 文件:
{
"name": "SinkDataset",
"properties": {
"type": "AzureBlob",
"typeProperties": {
"folderPath": "adftutorial/incrementalcopy",
"fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')",
"format": {
"type": "TextFormat"
}
},
"linkedServiceName": {
"referenceName": "AzureStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
重要
此代码片段假设 Blob 存储中有一个名为 adftutorial
的 Blob 容器。 创建容器(如果不存在),或者将容器设置为现有容器的名称。 会自动创建输出文件夹 incrementalcopy
(如果容器中不存在)。 在本教程中,文件名是使用表达式 @CONCAT('Incremental-', pipeline().RunId, '.txt')
动态生成的。
运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集 SinkDataset。
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
下面是该 cmdlet 的示例输出:
DatasetName : SinkDataset
ResourceGroupName : ADF
DataFactoryName : incrementalloadingADF
Structure :
Properties : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset
在此步骤中,创建用于存储高水印值的数据集。
在同一文件夹中,创建包含以下内容的名为 WatermarkDataset.json 的 JSON 文件:
{
"name": " WatermarkDataset ",
"properties": {
"type": "AzureSqlTable",
"typeProperties": {
"tableName": "watermarktable"
},
"linkedServiceName": {
"referenceName": "AzureSQLDatabaseLinkedService",
"type": "LinkedServiceReference"
}
}
}
运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集 WatermarkDataset。
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
下面是该 cmdlet 的示例输出:
DatasetName : WatermarkDataset
ResourceGroupName : ADF
DataFactoryName : incrementalloadingADF
Structure :
Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
本教程创建包含两个 Lookup 活动、一个 Copy 活动和一个 StoredProcedure 活动的管道,这些活动链接在一个管道中。
在同一文件夹中,创建包含以下内容的 JSON 文件 IncrementalCopyPipeline.json:
{
"name": "IncrementalCopyPipeline",
"properties": {
"activities": [
{
"name": "LookupOldWaterMarkActivity",
"type": "Lookup",
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": "select * from watermarktable"
},
"dataset": {
"referenceName": "WatermarkDataset",
"type": "DatasetReference"
}
}
},
{
"name": "LookupNewWaterMarkActivity",
"type": "Lookup",
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": "select MAX(LastModifytime) as NewWatermarkvalue from data_source_table"
},
"dataset": {
"referenceName": "SourceDataset",
"type": "DatasetReference"
}
}
},
{
"name": "IncrementalCopyActivity",
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": "select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'"
},
"sink": {
"type": "BlobSink"
}
},
"dependsOn": [
{
"activity": "LookupNewWaterMarkActivity",
"dependencyConditions": [
"Succeeded"
]
},
{
"activity": "LookupOldWaterMarkActivity",
"dependencyConditions": [
"Succeeded"
]
}
],
"inputs": [
{
"referenceName": "SourceDataset",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "SinkDataset",
"type": "DatasetReference"
}
]
},
{
"name": "StoredProceduretoWriteWatermarkActivity",
"type": "SqlServerStoredProcedure",
"typeProperties": {
"storedProcedureName": "usp_write_watermark",
"storedProcedureParameters": {
"LastModifiedtime": {"value": "@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type": "datetime" },
"TableName": { "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"String"}
}
},
"linkedServiceName": {
"referenceName": "AzureSQLDatabaseLinkedService",
"type": "LinkedServiceReference"
},
"dependsOn": [
{
"activity": "IncrementalCopyActivity",
"dependencyConditions": [
"Succeeded"
]
}
]
}
]
}
}
运行 Set-AzDataFactoryV2Pipeline cmdlet 以创建管道 IncrementalCopyPipeline。
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
下面是示例输出:
PipelineName : IncrementalCopyPipeline
ResourceGroupName : ADF
DataFactoryName : incrementalloadingADF
Activities : {LookupOldWaterMarkActivity, LookupNewWaterMarkActivity, IncrementalCopyActivity, StoredProceduretoWriteWatermarkActivity}
Parameters :
使用 Invoke-AzDataFactoryV2Pipeline cmdlet 运行管道 IncrementalCopyPipeline。 将占位符替换为自己的资源组和数据工厂名称。
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
运行 Get-AzDataFactoryV2ActivityRun cmdlet 检查管道的状态,直到看到所有活动成功运行的消息。 将占位符替换为针对 RunStartedAfter 和 RunStartedBefore 参数指定的自己的适当时间。 本教程使用 -RunStartedAfter "2017/09/14" 和 -RunStartedBefore "2017/09/15" 。
Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
下面是示例输出:
ResourceGroupName : ADF
DataFactoryName : incrementalloadingADF
ActivityName : LookupNewWaterMarkActivity
PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037
PipelineName : IncrementalCopyPipeline
Input : {source, dataset}
Output : {NewWatermarkvalue}
LinkedServiceName :
ActivityRunStart : 9/14/2017 7:42:42 AM
ActivityRunEnd : 9/14/2017 7:42:50 AM
DurationInMs : 7777
Status : Succeeded
Error : {errorCode, message, failureType, target}
ResourceGroupName : ADF
DataFactoryName : incrementalloadingADF
ActivityName : LookupOldWaterMarkActivity
PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037
PipelineName : IncrementalCopyPipeline
Input : {source, dataset}
Output : {TableName, WatermarkValue}
LinkedServiceName :
ActivityRunStart : 9/14/2017 7:42:42 AM
ActivityRunEnd : 9/14/2017 7:43:07 AM
DurationInMs : 25437
Status : Succeeded
Error : {errorCode, message, failureType, target}
ResourceGroupName : ADF
DataFactoryName : incrementalloadingADF
ActivityName : IncrementalCopyActivity
PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037
PipelineName : IncrementalCopyPipeline
Input : {source, sink}
Output : {dataRead, dataWritten, rowsCopied, copyDuration...}
LinkedServiceName :
ActivityRunStart : 9/14/2017 7:43:10 AM
ActivityRunEnd : 9/14/2017 7:43:29 AM
DurationInMs : 19769
Status : Succeeded
Error : {errorCode, message, failureType, target}
ResourceGroupName : ADF
DataFactoryName : incrementalloadingADF
ActivityName : StoredProceduretoWriteWatermarkActivity
PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037
PipelineName : IncrementalCopyPipeline
Input : {storedProcedureName, storedProcedureParameters}
Output : {}
LinkedServiceName :
ActivityRunStart : 9/14/2017 7:43:32 AM
ActivityRunEnd : 9/14/2017 7:43:47 AM
DurationInMs : 14467
Status : Succeeded
Error : {errorCode, message, failureType, target}
在 Blob 存储(接收器存储)中,可看到数据已复制到 SinkDataset 中定义的文件。 在当前的教程中,文件名为 Incremental- d4bf3ce2-5d60-43f3-9318-923155f61037.txt
。 打开该文件,可以看到其中与 SQL 数据库中的数据相同的记录。
1,aaaa,2017-09-01 00:56:00.0000000
2,bbbb,2017-09-02 05:23:00.0000000
3,cccc,2017-09-03 02:36:00.0000000
4,dddd,2017-09-04 03:21:00.0000000
5,eeee,2017-09-05 08:06:00.0000000
在 watermarktable
中查看最新值。 可看到水印值已更新。
Select * from watermarktable
下面是示例输出:
TableName | WatermarkValue |
---|---|
data_source_table | 2017-09-05 8:06:00.000 |
在 SQL 数据库(数据源存储)中插入新数据。
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
SQL 数据库中的更新数据为:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
通过使用 Invoke-AzDataFactoryV2Pipeline cmdlet 再次运行管道 IncrementalCopyPipeline。 将占位符替换为自己的资源组和数据工厂名称。
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
运行 Get-AzDataFactoryV2ActivityRun cmdlet 检查管道的状态,直到看到所有活动成功运行的消息。 将占位符替换为针对 RunStartedAfter 和 RunStartedBefore 参数指定的自己的适当时间。 本教程使用 -RunStartedAfter "2017/09/14" 和 -RunStartedBefore "2017/09/15" 。
Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
下面是示例输出:
ResourceGroupName : ADF
DataFactoryName : incrementalloadingADF
ActivityName : LookupNewWaterMarkActivity
PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7
PipelineName : IncrementalCopyPipeline
Input : {source, dataset}
Output : {NewWatermarkvalue}
LinkedServiceName :
ActivityRunStart : 9/14/2017 8:52:26 AM
ActivityRunEnd : 9/14/2017 8:52:58 AM
DurationInMs : 31758
Status : Succeeded
Error : {errorCode, message, failureType, target}
ResourceGroupName : ADF
DataFactoryName : incrementalloadingADF
ActivityName : LookupOldWaterMarkActivity
PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7
PipelineName : IncrementalCopyPipeline
Input : {source, dataset}
Output : {TableName, WatermarkValue}
LinkedServiceName :
ActivityRunStart : 9/14/2017 8:52:26 AM
ActivityRunEnd : 9/14/2017 8:52:52 AM
DurationInMs : 25497
Status : Succeeded
Error : {errorCode, message, failureType, target}
ResourceGroupName : ADF
DataFactoryName : incrementalloadingADF
ActivityName : IncrementalCopyActivity
PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7
PipelineName : IncrementalCopyPipeline
Input : {source, sink}
Output : {dataRead, dataWritten, rowsCopied, copyDuration...}
LinkedServiceName :
ActivityRunStart : 9/14/2017 8:53:00 AM
ActivityRunEnd : 9/14/2017 8:53:20 AM
DurationInMs : 20194
Status : Succeeded
Error : {errorCode, message, failureType, target}
ResourceGroupName : ADF
DataFactoryName : incrementalloadingADF
ActivityName : StoredProceduretoWriteWatermarkActivity
PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7
PipelineName : IncrementalCopyPipeline
Input : {storedProcedureName, storedProcedureParameters}
Output : {}
LinkedServiceName :
ActivityRunStart : 9/14/2017 8:53:23 AM
ActivityRunEnd : 9/14/2017 8:53:41 AM
DurationInMs : 18502
Status : Succeeded
Error : {errorCode, message, failureType, target}
在 Blob 存储中,可以看到另一文件已创建。 在本教程中,新文件名为 Incremental-2fc90ab8-d42c-4583-aa64-755dba9925d7.txt
。 打开该文件,会看到其中包含两行记录。
在 watermarktable
中查看最新值。 可看到水印值已再次更新。
Select * from watermarktable
示例输出:
TableName | WatermarkValue |
---|---|
data_source_table | 2017-09-07 09:01:00.000 |
已在本教程中执行了以下步骤:
在本教程中,管道将数据从 Azure SQL 数据库中的单个表复制到 Blob 存储。 转到下面的教程,了解如何将数据从 SQL Server 数据库中的多个表复制到 SQL 数据库。
培训
认证
Microsoft Certified: Azure Data Engineer Associate - Certifications
演示如何了解使用多种 Azure 服务在 Microsoft Azure 上实现和管理数据工程工作负荷的常见数据工程任务。