分享方式:


使用 Azure Functions 更新或合併 Azure SQL Database 中的記錄

目前,Azure 串流分析 (ASA) 僅支援 SQL 輸出 (Azure SQL DatabasesAzure Synapse Analytics) 的插入 (附加) 資料列。 本文將討論使用 Azure Functions 作為媒介層,以啟用 SQL 資料庫上的 UPDATE、UPSERT 或 MERGE 的因應措施。

Azure Functions 的替代選項會在文末敘述。

需求

您通常可透過下列方式,在資料表中寫入資料:

[模式] 對等的 T-SQL 陳述式 需求
附加 INSERT
Replace MERGE (UPSERT) 唯一索引鍵
累積 MERGE (UPSERT) 與複合指派運算子 (+=-=...) 唯一索引鍵和累加器

為說明差異,請查看內嵌下列兩筆記錄時所發生的情況:

Arrival_Time Device_Id Measure_Value
10:00 A 1
10:05 A 20

附加模式中,我們會插入兩筆記錄。 對等的 T-SQL 陳述式為:

INSERT INTO [target] VALUES (...);

產生的結果:

Modified_Time Device_Id Measure_Value
10:00 A 1
10:05 A 20

取代模式中,我們只會依索引鍵取得最後一個值。 此處我們將會使用 Device_Id 作為索引鍵。對等的 T-SQL 陳述式為:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

產生的結果:

Modified_Time Device_Key Measure_Value
10:05 A 20

最後,在累加模式中,我們會使用複合指派運算子 (+=) 來加總 Value。 此處我們也會使用 Device_Id 作為索引鍵:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

產生的結果:

Modified_Time Device_Key Measure_Value
10:05 A 21

針對效能考量,ASA SQL 資料庫輸出配接器目前僅支援原生附加模式。 這些介面卡會使用大量插入,將輸送量最大化並限制回堵情形。

本文說明如何使用 Azure Functions,來實作 ASA 的取代和累加模式。 當您使用函式作為媒介層,潛在的寫入效能不會影響串流作業。 因此 Azure Functions 最適合與 Azure SQL 搭配使用。 使用 Synapse SQL,從大量切換至逐列陳述句可能會產生更大的效能問題。

Azure Functions 輸出

在作業中,我們會將 ASA SQL 輸出取代為 ASA Azure Functions 輸出。 UPDATE、UPSERT 或 MERGE 功能將會在函式中實作。

目前有兩個選項可存取函式中的 SQL Database。 第一個是 Azure SQL 輸出繫結。 其目前僅限於 C#,且僅提供取代模式。 第二個是撰寫 SQL 查詢,並透過適當的 SQL 驅動程式 (Microsoft.Data.SqlClient for .NET) 提交。

針對下列兩個範例,我們將假設下列資料表結構描述。 繫結選項需要在目標資料表上設定主索引鍵。 當您使用 SQL 驅動程式時,此為非必要但建議的選項。

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

函式必須符合下列預期,才能作為 ASA 的輸出

  • Azure 串流分析會針對已成功處理的批次,預期來自 Functions 應用程式是 HTTP 狀態 200
  • 當 Azure 串流分析從 Azure 函式收到 413 (http 要求實體太大) 例外狀況時,會縮減傳送至 Azure Functions 的批次大小
  • 在測試連線期間,串流分析會將具有空批次的 POST 要求傳送至 Azure Functions,並預期傳回的 HTTP 狀態為 20x 以驗證測試

選項 1:依具有 Azure 函式 SQL 繫結的索引鍵更新

此選項會使用 Azure 函式 SQL 輸出繫結。 此延伸模組可以取代資料表中的物件,而不需要寫入 SQL 陳述式。 目前不支援複合指派運算子 (累加)。

此範例建置於:

若要深入了解繫結方法,建議您遵循本教學課程

首先,遵循本教學課程,建立預設的 HttpTrigger 函式應用程式。 使用下列資訊:

  • 語言:C#
  • 執行階段:.NET 6 (在函式/執行階段 v4 下)
  • 範本:HTTP trigger

在位於專案資料夾的終端中執行下列命令,以安裝繫結延伸模組:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

local.settings.jsonValues 區段中新增 SqlConnectionString 項目,填入目的地伺服器的連接字串:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

將整個函式 (專案中的 .cs 檔案) 取代為下列程式碼片段。 自行更新命名空間、類別名稱和函式名稱:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

更新繫結區段中的目的地資料表名稱:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

更新 Device 類別和對應區段,以符合自身的結構描述:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

