Обновление и слияние записей в Базе данных SQL Azure с помощью Функций Azure
В настоящее время Azure Stream Analytics (ASA) поддерживает только вставку (добавление) строк в выходные данные SQL (База данных SQL Azure и Azure Synapse Analytics). В этой статье обсуждаются обходные пути для включения UPDATE, UPSERT или MERGE в базах данных SQL с использованием Функций Azure в качестве промежуточного уровня.
Альтернативы Функциям Azure представлены в конце.
Требование
Запись данных в таблицу обычно осуществляется следующим образом:
Режим | Эквивалентная инструкция T-SQL | Требования |
---|---|---|
Добавление | INSERT | нет |
Replace | MERGE (UPSERT) | Уникальный ключ |
Накапливать | MERGE (UPSERT) с оператором составного присваивания (+= , -= ,..) |
Уникальный ключ и аккумулятор |
Чтобы проиллюстрировать различия, ознакомьтесь с тем, что происходит при приеме следующих двух записей:
Arrival_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | A | 1 |
10:05 | а | 20 |
В режиме добавления мы вставим две записи. Эквивалентная инструкция T-SQL:
INSERT INTO [target] VALUES (...);
Результат:
Modified_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | A | 1 |
10:05 | а | 20 |
В режиме замены мы получаем только последнее значение по ключу. Здесь мы используем Device_Id в качестве ключа. Эквивалентная инструкция T-SQL:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Результат:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | а | 20 |
Наконец, в режиме накопления мы будем суммировать Value
с помощью составного оператора присваивания (+=
). Здесь также мы используем Device_Id в качестве ключа:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Результат:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | а | 21 |
Из соображений производительности выходные адаптеры базы данных SQL ASA сейчас изначально поддерживают только режим добавления. В этих адаптерах используется массивная вставка, чтобы максимизировать пропускную способность и ограничить обратное давление.
В этой статье показано, как использовать Функции Azure для реализации режимов замены и накопления для ASA. При использовании функции в качестве промежуточного слоя потенциальная производительность записи не влияет на задание потоковой передачи. В этом отношении использование Функции Azure лучше всего работает с SQL Azure. При использовании Synapse SQL переключение с массового на операторы строк может привести к более высокой производительности.
Выходные данные Функций Azure
В нашем задании мы заменим выходные данные ASA SQL на выходные данные ASA Функции Azure. Возможности UPDATE, UPSERT или MERGE реализуются в функции.
Сейчас есть два варианта доступа к Базе данных SQL в функции. Во-первых, это выходная привязка Azure SQL. Сейчас этот вариант ограничен C# и предлагает только режим замены. Во-вторых, это составление запроса SQL, который будет отправлен через соответствующий драйвер SQL (Microsoft.Data.SqlClient для .NET).
Для обоих следующих примеров предполагается следующая схема таблицы. Вариант с привязкой требует, чтобы первичный ключ был установлен в целевой таблице. Это необязательно, но рекомендуется при использовании драйвера SQL.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
Функция должна соответствовать следующим ожиданиям для использования в качестве выходных данных ASA:
- Azure Stream Analytics ожидает состояние HTTP 200 из приложения-функции для пакетов, которые были успешно обработаны.
- Когда Azure Stream Analytics получает исключение 413 (сущность запроса HTTP слишком большая) из от Функций Azure, размер пакетов, отправляемых в Функции Azure, уменьшается.
- Во время тестового подключения Stream Analytics отправляет запрос POST с пустым пакетом в Функции Azure и ждет возврата состояния HTTP 20x для проверки теста.
Вариант 1. Обновление по ключу с помощью привязки SQL Функций Azure.
Этот вариант использует расширение Выходная привязка SQL Функций Azure. Это расширение может заменить объект в таблице без необходимости писать инструкцию SQL. Сейчас составные операторы присваивания (накопления) не поддерживаются.
Этот пример был создан с помощью:
- Среды выполнения Функций Azure версии 4.
- .NET 6.0
- Microsoft.Azure.WebJobs.Extensions.Sql 0.1.131-preview.
Чтобы лучше понять подход с привязкой, рекомендуется следовать этому руководству.
Сначала создайте приложение функции HttpTrigger по умолчанию, следуя этому руководству. Используются следующие сведения:
- Язык:
C#
- Среда выполнения:
.NET 6
(в функции или среде выполнения версии 4). - Шаблон:
HTTP trigger
.
Установите расширение привязки, выполнив следующую команду в терминале, расположенном в папке проекта:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
Добавьте элемент SqlConnectionString
в раздел Values
файла local.settings.json
, заполнив строку подключения целевого сервера:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Замените функцию целиком (CS-файл в проекте) на следующий фрагмент кода. Замените пространство имен, имя класса и имя функции на собственные:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Обновите имя целевой таблицы в разделе привязки:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Обновите раздел класса Device
и сопоставления, чтобы он соответствовал вашей схеме:
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
Теперь можно протестировать проводку между локальной функцией и базой данных, отладив (F5 в Visual Studio Code). База данных SQL должна быть доступна с компьютера. SSMS можно использовать для проверки подключения. Затем отправьте запросы POST в локальную конечную точку. Запрос с пустым телом должен возвращать состояние HTTP 204. Запрос с реальными полезными данными должен быть сохранен в целевой таблице (в режиме замены или обновления). Вот пример полезных данных, соответствующих схеме, используемой в этом примере:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
Теперь функцию можно опубликовать в Azure. Для параметра приложения нужно задать SqlConnectionString
. Брандмауэр Azure SQL Server должен разрешать службам Azure доступ к активной функции.
Затем эту функцию можно определить как результат задания ASA и использовать для замены записей вместо их вставки.
Вариант 2. Объединение с составным присвоением (накоплением) с помощью пользовательского SQL-запроса.
Примечание.
После перезапуска и восстановления ASA может повторно отправить выходные события, которые уже были отправлены. Это ожидаемое поведение, которое может привести к сбою логики накопления (удвоение отдельных значений). Чтобы избежать этого, рекомендуется выводить те же данные в таблицу через собственный вывод SQL ASA. Затем эту контрольную таблицу можно использовать для обнаружения проблем и повторной синхронизации накопления при необходимости.
Этот вариант использует Microsoft.Data.SqlClient. Эта библиотека позволяет выполнять любые запросы SQL к Базе данных SQL.
Этот пример был создан с помощью:
Сначала создайте приложение функции HttpTrigger по умолчанию, следуя этому руководству. Используются следующие сведения:
- Язык:
C#
- Среда выполнения:
.NET 6
(в функции или среде выполнения версии 4). - Шаблон:
HTTP trigger
.
Установите библиотеку SqlClient, выполнив следующую команду в терминале, расположенном в папке проекта:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
Добавьте элемент SqlConnectionString
в раздел Values
файла local.settings.json
, заполнив строку подключения целевого сервера:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Замените функцию целиком (CS-файл в проекте) на следующий фрагмент кода. Замените пространство имен, имя класса и имя функции на собственные:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
Обновите раздел построения команд sqltext
, чтобы он соответствовал вашей схеме (обратите внимание, как накопление достигается с помощью оператора +=
при обновлении):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
Теперь вы можете проверить связь между локальной функцией и базой данных с помощью отладки (F5 в VS Code). База данных SQL должна быть доступна с компьютера. SSMS можно использовать для проверки подключения. Затем выполните запросы POST к локальной конечной точке. Запрос с пустым телом должен возвращать состояние HTTP 204. Запрос с актуальными полезными данными должен сохраняться в целевой таблице (в режиме накопления или слияния). Вот пример полезных данных, соответствующих схеме, используемой в этом примере:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
Теперь функцию можно опубликовать в Azure. Для параметра приложения нужно задать SqlConnectionString
. Брандмауэр Azure SQL Server должен разрешать службам Azure доступ к активной функции.
Затем эту функцию можно определить как результат задания ASA и использовать для замены записей вместо их вставки.
Альтернативные варианты
Наряду с Функциями Azure есть и другие способы достижения ожидаемого результата. В этом разделе приведены некоторые из них.
Последующая обработка в целевой Базе данных SQL
Фоновая задача выполняется после вставки данных в базу данных с помощью стандартных выходных данных ASA SQL.
Для SQL Azure триггеры DML можно использовать для перехвата команд INSERT, INSTEAD OF
выданных ASA:
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
Для Synapse SQL ASA может вставлять в промежуточную таблицу. Затем повторяющаяся задача может преобразовать данные по мере необходимости в промежуточную таблицу. Наконец, данные перемещаются в производственную таблицу.
Предварительная обработка в Azure Cosmos DB
Azure Cosmos DB обеспечивает встроенную поддержку UPSERT. Здесь возможно только добавление и замена. Накопление должно управляться клиентом на стороне клиента в Azure Cosmos DB.
Если требования совпадают, можно заменить целевую базу данных SQL экземпляром Azure Cosmos DB. Это требует существенного изменения в общей архитектуре решения.
Для Synapse SQL Azure Cosmos DB можно использовать в качестве промежуточного уровня с помощью Azure Synapse Link для Azure Cosmos DB. Azure Synapse Link можно использовать для создания аналитического хранилища. Затем это хранилище данных можно запросить непосредственно в Synapse SQL.
Сравнение альтернатив
Каждый подход предлагает разные преимущества и возможности:
Тип | Вариант | Режимы | База данных SQL Azure | Azure Synapse Analytics |
---|---|---|---|---|
Постобработка | ||||
Триггеры | Замена, накопление | + | Н/д, триггеры недоступны в Synapse SQL | |
Промежуточная | Замена, накопление | + | + | |
Предварительная обработка | ||||
Функции Azure | Замена, накопление | + | - (построчная производительность) | |
Замена Azure Cosmos DB | Replace | Неприменимо | Неприменимо | |
Azure Cosmos DB Azure Synapse Link | Replace | Н/П | + |
Поддержка
За дополнительной информацией перейдите на страницу вопросов и ответов об Azure Stream Analytics.
Следующие шаги
- Описание выходных данных из Azure Stream Analytics
- Вывод данных Azure Stream Analytics в базу данных SQL Azure
- Повышение показателей пропускной способности базы данных Azure SQL из Azure Stream Analytics
- Использование управляемых удостоверений для доступа к Базе данных SQL Azure или Azure Synapse Analytics из задания Azure Stream Analytics
- Использование ссылочных данных из База данных SQL для задания Azure Stream Analytics
- Выполнение Функций Azure из заданий Azure Stream Analytics — руководство по выходным данным Redis
- Краткое руководство по созданию задания Stream Analytics с помощью портала Azure