Aktualizace nebo sloučení záznamů v Azure SQL Database pomocí Azure Functions

V současné době Azure Stream Analytics (ASA) podporuje pouze vkládání (připojování) řádků do výstupů SQL (Azure SQL Databases a Azure Synapse Analytics). Tento článek popisuje alternativní řešení povolení UPDATE, UPSERT nebo MERGE v databázích SQL pomocí Azure Functions jako zprostředkující vrstvy.

Na konci se zobrazí alternativní možnosti Azure Functions.

Požadavek

Data do tabulky můžete zapisovat pomocí jednoho z následujících režimů:

Režim Ekvivalentní příkaz T-SQL Požadavky
Připojit INSERT Nic
Nahradit MERGE (UPSERT) Jedinečný klíč
Hromadit MERGE (UPSERT) s operátorem složeného přiřazení(+=,-=...) Jedinečný klíč a akumulátor

Pokud chcete znázornit rozdíly, zvažte, co se stane při ingestování následujících dvou záznamů:

Čas příjezdu Device_Id Hodnota_Měření
10:00 A 1
10:05 A 20

V režimu připojení vložíte dva záznamy. Ekvivalentní příkaz T-SQL je:

INSERT INTO [target] VALUES (...);

Výsledkem je:

Čas_Upravení Device_Id Hodnota_Měření
10:00 A 1
10:05 A 20

V režimu nahrazení získáte pouze poslední hodnotu podle klíče. Tady jako klíč použijete Device_Id. Ekvivalentní příkaz T-SQL je:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Výsledkem je:

Čas_Upravení Klíč_zařízení Hodnota_Měření
10:05 A 20

Nakonec v režimu kumulování sečtete Value pomocí operátoru složeného přiřazení (+=). Tady také použijete Device_Id jako klíč:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Výsledkem je:

Čas_Upravení Klíč_zařízení Hodnota_Měření
10:05 A 21

Pro výkonové úvahy momentálně výstupní adaptéry databáze ASA SQL nativně podporují pouze režim připojení. Tyto adaptéry používají hromadné vkládání k maximalizaci propustnosti a omezení zpětného tlaku.

V tomto článku se dozvíte, jak pomocí azure Functions implementovat režimy nahrazení a kumulování pro ASA. Pokud používáte funkci jako zprostředkující vrstvu, potenciální výkon zápisu nemá vliv na úlohu streamování. V tomto ohledu funguje použití Azure Functions nejlépe s Azure SQL. V případě Synapse SQL může přechod z hromadných příkazů na příkazy po řádcích způsobit vyšší problémy s výkonem.

Azure Functions výstup

V této úloze nahradíte výstup z ASA SQL výstupem z ASA Azure Functions. Funkce implementuje funkce UPDATE, UPSERT nebo MERGE.

V současné době máte přístup ke službě SQL Database ve funkci pomocí dvou možností. První možností je výstupní vazba Azure SQL. V současné době je omezený na jazyk C# a nabízí pouze režim nahrazení. Druhou možností je vytvořit dotaz SQL, který se odešle přes příslušný ovladač SQL (Microsoft. Data.SqlClient pro .NET).

Obě následující ukázky předpokládají následující schéma tabulky. Možnost vazby vyžaduje , aby byl v cílové tabulce nastaven primární klíč . Není to nutné, ale doporučuje se při použití ovladače SQL.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

Pokud chcete funkci použít jako výstup z ASA, musí tato funkce splňovat následující očekávání:

  • Azure Stream Analytics očekává stav HTTP 200 z aplikace Functions pro dávky, které úspěšně zpracovává.
  • Když Azure Stream Analytics obdrží výjimku 413 ("http Request Entity Too Large") z Azure Functions, zmenšuje velikost dávek, které odesílá do Azure Function.
  • Během testovacího připojení odešle Stream Analytics POST požadavek s prázdnou dávkou k Azure Functions a očekává zpět stav HTTP 20x pro ověření testu.

Možnost 1: Aktualizace podle klíče pomocí vazby SQL pro Azure Functions

Tato možnost používá SQL výstupní vazbu pro Azure Function. Toto rozšíření může nahradit objekt v tabulce, aniž byste museli psát příkaz SQL. V tuto chvíli nepodporuje operátory složeného přiřazení (akumulace).

Tato ukázka byla postavena na:

Pokud chcete lépe porozumět přístupu vazby, postupujte podle tohoto návodu.

Nejprve podle tohoto kurzu vytvořte výchozí aplikaci funkcí HttpTrigger. Použijte následující informace:

  • Jazyk: C#
  • Runtime: .NET 6 (v části function/runtime v4)
  • Šablona: HTTP trigger

Nainstalujte rozšíření vazby spuštěním následujícího příkazu v terminálu umístěném ve složce projektu:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

Přidejte položku SqlConnectionString do oddílu Values vašeho local.settings.json a vyplňte připojovací řetězec cílového serveru:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Nahraďte celou funkci (.cs soubor v projektu) následujícím fragmentem kódu. Aktualizujte obor názvů, název třídy a název funkce vlastním názvem:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Aktualizujte název cílové tabulky v oddílu vazby:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Aktualizujte blok třídy a mapování tak, aby odpovídaly vlastnímu schématu:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

