使用 Azure Data Factory 或 Azure Synapse Analytics 複製和轉換 Azure SQL 資料庫 中的數據

適用於:Azure Data Factory Azure Synapse Analytics

提示

試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費啟動新的試用版

本文概述如何在 Azure Data Factory 或 Azure Synapse 管線中使用「複製活動」,從 Azure SQL 資料庫 複製數據,並使用數據流轉換 Azure SQL 資料庫 中的數據。 若要深入了解,請閱讀 Azure Data FactoryAzure Synapse Analytics 的介紹文章。

支援的功能

下列功能支援此 Azure SQL 資料庫 連接器:

支援的功能 IR 受控私人端點
複製活動 (來源/接收) (1) (2)
對應資料流 (來源/接收) (1)
查閱活動 (1) (2)
GetMetadata 活動 (1) (2)
指令碼活動 (1) (2)
預存程序活動 (1) (2)

① Azure 整合執行階段 ② 自我裝載整合執行階段

針對複製活動,此 Azure SQL Database 連接器支援下列功能:

  • 搭配使用 SQL 驗證和 Microsoft Entra 應用程式權杖驗證與服務主體或 Azure 資源受控識別,來複製資料。
  • 作為來源時,使用 SQL 查詢或預存程序來擷取資料。 您也可以選擇從 Azure SQL 資料庫 來源平行複製,請參閱 SQL Database 的平行複製一節以取得詳細數據。
  • 作為接收,如果來源架構不存在,則自動建立目的地數據表;將數據附加至數據表,或在複製期間叫用具有自定義邏輯的預存程式。

如果您使用 Azure SQL 資料庫 無伺服器層,請注意當伺服器暫停時,活動執行會失敗,而不是等待自動繼續準備就緒。 您可以新增活動重試或鏈結其他活動,以確保伺服器在實際執行時處於實時狀態。

重要

如果您使用 Azure 整合執行時間複製數據,請設定 伺服器層級防火牆規則 ,讓 Azure 服務可以存取伺服器。 如果您使用自我裝載整合運行時間來複製數據,請設定防火牆以允許適當的IP範圍。 此範圍包含用來連線到 Azure SQL 資料庫 的電腦 IP。

開始使用

若要透過管線執行複製活動,您可以使用下列其中一個工具或 SDK:

使用UI建立 Azure SQL 資料庫 連結服務

使用下列步驟,在 Azure 入口網站 UI 中建立 Azure SQL 資料庫 鏈接服務。

  1. 前往 Azure Data Factory 或 Synapse 工作區的 [管理] 索引標籤,選取 [連結服務],然後按一下 [新增]:

  2. 搜尋 SQL,然後選取 Azure SQL 資料庫 連接器。

    選取 [Azure SQL 資料庫 連接器]。

  3. 設定服務詳細資料,測試連線,然後建立新的連結服務。

    Azure SQL 資料庫 鏈接服務的設定螢幕快照。

連接器設定詳細資料

下列各節提供屬性的相關詳細數據,這些屬性是用來定義 Azure SQL 資料庫 連接器專屬的 Azure Data Factory 或 Synapse 管線實體。

連結服務屬性

Azure SQL 資料庫 鏈接服務支持這些泛型屬性:

屬性 描述 必要
type type 屬性必須設定為 AzureSqlDatabase Yes
connectionString 指定連線至 connectionString 屬性之 Azure SQL 資料庫 實例所需的資訊。
您也可以在 Azure 金鑰保存庫 中放置密碼或服務主體金鑰。 如果這是 SQL 驗證,則會從連接字串中提取 password 組態。 如需詳細資訊,請參閱表格下方的 JSON 範例和在 Azure Key Vault 中儲存認證
Yes
azureCloudType 針對服務主體驗證,請指定 Microsoft Entra 應用程式註冊的 Azure 雲端環境類型。
允許的值為 AzurePublicAzureChinaAzureUsGovernmentAzureGermany。 預設會使用 Data Factory 或 Synapse 管線的雲端環境。
No
alwaysEncryptedSettings 指定 alwaysencryptedsettings 資訊,讓 Always Encrypted 能夠使用受控身分識別或服務主體來保護儲存在 SQL Server 的敏感性資料。 如需詳細資訊,請參閱表格下方的 JSON 範例和使用 Always Encrypted 一節。 如果未指定,則會停用預設的一律加密設定。 No
connectVia 用來連線到資料存放區的整合執行階段。 如果您的資料存放區位於專用網中,您可以使用 Azure 整合運行時間或自我裝載整合運行時間。 若未指定,則會使用預設 Azure Integration Runtime。 No

針對不同的驗證類型,請分別參閱下列各節特定的屬性、必要條件和 JSON 範例:

提示

如果您遇到錯誤,且其錯誤碼為 "UserErrorFailedToConnectToSqlServer",以及「資料庫的工作階段限制為 XXX 並已達到」訊息,則請將 Pooling=false 新增至您的連接字串並再試一次。 Pooling=false 也建議用於 SHIR(自我裝載整合運行時間) 類型連結服務設定。 共用和其他連接參數可以新增為連結服務建立表單之 [其他連接屬性 ] 區段中的新參數名稱和值。

SQL 驗證

若要使用 SQL 驗證類型,請指定上一節所述的一般屬性。

範例:使用 SQL 驗證

