Aktualizace nebo sloučení záznamů ve službě Azure SQL Database pomocí Azure Functions

Azure Stream Analytics (ASA) v současné době podporuje vkládání (připojování) řádků pouze do výstupů SQL (Azure SQL Database a Azure Synapse Analytics). Tento článek popisuje alternativní řešení pro povolení funkce UPDATE, UPSERT nebo MERGE v databázích SQL s Azure Functions jako zprostředkující vrstvou.

Na konci se zobrazí alternativní možnosti azure Functions.

Požadavek

Zápis dat do tabulky lze obecně provádět následujícím způsobem:

Režim Ekvivalentní příkaz T-SQL Požadavky
Připojit INSERT Nic
Nahradit MERGE (UPSERT) Jedinečný klíč
Hromadit MERGE (UPSERT) s operátorem složeného přiřazení (+=,-=...) Jedinečný klíč a akumulátor

Pokud chcete znázornit rozdíly, podívejte se, co se stane při ingestování následujících dvou záznamů:

Arrival_Time Device_Id Measure_Value
10:00 A 0
10:05 A 20

V režimu připojení vložíme dva záznamy. Ekvivalentní příkaz T-SQL je:

INSERT INTO [target] VALUES (...);

Výsledkem je:

Modified_Time Device_Id Measure_Value
10:00 A 0
10:05 A 20

V režimu nahrazení získáme pouze poslední hodnotu podle klíče. Tady jako klíč používáme Device_Id. Ekvivalentní příkaz T-SQL je:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Výsledkem je:

Modified_Time Device_Key Measure_Value
10:05 A 20

Nakonec v režimu kumulování sečteme Value operátorem složeného přiřazení (+=). Tady také jako klíč používáme Device_Id:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Výsledkem je:

Modified_Time Device_Key Measure_Value
10:05 A 21

Pro důležité informace o výkonu podporují výstupní adaptéry databáze ASA SQL aktuálně nativně pouze režim připojení. Tyto adaptéry používají hromadné vkládání k maximalizaci propustnosti a omezení zpětného tlaku.

V tomto článku se dozvíte, jak pomocí azure Functions implementovat režimy nahrazení a kumulování pro ASA. Pokud funkci použijete jako zprostředkující vrstvu, potenciální výkon zápisu neovlivní úlohu streamování. V tomto ohledu funguje použití Azure Functions nejlépe s Azure SQL. V případě Synapse SQL může přechod z hromadných příkazů na příkazy po řádcích způsobit vyšší problémy s výkonem.

Výstup služby Azure Functions

V naší úloze nahradíme výstup ASA SQL výstupem ASA Azure Functions. Funkce UPDATE, UPSERT nebo MERGE jsou ve funkci implementovány.

V současné době existují dvě možnosti pro přístup ke službě SQL Database ve funkci. Nejprve je výstupní vazba Azure SQL. V současné době je omezený na jazyk C# a nabízí pouze režim nahrazení. Druhý je vytvoření dotazu SQL, který se odešle přes příslušný ovladač SQL (Microsoft.Data.SqlClient pro .NET).

U obou následujících ukázek předpokládáme následující schéma tabulky. Možnost vazby vyžaduje , aby byl v cílové tabulce nastaven primární klíč . Není to nutné, ale doporučuje se při použití ovladače SQL.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

Funkce musí splňovat následující očekávání, která se mají použít jako výstup z ASA:

  • Azure Stream Analytics očekává stav HTTP 200 z aplikace Functions pro dávky, které byly úspěšně zpracovány.
  • Když Azure Stream Analytics obdrží výjimku 413 (http Request Entity Too Large) z funkce Azure, zmenšuje velikost dávek, které odesílá do funkce Azure Functions.
  • Během testovacího připojení Stream Analytics odešle požadavek POST s prázdnou dávkou do služby Azure Functions a očekává, že stav HTTP 20x zpět ověří test.

Možnost 1: Aktualizace podle klíče pomocí vazby SQL funkce Azure Functions

Tato možnost používá výstupní vazbu SQL funkce Azure Functions. Toto rozšíření může nahradit objekt v tabulce, aniž byste museli psát příkaz SQL. V tuto chvíli nepodporuje operátory složeného přiřazení (akumulace).

Tato ukázka byla postavena na:

Pokud chcete lépe porozumět přístupu k vazbě, doporučujeme postupovat podle tohoto kurzu.

Nejprve podle tohoto kurzu vytvořte výchozí aplikaci funkcí HttpTrigger. Používají se následující informace:

  • Jazyk: C#
  • Runtime: .NET 6 (v části function/runtime v4)
  • Šablony: HTTP trigger

Nainstalujte rozšíření vazby spuštěním následujícího příkazu v terminálu umístěném ve složce projektu:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

SqlConnectionString Přidejte položku do oddíluValues, vyplňte local.settings.jsonpřipojovací řetězec cílového serveru:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Nahraďte celou funkci (.cs soubor v projektu) následujícím fragmentem kódu. Aktualizujte obor názvů, název třídy a název funkce vlastním názvem:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Aktualizujte název cílové tabulky v oddílu vazby:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Device Aktualizujte oddíl třídy a mapování tak, aby odpovídaly vlastnímu schématu:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

Teď můžete otestovat zapojení mezi místní funkcí a databází laděním (F5 v editoru Visual Studio Code). Databáze SQL musí být dostupná z vašeho počítače. SSMS se dá použít ke kontrole připojení. Potom můžete použít nástroj, jako je Postman , k vydávání požadavků POST na místní koncový bod. Požadavek s prázdným textem by měl vrátit http 204. Požadavek se skutečnou datovou částí by měl být zachován v cílové tabulce (v režimu nahrazení nebo aktualizace). Tady je ukázková datová část odpovídající schématu použitému v této ukázce:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

