Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В настоящее время Azure Stream Analytics (ASA) поддерживает только вставку (добавление) строк в выходные данные SQL (Azure SQL Базы данных и Azure Synapse Analytics). В этой статье рассматриваются обходные пути для включения UPDATE, UPSERT или MERGE в базах данных SQL с помощью Функции Azure в качестве промежуточного слоя.
Альтернативные варианты Функции Azure представлены в конце.
Требование
Данные можно записывать в таблицу с помощью одного из следующих режимов:
| Режим | Эквивалентная инструкция T-SQL | Требования |
|---|---|---|
| Добавить | INSERT | нет |
| Заменить | MERGE (UPSERT) | Уникальный ключ |
| Накапливать | MERGE (UPSERT) с оператором комплексного присваивания (+=, -=...) |
Уникальный ключ и аккумулятор |
Чтобы проиллюстрировать различия, рассмотрите, что происходит при приеме следующих двух записей:
| Время_прибытия | Идентификатор_Устройства | Значение_Измерения |
|---|---|---|
| 10:00 | A | 1 |
| 10:05 | A | 20 |
В режиме добавления вставьте две записи. Эквивалентная инструкция T-SQL:
INSERT INTO [target] VALUES (...);
Результат:
| Время_изменения | Device_Id | Значение_Измерения |
|---|---|---|
| 10:00 | A | 1 |
| 10:05 | A | 20 |
В режиме замены вы получаете только последнее значение по ключу. Здесь вы используете Device_Id в качестве ключа. Эквивалентная инструкция T-SQL:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Результат:
| Изменённое_Время | Ключ_устройства | Значение_Меры |
|---|---|---|
| 10:05 | A | 20 |
Наконец, в режиме накопления прибавляется Value с помощью оператора составного назначения (+=). Здесь также используется Device_Id в качестве ключа:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Результат:
| Время_изменения | Ключ устройства | Значение_Измерения |
|---|---|---|
| 10:05 | А | 21 |
Из соображений производительности выходные адаптеры базы данных SQL ASA сейчас изначально поддерживают только режим добавления. В этих адаптерах используется массивная вставка, чтобы максимизировать пропускную способность и ограничить обратное давление.
В этой статье показано, как использовать Функции Azure для реализации режимов замены и накопления для ASA. При использовании функции в качестве промежуточного слоя потенциальная производительность записи не влияет на задание потоковой передачи. В этом отношении использование функций Azure лучше всего совместимо с Azure SQL. При использовании Synapse SQL переключение с массового на построчные операторы может привести к более значительным проблемам с производительностью.
результаты работы Функции Azure
В этом задании вы замените выходные данные ASA SQL выходными данными ASA Функции Azure. Функция реализует возможности UPDATE, UPSERT или MERGE.
В настоящее время вы можете получить доступ к базе данных SQL в функции с помощью двух вариантов. Первым вариантом является Azure SQL выходная привязка. В настоящее время она ограничена C# и предлагает только режим замены. Второй вариант — создать SQL-запрос для отправки с помощью соответствующего драйвера SQL (Microsoft. Data.SqlClient для .NET).
Оба следующих примера предполагают следующую схему таблицы. Вариант с привязкой требует, чтобы первичный ключ был установлен в целевой таблице. Это необязательно, но рекомендуется при использовании драйвера SQL.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
Чтобы использовать функцию в качестве выходных данных из ASA, функция должна соответствовать следующим ожиданиям:
- Azure Stream Analytics ожидает состояние HTTP 200 из приложения "Функции" для пакетов, которые он успешно обрабатывает.
- Если Azure Stream Analytics получает исключение 413 ("http request Entity Too Large") из функции Azure, это уменьшает размер пакетов, которые он отправляет в функцию Azure.
- Во время тестового подключения Stream Analytics отправляет запрос POST с пустым пакетом в Функции Azure и ожидает, что код состояния HTTP 20x будет возвращён, чтобы подтвердить тест.
Вариант 1: Обновление по ключу с помощью привязки SQL функцией Azure.
Этот вариант использует опцию Выходной привязки SQL для функций Azure. Это расширение может заменить объект в таблице без необходимости писать инструкцию SQL. В настоящее время составные операторы присваивания, такие как операторы накопления, не поддерживаются.
Этот пример был создан с помощью:
- Среда выполнения Функции Azure версии 4.
- .NET 6.0
- Microsoft.Azure.WebJobs.Extensions.Sql 0.1.131-preview.
Чтобы лучше понять подход к привязке, следуйте инструкциям this.
Сначала создайте приложение функции HttpTrigger по умолчанию, следуя этому руководству. Используйте следующую информацию:
- Язык:
C# - Среда выполнения:
.NET 6(в рамках функции/среды выполнения версии 4). - Шаблон:
HTTP trigger.
Установите расширение привязки, выполнив следующую команду в терминале, расположенном в папке проекта:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
Добавьте элемент SqlConnectionString в раздел Values файла local.settings.json, заполнив строку подключения целевого сервера:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Замените функцию целиком (CS-файл в проекте) на следующий фрагмент кода. Обновите пространство имен, имя класса и имя функции, выбрав свои собственные:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Обновите имя целевой таблицы в разделе привязки:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Обновите раздел класса Device и сопоставления, чтобы он соответствовал вашей схеме:
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
Теперь можно протестировать соединение между локальной функцией и базой данных путем отладки, используя F5 в Visual Studio Code. База данных SQL должна быть доступна с компьютера. Для проверки подключения можно использовать SSMS . Затем отправьте запросы POST в локальную конечную точку. Запрос с пустым текстом должен возвращать HTTP 204. Запрос с фактической полезной нагрузкой должен быть записан в целевую таблицу (в режиме замены / обновления). Вот пример пакета данных, соответствующего схеме в этом примере:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
Теперь функцию можно опубликовать в Azure. Установка параметра приложения для SqlConnectionString. Брандмауэр Azure SQL Server должен разрешать службам Azure доступ к активной функции.
Затем можно определить функцию в качестве выходных данных в задании ASA и использовать ее для замены записей вместо вставки.
Вариант 2. Объединение с использованием операции составного присвоения (аккумулирования) посредством пользовательского SQL-запроса.
Примечание.
При перезапуске и восстановлении ASA может повторно отправить выходные события, которые он уже генерирует. Это поведение может привести к сбою логики накопления (удвоение отдельных значений). Чтобы предотвратить эту проблему, выведите те же данные в формате таблицы, используя встроенные выходные данные ASA SQL. Эту таблицу управления можно использовать для обнаружения проблем и повторной синхронизации накопления при необходимости.
Этот вариант использует Microsoft.Data.SqlClient. Эта библиотека позволяет отправлять запросы SQL в базу данных SQL.
Этот образец был построен на основе:
Сначала создайте приложение функции HttpTrigger по умолчанию, следуя этому руководству. Используются следующие сведения:
- Язык:
C# - Время выполнения:
.NET 6(в функции/время выполнения версии 4). - Шаблон:
HTTP trigger.
Установите библиотеку SqlClient, выполнив следующую команду в терминале, расположенном в папке проекта:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
Добавьте элемент SqlConnectionString в раздел Values файла local.settings.json, заполнив строку подключения целевого сервера:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Замените функцию целиком (CS-файл в проекте) на следующий фрагмент кода. Замените пространство имен, имя класса и имя функции на собственные:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
Обновите раздел построения команд sqltext, чтобы он соответствовал вашей схеме (обратите внимание, как накопление достигается с помощью оператора += при обновлении):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
Теперь вы можете проверить связь между локальной функцией и базой данных с помощью отладки (F5 в VS Code). База данных SQL должна быть доступна с компьютера. Для проверки подключения можно использовать SSMS . Затем отправьте запросы POST в локальную конечную точку. Запрос с пустым текстом должен возвращать HTTP 204. Запрос с фактическими данными полезной нагрузки должен сохраняться в целевой таблице (в режиме накопления или слияния). Вот пример нагрузки, которая соответствует схеме, используемой в данном случае.
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
Теперь функцию можно опубликовать в Azure. Для параметра приложения следует задать SqlConnectionString. Брандмауэр Azure SQL Server должен разрешать службам Azure доступ, чтобы функция в реальном времени могла к нему подключиться.
Затем эту функцию можно определить как результат задания ASA и использовать для замены записей вместо их вставки.
Альтернативные варианты
Вне Функции Azure несколько методов могут достичь ожидаемого результата. В этом разделе описаны некоторые из этих методов.
Последующая обработка в целевой Базе данных SQL
Фоновая задача выполняется после вставки данных в базу данных с помощью стандартных выходных данных ASA SQL.
Для Azure SQL используйте триггеры DML INSTEAD OF для перехвата команд INSERT, которые ASA выпускает.
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
Для Synapse SQL ASA может вставлять в промежуточную таблицу. Затем повторяющаяся задача может преобразовать данные по мере необходимости в промежуточную таблицу. Наконец, данные перемещаются в рабочую таблицу.
Предварительная обработка в Azure Cosmos DB
Azure Cosmos DB обеспечивает встроенную поддержку UPSERT. Здесь возможно только добавление или замена. Необходимо управлять накоплением на стороне клиента в Azure Cosmos DB.
Если требования соответствуют требованиям, можно заменить целевую базу данных SQL экземпляром Azure Cosmos DB. Для этого изменения требуется важное изменение в общей архитектуре решения.
Для Synapse SQL можно использовать Azure Cosmos DB в качестве промежуточного слоя через Azure Synapse Link для Azure Cosmos DB. Используйте Azure Synapse Link для создания аналитического хранилища. Затем вы можете запросить это хранилище данных непосредственно в Synapse SQL.
Сравнение альтернатив
Каждый подход предлагает различные предложения и возможности.
| Тип | Вариант | Режимы | База данных SQL Azure | Azure Synapse Analytics |
|---|---|---|---|---|
| Постобработка | ||||
| Триггеры | Замена, накопление | + | Не применимо, триггеры недоступны в Synapse SQL | |
| Тестовая среда | Замена, накопление | + | + | |
| Предварительная обработка | ||||
| Функции Azure | Замена, накопление | + | - (построчная производительность) | |
| Замена Azure Cosmos DB | Заменить | Неприменимо | Неприменимо | |
| Azure Cosmos DB Azure Synapse Link | Заменить | Неприменимо | + |
Получить поддержку
Дополнительные сведения см. на странице вопросов Microsoft Q&A для Azure Stream Analytics.
Следующие шаги
- Описание выходных данных из Azure Stream Analytics
- Вывод данных Azure Stream Analytics в базу данных SQL Azure
- Повышение показателей пропускной способности базы данных Azure SQL из Azure Stream Analytics
- Используйте управляемые удостоверения для доступа к База данных SQL Azure или Azure Synapse Analytics из задания в Azure Stream Analytics
- Использование ссылочных данных из базы данных SQL для задачи Azure Stream Analytics
- Выполнение Функций Azure из заданий Azure Stream Analytics — руководство по выходным данным Redis
- Краткое руководство по созданию задания Stream Analytics с помощью портала Azure