{
    "name": "AzureSqlDbLinkedService",
    "properties": {
        "type": "AzureSqlDatabase",
        "typeProperties": {
            "connectionString": "Data Source=tcp:<servername>.database.windows.net,1433;Initial Catalog=<databasename>;User ID=<username>@<servername>;Password=<password>;Trusted_Connection=False;Encrypt=True;Connection Timeout=30"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

範例:Azure 金鑰保存庫 中的密碼

{
    "name": "AzureSqlDbLinkedService",
    "properties": {
        "type": "AzureSqlDatabase",
        "typeProperties": {
            "connectionString": "Data Source=tcp:<servername>.database.windows.net,1433;Initial Catalog=<databasename>;User ID=<username>@<servername>;Trusted_Connection=False;Encrypt=True;Connection Timeout=30",
            "password": {
                "type": "AzureKeyVaultSecret",
                "store": {
                    "referenceName": "<Azure Key Vault linked service name>",
                    "type": "LinkedServiceReference"
                },
                "secretName": "<secretName>"
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

範例:使用Always Encrypted

{
    "name": "AzureSqlDbLinkedService",
    "properties": {
        "type": "AzureSqlDatabase",
        "typeProperties": {
            "connectionString": "Data Source=tcp:<servername>.database.windows.net,1433;Initial Catalog=<databasename>;User ID=<username>@<servername>;Password=<password>;Trusted_Connection=False;Encrypt=True;Connection Timeout=30"
        },
        "alwaysEncryptedSettings": {
            "alwaysEncryptedAkvAuthType": "ServicePrincipal",
            "servicePrincipalId": "<service principal id>",
            "servicePrincipalKey": {
                "type": "SecureString",
                "value": "<service principal key>"
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

服務主體驗證

若要使用服務主體驗證,除了上一節所述的泛型屬性之外,還指定下列屬性:

屬性 描述 必要
servicePrincipalId 指定應用程式的用戶端識別碼。 Yes
servicePrincipalKey 指定應用程式的金鑰。 將此欄位標示為 SecureString 以將其安全地儲存,或參考 Azure Key Vault 中儲存的祕密 Yes
tenant 指定您的應用程式所在租用戶的資訊,例如網域名稱或租用戶識別碼。 將滑鼠游標暫留在 Azure 入口網站右上角,即可加以擷取。 Yes

此外,請依照下列步驟操作:

  1. 從 Azure 入口網站建立 Microsoft Entra 應用程式。 請記下應用程式名稱,以及下列可定義連結服務的值:

    • Application ID
    • 應用程式金鑰
    • 租用戶識別碼
  2. 如果您尚未這麼做,請在 Azure 入口網站 為您的伺服器布建 Microsoft Entra 系統管理員。 Microsoft Entra 系統管理員必須是 Microsoft Entra 使用者或 Microsoft Entra 群組,但不能是服務主體。 此步驟可讓下一個步驟使用 Microsoft Entra 身分識別來建立服務主體的自主資料庫使用者。

  3. 針對服務主體,建立自主資料庫使用者。 連線 使用 SQL Server Management Studio 之類的工具來複製數據的資料庫,以及至少具有 ALTER ANY USER 許可權的 Microsoft Entra 身分識別。 執行下列 T-SQL:

    CREATE USER [your application name] FROM EXTERNAL PROVIDER;
    
  4. 如同您通常對 SQL 使用者或其他使用者所做的一樣,授與服務主體所需的許可權。 執行下列程式碼。 如需更多選項,請參閱此文件

    ALTER ROLE [role name] ADD MEMBER [your application name];
    
  5. 在 Azure Data Factory 或 Synapse 工作區中設定 Azure SQL 資料庫 鏈接服務。

使用服務主體驗證的連結服務範例

{
    "name": "AzureSqlDbLinkedService",
    "properties": {
        "type": "AzureSqlDatabase",
        "typeProperties": {
            "connectionString": "Data Source=tcp:<servername>.database.windows.net,1433;Initial Catalog=<databasename>;Connection Timeout=30",
            "servicePrincipalId": "<service principal id>",
            "servicePrincipalKey": {
                "type": "SecureString",
                "value": "<service principal key>"
            },
            "tenant": "<tenant info, e.g. microsoft.onmicrosoft.com>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

系統指派的受控身分識別驗證

數據處理站或 Synapse 工作區可以與 Azure 資源 系統指派的受控識別相關聯,以在向 Azure 中的其他資源進行驗證時,代表服務。 您可以使用此受控識別來進行 Azure SQL Database 分析驗證。 指定的處理站或 Synapse 工作區可以使用此身分識別,從資料庫或數據庫存取和複製數據。

若要使用系統指派的受控識別驗證,請指定上一節所述的一般屬性,並依照下列步驟操作。

  1. 如果您尚未這麼做,請在 Azure 入口網站 上為您的伺服器布建 Microsoft Entra 系統管理員。 Microsoft Entra 系統管理員可以是 Microsoft Entra 使用者或 Microsoft Entra 群組。 如果您將受控識別授與群組管理員角色,請略過步驟 3 和 4。 系統管理員具有資料庫的完整存取權。

  2. 建立受控識別的自主資料庫使用者 。 連線 至您想要使用 SQL Server Management Studio 等工具來複製數據的資料庫,以及至少具有 ALTER ANY USER 許可權的 Microsoft Entra 身分識別。 執行下列 T-SQL:

    CREATE USER [your_resource_name] FROM EXTERNAL PROVIDER;
    
  3. 如同您通常對 SQL 使用者和其他人員所做的一樣,授與受控識別所需的許可權。 執行下列程式碼。 如需更多選項,請參閱此文件

    ALTER ROLE [role name] ADD MEMBER [your_resource_name];
    
  4. 設定 Azure SQL 資料庫 連結服務。

範例

{
    "name": "AzureSqlDbLinkedService",
    "properties": {
        "type": "AzureSqlDatabase",
        "typeProperties": {
            "connectionString": "Data Source=tcp:<servername>.database.windows.net,1433;Initial Catalog=<databasename>;Connection Timeout=30"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

使用者指派的受控身分識別驗證

數據處理站或 Synapse 工作區可以與 使用者指派的受控識別 相關聯,以在向 Azure 中的其他資源進行驗證時,代表服務。 您可以使用此受控識別來進行 Azure SQL Database 分析驗證。 指定的處理站或 Synapse 工作區可以使用此身分識別,從資料庫或數據庫存取和複製數據。

若要使用使用者指派的受控識別驗證,除了上一節所述的一般屬性外,請指定下列屬性:

屬性 描述 必要
credentials 將使用者指派的受控身分識別指定為認證物件。 Yes

此外,請依照下列步驟操作:

  1. 如果您尚未這麼做,請在 Azure 入口網站 上為您的伺服器布建 Microsoft Entra 系統管理員。 Microsoft Entra 系統管理員可以是 Microsoft Entra 使用者或 Microsoft Entra 群組。 如果您以使用者指派的受控識別授與群組系統管理員角色,請略過步驟 3。 系統管理員具有資料庫的完整存取權。

  2. 為使用者指派的受控識別建立自主資料庫使用者。 連線 使用 SQL Server Management Studio 之類的工具來複製數據的資料庫,以及至少具有 ALTER ANY USER 許可權的 Microsoft Entra 身分識別。 執行下列 T-SQL:

    CREATE USER [your_resource_name] FROM EXTERNAL PROVIDER;
    
  3. 依照您平常為 SQL 使用者和其他人所進行的操作一樣,建立一或多個使用者指派的受控識別,並將所需的權限授與使用者指派的受控識別。 執行下列程式碼。 如需更多選項,請參閱此文件

    ALTER ROLE [role name] ADD MEMBER [your_resource_name];
    
  4. 將一或多個使用者指派的受控識別指派給 Data Factory,並為每個使用者指派的受控識別建立認證

  5. 設定 Azure SQL 資料庫 連結服務。

範例:

{
    "name": "AzureSqlDbLinkedService",
    "properties": {
        "type": "AzureSqlDatabase",
        "typeProperties": {
            "connectionString": "Data Source=tcp:<servername>.database.windows.net,1433;Initial Catalog=<databasename>;Connection Timeout=30",
            "credential": {
                "referenceName": "credential1",
                "type": "CredentialReference"
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

資料集屬性

如需可用來定義資料集之區段和屬性的完整清單,請參閱 數據集

Azure SQL 資料庫 數據集支援下列屬性:

屬性 描述 必要
type 數據集的 type 屬性必須設定為 AzureSqlTable Yes
schema 結構描述的名稱。 否 (來源);是 (接收)
table 資料表/檢視的名稱。 否 (來源);是 (接收)
tableName 具有結構描述的資料表/檢視名稱。 支援此屬性是基於回溯相容性。 對於新的工作負載,請使用 schematable 否 (來源);是 (接收)

數據集屬性範例

{
    "name": "AzureSQLDbDataset",
    "properties":
    {
        "type": "AzureSqlTable",
        "linkedServiceName": {
            "referenceName": "<Azure SQL Database linked service name>",
            "type": "LinkedServiceReference"
        },
        "schema": [ < physical schema, optional, retrievable during authoring > ],
        "typeProperties": {
            "schema": "<schema_name>",
            "table": "<table_name>"
        }
    }
}

複製活動屬性

如需可用來定義活動的區段和屬性的完整清單,請參閱 管線。 本節提供 Azure SQL 資料庫 來源和接收所支持的屬性清單。

Azure SQL 資料庫 作為來源

提示

若要使用數據分割有效率地從 Azure 載入資料 SQL 資料庫 數據,請從 SQL 資料庫的平行複製深入瞭解。

若要從 Azure SQL 資料庫 複製資料,複製活動來源區段中支援下列屬性:

屬性 描述 必要
type 複製活動來源的 type 屬性必須設定為 AzureSqlSource。 回溯兼容性仍支援 「SqlSource」 類型。 Yes
sqlReaderQuery 此屬性使用自訂 SQL 查詢來讀取資料。 例如 select * from MyTable No
sqlReaderStoredProcedureName 從源數據表讀取數據的預存程式名稱。 最後一個 SQL 陳述式必須是預存程序中的 SELECT 陳述式。 No
storedProcedureParameters 預存程序的參數。
允許的值為名稱或值組。 參數的名稱和大小寫必須符合預存程序參數的名稱和大小寫。
No
isolationLevel 指定 SQL 來源的異動鎖定行為。 允許的值為:ReadCommittedReadUncommittedRepeatableReadSerializableSnapshot。 如果未指定,則會使用資料庫的預設隔離等級。 如需詳細資訊,請參閱這篇文件 No
partitionOptions 指定用來從 Azure SQL 資料庫 載入資料的數據分割選項。
允許的值為:None (預設值)、PhysicalPartitionsOfTableDynamicRange
啟用分割區選項時(也就是,不是 None),平行處理原則從 Azure SQL 資料庫 同時載入數據的程度是由複製活動的設定所parallelCopies控制。
No
partitionSettings 指定資料分割的設定群組。
當分割選項不是 None 時套用。
No
partitionSettings 下方:
partitionColumnName 整數類型或 date/datetime 類型 (intsmallintbigintdatesmalldatetimedatetimedatetime2datetimeoffset) 指定來源資料行的名稱,供平行複製的範圍分割使用。 如果未指定,則會自動偵測數據表的索引或主鍵,並當做數據分割數據行使用。
當分割選項是 DynamicRange 時套用。 如果您使用查詢來取出來源資料,請在 WHERE 子句中加上 ?AdfDynamicRangePartitionCondition 。 如需範例,請參閱從 SQL 資料庫平行複製一節。
No
partitionUpperBound 分割區範圍分割的分割區資料行最大值。 這個值用於決定分割區的跨距,而不是用於篩選資料表中的資料列。 資料表或查詢結果中的所有資料列都會進行分割和複製。 如果未指定,複製活動會自動偵測該值。
當分割選項是 DynamicRange 時套用。 如需範例,請參閱從 SQL 資料庫平行複製一節。
No
partitionLowerBound 分割區範圍分割的分割區資料行最小值。 這個值用於決定分割區的跨距,而不是用於篩選資料表中的資料列。 資料表或查詢結果中的所有資料列都會進行分割和複製。 如果未指定,複製活動會自動偵測該值。
當分割選項是 DynamicRange 時套用。 如需範例,請參閱從 SQL 資料庫平行複製一節。
No

請注意下列幾點

  • 如果AzureSqlSource 指定 sqlReaderQuery,複製活動會針對 Azure SQL 資料庫 來源執行此查詢,以取得數據。 如果預存程序接受參數,您也可以藉由指定 sqlReaderStoredProcedureNamestoredProcedureParameters 來指定預存程序。
  • 在來源中使用預存程序來擷取資料時,請注意,如果您的預存程序設計為在傳入不同的參數值時傳回不同的結構描述,在從 UI 匯入結構描述,或使用自動資料表建立將資料複製到 SQL 資料庫時,您可能遇到失敗,或看到非預期的結果。

SQL 查詢範例

"activities":[
    {
        "name": "CopyFromAzureSQLDatabase",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Azure SQL Database input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "AzureSqlSource",
                "sqlReaderQuery": "SELECT * FROM MyTable"
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

預存程序範例

"activities":[
    {
        "name": "CopyFromAzureSQLDatabase",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Azure SQL Database input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "AzureSqlSource",
                "sqlReaderStoredProcedureName": "CopyTestSrcStoredProcedureWithParameters",
                "storedProcedureParameters": {
                    "stringData": { "value": "str3" },
                    "identifier": { "value": "$$Text.Format('{0:yyyy}', <datetime parameter>)", "type": "Int"}
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

預存程序定義

CREATE PROCEDURE CopyTestSrcStoredProcedureWithParameters
(
    @stringData varchar(20),
    @identifier int
)
AS
SET NOCOUNT ON;
BEGIN
     select *
     from dbo.UnitTestSrcTable
     where dbo.UnitTestSrcTable.stringData != stringData
    and dbo.UnitTestSrcTable.identifier != identifier
END
GO

Azure SQL 資料庫 作為接收

提示

深入瞭解最佳做法中支援的寫入行為、組態和最佳做法,以將數據載入 Azure SQL 資料庫

若要將資料複製到 Azure SQL 資料庫,複製活動接收區段中支援下列屬性:

屬性 描述 必要
type 複製活動接收的類型屬性必須設定為 AzureSqlSink。 回溯兼容性仍支援 「SqlSink」 類型。 Yes
preCopyScript 指定要執行複製活動的 SQL 查詢,再將數據寫入 Azure SQL 資料庫。 每一複製回合只會叫用此查詢一次。 使用此屬性來清除預先載入的資料。 No
tableOption 指定是否要根據來源結構描述,自動建立接收資料表 (如果不存在)。
當接收指定預存程序時,不支援自動建立資料表。
允許的值包為:none (預設) 或 autoCreate
No
sqlWriterStoredProcedureName 定義如何將來源資料套用到目標資料表的預存程序名稱。
此預存程序將會依批次叫用。 針對只執行一次且與來源資料無關的作業 (例如刪除或截斷),請使用 preCopyScript 屬性。
請參閱叫用 SQL 接收器中的預存程序的範例。
No
storedProcedureTableTypeParameterName 預存程序中指定資料表類型的參數名稱。 No
sqlWriterTableType 在預存程序中使用的資料表類型名稱。 複製活動可讓正在移動的資料可用於此資料表類型的暫存資料表。 然後,預存程序程式碼可以合併正在複製的資料與現有的資料。 No
storedProcedureParameters 預存程序的參數。
允許的值為:名稱和值組。 參數的名稱和大小寫必須符合預存程序參數的名稱和大小寫。
No
writeBatchSize 對於每個批次要插入 SQL 資料表中的資料列數。
允許的值是 整數 (數據列數目)。 根據預設,服務會依據資料列大小動態決定適當的批次大小。
No
writeBatchTimeout 插入、upsert 和預存程式作業在逾時之前完成的等候時間。
允許的值為時間範圍。 例如,“00:30:00” 為 30 分鐘。 如果未指定任何值,則逾時預設為 “00:30:00”。
No
disableMetricsCollection 此服務會收集 Azure SQL 資料庫 DTU 等計量,以取得複製效能優化和建議,這引進了額外的主要數據庫存取權。 如果您擔心此行為,請指定 true 將其關閉。 否 (預設值為 false)
 maxConcurrentConnections 在活動執行期間建立至資料存放區的同時連線上限。 僅在想要限制並行連線時,才需要指定值。  否
WriteBehavior 指定複製活動的寫入行為,以將數據載入 Azure SQL 資料庫。
允許的值為 InsertUpsert。 根據預設,服務會使用 Insert 載入資料。
No
upsertSettings 指定寫入行為的設定群組。
當 WriteBehavior 選項為 Upsert 時套用。
No
upsertSettings 下方:
useTempDB 指定是否要使用全域臨時表或實體數據表作為 upsert 的臨時表。
根據預設,服務會使用全域暫存資料表作為過度資料表。 值為 true
No
interimSchemaName 如果使用實體資料表,請指定建立過渡資料表的過渡結構描述。 注意:使用者必須具有建立和刪除資料表的權限。 根據預設,過渡資料表會與接收資料表共用相同的結構描述。
當 useTempDB 選項為 False 套用。
No
金鑰 指定唯一資料列識別的資料行名稱。 您可以使用單一索引鍵或一系列索引鍵。 如果未指定,則會使用主索引鍵。 No

範例 1:附加資料

"activities":[
    {
        "name": "CopyToAzureSQLDatabase",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Azure SQL Database output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "AzureSqlSink",
                "tableOption": "autoCreate",
                "writeBatchSize": 100000
            }
        }
    }
]

範例 2:在複製期間叫用預存程序

從從 SQL 接收叫用預存程式深入瞭解。

"activities":[
    {
        "name": "CopyToAzureSQLDatabase",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Azure SQL Database output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "AzureSqlSink",
                "sqlWriterStoredProcedureName": "CopyTestStoredProcedureWithParameters",
                "storedProcedureTableTypeParameterName": "MyTable",
                "sqlWriterTableType": "MyTableType",
                "storedProcedureParameters": {
                    "identifier": { "value": "1", "type": "Int" },
                    "stringData": { "value": "str1" }
                }
            }
        }
    }
]

範例 3:Upsert 資料

"activities":[
    {
        "name": "CopyToAzureSQLDatabase",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Azure SQL Database output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "AzureSqlSink",
                "tableOption": "autoCreate",
                "writeBehavior": "upsert",
                "upsertSettings": {
                    "useTempDB": true,
                    "keys": [
                        "<column name>"
                    ]
                },
            }
        }
    }
]

從 SQL 資料庫平行複製

複製活動中 Azure SQL 資料庫 連接器提供內建的數據分割,以平行複製數據。 您可以在複製活動的 [來源] 索引標籤上找到資料分割選項。

數據分割選項的螢幕快照

當您啟用分割複製時,複製活動會針對 Azure SQL 資料庫 來源執行平行查詢,以依分割區載入數據。 平行程度由複製活動的 parallelCopies 設定所控制。 例如,如果您設定parallelCopies為 4,服務會根據指定的分割區選項和設定,同時產生並執行四個查詢,而每個查詢都會從 Azure SQL 資料庫 擷取部分數據。

建議您啟用與數據分割的平行複製,特別是當您從 Azure SQL 資料庫 載入大量數據時。 以下針對各種情節的建議設定。 將資料複製到以檔案為基礎的資料存放區時,建議分成多個檔案來寫入資料夾 (僅指定資料夾名稱),這樣效能會比寫入單一檔案更好。

案例 建議的設定
使用實體分割區從大型資料表完整載入。 分割選項:資料表的實體分割區。

在執行期間,服務會自動偵測實體分割區,並依分割區複製資料。

若要檢查您的資料表是否有實體分割區,您可以參考此查詢
從大型資料表完整載入,不含實體分割區,同時在資料分割時包含整數或日期時間資料行。 分割選項:動態範圍分割。
分割資料行 (選用):指定用來分割資料的資料行。 如果未指定,則會使用索引或主索引鍵資料行。
分割區上限分割區下限 (選用):指定是否要決定分割區跨距。 這不適用於篩選資料表中的資料列,資料表中的所有資料列都會分割並複製。 如果未指定,複製活動會自動偵測值。

例如,如果您的分割區資料行「識別碼」具有範圍 1 到 100 之間的值,而您將下限設定為 20、上限設定為 80,且平行複製為 4,則服務會分別依 4 個分割區擷取資料 - 範圍中的識別碼分別為 <=20、[21, 50]、[51, 80] 和 >=81。
使用自訂查詢載入大量資料,不使用實體分割區,同時包含整數或日期/日期時間資料行用於資料分割。 分割選項:動態範圍分割。
查詢SELECT * FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause>
分割資料行:指定用來分割資料的資料行。
分割區上限分割區下限 (選用):指定是否要決定分割區跨距。 這不適用於篩選資料表中的資料列,查詢結果中的所有資料列都會分割並複製。 如果未指定,複製活動會自動偵測該值。

在執行期間,服務會?AdfRangePartitionColumnName以每個分割區的實際數據行名稱和值範圍取代,並傳送至 Azure SQL 資料庫。
例如,如果您的分割區資料行「識別碼」具有範圍 1 到 100 之間的值,而您將下限設定為 20、上限設定為 80,且平行複製為 4,則服務會分別依 4 個分割區擷取資料 - 範圍中的識別碼分別為 <=20、[21, 50]、[51, 80] 和 >=81。

以下是不同案例的更多範例查詢:
1.查詢整個資料表:
SELECT * FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition
2.來自具有資料行選取範圍和其他 where 子句篩選的資料表查詢:
SELECT <column_list> FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause>
3.使用子查詢進行查詢:
SELECT <column_list> FROM (<your_sub_query>) AS T WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause>
4.在子查詢中使用分割區進行查詢:
SELECT <column_list> FROM (SELECT <your_sub_query_column_list> FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition) AS T

使用分割區選項載入資料的最佳做法:

  1. 選擇獨特的資料行作為分割資料行 (例如主索引鍵或唯一索引鍵) 以避免資料扭曲。
  2. 如果資料表有內建分割區,請使用分割選項「資料表的實體分割區」,以獲得更佳的效能。
  3. 如果您使用 Azure Integration Runtime 來複製資料,您可以設定較大的「資料整合單位 (DIU)」(>4) 來利用更多運算資源。 檢查該處適用的案例。
  4. 複製平行處理原則的程度」會控制分割區數目,將此數目設定過大有時會損害效能,建議將此數目設定為 (DIU 或自我裝載 IR 節點數目) * (2 到 4)。

範例:使用實體分割區從大型資料表完整載入

"source": {
    "type": "AzureSqlSource",
    "partitionOption": "PhysicalPartitionsOfTable"
}

範例:使用動態範圍分割進行查詢

"source": {
    "type": "AzureSqlSource",
    "query": "SELECT * FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause>",
    "partitionOption": "DynamicRange",
    "partitionSettings": {
        "partitionColumnName": "<partition_column_name>",
        "partitionUpperBound": "<upper_value_of_partition_column (optional) to decide the partition stride, not as data filter>",
        "partitionLowerBound": "<lower_value_of_partition_column (optional) to decide the partition stride, not as data filter>"
    }
}

用來檢查實體分割區的範例查詢

SELECT DISTINCT s.name AS SchemaName, t.name AS TableName, pf.name AS PartitionFunctionName, c.name AS ColumnName, iif(pf.name is null, 'no', 'yes') AS HasPartition
FROM sys.tables AS t
LEFT JOIN sys.objects AS o ON t.object_id = o.object_id
LEFT JOIN sys.schemas AS s ON o.schema_id = s.schema_id
LEFT JOIN sys.indexes AS i ON t.object_id = i.object_id 
LEFT JOIN sys.index_columns AS ic ON ic.partition_ordinal > 0 AND ic.index_id = i.index_id AND ic.object_id = t.object_id 
LEFT JOIN sys.columns AS c ON c.object_id = ic.object_id AND c.column_id = ic.column_id 
LEFT JOIN sys.partition_schemes ps ON i.data_space_id = ps.data_space_id 
LEFT JOIN sys.partition_functions pf ON pf.function_id = ps.function_id 
WHERE s.name='[your schema]' AND t.name = '[your table name]'

如果資料表具有實體分割區,您會看到 “HasPartition” 顯示為 “yes”,如下所示。

Sql 查詢結果

將數據載入 Azure SQL 資料庫 的最佳做法

將資料複製到 Azure SQL 資料庫 時,可能需要不同的寫入行為:

  • 附加:我的來源資料只有新的記錄。
  • Upsert:我的來源資料同時有插入和更新。
  • 覆寫:我每次都想要重載整個維度數據表。
  • 使用自訂邏輯寫入:在最終插入目的地資料表之前,我還需要額外的處理。

請參閱有關如何在服務和最佳做法中設定的個別章節。

附加資料

附加數據是這個 Azure SQL 資料庫 接收連接器的預設行為。 服務會執行大量插入,以有效率地寫入數據表。 您可以在複製活動中據以設定來源和接收器。

Upsert 資料

複製活動現在支援將資料原生載入資料庫暫存資料表,然後在索引鍵存在時更新接收資料表中的資料,若無則插入新的資料。 若要深入瞭解複製活動中的 upsert 設定,請參閱 Azure SQL 資料庫 作為接收

覆寫整個資料表

您可以在複製活動接收中設定 preCopyScript 屬性。 在此情況下,針對執行的每個複製活動,服務會先執行指令碼。 然後服務會執行複本以插入資料。 例如,若要以最新的資料覆寫整個資料表,可以指定先刪除所有記錄,再從來源大量載入新資料的指令碼。

使用自訂邏輯寫入資料

使用自訂邏輯寫入資料的步驟類似於更新插入資料一節中所述的步驟。 當您需要在最後插入源數據至目的地數據表之前套用額外的處理時,您可以載入至臨時表,然後叫用預存程式活動,或在複製活動接收中叫用預存程式以套用數據,或使用對應數據流。

從 SQL 接收器叫用預存程序

當您將資料複製到 Azure SQL 資料庫 時,您也可以在源數據表的每個批次上設定及叫用具有額外參數的使用者指定預存程式。 預存程序功能使用資料表值參數

當內建的複製機制無法滿足需求時,您可以使用預存程序。 例如,當您想要在最終插入來源資料至目的地資料表之前,套用額外的處理。 額外處理的一些範例包括:合併資料行、查閱其他的值,以及插入多個資料表中。

下列範例示範如何使用預存程式,在 Azure SQL 資料庫 中的數據表中執行 upsert。 假設輸入資料和接收器 Marketing 資料表各有三個資料行:ProfileIDStateCategory。 根據 ProfileID 資料行執行更新插入,然後僅套用至名為 "ProductA" 的特定類別。

  1. 在資料庫中,使用與 sqlWriterTableType 相同的名稱來定義資料表類型。 資料表類型的結構描述會與輸入資料所傳回的結構描述相同。

    CREATE TYPE [dbo].[MarketingType] AS TABLE(
        [ProfileID] [varchar](256) NOT NULL,
        [State] [varchar](256) NOT NULL,
        [Category] [varchar](256) NOT NULL
    )
    
  2. 在資料庫中,使用與 sqlWriterStoredProcedureName 相同的名稱來定義預存程序。 它會處理來自指定來源的輸入資料,並合併至輸出資料表。 預存程序中資料表類型的參數名稱會與資料集中定義的 tableName 相同。

    CREATE PROCEDURE spOverwriteMarketing @Marketing [dbo].[MarketingType] READONLY, @category varchar(256)
    AS
    BEGIN
    MERGE [dbo].[Marketing] AS target
    USING @Marketing AS source
    ON (target.ProfileID = source.ProfileID and target.Category = @category)
    WHEN MATCHED THEN
        UPDATE SET State = source.State
    WHEN NOT MATCHED THEN
        INSERT (ProfileID, State, Category)
        VALUES (source.ProfileID, source.State, source.Category);
    END
    
  3. 在您的 Azure Data Factory 或 Synapse 管線中,定義 複製活動中的 SQL 接收 區段,如下所示:

    "sink": {
        "type": "AzureSqlSink",
        "sqlWriterStoredProcedureName": "spOverwriteMarketing",
        "storedProcedureTableTypeParameterName": "Marketing",
        "sqlWriterTableType": "MarketingType",
        "storedProcedureParameters": {
            "category": {
                "value": "ProductA"
            }
        }
    }
    

使用預存程式將數據寫入 Azure SQL 資料庫 時,接收會將源數據分割成小型批次,然後執行插入,因此預存程式中的額外查詢可以多次執行。 如果您在將數據寫入 Azure SQL 資料庫 之前,先執行複製活動的查詢,則不建議將它新增至預存程式,請在 [預先複製腳本] 方塊中新增它。

對應資料流程屬性

在對應數據流中轉換數據時,您可以從 Azure SQL 資料庫 讀取和寫入數據表。 如需詳細資訊,請參閱對應資料流程中的來源轉換接收轉換

來源轉換

azure SQL 資料庫 的特定 設定 可在來源轉換的 [來源選項] 索引卷標中取得。

輸入: 選取您是否將來源指向數據表(對等的 Select * from <table-name>),或輸入自定義 SQL 查詢。

查詢:如果您在輸入字段中選取 [查詢],請輸入來源的 SQL 查詢。 此設定會覆寫您在資料集中選擇的任何數據表。 此處不支援 Order By 子句,但您可以設定完整的 SELECT FROM 語句。 您也可使用使用者定義的資料表函數。 select * from udfGetData() 是 SQL 中傳回數據表的 UDF。 此查詢會產生可在數據流中使用的源數據表。 使用查詢也是減少測試或查閱數據列的絕佳方式。

提示

對應資料流程 [查詢] 模式不支援 SQL 的通用資料表運算式 (CTE),因為使用此模式的必要條件是查詢可用於 SQL 查詢 FROM 子句,但 CTE 不適用。 若要使用 CTE,您必須使用下列查詢來建立預存程序:

CREATE PROC CTESP @query nvarchar(max)
AS
BEGIN
EXECUTE sp_executesql @query;
END

然後在對應資料流程的來源轉換中使用 [預存程序] 模式,並依照範例 with CTE as (select 'test' as a) select * from CTE 設定 @query。 接著您便可以正常使用 CTE。

預存程式:如果您想要從源資料庫執行的預存程式產生投影和源數據,請選擇此選項。 您可以輸入架構、程式名稱和參數,或按兩下 [重新整理] 來要求服務探索架構和程式名稱。 然後,您可以按兩下 [匯入] 以使用 表單 @paraName匯入所有程序參數。

預存程序

  • SQL 範例: Select * from MyTable where customerId > 1000 and customerId < 2000
  • 參數化 SQL 範例: "select * from {$tablename} where orderyear > {$year}"

批次大小:輸入批次大小,將大型數據區塊化為讀取。

隔離等級:對應數據流中 SQL 來源的預設值為未認可。 您可以將這裡的隔離等級變更為下列其中一個值:

  • 讀取認可
  • 讀取未認可
  • 可重複讀取
  • 可序列化
  • 沒有忽略隔離等級

隔離等級

啟用累加擷取:使用此選項可告訴ADF只處理自上次執行管線後變更的數據列。若要使用架構漂移啟用累加擷取,請選擇以累加/浮浮浮浮水印數據行為基礎的數據表,而不是針對原生異動數據擷取啟用的數據表。

累加數據行:使用累加擷取功能時,您必須選擇您想要在源數據表中做為浮浮水印的日期/時間或數值數據行。

啟用原生異動數據擷取(預覽):使用此選項告訴 ADF 只會處理自上次執行管線以來由 SQL 異動數據擷取技術取的差異數據。 使用此選項時,會自動載入差異資料 (包括資料列插入、更新和刪除),而不需要任何累加資料行。 您必須在 ADF 中使用此選項之前,先在 Azure SQL DB 上啟用異動數據擷取 。 如需 ADF 中此選項的詳細資訊,請參閱原生異動資料擷取

從頭開始讀取:使用累加擷取設定此選項會指示ADF在第一次執行管線時讀取所有數據列,並開啟累加擷取。

接收轉換

在接收轉換的 [設定] 索引標籤中,可以使用 Azure SQL 資料庫 特定的 設定

Update 方法:決定您的資料庫目的地所允許的作業。 預設僅允許插入。 若要更新、upsert 或刪除資料列,必須使用 alter-row 轉換來標記這些動作的資料列。 對於更新、更新插入和刪除,必須設定索引鍵資料行,以決定要改變哪一個資料列。

索引鍵資料行

您在這裡挑選做為索引鍵的數據行名稱,將由服務用來作為後續更新、更新、刪除的一部分。 因此,您必須挑選存在於接收器對應中的資料行。 如果您想要不要將值寫入此索引鍵數據行,請按兩下 [略過寫入索引鍵數據行]。

您可以將這裡用來更新目標 Azure SQL 資料庫 資料表的索引鍵資料行參數化。 如果您有多個複合索引鍵的數據行,請按兩下 [自定義表達式],您就可以使用數據流 表達式語言來新增動態內容,其中包含具有複合索引鍵數據行名稱的字串陣列。

數據表動作: 決定在寫入之前,是否要重新建立或移除目的地數據表中的所有數據列。

  • 無:資料表不會執行任何動作。
  • 重新建立:資料表會遭到捨棄並重新建立。 如果要動態建立新的資料表,則為必要。
  • 截斷:系統將會移除目標資料表中的所有資料列。

批次大小:控制每個貯體中寫入的數據列數目。 較大的批次大小會改善壓縮和記憶體優化,但會導致在快取資料時發生記憶體例外狀況的風險。

使用 TempDB: 根據預設,服務會使用全域臨時表將數據儲存為載入程式的一部分。 或者,您可以取消核取 [使用 TempDB] 選項,並改為要求服務將暫存數據表儲存在位於用於此接收之資料庫中的用戶資料庫中。

使用暫存資料庫

前置和後置 SQL 腳本:輸入將在接收資料庫寫入接收資料庫之前(前置處理)和之後執行的多行 SQL 腳本。

顯示接收設定的螢幕快照,其中包含 SQL 前置和後置處理腳本。

提示

  1. 建議將含有多個命令的單一批次指令碼分成多個批次。
  2. 只有傳回簡單更新計數的資料定義語言 (DDL) 和資料操作語言 (DML) 陳述式可以當作批次的一部份來執行。 若要深入了解,請參閱執行批次作業

錯誤數據列處理

寫入 Azure SQL DB 時,某些數據列可能會因為目的地所設定的條件約束而失敗。 常見錯誤包括:

  • 字串或二進位數據將會在數據表中截斷
  • 無法將 NULL 值插入資料行
  • INSERT 語句與 CHECK 條件約束衝突

根據預設,數據流執行會在它取得的第一個錯誤時失敗。 您可以選擇在 錯誤 時繼續,讓數據流完成,即使個別數據列有錯誤也一樣。 服務會提供不同的選項,讓您處理這些錯誤數據列。

交易認可: 選擇您的數據是以單一交易或批次寫入。 單一交易會提供更差的效能,但在交易完成之前,其他人不會看到寫入的數據。

輸出拒絕的數據:如果已啟用,您可以將錯誤數據列輸出至 Azure Blob 儲存體 中的 csv 檔案,或您選擇的 Azure Data Lake 儲存體 Gen2 帳戶。 這會寫入錯誤數據列,其中包含三個額外的數據行:INSERT 或 UPDATE 等 SQL 作業、數據流錯誤碼,以及數據列上的錯誤訊息。

回報錯誤成功: 如果啟用,即使找到錯誤數據列,數據流也會標示為成功。

錯誤數據列處理

Azure SQL 資料庫 的數據類型對應

從 Azure SQL 資料庫 複製資料或將數據複製到 Azure SQL 資料庫 時,會使用下列從 Azure SQL 資料庫 數據類型對應到 Azure Data Factory 過渡期數據類型。 Synapse 管線功能會使用相同的對應,此功能會直接實作 Azure Data Factory。 若要瞭解複製活動如何將來源架構和數據類型對應至接收,請參閱 架構和數據類型對應

Azure SQL 資料庫 資料類型 Data Factory 過渡期數據類型
BIGINT Int64
BINARY Byte[]
bit Boolean
char String, Char[]
date Datetime
Datetime Datetime
datetime2 Datetime
Datetimeoffset DateTimeOffset
Decimal Decimal
FILESTREAM attribute (varbinary(max)) Byte[]
Float Double
image Byte[]
int Int32
money Decimal
NCHAR String, Char[]
ntext String, Char[]
NUMERIC Decimal
NVARCHAR String, Char[]
real Single
rowversion Byte[]
smalldatetime Datetime
SMALLINT Int16
SMALLMONEY Decimal
sql_variant Object
text String, Char[]
time TimeSpan
timestamp Byte[]
TINYINT Byte
UNIQUEIDENTIFIER Guid
varbinary Byte[]
varchar String, Char[]
xml String

注意

針對對應至 Decimal 過渡期類型的資料類型,複製活動目前支援最多 28 個有效位數。 如果您有精確度大於 28 的數據,請考慮在 SQL 查詢中轉換成字串。

查閱活動屬性

若要了解屬性的詳細資料,請參閱查閱活動

GetMetadata 活動屬性

若要了解關於屬性的詳細資料,請參閱 GetMetadata 活動

使用 Always Encrypted

當您使用 Always Encrypted 從 Azure SQL 資料庫 複製數據時,請遵循下列步驟:

  1. 資料行主要金鑰 (CMK) 儲存在 Azure Key Vault 中。 深入了解如何使用 Azure Key Vault 設定 Always Encrypted

  2. 請務必存取儲存數據行主要密鑰 (CMK)金鑰保存庫。 針對必要權限,請參閱這篇文章

  3. 建立連結服務以連線到您的 SQL 資料庫,並使用受控身分識別或服務主體啟用 'Always Encrypted' 函式。

注意

Azure SQL 資料庫 Always Encrypted 支援下列案例:

  1. 來源或接收器資料存放區都使用受控識別或服務主體作為金鑰提供者驗證類型。
  2. 來源和接收資料存放區都會使用受控身分識別作為金鑰提供者驗證類型。
  3. 來源和接收資料存放區都使用與金鑰提供者驗證類型相同的服務主體。

注意

目前,只有對應數據流中的來源轉換才支援 Azure SQL 資料庫 Always Encrypted

原生異動資料擷取

Azure Data Factory 可支援適用於 SQL Server、Azure SQL DB 和 Azure SQL MI 的原生異動資料擷取功能。 ADF 對應資料流程可以自動偵測及擷取異動的資料,包括在 SQL 存放區中的資料列插入、更新和刪除。 若使用者沒有任何對應資料流程的程式碼經驗,可以將資料庫當作目的地存放區來附加,輕鬆地達成從 SQL 存放區複寫資料的情境。 此外,使用者也可以在兩者之間撰寫任何的資料轉換邏輯,以從 SQL 存放區達到累加式 ETL 的情境。

請確定管線和活動名稱保持不變,如此 ADF 便可以為您記錄檢查點,以便自動取得上次執行的變更資料。 如果您變更管線名稱或活動名稱,檢查點便會重設,這會導致您在下次執行時得從頭開始,或是取得從現在開始的變更。 如果您想要變更管線名稱或活動名稱,但仍想讓檢查點自動從上次的執行中取得變更的資料時,請在資料流程活動中使用您自己的檢查點索引鍵來達到此目的。

偵錯管線時,此功能的運作方式相同。 請注意,當您在偵錯執行期間重新整理瀏覽器時,將會重設檢查點。 在您對偵錯執行的管線結果感到滿意之後,您可以繼續發佈並觸發管線。 當您第一次觸發已發佈的管線時,會自動從頭重新開始,或是取得從現在開始的變更。

在監視區段中,您隨時有機會重新執行管線。 重新執行時,一律會從所選管線執行的前一個檢查點擷取變更的資料。

範例 1:

當您直接將「參照到啟用 SQL CDC 資料集的來源轉換」與「參照到對應資料流程中資料庫的接收轉換」鏈結起來時,SQL 來源上的變更會自動套用至目標資料庫,如此一來,您就可以輕鬆地達成在資料庫之間複寫資料的情境。 您可以在接收轉換中使用更新方法,以選取要在目標資料庫上允許插入、允許更新,還是要允許刪除。 對應資料流程中的範例指令碼如下所示。

source(output(
		id as integer,
		name as string
	),
	allowSchemaDrift: true,
	validateSchema: false,
	enableNativeCdc: true,
	netChanges: true,
	skipInitialLoad: false,
	isolationLevel: 'READ_UNCOMMITTED',
	format: 'table') ~> source1
source1 sink(allowSchemaDrift: true,
	validateSchema: false,
	deletable:true,
	insertable:true,
	updateable:true,
	upsertable:true,
	keys:['id'],
	format: 'table',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true,
	errorHandlingOption: 'stopOnFirstError') ~> sink1

範例 2:

如果您想要啟用 ETL 情境,而不是透過 SQL CDC 在資料庫之間進行資料複寫,可以在對應資料流程中使用運算式,包括 isInsert(1)、isUpdate(1),以及 isDelete(1) 來區分不同作業類型的資料列。 以下是用來對應資料流程的其中一個範例指令碼,若衍生出來的資料行值為 1 表示插入的資料列,2 表示更新的資料列,3 表示下游轉換已刪除的資料列,以處理差異資料。

source(output(
		id as integer,
		name as string
	),
	allowSchemaDrift: true,
	validateSchema: false,
	enableNativeCdc: true,
	netChanges: true,
	skipInitialLoad: false,
	isolationLevel: 'READ_UNCOMMITTED',
	format: 'table') ~> source1
source1 derive(operationType = iif(isInsert(1), 1, iif(isUpdate(1), 2, 3))) ~> derivedColumn1
derivedColumn1 sink(allowSchemaDrift: true,
	validateSchema: false,
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> sink1

已知的限制:

如需複製活動支援做為來源和接收的數據存放區清單,請參閱 支援的數據存放區和格式