Azure Functionsを使用してAzure SQL Database内のレコードを更新またはマージする

現在、Azure Stream Analytics (ASA) では、SQL 出力 (Azure SQL Databases および Azure Synapse Analytics) への行の挿入 (追加) のみがサポートされています。 この記事では、Azure Functionsを中間レイヤーとして使用して、SQL データベースで UPDATEUPSERT、または MERGE を有効にする回避策について説明します。

Azure Functionsの代替オプションは、最後に表示されます。

要件

次のいずれかのモードを使用して、テーブルにデータを書き込むことができます。

モード 同等の T-SQL ステートメント 必要条件
Append INSERT なし
置き換える MERGE (UPSERT) 一意キー
Accumulate MERGE (UPSERT) を複合代入演算子 (+=-=...) とともに使用する 一意キーとアキュムレータ

違いを示すために、次の 2 つのレコードを取り込むとどうなるかを検討してください。

到着時刻 Device_Id 測定値
10:00 A 1
10:05 A 20

追加モードでは、2 つのレコードを挿入します。 同等の T-SQL ステートメントは次のとおりです。

INSERT INTO [target] VALUES (...);

結果:

Modified_Time デバイスID 測定値
10:00 A 1
10:05 A 20

置換モードでは、キーごとに最後の値のみを取得します。 ここでは、 キーとしてDevice_Idを使用します。 同等の T-SQL ステートメントは次のとおりです。

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

結果:

Modified_Time デバイス_キー 測定値
10:05 A 20

最後に、累積モードでは、複合代入演算子 (Value) で+=を合計します。 キーとしてDevice_Idも使用します。

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

結果:

Modified_Time デバイス_キー 測定値
10:05 A 21

パフォーマンスに関する考慮事項のため、ASA SQL データベースの出力アダプターは、現在、追加モードのみをネイティブでサポートしています。 これらのアダプターでは、一括挿入を使用してスループットを最大化し、負荷を軽減します。

この記事では、Azure Functions を使用して ASA の置換モードと累積モードを実装する方法について説明します。 中間レイヤーとして関数を使用する場合、潜在的な書き込みパフォーマンスはストリーミング ジョブに影響しません。 この点で、Azure Functions の使用は Azure SQL に最適です。 Synapse SQL では、バルクから行単位のステートメントに切り替えると、より大きなパフォーマンス上の問題が発生する可能性があります。

Azure Functions 出力

このジョブでは、ASA SQL 出力を ASA Azure Functions 出力に置き換えます。 この関数は、UPDATE、UPSERT、または MERGE 機能を実装します。

現時点では、2 つのオプションを使用して関数内の SQL Database にアクセスできます。 最初のオプションは、Azure SQL出力バインドです。 現在は C# に制限されており、置換モードのみを提供します。 2 つ目のオプションは、適切な SQL ドライバー (Microsoft.Data.SqlClient for .NET) 経由で送信するための SQL クエリを作成することです。

次のサンプルはどちらも、次のテーブル スキーマを前提としています。 バインド オプションを使用するには、対象テーブルに主キーを設定する必要があります。 SQL ドライバーを使用する場合、必須ではありませんが推奨されます。

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

ASA からの出力として関数を使用するには、この関数が次の要件を満たしている必要があります。

  • Azure Stream Analyticsは、正常に処理されるバッチに対して Functions アプリから HTTP 状態 200 を受け取ります。
  • Azure Stream AnalyticsがAzure関数から 413 ("http Request Entity Too Large") 例外を受け取ると、Azure関数に送信されるバッチのサイズが小さくなります。
  • テスト接続中、Stream Analytics は空のバッチを含む POST 要求をAzure Functionsに送信し、テストを検証するために HTTP 状態 20x が返される必要があります。

オプション 1: Azure 関数 SQL Binding を使用してキーで更新する

このオプションでは、Azure 関数 SQL 出力バインドを使用します。 この拡張機能は、SQL ステートメントを記述しなくても、テーブル内のオブジェクトを置き換えることができます。 現時点では、複合代入演算子 (accumulations) はサポートされていません。

このサンプルは、次のものに基づいて構築されました。

バインディングの方法をより深く理解するには、こちらのチュートリアルに従ってください。

まず、このチュートリアルに従って、既定の HttpTrigger 関数アプリを作成します。 以下の情報を使用します:

  • 言語: C#
  • ランタイム: .NET 6 (関数/ランタイム v4 以降)
  • テンプレート: HTTP trigger

プロジェクト フォルダーにあるターミナルで次のコマンドを実行して、バインド拡張機能をインストールします。

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

SqlConnectionStringValues セクションに local.settings.json 項目を追加し、宛先サーバーの接続文字列を入力します。

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

次のコード スニペットを使って、関数 (プロジェクト内の .cs ファイル) 全体を置き換えます。 名前空間、クラス名、関数名を独自の名前で更新します。

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

バインド セクションで、宛先テーブルの名前を更新します。

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

独自のスキーマに一致するように、Device クラスとマッピング セクションを更新します。

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

