Aktualisieren oder Zusammenführen von Datensätzen in Azure SQL-Datenbank mit Azure Functions

Aktuell unterstützt Azure Stream Analytics (ASA) nur das Einfügen (Anfügen) von Zeilen in SQL-Ausgaben (Azure SQL-Datenbanken und Azure Synapse Analytics). In diesem Artikel werden Problemumgehungen zum Aktivieren von UPDATE, UPSERT oder MERGE für SQL-Datenbanken erläutert, wobei Azure Functions als zwischengeschaltete Ebene verwendet wird.

Alternative Optionen für Azure Functions werden am Ende vorgestellt.

Anforderung

Das Schreiben von Daten in eine Tabelle kann im Allgemeinen in der folgenden Weise erfolgen:

Mode Entsprechende T-SQL-Anweisung Anforderungen
Anfügen INSERT Keine
Replace MERGE (UPSERT) Eindeutiger Schlüssel
Accumulate MERGE (UPSERT) mit Verbundzuweisungs-Operator (+=, -=...) Eindeutiger Schlüssel und Akkumulator

Sehen Sie sich zum Verdeutlichen der Unterschiede an, was bei der Erfassung der folgenden beiden Datensätze geschieht:

Arrival_Time Device_Id Measure_Value
10:00 A 1
10:05 A 20

Im Anfügemodus fügen wir zwei Datensätze ein. Die entsprechende T-SQL-Anweisung lautet:

INSERT INTO [target] VALUES (...);

Ergebnis:

Modified_Time Device_Id Measure_Value
10:00 A 1
10:05 A 20

Im Ersetzungsmodus erhalten wir nur den dem Schlüssel nach letzten Wert. Hier verwenden wir Device_Id als Schlüssel. Die entsprechende T-SQL-Anweisung lautet:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Ergebnis:

Modified_Time Device_Key Measure_Value
10:05 A 20

Schließlich wird im Akkumulationsmodus eine Summe aus Value mit einem Verbundzuweisungsoperator (+=) gebildet. Hier verwenden wir ebenfalls Device_Id als Schlüssel:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Ergebnis:

Modified_Time Device_Key Measure_Value
10:05 A 21

Aus Leistungsgründen unterstützen die ASA-SQL Datenbankausgabeadapter derzeit nur den Anfügemodus nativ. Diese Adapter verwenden die Masseneinfügung, um den Durchsatz zu maximieren und den Gegendruck zu begrenzen.

In diesem Artikel wird die Verwendung von Azure Functions zum Implementieren des Ersetzungs- und des Akkumulationsmodus für ASA veranschaulicht. Wenn Sie eine Funktion als Zwischenschicht verwenden, wirkt sich die potenzielle Schreibleistung nicht auf den Streamingauftrag aus. In dieser Hinsicht funktioniert Azure Functions am besten mit Azure SQL. Bei Synapse SQL kann der Wechsel von der Massenausführung zu zeilenbasierten Anweisungen zu größeren Leistungsproblemen führen.

Azure Functions-Ausgabe

In unserem Auftrag ersetzen wir die ASA-SQL-Ausgabe durch die ASA Azure Functions-Ausgabe. Die Funktionen UPDATE, UPSERT oder MERGE sind in der Funktion implementiert.

Es gibt aktuell zwei Optionen für den Zugriff auf eine SQL-Datenbank in einer Funktion. Die erste ist die Azure SQL-Ausgabebindung. Sie ist derzeit auf C# beschränkt und bietet nur den Ersetzungsmodus. Die zweite besteht im Zusammenstellen einer SQL-Abfrage für die Übermittlung über den entsprechenden SQL-Treiber (Microsoft.Data.SqlClient für .NET).

Für beide nachfolgenden Beispiele gehen wir vom folgenden Tabellenschema aus. Für die Bindungsoption muss ein Primärschlüssel für die Zieltabelle festgelegt werden. Bei Verwendung eines SQL-Treibers ist dies nicht notwendig, aber empfehlenswert.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

Eine Funktion muss die folgenden Erwartungen erfüllen, damit sie als Ausgabe von ASA verwendet werden kann:

  • Azure Stream Analytics erwartet den HTTP-Status 200 von der Functions-App für Batches, die erfolgreich verarbeitet wurden.
  • Die Größe der an Azure Functions gesendeten Batches wird verringert, wenn in Azure Stream Analytics die Ausnahme 413 (HTTP-Anforderungseinheit zu groß) durch eine Azure-Funktion auftritt.
  • Bei bestehender Testverbindung sendet Stream Analytics eine POST-Anforderung mit einem leeren Batch an Azure Functions und erwartet 20 mal den HTTP-Status zurück, damit der Test als bestanden gilt.

