Aktualizowanie lub scalanie rekordów w Azure SQL Database przy użyciu Azure Functions

Obecnie Azure Stream Analytics (ASA) obsługuje tylko wstawianie (dołączanie) wierszy do danych wyjściowych SQL (Azure SQL Databases i Azure Synapse Analytics). W tym artykule omówiono sposoby obejścia umożliwiające uruchomienie UPDATE, UPSERT lub MERGE w bazach danych SQL, z wykorzystaniem Azure Functions jako warstwy pośredniej.

Na końcu przedstawiono alternatywne opcje Azure Functions.

Wymaganie

Dane można zapisywać w tabeli przy użyciu jednego z następujących trybów:

Tryb Równoważna instrukcja języka T-SQL Wymagania
Dołączanie INSERT Brak
Zastąp MERGE (UPSERT) Unikatowy klucz
Gromadzić MERGE (UPSERT) z operatorem złożonego przypisania(+=, -=...) Unikatowy klucz i akumulator

Aby zilustrować różnice, rozważ, co się stanie przy wprowadzaniu następujących dwóch rekordów:

Czas_Przybycia Device_Id Measure_Value
10:00 A 1
10:05 A 20

W trybie dołączania wstawisz dwa rekordy. Równoważna instrukcja języka T-SQL to:

INSERT INTO [target] VALUES (...);

Skutkuje to:

Czas modyfikacji Device_Id Wartość_Miernika
10:00 A 1
10:05 A 20

W trybie zamiany uzyskasz tylko ostatnią wartość dla klucza. W tym miejscu należy użyć Device_Id jako klucza. Równoważna instrukcja języka T-SQL to:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Wynikowe:

Czas_Modyfikacji Device_Key Wartość_Pomiary
10:05 A 20

Na koniec w trybie kumulowania sumujesz sumę Value za pomocą operatora przypisania złożonego (+=). W tym miejscu należy również użyć Device_Id jako klucza:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Rezultat:

Czas_modyfikacji Klucz_Urządzenia Wartość_Pomiaru
10:05 A 21

W przypadku zagadnień dotyczących wydajności adaptery wyjściowe bazy danych SQL usługi ASA obsługują obecnie tylko tryb dołączania natywnie. Te adaptery używają wstawiania zbiorczego, aby zmaksymalizować wydajność i ograniczyć ciśnienie wsteczne.

W tym artykule pokazano, jak za pomocą usługi Azure Functions zaimplementować tryby zastępowania i kumulowania dla usługi ASA. Jeśli używasz funkcji jako warstwy pośredniej, potencjalna wydajność zapisu nie ma wpływu na zadanie przesyłania strumieniowego. W związku z tym korzystanie z usługi Azure Functions działa najlepiej z usługą Azure SQL. W przypadku usługi Synapse SQL przejście z instrukcji zbiorczych na wiersz po wiersz może spowodować większe problemy z wydajnością.

dane wyjściowe Azure Functions

W tym zadaniu zastąpisz dane wyjściowe ASA SQL danymi wyjściowymi z ASA Azure Functions. Funkcja implementuje funkcje UPDATE, UPSERT lub MERGE.

Obecnie możesz uzyskać dostęp do SQL Database w ramach funkcji, korzystając z dwóch opcji. Pierwszą opcją jest powiązanie wyjściowe Azure SQL. Obecnie jest on ograniczony do języka C#i oferuje tylko tryb zamiany. Drugą opcją jest utworzenie zapytania SQL w celu przesłania go za pomocą odpowiedniego sterownika SQL (Microsoft. Data.SqlClient for .NET).

Oba poniższe przykłady zakładają następujący schemat tabeli. Opcja powiązania wymaga ustawienia klucza podstawowego w tabeli docelowej. Nie jest to konieczne, ale zalecane w przypadku korzystania ze sterownika SQL.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

Aby użyć funkcji jako danych wyjściowych z usługi ASA, funkcja musi spełniać następujące wymagania:

  • Azure Stream Analytics oczekuje stanu HTTP 200 z aplikacji usługi Functions dla partii, które pomyślnie przetwarza.
  • Gdy Azure Stream Analytics otrzyma wyjątek 413 ("http Request Entity Too Large") z funkcji Azure, zmniejsza rozmiar partii wysyłanych do funkcji Azure.
  • Podczas połączenia testowego usługa Stream Analytics wysyła żądanie POST z pustą partią do Azure Functions i oczekuje stanu HTTP 20x z powrotem w celu zweryfikowania testu.

Opcja 1. Aktualizacja według klucza za pomocą powiązania SQL funkcji platformy Azure

Ta opcja używa powiązania danych wyjściowych SQL funkcji Azure. To rozszerzenie może zastąpić obiekt w tabeli bez konieczności pisania instrukcji SQL. Obecnie nie obsługuje operatorów przypisania złożonego (akumulacji).

Ten przykład został zbudowany na:

Aby lepiej zrozumieć podejście do powiązania, przejdź przez ten samouczek.

Najpierw utwórz domyślną aplikację funkcji HttpTrigger, postępując zgodnie z instrukcjami w tym samouczku. Użyj następujących informacji:

  • Język: C#
  • Środowisko uruchomieniowe: .NET 6 (w obszarze function/runtime v4)
  • Szablon: HTTP trigger

Zainstaluj rozszerzenie powiązania, uruchamiając następujące polecenie w terminalu znajdującym się w folderze projektu:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

Dodaj element SqlConnectionString w sekcji Values swojego pliku local.settings.json, wypełniając ciąg połączenia serwera docelowego.

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Zastąp całą funkcję (plik .cs w projekcie) następującym fragmentem kodu. Zaktualizuj przestrzeń nazw, nazwę klasy i nazwę funkcji własnymi:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Zaktualizuj nazwę tabeli docelowej w sekcji powiązania:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Zaktualizuj sekcję Device klasy i mapowania, aby dopasować ją do własnego schematu:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

Możesz teraz przetestować okablowanie między funkcją lokalną a bazą danych, debugując (F5 w programie Visual Studio Code). Baza danych SQL musi być osiągalna z twojej maszyny. Aby sprawdzić łączność, możesz użyć programu SSMS . Następnie wyślij żądania POST do lokalnego punktu końcowego. Żądanie z pustą treścią powinno zwrócić HTTP 204. Żądanie z faktycznym ładunkiem powinno być zapisywane w tabeli docelowej (w trybie nadpisywania/aktualizacji). Oto przykładowy ładunek odpowiadający schematowi użytemu w tym przykładzie:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

Funkcję można teraz opublikować na platformie Azure. Ustaw ustawienie aplikacji dla elementu SqlConnectionString. Zapora programu Azure SQL Server powinna zezwalać usługom platformy Azure na dostęp, aby funkcja działająca na żywo mogła ją osiągnąć.

Następnie można zdefiniować funkcję jako dane wyjściowe w zadaniu usługi ASA i użyć jej do zastąpienia rekordów zamiast ich wstawiania.

Opcja 2. Scalanie z przypisaniem złożonym (skumulowanym) za pomocą niestandardowego zapytania SQL

Uwaga

Po ponownym uruchomieniu i odzyskiwaniu usługa ASA może ponownie wysłać zdarzenia wyjściowe, które już zostały wyemitowane. To zachowanie może spowodować niepowodzenie logiki akumulowania (podwojenie poszczególnych wartości). Aby zapobiec temu problemowi, wyprowadź te same dane w tabeli przy użyciu natywnego wyjścia ASA SQL. Za pomocą tej tabeli kontrolek można wykrywać problemy i ponownie synchronizować akumulację w razie potrzeby.

Ta opcja używa elementu Microsoft.Data.SqlClient. Ta biblioteka umożliwia wysyłanie zapytań SQL do usługi SQL Database.

Ten przykład został zbudowany na:

Najpierw utwórz domyślną aplikację funkcji HttpTrigger, postępując zgodnie z instrukcjami w tym samouczku. Używane są następujące informacje:

  • Język: C#
  • Środowisko uruchomieniowe: .NET 6 (w obszarze function/runtime v4)
  • Szablon: HTTP trigger

Zainstaluj bibliotekę SqlClient, uruchamiając następujące polecenie w terminalu znajdującym się w folderze projektu:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