Funkce je teď možné publikovat do Azure. Nastavení aplikace by mělo být nastaveno pro SqlConnectionString. Brána firewall Azure SQL Serveru by měla umožňovat, aby se k ní dostaly služby Azure, aby se k ní dostaly živé funkce.

Funkci pak můžete definovat jako výstup v úloze ASA a místo jejich vložení je použít k nahrazení záznamů.

Možnost 2: Sloučení se složeným přiřazením (kumulování) prostřednictvím vlastního dotazu SQL

Poznámka:

Po restartování a obnovení může ASA znovu odeslat výstupní události, které už byly vygenerovány. Jedná se o očekávané chování, které může způsobit selhání logiky akumulace (zdvojnásobení jednotlivých hodnot). Pokud tomu chcete zabránit, doporučujeme vytvořit výstup stejných dat v tabulce prostřednictvím nativního výstupu ASA SQL. Tuto řídicí tabulku pak můžete použít k detekci problémů a opětovné synchronizaci akumulace v případě potřeby.

Tato možnost používá Microsoft.Data.SqlClient. Tato knihovna nám umožňuje vydávat dotazy SQL do služby SQL Database.

Tato ukázka byla postavena na:

Nejprve podle tohoto kurzu vytvořte výchozí aplikaci funkcí HttpTrigger. Používají se následující informace:

  • Jazyk: C#
  • Runtime: .NET 6 (v části function/runtime v4)
  • Šablony: HTTP trigger

Nainstalujte knihovnu SqlClient spuštěním následujícího příkazu v terminálu umístěném ve složce projektu:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

SqlConnectionString Přidejte položku do oddíluValues, vyplňte local.settings.jsonpřipojovací řetězec cílového serveru:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Nahraďte celou funkci (.cs soubor v projektu) následujícím fragmentem kódu. Aktualizujte obor názvů, název třídy a název funkce vlastním názvem:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

sqltext Aktualizujte oddíl sestavení příkazu tak, aby odpovídal vašemu vlastnímu schématu (všimněte si, jak se dosahuje akumulace prostřednictvím operátoru += při aktualizaci):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

Teď můžete otestovat zapojení mezi místní funkcí a databází laděním (F5 v editoru VS Code). Databáze SQL musí být dostupná z vašeho počítače. SSMS se dá použít ke kontrole připojení. Potom můžete použít nástroj, jako je Postman , k vydávání požadavků POST na místní koncový bod. Požadavek s prázdným textem by měl vrátit http 204. Požadavek se skutečnou datovou částí by měl být zachován v cílové tabulce (v režimu kumulování nebo sloučení). Tady je ukázková datová část odpovídající schématu použitému v této ukázce:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

Funkce je teď možné publikovat do Azure. Nastavení aplikace by mělo být nastaveno pro SqlConnectionString. Brána firewall Azure SQL Serveru by měla umožňovat, aby se k ní dostaly služby Azure, aby se k ní dostaly živé funkce.

Funkci pak můžete definovat jako výstup v úloze ASA a místo jejich vložení je použít k nahrazení záznamů.

Alternativy

Mimo Azure Functions existuje několik způsobů, jak dosáhnout očekávaného výsledku. Tato část obsahuje některé z nich.

Následné zpracování v cílové databázi SQL

Úloha na pozadí funguje, jakmile se data vloží do databáze prostřednictvím standardních výstupů ASA SQL.

Pro Azure SQL INSTEAD OFje možné triggery DML použít k zachycení příkazů INSERT vydaných službou ASA:

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

V případě Synapse SQL může ASA vložit do pracovní tabulky. Opakovaný úkol pak může podle potřeby transformovat data do zprostředkující tabulky. Nakonec se data přesunou do produkční tabulky.

Předběžné zpracování ve službě Azure Cosmos DB

Azure Cosmos DB nativně podporuje UPSERT. Tady je možné pouze připojit nebo nahradit. Akumulace musí být spravovaná na straně klienta ve službě Azure Cosmos DB.

Pokud se požadavky shodují, je možné nahradit cílovou databázi SQL instancí služby Azure Cosmos DB. To vyžaduje důležitou změnu v celkové architektuře řešení.

Pro Synapse SQL je možné službu Azure Cosmos DB použít jako zprostředkující vrstvu prostřednictvím Azure Synapse Linku pro Azure Cosmos DB. Azure Synapse Link se dá použít k vytvoření analytického úložiště. Toto úložiště dat se pak dá dotazovat přímo ve službě Synapse SQL.

Porovnání alternativ

Každý přístup nabízí jinou hodnotu a možnosti:

Typ Možnost Režimy Azure SQL Database Azure Synapse Analytics
Následné zpracování
Aktivační události Nahradit, Kumulovat + Není k dispozici, triggery nejsou dostupné v Synapse SQL
Příprava Nahradit, Kumulovat + +
Předběžné zpracování
Azure Functions Nahradit, Kumulovat + - (výkon řádků po řádech)
Nahrazení služby Azure Cosmos DB Nahradit N/A
Azure Cosmos DB Azure Synapse Link Nahradit +

Získání podpory

Pokud potřebujete další pomoc, vyzkoušejte naši stránku pro otázky Microsoftu pro Azure Stream Analytics.

Další kroky