これで、デバッグ (Visual Studio Code で F5 キーを押します) によってローカル関数とデータベース間の接続をテストすることができます。 SQL データベースには、コンピューターからアクセスできる必要があります。 SSMS を使用して接続を確認できます。 次に、POST 要求をローカル エンドポイントに送信します。 本文が空の要求は HTTP 204 を返す必要があります。 実際のペイロードを含む要求は、宛先テーブル (置換/更新モード) で保持する必要があります。 このサンプルで使用されるスキーマに対応したサンプルのペイロードを次に示します。

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

これで、関数を Azure に発行できます。 SqlConnectionStringします。 Azure SQL Server のファイアウォールでは、ライブ機能が到達できるように、Azure サービスを許可する必要があります。

その後、ASA ジョブの出力として関数を定義し、それを使用してレコードを挿入する代わりに置き換えることができます。

オプション 2: カスタム SQL クエリを使用して複合割り当て (蓄積) をマージする

注意

再起動と復旧時に、ASA によって既に出力された出力イベントが再送信される場合があります。 この動作により、累積ロジックが失敗する可能性があります (個々の値が 2 倍になります)。 この問題を回避するには、ネイティブ ASA SQL 出力を使用してテーブルに同じデータを出力します。 この制御テーブルを使用すると、問題を検出し、必要に応じて蓄積を再同期できます。

このオプションは、Microsoft.Data.SqlClient を使用します。 このライブラリを使用すると、SQL クエリを SQL Database に送信できます。

このサンプルは、次のものに基づいて構築されました。

まず、このチュートリアルに従って、既定の HttpTrigger 関数アプリを作成します。 次の情報が使用されます。

  • 言語: C#
  • ランタイム: .NET 6 (関数/ランタイム v4の下)
  • テンプレート: HTTP trigger

プロジェクト フォルダーにあるターミナルで次のコマンドを実行して、SqlClient ライブラリをインストールします。

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

SqlConnectionStringValues セクションに local.settings.json 項目を追加し、宛先サーバーの接続文字列を入力します。

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

次のコード スニペットを使って、関数 (プロジェクト内の .cs ファイル) 全体を置き換えます。 名前空間、クラス名、関数名を独自に更新します。

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

独自のスキーマに一致するように、sqltext コマンドの構築セクションを更新します (更新時に += 演算子を使用して累積がどのように達成されるかに注意してください)。

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

デバッグによってローカル関数とデータベース間の接続をテストできるようになりました (VS Code で F5 キーを押します)。 SQL データベースには、コンピューターからアクセスできる必要があります。 SSMS を使用して接続を確認できます。 次に、POST 要求をローカル エンドポイントに送信します。 本文が空の要求は HTTP 204 を返す必要があります。 実際のペイロードを含む要求は、(累積/マージ モードで) 宛先テーブルに保存する必要があります。 このサンプルで使用されるスキーマに対応したサンプルのペイロードを次に示します。

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

これで、関数を Azure に発行することができますアプリケーションの設定は、SqlConnectionString に設定される必要があります。 Azure SQL Server のファイアウォールでは、ライブ機能が到達できるように、Azure サービスを許可する必要があります。

その後、関数を ASA ジョブの出力として定義し、レコードを挿入する代わりに、レコードを置き換えるために使用できます。

選択肢

Azure Functions以外では、複数の方法で期待される結果を得ることができます。 このセクションでは、これらのメソッドの一部について説明します。

ターゲット SQL Database での後処理

バックグラウンド タスクは、標準の ASA SQL 出力を介してデータベースにデータが挿入されると動作します。

Azure SQLの場合は、INSTEAD OFDML トリガーを使用して、ASA が発行する INSERT コマンドをインターセプトします。

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

Synapse SQL の場合、ASA はステージング テーブルに挿入できます。 その後、定期的なタスクによって、必要に応じて中間テーブルにデータを変換できます。 最後に、データは実稼働テーブルに 移動されます

Azure Cosmos DB での前処理

Azure Cosmos DB は、ネイティブで UPSERT をサポートします。 ここでは、追加または置換のみが可能です。 Azure Cosmos DBでクライアント側の累積を管理する必要があります。

要件が一致する場合は、ターゲット SQL データベースを Azure Cosmos DB インスタンスに置き換えることができます。 この変更には、ソリューション アーキテクチャ全体で重要な変更が必要です。

Synapse SQL の場合は、Azure Synapse Link for Azure Cosmos DB を介して、Azure Cosmos DBを中間レイヤーとして使用できます。 Azure Synapse Linkを使用して、分析ストアを作成します。 その後、Synapse SQL でこのデータ ストアに直接クエリを実行できます。

代替手段の比較

各アプローチには、異なる価値提案と機能が用意されています。

タイプ オプション モード Azure SQL Database Azure Synapse Analytics
後処理
トリガー 置換、累積 + 該当なし。Synapse SQL では、トリガーを使用できません
ステージング 置換、累積 + +
前処理
Azure Functions 置換、累積 + - (行ごとのパフォーマンス)
Azure Cosmos DB 置換 置き換える 該当なし 該当なし
Azure Cosmos DB Azure Synapse Link 置き換える 該当なし +

サポートを受ける

詳細については、 Azure Stream Analytics の Microsoft Q&A 質問ページをお試しください。

次のステップ