Option 1: Aktualisierung nach Schlüssel mit der Azure SQL-Bindungsfunktion

Bei dieser Option wird die Azure-Funktion SQL-Ausgabebindung verwendet. Diese Erweiterung kann ein Objekt in einer Tabelle ersetzen, ohne eine SQL-Anweisung schreiben zu müssen. Zurzeit werden keine Verbundzuweisungsoperatoren (Akkumulationen) unterstützt.

Dieses Beispiel basiert auf:

Zum besseren Verständnis des Bindungsansatzes empfehlen wir, diesem Tutorial zu folgen.

Erstellen Sie zunächst eine standardmäßige HttpTrigger-Funktions-App gemäß diesem Tutorial. Die folgenden Informationen werden verwendet:

  • Sprache: C#
  • Runtime: .NET 6 (unter function/runtime v4)
  • Vorlage: HTTP trigger

Installieren Sie die Bindungserweiterung, indem Sie den folgenden Befehl in einem Terminal ausführen, das sich im Projektordner befindet:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

Fügen Sie das SqlConnectionString-Element im Abschnitt Values Ihrer local.settings.json hinzu, und setzen Sie die Verbindungszeichenfolge des Zielservers ein:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Ersetzen Sie die gesamte Funktion (CS-Datei im Projekt) durch den folgenden Codeausschnitt. Aktualisieren Sie den Namespace, den Klassennamen und den Funktionsnamen durch Ihre eigenen Werte:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Aktualisieren Sie den Namen der Zieltabelle im Bindungsabschnitt:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Aktualisieren Sie die Device-Klasse und den Zuordnungsabschnitt so, dass sie Ihrem eigenen Schema entsprechen:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

Sie können jetzt die Verkabelung zwischen der lokalen Funktion und der Datenbank durch Debuggen testen (F5 in Visual Studio Code). Die SQL-Datenbank muss von Ihrem Computer aus erreichbar sein. SSMS kann verwendet werden, um die Konnektivität zu überprüfen. Anschließend kann ein Tool wie Postman verwendet werden, um POST-Anforderungen an den lokalen Endpunkt auszugeben. Eine Anforderung mit leerem Text sollte http 204 zurückgeben. Eine Anforderung mit einer tatsächlichen Nutzlast sollte in der Zieltabelle (im Ersetzungs-/Aktualisierungsmodus) persistiert werden. Hier sehen Sie eine Beispielnutzlast, die dem in diesem Beispiel verwendeten Schema entspricht:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

Die Funktion kann jetzt in Azure veröffentlicht werden. Für SqlConnectionString sollte eine Anwendungseinstellung festgelegt sein. Die Azure SQL Server-Firewall sollte eingehende Verbindungen mit Azure-Diensten zulassen, damit die Livefunktion sie erreichen kann.

Die Funktion kann dann als Ausgabe im ASA-Auftrag definiert und verwendet werden, um Datensätze zu ersetzen, anstatt sie einzufügen.

Option 2: Zusammenführen mit Verbundzuweisung (Akkumulation) über eine benutzerdefinierte SQL-Abfrage

Hinweis

Nach Neustart und Wiederherstellung sendet ASA möglicherweise Ausgabeereignisse erneut, die bereits ausgegeben wurden. Dies ist ein erwartetes Verhalten, das zu einem Fehler bei der Akkumulationslogik führen kann (Verdoppelung einzelner Werte). Damit dies verhindert wird, empfiehlt es sich, die gleichen Daten über die native ASA-SQL-Ausgabe in eine Tabelle auszugeben. Diese Steuertabelle kann dann verwendet werden, um Probleme zu erkennen und die Akkumulation bei Bedarf erneut zu synchronisieren.

Diese Option verwendet Microsoft.Data.SqlClient. Mit dieser Bibliothek können wir beliebige SQL-Abfragen an eine SQL-Datenbank ausgeben.

Dieses Beispiel basiert auf:

Erstellen Sie zunächst eine standardmäßige HttpTrigger-Funktions-App gemäß diesem Tutorial. Die folgenden Informationen werden verwendet:

  • Sprache: C#
  • Runtime: .NET 6 (unter function/runtime v4)
  • Vorlage: HTTP trigger