Teď můžete otestovat zapojení mezi místní funkcí a databází laděním (F5 v editoru Visual Studio Code). Databáze SQL musí být dostupná z vašeho počítače. SSMS můžete použít ke kontrole připojení. Potom odešlete požadavky POST do místního koncového bodu. Požadavek s prázdným textem by měl vrátit HTTP 204. Požadavek opatřený skutečným obsahem by měl být uložen v cílové tabulce (v režimu nahrazení nebo aktualizace). Zde je vzorová datová část, která odpovídá schématu použitému v tomto příkladu:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

Funkce je teď možné publikovat do Azure. Nastavte nastavení aplikace pro SqlConnectionString. Brána firewall Azure SQL Serveru by měla umožňovat, aby se k ní dostaly služby Azure, aby se k ní dostaly živé funkce.

Pak můžete funkci definovat jako výstup v úloze ASA a místo jejich vložení je použít k nahrazení záznamů.

Možnost 2: Sloučení se složeným přiřazením (kumulování) prostřednictvím vlastního dotazu SQL

Poznámka:

Po restartování a obnovení může ASA znovu odeslat výstupní události, které už vygeneroval. Toto chování může způsobit selhání logiky akumulace (zdvojnásobení jednotlivých hodnot). Pokud chcete tomuto problému zabránit, vypište stejná data v tabulce pomocí nativního výstupu ASA SQL. Tuto řídicí tabulku můžete použít k detekci problémů a opětovné synchronizaci akumulace v případě potřeby.

Tato možnost používá Microsoft.Data.SqlClient. Tato knihovna umožňuje odesílat všechny dotazy SQL do služby SQL Database.

Tato ukázka byla vytvořena na základě:

Nejprve podle tohoto kurzu vytvořte výchozí aplikaci funkcí HttpTrigger. Používají se následující informace:

  • Jazyk: C#
  • Runtime: .NET 6 (v části function/runtime v4)
  • Šablona: HTTP trigger

Nainstalujte knihovnu SqlClient spuštěním následujícího příkazu v terminálu umístěném ve složce projektu:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

Položku SqlConnectionString přidejte do oddílu Values vašeho local.settings.json, vyplněním připojovacího řetězce cílového serveru:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Nahraďte celou funkci (.cs soubor v projektu) následujícím fragmentem kódu. Aktualizujte obor názvů, název třídy a název funkce vlastním názvem:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

sqltext Aktualizujte oddíl sestavení příkazu tak, aby odpovídal vašemu vlastnímu schématu (všimněte si, jak se dosahuje akumulace prostřednictvím operátoru += při aktualizaci):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

Teď můžete otestovat zapojení mezi místní funkcí a databází laděním (F5 v editoru VS Code). Databáze SQL musí být dostupná z vašeho počítače. SSMS můžete použít ke kontrole připojení. Potom odešlete požadavky POST do místního koncového bodu. Požadavek s prázdným textem by měl vrátit HTTP 204. Požadavek se skutečným obsahem by měl být uložen v cílové tabulce (v režimu shromažďování nebo sloučení). Tady je ukázková datová část odpovídající schématu použitému v této ukázce:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

Funkce je teď možné publikovat do Azure. Nastavení aplikace by mělo být nastaveno pro SqlConnectionString. Brána firewall Azure SQL Serveru by měla umožňovat služby Azure, aby se k ní dostala živá funkce.

Funkci pak můžete definovat jako výstup v úloze ASA a místo jejich vložení je použít k nahrazení záznamů.

Alternativy

Mimo Azure Functions může dosáhnout očekávaného výsledku více metod. Tato část popisuje některé z těchto metod.

Následné zpracování v cílové databázi SQL

Úloha na pozadí funguje, jakmile se data vloží do databáze prostřednictvím standardních výstupů ASA SQL.

Pro Azure SQL použijte INSTEAD OFDML triggery k zachycení příkazů INSERT, které vydává ASA.

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

V případě Synapse SQL může ASA vložit do pracovní tabulky. Opakovaný úkol pak může podle potřeby transformovat data do zprostředkující tabulky. Nakonec se data přesunou do produkční tabulky.

Předběžné zpracování ve službě Azure Cosmos DB

Azure Cosmos DB nativně podporuje UPSERT. Tady je možné pouze připojit nebo nahradit. V Azure Cosmos DB musíte spravovat akumulace na straně klienta.

Pokud se požadavky shodují, můžete cílovou databázi SQL nahradit Azure Cosmos DB instancí. Tato změna vyžaduje důležitou změnu v celkové architektuře řešení.

Pro Synapse SQL můžete jako zprostředkující vrstvu použít Azure Cosmos DB prostřednictvím Azure Synapse Link pro Azure Cosmos DB. Pomocí Azure Synapse Link vytvořte analytické úložiště . Toto úložiště dat pak můžete dotazovat přímo ve službě Synapse SQL.

Porovnání alternativ

Každý přístup nabízí různé návrhy hodnot a schopnosti.

Typ Možnost Režimy Azure SQL Database Azure Synapse Analytics
Následné zpracování
Spouštěče Nahradit, Kumulovat + Není k dispozici, triggery nejsou dostupné v Synapse SQL
Příprava Nahradit, Kumulovat + +
Předběžné zpracování
Azure Functions Nahradit, Kumulovat + - (výkon po řádcích)
Nahrazení služby Azure Cosmos DB Nahradit Není relevantní Není relevantní
Azure Cosmos DB Azure Synapse Link Nahradit Není relevantní +

Získání podpory

Pokud potřebujete další pomoc, vyzkoušejte stránku s dotazy microsoftu Q&A pro Azure Stream Analytics.

Další kroky