Uppdatera eller sammanfoga poster i Azure SQL Database med Azure Functions

För närvarande stöder Azure Stream Analytics (ASA) endast infogning av (väntande) rader i SQL-utdata (Azure SQL Databases och Azure Synapse Analytics). I den här artikeln beskrivs lösningar för att aktivera UPDATE, UPSERT eller MERGE på SQL-databaser med Azure Functions som mellanliggande lager.

Alternativa alternativ till Azure Functions visas i slutet.

Krav

Att skriva data i en tabell kan vanligtvis göras på följande sätt:

Läge Motsvarande T-SQL-instruktion Behov
Lägga till INSERT Ingen
Replace MERGE (UPSERT) Unik nyckel
Samla MERGE (UPSERT) med sammansatt tilldelningsoperator (+=, -=...) Unik nyckel och ackumulator

För att illustrera skillnaderna kan du titta på vad som händer när du matar in följande två poster:

Arrival_Time Device_Id Measure_Value
10:00:00 A 1
10:05 A 20

I tilläggsläget infogar vi två poster. Motsvarande T-SQL-instruktion är:

INSERT INTO [target] VALUES (...);

Resulterar i:

Modified_Time Device_Id Measure_Value
10:00:00 A 1
10:05 A 20

I ersättningsläge får vi bara det sista värdet efter nyckel. Här använder vi Device_Id som nyckel. Motsvarande T-SQL-instruktion är:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Resulterar i:

Modified_Time Device_Key Measure_Value
10:05 A 20

Slutligen summerar vi i ackumulerat läge med en sammansatt tilldelningsoperator (+=).Value Här använder vi även Device_Id som nyckel:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Resulterar i:

Modified_Time Device_Key Measure_Value
10:05 A 21

För prestandaöverväganden stöder ASA SQL-databasutdatakorten för närvarande endast tilläggsläge internt. Dessa kort använder massinfogning för att maximera dataflödet och begränsa tillbakatrycket.

Den här artikeln visar hur du använder Azure Functions för att implementera ersätt- och ackumulerade lägen för ASA. När du använder en funktion som mellanliggande lager påverkar inte den potentiella skrivprestandan strömningsjobbet. I det här avseendet fungerar användning av Azure Functions bäst med Azure SQL. Med Synapse SQL kan växling från massuttryck till rad-för-rad-instruktioner skapa större prestandaproblem.

Azure Functions-utdata

I vårt jobb ersätter vi ASA SQL-utdata med ASA Azure Functions-utdata. Funktionerna UPDATE, UPSERT eller MERGE implementeras i funktionen.

Det finns för närvarande två alternativ för att komma åt en SQL Database i en funktion. Först är Azure SQL-utdatabindningen. Den är för närvarande begränsad till C# och erbjuder endast ersättningsläge. Det andra är att skapa en SQL-fråga som ska skickas via lämplig SQL-drivrutin (Microsoft.Data.SqlClient för .NET).

För båda följande exempel antar vi följande tabellschema. Bindningsalternativet kräver att en primärnyckel anges i måltabellen. Det är inte nödvändigt, men rekommenderas, när du använder en SQL-drivrutin.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

En funktion måste uppfylla följande förväntningar som ska användas som utdata från ASA:

  • Azure Stream Analytics förväntar sig HTTP-status 200 från Functions-appen för batchar som har bearbetats
  • När Azure Stream Analytics tar emot ett 413-undantag ("http Request Entity Too Large") från en Azure-funktion minskar storleken på de batchar som skickas till Azure-funktionen
  • Under testanslutningen skickar Stream Analytics en POST-begäran med en tom batch till Azure Functions och förväntar sig HTTP-status 20x tillbaka för att verifiera testet

Alternativ 1: Uppdatera efter nyckel med Azure Function SQL-bindning

Det här alternativet använder Azure Function SQL-utdatabindning. Det här tillägget kan ersätta ett objekt i en tabell utan att behöva skriva en SQL-instruktion. För närvarande stöder den inte sammansatta tilldelningsoperatorer (ackumuleringar).

Det här exemplet byggdes på:

För att bättre förstå bindningsmetoden rekommenderar vi att du följer den här självstudien.

Skapa först en httptrigger-standardfunktionsapp genom att följa den här självstudien. Följande information används:

  • Språk: C#
  • Körning: .NET 6 (under funktion/körning v4)
  • Mall: HTTP trigger

Installera bindningstillägget genom att köra följande kommando i en terminal i projektmappen:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

SqlConnectionString Lägg till objektet i Values avsnittet i och local.settings.jsonfyll i målserverns anslutningssträng:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Ersätt hela funktionen (.cs fil i projektet) med följande kodfragment. Uppdatera namnområdet, klassnamnet och funktionsnamnet efter ditt eget:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Uppdatera måltabellens namn i bindningsavsnittet:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Device Uppdatera avsnittet klass och mappning så att det matchar ditt eget schema:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

Du kan nu testa kabeldragningen mellan den lokala funktionen och databasen genom att felsöka (F5 i Visual Studio Code). SQL-databasen måste kunna nås från datorn. SSMS kan användas för att kontrollera anslutningen. Sedan kan ett verktyg som Postman användas för att utfärda POST-begäranden till den lokala slutpunkten. En begäran med en tom brödtext ska returnera http 204. En begäran med en faktisk nyttolast ska sparas i måltabellen (i läget ersätt/uppdatera). Här är ett exempel på nyttolast som motsvarar schemat som används i det här exemplet:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