Dodaj element SqlConnectionString w sekcji Values swojego pliku local.settings.json, wypełniając ciąg połączenia serwera docelowego.

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Zastąp całą funkcję (plik .cs w projekcie) następującym fragmentem kodu. Zaktualizuj własną przestrzeń nazw, nazwę klasy i nazwę funkcji:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

Zaktualizuj sekcję sqltext kompilowania poleceń, aby pasować do własnego schematu (zwróć uwagę, jak akumulacja jest osiągana za pośrednictwem += operatora podczas aktualizacji):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

Teraz można przetestować okablowanie między funkcją lokalną a bazą danych, debugując (F5 w programie VS Code). Baza danych SQL musi być osiągalna z twojej maszyny. Aby sprawdzić łączność, możesz użyć programu SSMS . Następnie wyślij żądania POST do lokalnego punktu końcowego. Żądanie z pustą treścią powinno zwrócić http 204. Żądanie z danym obciążeniowym powinno być zapisywane w tabeli docelowej (w trybie kumulacji/łączenia). Oto przykładowy ładunek odpowiadający schematowi użytemu w tym przykładzie:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

Funkcję można teraz opublikować na platformie Azure. Należy ustawić ustawienie aplikacji dla SqlConnectionString. Zapora programu Azure SQL Server powinna zezwalać usługom platformy Azure na dostęp, aby funkcja na żywo mogła do niego dotrzeć.

Następnie funkcję można zdefiniować jako wynik w zadaniu ASA i użyć do zastąpienia rekordów zamiast wstawiania.

Alternatywy

Poza Azure Functions wiele metod może osiągnąć oczekiwany wynik. W tej sekcji opisano niektóre z tych metod.

Przetwarzanie końcowe w docelowej bazie danych SQL

Zadanie w tle działa po wstawieniu danych do bazy danych za pośrednictwem standardowych danych wyjściowych usługi ASA SQL.

W przypadku Azure SQL użyj wyzwalaczy INSTEAD OFDML, aby przechwycić polecenia INSERT, które są wydawane przez usługę ASA.

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

W przypadku usługi Synapse SQL usługa ASA może wstawić dane do tabeli przejściowej. Zadanie cykliczne może następnie przekształcić dane zgodnie z potrzebami w tabelę pośredniczącą. Na koniec dane są przenoszone do tabeli produkcyjnej.

Przetwarzanie wstępne w usłudze Azure Cosmos DB

Usługa Azure Cosmos DB obsługuje natywnie funkcję UPSERT. W tym miejscu możliwe jest tylko dołączanie lub zastępowanie. Należy zarządzać akumulacjami po stronie klienta w Azure Cosmos DB.

Jeśli wymagania są zgodne, możesz zastąpić docelową bazę danych SQL wystąpieniem Azure Cosmos DB. Ta zmiana wymaga ważnej zmiany w ogólnej architekturze rozwiązania.

W przypadku usługi Synapse SQL można użyć Azure Cosmos DB jako warstwy pośredniej za pośrednictwem Azure Synapse Link dla Azure Cosmos DB. Użyj usługi Azure Synapse Link, aby utworzyć magazyn analityczny. Następnie możesz wykonywać zapytania dotyczące tego magazynu danych bezpośrednio w usłudze Synapse SQL.

Porównanie alternatyw

Każde podejście oferuje różne propozycje wartości i możliwości:

Typ Opcja Tryby Azure SQL Database Azure Synapse Analytics
Przetwarzanie końcowe
Wyzwalacze Zamień, Akumuluj + Nie dotyczy: wyzwalacze nie są dostępne w Synapse SQL
Przygotowanie Zamień, Akumuluj + +
Wstępne przetwarzanie
Azure Functions Zamień, Akumuluj + - (wydajność wiersz po wierszu)
Zastąpienie usługi Azure Cosmos DB Zastąp Brak Brak
Azure Cosmos DB Azure Synapse Link Zastąp Brak +

Uzyskiwanie pomocy technicznej

Aby uzyskać dalszą pomoc, wypróbuj stronę pytań i odpowiedzi firmy Microsoft dotyczącą usługi Azure Stream Analytics.

Następne kroki