Installieren Sie die SqlClient-Bibliothek, indem Sie den folgenden Befehl in einem Terminal ausführen, das sich im Projektordner befindet:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

Fügen Sie das SqlConnectionString-Element im Abschnitt Values Ihrer local.settings.json hinzu, und setzen Sie die Verbindungszeichenfolge des Zielservers ein:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Ersetzen Sie die gesamte Funktion (CS-Datei im Projekt) durch den folgenden Codeausschnitt. Aktualisieren Sie den Namespace, den Klassennamen und den Funktionsnamen durch Ihre eigenen Werte:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

Aktualisieren Sie den Abschnitt zur sqltext-Befehlserstellung so, dass er Ihrem eigenen Schema entspricht (beachten Sie, wie Akkumulation durch den +=-Operator bei der Aktualisierung erreicht wird):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

Sie können jetzt die Verkabelung zwischen der lokalen Funktion und der Datenbank durch Debuggen testen (F5 in VS Code). Die SQL-Datenbank muss von Ihrem Computer aus erreichbar sein. SSMS kann verwendet werden, um die Konnektivität zu überprüfen. Anschließend kann ein Tool wie Postman verwendet werden, um POST-Anforderungen an den lokalen Endpunkt auszugeben. Eine Anforderung mit leerem Text sollte http 204 zurückgeben. Eine Anforderung mit einer tatsächlichen Nutzlast sollte in der Zieltabelle (im Akkumulations-/Zusammenführungsmodus) persistiert werden. Hier sehen Sie eine Beispielnutzlast, die dem in diesem Beispiel verwendeten Schema entspricht:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

Die Funktion kann jetzt in Azure veröffentlicht werden. Für SqlConnectionString sollte eine Anwendungseinstellung festgelegt sein. Die Azure SQL Server-Firewall sollte eingehende Verbindungen mit Azure-Diensten zulassen, damit die Livefunktion sie erreichen kann.

Die Funktion kann dann als Ausgabe im ASA-Auftrag definiert und verwendet werden, um Datensätze zu ersetzen, anstatt sie einzufügen.

Alternativen

Abgesehen von Azure Functions gibt es mehrere Möglichkeiten, das erwartete Ergebnis zu erzielen. Dieser Abschnitt enthält einige davon.

Nachverarbeitung in der SQL-Zieldatenbank

Eine Hintergrundaufgabe wird ausgeführt, sobald die Daten über die ASA-SQL-Standardausgaben in die Datenbank eingefügt wurden.

Bei Azure SQL können INSTEAD OFDML-Trigger verwendet werden, um die von ASA ausgegebenen INSERT-Befehle abzufangen.

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

Bei Synapse SQL kann ASA in eine Stagingtabelle einfügen. Eine wiederkehrende Aufgabe kann die Daten dann bei Bedarf in eine Zwischentabelle transformieren. Schließlich werden die Daten in die Produktionstabelle verschoben.

Vorverarbeitung in Azure Cosmos DB

Azure Cosmos DB unterstützt UPSERT nativ. Hier ist nur Anfügen/Ersetzen möglich. Akkumulationen müssen clientseitig in Azure Cosmos DB verwaltet werden.

Wenn die Anforderungen übereinstimmen, ist eine Option, die SQL-Zieldatenbank durch eine Azure Cosmos DB-Instanz zu ersetzen. Dies erfordert eine wichtige Änderung der Gesamtarchitektur der Lösung.

Für Synapse SQL kann Azure Cosmos DB über Azure Synapse Link für Azure Cosmos DB als zwischengeschaltete Ebene verwendet werden. Azure Synapse Link kann verwendet werden, um einen Analysespeicher zu erstellen. Dieser Datenspeicher kann dann direkt in Synapse SQL abgefragt werden.

Vergleich der Alternativen

Jeder Ansatz bietet verschiedene Nutzenversprechen und Funktionen:

type Option Modi Azure SQL-Datenbank Azure Synapse Analytics
Nachbearbeitung
Auslöser Ersetzen, Akkumulieren + N/V; Trigger stehen in Synapse SQL nicht zur Verfügung
Staging Ersetzen, Akkumulieren + +
Vorverarbeitung
Azure-Funktionen Ersetzen, Akkumulieren + – (Zeilenbasierte Leistung)
Azure Cosmos DB-Ersetzung Replace
Azure Cosmos DB Azure Synapse Link Ersetzen von +

Support

Weitere Unterstützung finden Sie auf der Frageseite von Microsoft Q&A (Fragen und Antworten) zu Azure Stream Analytics.

Nächste Schritte