Funktionen kan nu publiceras till Azure. En programinställning ska anges för SqlConnectionString. Azure SQL Server-brandväggen bör tillåta att Azure-tjänster i för livefunktionen når den.

Funktionen kan sedan definieras som utdata i ASA-jobbet och användas för att ersätta poster i stället för att infoga dem.

Alternativ 2: Sammanfoga med sammansatt tilldelning (ackumuleras) via en anpassad SQL-fråga

Kommentar

Vid omstart och återställning kan ASA skicka utdatahändelser som redan har genererats. Det här är ett förväntat beteende som kan leda till att ackumuleringslogik misslyckas (dubblering av enskilda värden). För att förhindra detta rekommenderar vi att du matar ut samma data i en tabell via inbyggda ASA SQL-utdata. Den här kontrolltabellen kan sedan användas för att identifiera problem och synkronisera ackumuleringen igen vid behov.

Det här alternativet använder Microsoft.Data.SqlClient. Med det här biblioteket kan vi utfärda sql-frågor till en SQL Database.

Det här exemplet byggdes på:

  • Azure Functions-körningsversion 4
  • .NET 6.0
  • Microsoft.Data.SqlClient 4.0.0

Skapa först en httptrigger-standardfunktionsapp genom att följa den här självstudien. Följande information används:

  • Språk: C#
  • Körning: .NET 6 (under funktion/körning v4)
  • Mall: HTTP trigger

Installera SqlClient-biblioteket genom att köra följande kommando i en terminal i projektmappen:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

SqlConnectionString Lägg till objektet i Values avsnittet i och local.settings.jsonfyll i målserverns anslutningssträng:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Ersätt hela funktionen (.cs fil i projektet) med följande kodfragment. Uppdatera namnområdet, klassnamnet och funktionsnamnet efter ditt eget:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

Uppdatera kommandobyggnadsavsnittet sqltext så att det matchar ditt eget schema (observera hur ackumulering uppnås via operatorn vid += uppdatering):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

Nu kan du testa kabeldragningen mellan den lokala funktionen och databasen genom att felsöka (F5 i VS Code). SQL-databasen måste kunna nås från datorn. SSMS kan användas för att kontrollera anslutningen. Sedan kan ett verktyg som Postman användas för att utfärda POST-begäranden till den lokala slutpunkten. En begäran med en tom brödtext ska returnera http 204. En begäran med en faktisk nyttolast ska sparas i måltabellen (i ackumulerat/sammanslagningsläge). Här är ett exempel på nyttolast som motsvarar schemat som används i det här exemplet:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

Funktionen kan nu publiceras till Azure. En programinställning ska anges för SqlConnectionString. Azure SQL Server-brandväggen bör tillåta att Azure-tjänster i för livefunktionen når den.

Funktionen kan sedan definieras som utdata i ASA-jobbet och användas för att ersätta poster i stället för att infoga dem.

Alternativ

Utanför Azure Functions finns det flera sätt att uppnå det förväntade resultatet. Det här avsnittet innehåller några av dem.

Efterbearbetning i sql-måldatabasen

En bakgrundsaktivitet fungerar när data infogas i databasen via standard-ASA SQL-utdata.

För Azure SQL INSTEAD OFkan DML-utlösare användas för att fånga upp INSERT-kommandon som utfärdats av ASA:

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

För Synapse SQL kan ASA infogas i en mellanlagringstabell. En återkommande uppgift kan sedan omvandla data efter behov till en mellanliggande tabell. Slutligen flyttas data till produktionstabellen.

Förbearbetning i Azure Cosmos DB

Azure Cosmos DB stöder UPSERT internt. Här är det bara möjligt att lägga till/ersätta. Ackumuleringar måste hanteras på klientsidan i Azure Cosmos DB.

Om kraven matchar är ett alternativ att ersätta SQL-måldatabasen med en Azure Cosmos DB-instans. Detta kräver en viktig förändring i den övergripande lösningsarkitekturen.

För Synapse SQL kan Azure Cosmos DB användas som mellanliggande lager via Azure Synapse Link för Azure Cosmos DB. Azure Synapse Link kan användas för att skapa ett analysarkiv. Det här datalagret kan sedan frågas direkt i Synapse SQL.

Jämförelse av alternativen

Varje metod erbjuder olika värdeförslag och funktioner:

Typ Alternativ Lägen Azure SQL Database Azure Synapse Analytics
Efterbearbetning
Utlösare Ersätt, ackumulera + Utlösare är inte tillgängliga i Synapse SQL
Mellanlagring Ersätt, ackumulera + +
Förbearbetning
Azure Functions Ersätt, ackumulera + – (prestanda rad för rad)
Azure Cosmos DB-ersättning Replace Saknas Saknas
Azure Cosmos DB Azure Synapse Link Replace Ej tillämpligt +

Få support

Om du vill ha mer hjälp kan du prova vår frågesida för Microsoft Q&A för Azure Stream Analytics.

Nästa steg