使用 PowerShell 使用變更追蹤資訊,以累加方式將資料從 Azure SQL Database 載入到 Azure Blob 儲存體
適用於:Azure Data Factory Azure Synapse Analytics
提示
試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費開始新的試用!
在本教學課程中,您會建立一個 Azure Data Factory 並讓其具有管線,以根據 Azure SQL Database 來源資料庫中的變更追蹤資訊,將差異資料載入到 Azure Blob 儲存體。
您會在本教學課程中執行下列步驟:
- 準備來源資料存放區
- 建立資料處理站。
- 建立連結的服務。
- 建立來源、接收和變更追蹤資料集。
- 建立、執行和監視完整複製管線
- 新增或更新來源資料表中的資料
- 建立、執行和監視累加複製管線
注意
建議您使用 Azure Az PowerShell 模組來與 Azure 互動。 若要開始使用,請參閱安裝 Azure PowerShell (部分機器翻譯)。 若要了解如何移轉至 Az PowerShell 模組,請參閱將 Azure PowerShell 從 AzureRM 移轉至 Az。
概觀
在資料整合解決方案中,我們經常會見到在初次載入資料之後,再以累加方式載入資料的情形。 在某些情況下,您的來源資料存放區於某一期間內所變更的資料,可以輕鬆地到加以切割 (例如,LastModifyTime、CreationTime)。 但也有某些情況是,您並無法明確地識別自上次處理資料後所產生的差異資料。 Azure SQL Database 和 SQL Server 等資料存放區所支援的變更追蹤技術,可供您用來識別差異資料。 本教學課程說明如何搭配使用 Azure Data Factory 與 SQL 變更追蹤技術,以透過累加方式從 Azure SQL Database 將差異資料載入至 Azure Blob 儲存體。 如需更為具體的 SQL 變更追蹤技術資訊,請參閱 SQL Server 中的變更追蹤。
端對端工作流程
以下是使用變更追蹤技術,以累加方式載入資料的典型端對端工作流程步驟。
注意
Azure SQL Database 和 SQL Server 都支援變更追蹤技術。 本教學課程會使用 Azure SQL Database 作為來源資料存放區。 您也可以使用 SQL Server 執行個體。
- 初始載入歷史資料 (執行一次):
- 在 Azure SQL Database 的來源資料庫中啟用變更追蹤技術。
- 取得資料庫中的 SYS_CHANGE_VERSION 初始值作為基準,以擷取變更的資料。
- 將完整資料從來源資料庫載入到 Azure Blob 儲存體。
- 依排程累加載入差異資料 (在初始載入資料後定期執行):
- 取得 SYS_CHANGE_VERSION 的舊值和新值。
- 藉由將 sys.change_tracking_tables 中有所變更資料列 (介於兩個 SYS_CHANGE_VERSION 值之間) 的主索引鍵,聯結到來源資料表中的資料來載入差異資料,然後將差異資料移動到目的地。
- 針對要在下一次載入的差異更新其 SYS_CHANGE_VERSION。
高階解決方案
在本教學課程中,您會建立兩個管線以執行下列兩項作業:
初始載入:您會使用複製活動建立管線,以將整個資料從來源資料存放區 (Azure SQL Database) 複製到目的地資料存放區 (Azure Blob 儲存體)。
累加載入:您會使用下列活動建立管線,然後定期執行該管線。
- 建立兩個查閱活動,從 Azure SQL Database 取得舊的和新的 SYS_CHANGE_VERSION,並將其傳遞給複製活動。
- 建立一個複製活動,將兩個 SYS_CHANGE_VERSION 值之間所插入/更新/刪除的資料,從 Azure SQL Database 複製到 Azure Blob 儲存體。
- 建立一個預存程序活動,以更新下一次管線執行的 SYS_CHANGE_VERSION 值。
如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶。
必要條件
- Azure PowerShell。 依照如何安裝和設定 Azure PowerShell中的指示,安裝最新的 Azure PowerShell 模組。
- Azure SQL Database。 您需要使用資料庫作為來源資料存放區。 如果您在 Azure SQL Database 中沒有資料庫,請參閱在 Azure SQL Database 中建立資料庫一文,按照步驟建立資料庫。
- Azure 儲存體帳戶。 您需要使用 Blob 儲存體作為接收資料存放區。 如果您沒有 Azure 儲存體帳戶,請參閱建立儲存體帳戶一文,按照步驟來建立帳戶。 建立名為 adftutorial 的容器。
在資料庫中建立資料來源資料表
啟動 SQL Server Management Studio,然後連線至 SQL Database。
在伺服器總管中,以滑鼠右鍵按一下您的資料庫,然後選擇 [新增查詢]。
對資料庫執行下列 SQL 命令,以建立名為
data_source_table
的資料表作為資料來源存放區。create table data_source_table ( PersonID int NOT NULL, Name varchar(255), Age int PRIMARY KEY (PersonID) ); INSERT INTO data_source_table (PersonID, Name, Age) VALUES (1, 'aaaa', 21), (2, 'bbbb', 24), (3, 'cccc', 20), (4, 'dddd', 26), (5, 'eeee', 22);
執行下列 SQL 查詢,在您的資料庫和來源資料表 (data_source_table) 啟用變更追蹤機制:
注意
- 將 <your database name> 替換為具有 data_source_table 的資料庫名稱。
- 在目前的範例中,有變更的資料會保留兩天。 如果您每隔三天以上才會載入變更的資料,則裡面可能不會包含某些已變更的資料。 您必須將 CHANGE_RETENTION 的值變更為更大的數字。 或者,請確保您用來載入已變更資料的期間是在兩天內。 如需詳細資訊,請參閱啟用資料庫的變更追蹤
ALTER DATABASE <your database name> SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON) ALTER TABLE data_source_table ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = ON)
執行下列查詢,建立新資料表並使用預設值儲存 ChangeTracking_version:
create table table_store_ChangeTracking_version ( TableName varchar(255), SYS_CHANGE_VERSION BIGINT, ); DECLARE @ChangeTracking_version BIGINT SET @ChangeTracking_version = CHANGE_TRACKING_CURRENT_VERSION(); INSERT INTO table_store_ChangeTracking_version VALUES ('data_source_table', @ChangeTracking_version)
注意
如果在您啟用 SQL Database 的變更追蹤後,資料並未變更,變更追蹤版本的值會是 0。
執行下列查詢,在您的資料庫中建立預存程序。 該管線會叫用此預存程序,以更新您在上一個步驟建立的資料表變更追蹤版本。
CREATE PROCEDURE Update_ChangeTracking_Version @CurrentTrackingVersion BIGINT, @TableName varchar(50) AS BEGIN UPDATE table_store_ChangeTracking_version SET [SYS_CHANGE_VERSION] = @CurrentTrackingVersion WHERE [TableName] = @TableName END
Azure PowerShell
依照如何安裝和設定 Azure PowerShell中的指示,安裝最新的 Azure PowerShell 模組。
建立資料處理站
定義資源群組名稱的變數,以便稍後在 PowerShell 命令中使用。 將下列命令文字複製到 PowerShell,以雙引號指定 Azure 資源群組的名稱,然後執行命令。 例如:
"adfrg"
。$resourceGroupName = "ADFTutorialResourceGroup";
如果資源群組已經存在,您可能不想覆寫它。 將不同的值指派給
$resourceGroupName
變數,然後執行一次命令定義 Data Factory 位置的變數:
$location = "East US"
若要建立 Azure 資源群組,請執行下列命令:
New-AzResourceGroup $resourceGroupName $location
如果資源群組已經存在,您可能不想覆寫它。 將不同的值指派給
$resourceGroupName
變數,然後執行一次命令。定義 Data Factory 名稱的變數。
重要
將資料處理站名稱更新為全域唯一的。
$dataFactoryName = "IncCopyChgTrackingDF";
若要建立資料處理站,請執行下列 Set-AzDataFactoryV2 Cmdlet:
Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
請注意下列幾點:
Azure Data Factory 的名稱在全域必須是唯一的。 如果發生下列錯誤,請變更名稱,並再試一次。
The specified Data Factory name 'ADFIncCopyChangeTrackingTestFactory' is already in use. Data Factory names must be globally unique.
若要建立 Data Factory 執行個體,您用來登入 Azure 的使用者帳戶必須為參與者或擁有者角色,或是 Azure 訂用帳戶的管理員。
如需目前可使用 Data Factory 的 Azure 區域清單,請在下列頁面上選取您感興趣的區域,然後展開 [分析] 以找出 [Data Factory]:依區域提供的產品。 資料處理站所使用的資料存放區 (Azure 儲存體、Azure SQL Database 等) 和計算 (HDInsight 等) 可位於其他區域。
建立連結服務
您在資料處理站中建立的連結服務會將您的資料存放區和計算服務連結到資料處理站。 在本節中,您會對 Azure 儲存體帳戶和 Azure SQL Database 中的資料庫建立連結服務。
建立 Azure 儲存體連結服務。
在此步驟中,您會將您的 Azure 儲存體帳戶連結到 Data Factory。
在 C:\ADFTutorials\IncCopyChangeTrackingTutorial 資料夾中,使用下列內容建立名為 AzureStorageLinkedService.json 的 JSON 檔案:(如果資料夾尚不存在,請加以建立)。 儲存檔案之前,以您的 Azure 儲存體帳戶名稱和金鑰取代
<accountName>
、<accountKey>
。{ "name": "AzureStorageLinkedService", "properties": { "type": "AzureStorage", "typeProperties": { "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>" } } }
在 Azure PowerShell 中,切換至 C:\ADFTutorials\IncCopyChangeTrackingTutorial 資料夾。
執行 Set-AzDataFactoryV2LinkedService Cmdlet 來建立連結服務:AzureStorageLinkedService。 在下列範例中,您會傳遞 ResourceGroupName 和 DataFactoryName 參數的值。
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
以下是範例輸出:
LinkedServiceName : AzureStorageLinkedService ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : IncCopyChgTrackingDF Properties : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
建立 Azure SQL Database 連結服務。
在此步驟中,您要將資料庫連結至資料處理站。
在 C:\ADFTutorials\IncCopyChangeTrackingTutorial 資料夾中,使用下列內容建立名為 AzureSQLDatabaseLinkedService.json 的 JSON 檔案:在儲存盤案之前,將 your-server-name> 和 <your-database-name> 取代<為您的伺服器和資料庫名稱。 您也必須設定 Azure SQL Server,以 授與數據處理站受控識別的存取權。
{ "name": "AzureSqlDatabaseLinkedService", "properties": { "type": "AzureSqlDatabase", "typeProperties": { "connectionString": "Server=tcp:<your-server-name>.database.windows.net,1433;Database=<your-database-name>;" }, "authenticationType": "ManagedIdentity", "annotations": [] } }
在 Azure PowerShell 中,執行 Set-AzDataFactoryV2LinkedService Cmdlet 來建立連結服務:AzureSQLDatabaseLinkedService。
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
以下是範例輸出:
LinkedServiceName : AzureSQLDatabaseLinkedService ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : IncCopyChgTrackingDF Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
建立資料集
在此步驟中,您會建立資料集以代表資料來源和資料目的地。 以及用來儲存 SYS_CHANGE_VERSION 的位置。
建立來源資料集
在此步驟中,您會建立資料集來代表來源資料。
在相同的資料夾中,使用下列內容建立名為 SourceDataset.json 的 JSON 檔案:
{ "name": "SourceDataset", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "data_source_table" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }
執行 Set-AzDataFactoryV2Dataset Cmdlet 來建立資料集:SourceDataset
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
以下是此 Cmdlet 的範例輸出:
DatasetName : SourceDataset ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : IncCopyChgTrackingDF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
建立接收資料集
在此步驟中,您會建立資料集來代表從來源資料存放區複製的資料。
在相同的資料夾中,使用下列內容建立名為 SinkDataset.json 的 JSON 檔案:
{ "name": "SinkDataset", "properties": { "type": "AzureBlob", "typeProperties": { "folderPath": "adftutorial/incchgtracking", "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')", "format": { "type": "TextFormat" } }, "linkedServiceName": { "referenceName": "AzureStorageLinkedService", "type": "LinkedServiceReference" } } }
您會在 Azure Blob 儲存體中建立 adftutorial 容器,以滿足其中一項必要條件。 建立容器 (若不存在),或設為現有容器的名稱。 在本教學課程中,您會使用下列運算式來動態產生輸出檔案名稱:@CONCAT(('Incremental-', pipeline().RunId, '.txt')。
執行 Set-AzDataFactoryV2Dataset Cmdlet 來建立資料集:SinkDataset
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
以下是此 Cmdlet 的範例輸出:
DatasetName : SinkDataset ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : IncCopyChgTrackingDF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset
建立變更追蹤資料集
在此步驟中,您會建立資料集來儲存變更追蹤版本。
在相同的資料夾中,使用下列內容建立名為 ChangeTrackingDataset.json 的 JSON 檔案:
{ "name": " ChangeTrackingDataset", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "table_store_ChangeTracking_version" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }
您必須建立 table_store_ChangeTracking_version 資料表以滿足其中一項必要條件。
執行 Set-AzDataFactoryV2Dataset Cmdlet 來建立資料集:ChangeTrackingDataset
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "ChangeTrackingDataset" -File ".\ChangeTrackingDataset.json"
以下是此 Cmdlet 的範例輸出:
DatasetName : ChangeTrackingDataset ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : IncCopyChgTrackingDF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
建立完整複本的管線
在此步驟中,您會使用複製活動建立管線,將整個資料從來源資料存放區 (Azure SQL Database) 複製到目的地資料存放區 (Azure Blob 儲存體)。
在相同的資料夾中,使用下列內容建立 JSON 檔案 FullCopyPipeline.json:
{ "name": "FullCopyPipeline", "properties": { "activities": [{ "name": "FullCopyActivity", "type": "Copy", "typeProperties": { "source": { "type": "SqlSource" }, "sink": { "type": "BlobSink" } }, "inputs": [{ "referenceName": "SourceDataset", "type": "DatasetReference" }], "outputs": [{ "referenceName": "SinkDataset", "type": "DatasetReference" }] }] } }
執行 Set-AzDataFactoryV2Pipeline Cmdlet 以建立管線:FullCopyPipeline。
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "FullCopyPipeline" -File ".\FullCopyPipeline.json"
以下是範例輸出:
PipelineName : FullCopyPipeline ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : IncCopyChgTrackingDF Activities : {FullCopyActivity} Parameters :
執行完整的複製管線
使用 Invoke-AzDataFactoryV2Pipeline Cmdlet 執行管線:FullCopyPipeline。
Invoke-AzDataFactoryV2Pipeline -PipelineName "FullCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName
監視完整的複製管線
登入 Azure 入口網站。
按一下 [所有服務],以
data factories
關鍵字進行搜尋,然後選取 [資料處理站]。在 Data Factory 清單中搜尋您的 Data Factory,然後加以選取以啟動 [Data Factory] 頁面。
在資料管理站頁面上,按一下 [監視及管理] 圖格。
資料整合應用程式會在不同的索引標籤中啟動。您可以看到所有管線執行及其狀態。 請注意,在下列範例中,管線執行狀態是 [成功]。 按一下 [參數] 資料行中的連結,即可檢查傳遞到管線的參數。 如果有錯誤,您就會在 [錯誤] 資料行中看到連結。 按一下 [動作] 資料行中的連結。
當您按一下 [動作] 資料行中的連結時,您會看到下列頁面,其中顯示管線的所有活動執行。
若要切換回 [管線執行] 檢視,請按一下 [管線],如下圖所示。
檢閱結果
您會在 adftutorial
容器的 incchgtracking
資料夾中看到名為 incremental-<GUID>.txt
的檔案。
檔案應該有您資料庫中的資料:
1,aaaa,21
2,bbbb,24
3,cccc,20
4,dddd,26
5,eeee,22
新增更多資料至來源資料表
對您的資料庫執行下列查詢,以新增資料列並加以更新。
INSERT INTO data_source_table
(PersonID, Name, Age)
VALUES
(6, 'new','50');
UPDATE data_source_table
SET [Age] = '10', [name]='update' where [PersonID] = 1
建立差異複本的管線
在此步驟中,您會建立具有下列活動的管線,並定期執行此管線。 查閱活動會從 Azure SQL Database 取得舊的和新的 SYS_CHANGE_VERSION,並將它傳遞給複製活動。 複製活動會將兩個 SYS_CHANGE_VERSION 值之間所插入/更新/刪除的資料,從 Azure SQL Database 複製到 Azure Blob 儲存體。 預存程序活動會更新下一次管線執行的 SYS_CHANGE_VERSION 值。
在相同的資料夾中,使用下列內容建立 JSON 檔案 IncrementalCopyPipeline.json:
{ "name": "IncrementalCopyPipeline", "properties": { "activities": [ { "name": "LookupLastChangeTrackingVersionActivity", "type": "Lookup", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select * from table_store_ChangeTracking_version" }, "dataset": { "referenceName": "ChangeTrackingDataset", "type": "DatasetReference" } } }, { "name": "LookupCurrentChangeTrackingVersionActivity", "type": "Lookup", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "SELECT CHANGE_TRACKING_CURRENT_VERSION() as CurrentChangeTrackingVersion" }, "dataset": { "referenceName": "SourceDataset", "type": "DatasetReference" } } }, { "name": "IncrementalCopyActivity", "type": "Copy", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select data_source_table.PersonID,data_source_table.Name,data_source_table.Age, CT.SYS_CHANGE_VERSION, SYS_CHANGE_OPERATION from data_source_table RIGHT OUTER JOIN CHANGETABLE(CHANGES data_source_table, @{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.SYS_CHANGE_VERSION}) as CT on data_source_table.PersonID = CT.PersonID where CT.SYS_CHANGE_VERSION <= @{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}" }, "sink": { "type": "BlobSink" } }, "dependsOn": [ { "activity": "LookupLastChangeTrackingVersionActivity", "dependencyConditions": [ "Succeeded" ] }, { "activity": "LookupCurrentChangeTrackingVersionActivity", "dependencyConditions": [ "Succeeded" ] } ], "inputs": [ { "referenceName": "SourceDataset", "type": "DatasetReference" } ], "outputs": [ { "referenceName": "SinkDataset", "type": "DatasetReference" } ] }, { "name": "StoredProceduretoUpdateChangeTrackingActivity", "type": "SqlServerStoredProcedure", "typeProperties": { "storedProcedureName": "Update_ChangeTracking_Version", "storedProcedureParameters": { "CurrentTrackingVersion": { "value": "@{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}", "type": "INT64" }, "TableName": { "value": "@{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.TableName}", "type": "String" } } }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" }, "dependsOn": [ { "activity": "IncrementalCopyActivity", "dependencyConditions": [ "Succeeded" ] } ] } ] } }
執行 Set-AzDataFactoryV2Pipeline Cmdlet 以建立管線:FullCopyPipeline。
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
以下是範例輸出:
PipelineName : IncrementalCopyPipeline ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : IncCopyChgTrackingDF Activities : {LookupLastChangeTrackingVersionActivity, LookupCurrentChangeTrackingVersionActivity, IncrementalCopyActivity, StoredProceduretoUpdateChangeTrackingActivity} Parameters :
執行累加複製管線
使用 Invoke-AzDataFactoryV2Pipeline Cmdlet 來執行管線:IncrementalCopyPipeline。
Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName
監視累加複製管線
在資料整合應用程式中,重新整理 [管線執行] 檢視。 確認您有在清單中看到 IncrementalCopyPipeline。 按一下 [動作] 資料行中的連結。
當您按一下 [動作] 資料行中的連結時,您會看到下列頁面,其中顯示管線的所有活動執行。
若要切換回 [管線執行] 檢視,請按一下 [管線],如下圖所示。
檢閱結果
您會在 adftutorial
容器的 incchgtracking
資料夾中看到第二個檔案。
此檔案應該只會有您資料庫中的差異資料。 具有 U
的記錄是資料庫中更新的資料列,具有 I
的記錄則是一個新增的資料列。
1,update,10,2,U
6,new,50,1,I
前三個資料行是 data_source_table 中已變更的資料。 最後兩個資料行是變更追蹤系統資料表中的中繼資料。 第四個資料行是每個已變更資料列的 SYS_CHANGE_VERSION。 第五個資料行是作業:U=更新,I=插入。 如需變更追蹤資訊的詳細資料,請參閱 CHANGETABLE。
==================================================================
PersonID Name Age SYS_CHANGE_VERSION SYS_CHANGE_OPERATION
==================================================================
1 update 10 2 U
6 new 50 1 I
相關內容
進入下列教學課程,深入了解如何只根據其 LastModifiedDate 複製全新和變更檔案: