Delen via


Records in Azure SQL Database bijwerken of samenvoegen met Azure Functions

Op dit moment ondersteunt Azure Stream Analytics (ASA) alleen het invoegen (toevoegen) van rijen aan SQL-uitvoer (Azure SQL Databases en Azure Synapse Analytics). In dit artikel worden tijdelijke oplossingen besproken voor het inschakelen van UPDATE, UPSERT of MERGE op SQL-databases, met Azure Functions als intermediaire laag.

Alternatieve opties voor Azure Functions worden aan het einde weergegeven.

Vereiste

Het schrijven van gegevens in een tabel kan over het algemeen op de volgende manier worden uitgevoerd:

Modus Equivalente T-SQL-instructie Vereisten
Toevoegen INSERT Geen
Replace SAMENVOEGEN (UPSERT) Unieke sleutel
Accumuleren MERGE (UPSERT) met samengestelde toewijzingsoperator (+=, -=...) Unieke sleutel en accumulator

Bekijk wat er gebeurt wanneer u de volgende twee records opneemt om de verschillen te illustreren:

Arrival_Time Device_Id Measure_Value
10:00:00 V 1
10:05 A 20

In de toevoegmodus voegen we twee records in. De equivalente T-SQL-instructie is:

INSERT INTO [target] VALUES (...);

Resulteert in:

Modified_Time Device_Id Measure_Value
10:00:00 V 1
10:05 A 20

In de vervangingsmodus krijgen we alleen de laatste waarde per sleutel. Hier gebruiken we Device_Id als sleutel. De equivalente T-SQL-instructie is:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Resulteert in:

Modified_Time Device_Key Measure_Value
10:05 A 20

Ten slotte wordt in de cumulatieve modus opgeteldValue met een samengestelde toewijzingsoperator (+=). Hier gebruiken we ook Device_Id als sleutel:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Resulteert in:

Modified_Time Device_Key Measure_Value
10:05 A 21

Voor prestatieoverwegingen ondersteunen de UITVOERadapters van de ASA SQL-database momenteel alleen de toevoegmodus. Deze adapters gebruiken bulksgewijs invoegen om de doorvoer te maximaliseren en de terugdruk te beperken.

In dit artikel wordt beschreven hoe u Azure Functions gebruikt om de modi Replace and Accumulate voor ASA te implementeren. Wanneer u een functie als intermediaire laag gebruikt, hebben de mogelijke schrijfprestaties geen invloed op de streamingtaak. In dit opzicht werkt het gebruik van Azure Functions het beste met Azure SQL. Met Synapse SQL kan het overschakelen van bulksgewijs naar rij-per-rij-instructies grotere prestatieproblemen opleveren.

Uitvoer van Azure Functions

In onze taak vervangen we de ASA SQL-uitvoer door de ASA Azure Functions-uitvoer. De mogelijkheden UPDATE, UPSERT of MERGE worden geïmplementeerd in de functie.

Er zijn momenteel twee opties voor toegang tot een SQL Database in een functie. Eerst is de Azure SQL-uitvoerbinding. Het is momenteel beperkt tot C# en biedt alleen de vervangingsmodus. Ten tweede bestaat uit het opstellen van een SQL-query die moet worden verzonden via het juiste SQL-stuurprogramma (Microsoft.Data.SqlClient voor .NET).

Voor beide voorbeelden gaan we uit van het volgende tabelschema. Voor de bindingsoptie moet een primaire sleutel worden ingesteld in de doeltabel. Het is niet nodig, maar wordt aanbevolen bij het gebruik van een SQL-stuurprogramma.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

Een functie moet voldoen aan de volgende verwachtingen die moeten worden gebruikt als uitvoer van ASA:

  • Azure Stream Analytics verwacht HTTP-status 200 van de Functions-app voor batches die zijn verwerkt
  • Wanneer Azure Stream Analytics een 413(http-aanvraagentiteit te groot) ontvangt van een Azure-functie, vermindert deze de grootte van de batches die worden verzonden naar De Azure-functie
  • Tijdens de testverbinding verzendt Stream Analytics een POST-aanvraag met een lege batch naar Azure Functions en verwacht de HTTP-status 20x terug om de test te valideren

Optie 1: Bijwerken op sleutel met de Azure Function SQL-binding

Deze optie maakt gebruik van de Azure Function SQL-uitvoerbinding. Deze extensie kan een object in een tabel vervangen zonder dat u een SQL-instructie hoeft te schrijven. Op dit moment worden samengestelde toewijzingsoperatoren (accumulaties) niet ondersteund.

Dit voorbeeld is gebaseerd op:

Als u meer inzicht wilt krijgen in de bindingsbenadering, wordt u aangeraden deze zelfstudie te volgen.

Maak eerst een standaard-HttpTrigger-functie-app door deze zelfstudie te volgen. De volgende informatie wordt gebruikt:

  • Taal: C#
  • Runtime: .NET 6 (onder function/runtime v4)
  • Sjabloon: HTTP trigger

Installeer de bindingsextensie door de volgende opdracht uit te voeren in een terminal die zich in de projectmap bevindt:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

Voeg het SqlConnectionString item toe in de Values sectie van uwlocal.settings.json, vul de verbindingsreeks van de doelserver in:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Vervang de volledige functie (.cs bestand in het project) door het volgende codefragment. Werk de naamruimte, de klassenaam en de functienaam zelf bij:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Werk de naam van de doeltabel in de bindingssectie bij:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Werk de Device sectie Klasse en toewijzing bij zodat deze overeenkomen met uw eigen schema:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

U kunt nu de bedrading tussen de lokale functie en de database testen door foutopsporing (F5 in Visual Studio Code). De SQL-database moet bereikbaar zijn vanaf uw computer. SSMS kan worden gebruikt om de connectiviteit te controleren. Vervolgens kan een hulpprogramma zoals Postman worden gebruikt om POST-aanvragen uit te geven aan het lokale eindpunt. Een aanvraag met een lege hoofdtekst moet http 204 retourneren. Een aanvraag met een werkelijke nettolading moet worden bewaard in de doeltabel (in de vervang-/updatemodus). Hier volgt een voorbeeldpayload die overeenkomt met het schema dat in dit voorbeeld wordt gebruikt:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

De functie kan nu worden gepubliceerd naar Azure. Er moet een toepassingsinstelling worden ingesteld voor SqlConnectionString. De Azure SQL Server-firewall moet toestaan dat Azure-services in de live-functie deze bereiken.

De functie kan vervolgens worden gedefinieerd als uitvoer in de ASA-taak en wordt gebruikt om records te vervangen in plaats van ze in te voegen.

Optie 2: Samenvoegen met samengestelde toewijzing (cumulatie) via een aangepaste SQL-query

Notitie

Bij het opnieuw opstarten en herstellen kan ASA uitvoergebeurtenissen die al zijn verzonden, opnieuw verzenden. Dit is een verwacht gedrag dat ertoe kan leiden dat de accumulatielogica mislukt (het verdubbelen van afzonderlijke waarden). U kunt dit voorkomen door dezelfde gegevens in een tabel uit te voeren via de systeemeigen ASA SQL-uitvoer. Deze besturingstabel kan vervolgens worden gebruikt om problemen te detecteren en de accumulatie zo nodig opnieuw te synchroniseren.

Deze optie maakt gebruik van Microsoft.Data.SqlClient. Met deze bibliotheek kunnen sql-query's worden uitgevoerd naar een SQL Database.

Dit voorbeeld is gebaseerd op:

  • Runtimeversie 4 van Azure Functions
  • .NET 6.0
  • Microsoft.Data.SqlClient 4.0.0

Maak eerst een standaard-HttpTrigger-functie-app door deze zelfstudie te volgen. De volgende informatie wordt gebruikt:

  • Taal: C#
  • Runtime: .NET 6 (onder function/runtime v4)
  • Sjabloon: HTTP trigger

