Обновление или слияние записей в База данных SQL Azure с помощью Функции Azure

В настоящее время Azure Stream Analytics (ASA) поддерживает только вставку (добавление) строк в выходные данные SQL (Azure SQL Базы данных и Azure Synapse Analytics). В этой статье рассматриваются обходные пути для включения UPDATE, UPSERT или MERGE в базах данных SQL с помощью Функции Azure в качестве промежуточного слоя.

Альтернативные варианты Функции Azure представлены в конце.

Требование

Данные можно записывать в таблицу с помощью одного из следующих режимов:

Режим Эквивалентная инструкция T-SQL Требования
Добавить INSERT нет
Заменить MERGE (UPSERT) Уникальный ключ
Накапливать MERGE (UPSERT) с оператором комплексного присваивания (+=, -=...) Уникальный ключ и аккумулятор

Чтобы проиллюстрировать различия, рассмотрите, что происходит при приеме следующих двух записей:

Время_прибытия Идентификатор_Устройства Значение_Измерения
10:00 A 1
10:05 A 20

В режиме добавления вставьте две записи. Эквивалентная инструкция T-SQL:

INSERT INTO [target] VALUES (...);

Результат:

Время_изменения Device_Id Значение_Измерения
10:00 A 1
10:05 A 20

В режиме замены вы получаете только последнее значение по ключу. Здесь вы используете Device_Id в качестве ключа. Эквивалентная инструкция T-SQL:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Результат:

Изменённое_Время Ключ_устройства Значение_Меры
10:05 A 20

Наконец, в режиме накопления прибавляется Value с помощью оператора составного назначения (+=). Здесь также используется Device_Id в качестве ключа:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Результат:

Время_изменения Ключ устройства Значение_Измерения
10:05 А 21

Из соображений производительности выходные адаптеры базы данных SQL ASA сейчас изначально поддерживают только режим добавления. В этих адаптерах используется массивная вставка, чтобы максимизировать пропускную способность и ограничить обратное давление.

В этой статье показано, как использовать Функции Azure для реализации режимов замены и накопления для ASA. При использовании функции в качестве промежуточного слоя потенциальная производительность записи не влияет на задание потоковой передачи. В этом отношении использование функций Azure лучше всего совместимо с Azure SQL. При использовании Synapse SQL переключение с массового на построчные операторы может привести к более значительным проблемам с производительностью.

результаты работы Функции Azure

В этом задании вы замените выходные данные ASA SQL выходными данными ASA Функции Azure. Функция реализует возможности UPDATE, UPSERT или MERGE.

В настоящее время вы можете получить доступ к базе данных SQL в функции с помощью двух вариантов. Первым вариантом является Azure SQL выходная привязка. В настоящее время она ограничена C# и предлагает только режим замены. Второй вариант — создать SQL-запрос для отправки с помощью соответствующего драйвера SQL (Microsoft. Data.SqlClient для .NET).

Оба следующих примера предполагают следующую схему таблицы. Вариант с привязкой требует, чтобы первичный ключ был установлен в целевой таблице. Это необязательно, но рекомендуется при использовании драйвера SQL.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

Чтобы использовать функцию в качестве выходных данных из ASA, функция должна соответствовать следующим ожиданиям:

  • Azure Stream Analytics ожидает состояние HTTP 200 из приложения "Функции" для пакетов, которые он успешно обрабатывает.
  • Если Azure Stream Analytics получает исключение 413 ("http request Entity Too Large") из функции Azure, это уменьшает размер пакетов, которые он отправляет в функцию Azure.
  • Во время тестового подключения Stream Analytics отправляет запрос POST с пустым пакетом в Функции Azure и ожидает, что код состояния HTTP 20x будет возвращён, чтобы подтвердить тест.

Вариант 1: Обновление по ключу с помощью привязки SQL функцией Azure.

Этот вариант использует опцию Выходной привязки SQL для функций Azure. Это расширение может заменить объект в таблице без необходимости писать инструкцию SQL. В настоящее время составные операторы присваивания, такие как операторы накопления, не поддерживаются.

Этот пример был создан с помощью:

Чтобы лучше понять подход к привязке, следуйте инструкциям this.

Сначала создайте приложение функции HttpTrigger по умолчанию, следуя этому руководству. Используйте следующую информацию:

  • Язык: C#
  • Среда выполнения: .NET 6 (в рамках функции/среды выполнения версии 4).
  • Шаблон: HTTP trigger.

Установите расширение привязки, выполнив следующую команду в терминале, расположенном в папке проекта:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

Добавьте элемент SqlConnectionString в раздел Values файла local.settings.json, заполнив строку подключения целевого сервера:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Замените функцию целиком (CS-файл в проекте) на следующий фрагмент кода. Обновите пространство имен, имя класса и имя функции, выбрав свои собственные:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Обновите имя целевой таблицы в разделе привязки:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Обновите раздел класса Device и сопоставления, чтобы он соответствовал вашей схеме:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

Теперь можно протестировать соединение между локальной функцией и базой данных путем отладки, используя F5 в Visual Studio Code. База данных SQL должна быть доступна с компьютера. Для проверки подключения можно использовать SSMS . Затем отправьте запросы POST в локальную конечную точку. Запрос с пустым текстом должен возвращать HTTP 204. Запрос с фактической полезной нагрузкой должен быть записан в целевую таблицу (в режиме замены / обновления). Вот пример пакета данных, соответствующего схеме в этом примере:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

Теперь функцию можно опубликовать в Azure. Установка параметра приложения для SqlConnectionString. Брандмауэр Azure SQL Server должен разрешать службам Azure доступ к активной функции.

Затем можно определить функцию в качестве выходных данных в задании ASA и использовать ее для замены записей вместо вставки.

Вариант 2. Объединение с использованием операции составного присвоения (аккумулирования) посредством пользовательского SQL-запроса.

Примечание.

При перезапуске и восстановлении ASA может повторно отправить выходные события, которые он уже генерирует. Это поведение может привести к сбою логики накопления (удвоение отдельных значений). Чтобы предотвратить эту проблему, выведите те же данные в формате таблицы, используя встроенные выходные данные ASA SQL. Эту таблицу управления можно использовать для обнаружения проблем и повторной синхронизации накопления при необходимости.

Этот вариант использует Microsoft.Data.SqlClient. Эта библиотека позволяет отправлять запросы SQL в базу данных SQL.

Этот образец был построен на основе:

Сначала создайте приложение функции HttpTrigger по умолчанию, следуя этому руководству. Используются следующие сведения:

  • Язык: C#
  • Время выполнения: .NET 6 (в функции/время выполнения версии 4).
  • Шаблон: HTTP trigger.

Установите библиотеку SqlClient, выполнив следующую команду в терминале, расположенном в папке проекта:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

Добавьте элемент SqlConnectionString в раздел Values файла local.settings.json, заполнив строку подключения целевого сервера:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Замените функцию целиком (CS-файл в проекте) на следующий фрагмент кода. Замените пространство имен, имя класса и имя функции на собственные:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

Обновите раздел построения команд sqltext, чтобы он соответствовал вашей схеме (обратите внимание, как накопление достигается с помощью оператора += при обновлении):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

Теперь вы можете проверить связь между локальной функцией и базой данных с помощью отладки (F5 в VS Code). База данных SQL должна быть доступна с компьютера. Для проверки подключения можно использовать SSMS . Затем отправьте запросы POST в локальную конечную точку. Запрос с пустым текстом должен возвращать HTTP 204. Запрос с фактическими данными полезной нагрузки должен сохраняться в целевой таблице (в режиме накопления или слияния). Вот пример нагрузки, которая соответствует схеме, используемой в данном случае.

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

Теперь функцию можно опубликовать в Azure. Для параметра приложения следует задать SqlConnectionString. Брандмауэр Azure SQL Server должен разрешать службам Azure доступ, чтобы функция в реальном времени могла к нему подключиться.

Затем эту функцию можно определить как результат задания ASA и использовать для замены записей вместо их вставки.

Альтернативные варианты

Вне Функции Azure несколько методов могут достичь ожидаемого результата. В этом разделе описаны некоторые из этих методов.

Последующая обработка в целевой Базе данных SQL

Фоновая задача выполняется после вставки данных в базу данных с помощью стандартных выходных данных ASA SQL.

Для Azure SQL используйте триггеры DML INSTEAD OF для перехвата команд INSERT, которые ASA выпускает.

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

Для Synapse SQL ASA может вставлять в промежуточную таблицу. Затем повторяющаяся задача может преобразовать данные по мере необходимости в промежуточную таблицу. Наконец, данные перемещаются в рабочую таблицу.

Предварительная обработка в Azure Cosmos DB

Azure Cosmos DB обеспечивает встроенную поддержку UPSERT. Здесь возможно только добавление или замена. Необходимо управлять накоплением на стороне клиента в Azure Cosmos DB.

Если требования соответствуют требованиям, можно заменить целевую базу данных SQL экземпляром Azure Cosmos DB. Для этого изменения требуется важное изменение в общей архитектуре решения.

Для Synapse SQL можно использовать Azure Cosmos DB в качестве промежуточного слоя через Azure Synapse Link для Azure Cosmos DB. Используйте Azure Synapse Link для создания аналитического хранилища. Затем вы можете запросить это хранилище данных непосредственно в Synapse SQL.

Сравнение альтернатив

Каждый подход предлагает различные предложения и возможности.

Тип Вариант Режимы База данных SQL Azure Azure Synapse Analytics
Постобработка
Триггеры Замена, накопление + Не применимо, триггеры недоступны в Synapse SQL
Тестовая среда Замена, накопление + +
Предварительная обработка
Функции Azure Замена, накопление + - (построчная производительность)
Замена Azure Cosmos DB Заменить Неприменимо Неприменимо
Azure Cosmos DB Azure Synapse Link Заменить Неприменимо +

Получить поддержку

Дополнительные сведения см. на странице вопросов Microsoft Q&A для Azure Stream Analytics.

Следующие шаги