您現在可以透過偵錯作業 (Visual Studio Code 的 F5),測試本機函式和資料庫之間的連接。 您的機器必須可連接到 SQL 資料庫。 SSMS可用來檢查連線能力。 然後,將 POST 要求傳送至本機端點。 具有空白本文的要求應該會傳回 HTTP 204。 具有實際承載的要求應該保存在目的地資料表中 (在取代/更新模式中)。 下列是對應至此範例所用結構描述的範例乘載:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

函式現在可以發佈至 Azure。 應為 SqlConnectionString 設定應用程式設定。 Azure SQL Server 防火牆應該允許 Azure 服務,讓即時函式可與其連線。

函式接著可以定義為 ASA 作業中的輸出,並用來取代記錄,而不是插入記錄。

選項 2:透過自訂 SQL 查詢,與複合指派 (累加) 進行合併

注意

重新開機和復原時,ASA 可能會重新傳送已發出的輸出事件。 此預期行為可能會導致累加邏輯失敗 (將個別值加倍)。 若要避免此情況,建議您透過原生 ASA SQL Output,在資料表中輸出相同的資料。 然後,您可以使用此控制資料表來偵測問題,並在必要時重新同步處理累加作業。

此選項會使用 Microsoft.Data.SqlClient。 此程式庫可讓我們發出 SQL Database 的任何 SQL 查詢。

此範例建置於:

首先,遵循本教學課程,建立預設的 HttpTrigger 函式應用程式。 使用下列資訊:

  • 語言:C#
  • 執行階段:.NET 6 (在函式/執行階段 v4 下)
  • 範本:HTTP trigger

在位於專案資料夾的終端中執行下列命令,以安裝 SqlClient 程式庫:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

local.settings.jsonValues 區段中新增 SqlConnectionString 項目,填入目的地伺服器的連接字串:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

將整個函式 (專案中的 .cs 檔案) 取代為下列程式碼片段。 自行更新命名空間、類別名稱和函式名稱:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

更新 sqltext 命令組建區塊,以符合自身的結構描述 (請留意系統如何透過更新中的 += 運算子完成累加作業):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

您現在可以透過偵錯作業 (VS Code 的 F5),測試本機函式和資料庫之間的連接。 您的機器必須可連接到 SQL 資料庫。 SSMS可用來檢查連線能力。 然後,將 POST 要求發行至本機端點。 具有空白本文的要求應該會傳回 HTTP 204。 具有實際承載的要求應該保存在目的地資料表中 (在累加/合併模式中)。 下列是對應至此範例所用結構描述的範例乘載:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

函式現在可以發佈至 Azure。 應為 SqlConnectionString 設定應用程式設定。 Azure SQL Server 防火牆應該允許 Azure 服務,讓即時函式可與其連線。

函式接著可以定義為 ASA 作業中的輸出,並用來取代記錄,而不是插入記錄。

替代項目

除了 Azure Functions 之外,仍有多種方式可以達成預期的結果。 本節提供其中一些。

在目標 SQL Database 中進行後置處理

當資料透過標準 ASA SQL 輸出插入資料庫時,系統便會開始執行背景工作。

針對 Azure SQL,INSTEAD OFDML 觸發程序可用來攔截 ASA 發出的 INSERT 命令:

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

針對 Synapse SQL,ASA 可以插入暫存表格。 週期性工作接著可以視需要將資料轉換成中繼資料表。 最後,資料會移至生產資料表。

Azure Cosmos DB 中的前置處理

Azure Cosmos DB 支援原生 UPSERT。 此處僅適用附加/取代模式。 累加作業必須是 Azure Cosmos DB 中的受控用戶端。

如果符合需求,則該選項便可將目標 SQL 資料庫取代為 Azure Cosmos DB 執行個體。 若要執行此作業,則需在整體解決方案結構中進行重要變更。

針對 Synapse SQL,Azure Cosmos DB 可以透過 Azure Cosmos DB 的 Azure Synapse Link 作為媒介層。 Azure Synapse Link 可用來建立分析存放區。 您便可在 Synapse SQL 中直接查詢此資料存放區。

替代方案的比較

每個方法都會提供不同的價值主張和功能:

類型 選項 模式 Azure SQL Database Azure Synapse Analytics
後處理
觸發程序 取代、累加 + N/A,Synapse SQL 中無法使用觸發程序
預備 取代、累加 + +
前處理
Azure Functions 取代、累加 + - (逐列效能)
Azure Cosmos DB 取代 Replace N/A N/A
Azure Cosmos DB Azure Synapse Link Replace N/A +

取得支援

如需進一步的協助,請嘗試 Azure 串流分析的 Microsoft 問與答頁面

下一步