Installeer de SqlClient-bibliotheek door de volgende opdracht uit te voeren in een terminal die zich in de projectmap bevindt:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

Voeg het SqlConnectionString item toe in de Values sectie van uwlocal.settings.json, vul de verbindingsreeks van de doelserver in:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Vervang de volledige functie (.cs bestand in het project) door het volgende codefragment. Werk de naamruimte, de klassenaam en de functienaam zelf bij:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

Werk de sectie van het sqltext opdrachtgebouw bij zodat deze overeenkomt met uw eigen schema (u ziet hoe accumulatie wordt bereikt via de operator bij de += update):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

U kunt nu de bedrading tussen de lokale functie en de database testen door foutopsporing (F5 in VS Code). De SQL-database moet bereikbaar zijn vanaf uw computer. SSMS kan worden gebruikt om de connectiviteit te controleren. Vervolgens kan een hulpprogramma zoals Postman worden gebruikt om POST-aanvragen uit te geven aan het lokale eindpunt. Een aanvraag met een lege hoofdtekst moet http 204 retourneren. Een aanvraag met een werkelijke nettolading moet worden opgeslagen in de doeltabel (in de samengevoegde/samenvoegmodus). Hier volgt een voorbeeldpayload die overeenkomt met het schema dat in dit voorbeeld wordt gebruikt:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

De functie kan nu worden gepubliceerd naar Azure. Er moet een toepassingsinstelling worden ingesteld voor SqlConnectionString. De Azure SQL Server-firewall moet toestaan dat Azure-services in de live-functie deze bereiken.

De functie kan vervolgens worden gedefinieerd als uitvoer in de ASA-taak en wordt gebruikt om records te vervangen in plaats van ze in te voegen.

Alternatieven

Buiten Azure Functions zijn er meerdere manieren om het verwachte resultaat te bereiken. In deze sectie vindt u een aantal van deze secties.

Naverwerking in de doel-SQL Database

Een achtergrondtaak werkt zodra de gegevens in de database worden ingevoegd via de standaard-ASA SQL-uitvoer.

Voor Azure SQL INSTEAD OFkunnen DML-triggers worden gebruikt om de INSERT-opdrachten te onderscheppen die zijn uitgegeven door ASA:

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

Voor Synapse SQL kan ASA worden ingevoegd in een faseringstabel. Een terugkerende taak kan vervolgens de gegevens naar behoefte transformeren in een tussenliggende tabel. Ten slotte worden de gegevens verplaatst naar de productietabel.

Voorverwerking in Azure Cosmos DB

Azure Cosmos DB biedt systeemeigen ondersteuning voor UPSERT. Hier is alleen toevoegen/vervangen mogelijk. Accumulaties moeten worden beheerd aan de clientzijde in Azure Cosmos DB.

Als de vereisten overeenkomen, kunt u de doel-SQL-database vervangen door een Azure Cosmos DB-exemplaar. Hiervoor is een belangrijke wijziging in de algehele oplossingsarchitectuur vereist.

Voor Synapse SQL kan Azure Cosmos DB worden gebruikt als intermediaire laag via Azure Synapse Link voor Azure Cosmos DB. Azure Synapse Link kan worden gebruikt om een analytische opslag te maken. Dit gegevensarchief kan vervolgens rechtstreeks in Synapse SQL worden opgevraagd.

Vergelijking van de alternatieven

Elke benadering biedt verschillende waardeproposities en mogelijkheden:

Type Optie Modi Azure SQL Database Azure Synapse Analytics
Naverwerking
Triggers Vervangen, verzamelen + N.v.t., triggers zijn niet beschikbaar in Synapse SQL
Staging Vervangen, verzamelen + +
Voorverwerking
Azure Functions Vervangen, verzamelen + - (prestaties van rij per rij)
Vervanging van Azure Cosmos DB Replace N.v.t. N.v.t.
Azure Cosmos DB Azure Synapse Link Replace N.v.t. +

Ondersteuning krijgen

Probeer onze microsoft Q&A-vragenpagina voor Azure Stream Analytics voor meer hulp.

Volgende stappen