Actualización o combinación de registros en Azure SQL Database con Azure Functions
Actualmente, Azure Stream Analytics (ASA) solo admite la inserción (anexión) de filas en salidas de SQL (Azure SQL Database y Azure Synapse Analytics). En este artículo se dan soluciones alternativas para habilitar UPDATE, UPSERT o MERGE en bases de datos SQL, con Azure Functions como capa intermediaria.
Al final se presentan opciones alternativas a Azure Functions.
Requisito
Por lo general, la escritura de datos en una tabla se puede realizar de la siguiente manera:
Modo | Instrucción T-SQL equivalente | Requisitos |
---|---|---|
Append | INSERT | None |
Replace | MERGE (UPSERT) | Clave única |
Acumular | MERGE (UPSERT) con operador de asignación compuesta (+= , -= ...) |
Clave única y acumulador |
Para ilustrar las diferencias, se puede ver lo que sucede al ingerir los dos registros siguientes:
Arrival_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | A | 1 |
10:05 | A | 20 |
En el modo append, se insertan los dos registros. La instrucción T-SQL equivalente es:
INSERT INTO [target] VALUES (...);
Elemento resultante:
Modified_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | A | 1 |
10:05 | A | 20 |
En el modo replace, solo se obtiene el último valor por clave. Aquí usamos Device_Id como clave. La instrucción T-SQL equivalente es:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Elemento resultante:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 20 |
Por último, en el modo accumulate, se suma Value
con un operador de asignación compuesto (+=
). Aquí también usamos Device_Id como clave:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Elemento resultante:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 21 |
Por motivos de rendimiento, los adaptadores de salida de base de datos SQL de ASA solo admiten de momento el modo Append de forma nativa. Estos adaptadores usan la inserción masiva para maximizar el rendimiento y limitar la presión de retorno.
En este artículo se muestra cómo usar Azure Functions para implementar los modos Replace y Accumulate de ASA. Cuando se usa una función como capa intermedia, el posible rendimiento de escritura no afectará al trabajo de streaming. En este sentido, el uso de Azure Functions funciona mejor con Azure SQL. Con Synapse SQL, el cambio de instrucciones masivas a instrucciones fila a fila puede crear mayores problemas de rendimiento.
Salida de Azure Functions
En el trabajo, vamos a reemplazar la salida SQL de ASA por la salida de Azure Functions de ASA. Las capacidades UPDATE, UPSERT o MERGE se van a implementar en la función.
Actualmente hay dos opciones para acceder a SQL Database en una función. Primero está el enlace de salida de Azure SQL. Actualmente está limitado a C# y solo ofrece el modo Replace. En segundo lugar está la creación de una consulta SQL que se va a enviar por medio del controlador SQL adecuado (Microsoft.Data.SqlClient para .NET).
Para los dos ejemplos siguientes, se supone que el esquema de tabla siguiente. La opción de enlace requiere el establecimiento de una clave principal en la tabla de destino. No es necesario, aunque se recomienda, si se usa un controlador SQL.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
Una función tiene que cumplir las siguientes expectativas para usarse como salida de ASA:
- Azure Stream Analytics espera el estado de HTTP 200 de la aplicación de Functions de los lotes que se han procesado correctamente.
- Si Azure Stream Analytics recibe una excepción 413 ("La entidad de solicitud HTTP es demasiado grande") de una función de Azure, disminuye el tamaño de los lotes que envía a Azure Functions.
- Durante la conexión de prueba, Stream Analytics envía una solicitud POST con un lote vacío a Azure Functions y espera que se devuelva el estado HTTP 20x para validar la prueba.
Opción 1: Actualización por clave con el enlace SQL de Azure Functions
Esta opción usa el enlace de salida SQL de Azure Functions. Esta extensión puede reemplazar un objeto de una tabla sin necesidad de escribir una instrucción SQL. En este momento, no admite operadores de asignación compuestos (acumulaciones).
Este ejemplo se ha basado en:
- Versión 4 del entorno en tiempo de ejecución de Azure Functions
- .NET 6.0
- Microsoft.Azure.WebJobs.Extensions.Sql 0.1.131-preview
Para comprender mejor el método de enlace, se recomienda seguir este tutorial.
En primer lugar, cree una aplicación de funciones HttpTrigger predeterminada con este tutorial. Se usa la siguiente información:
- Lenguaje:
C#
- Runtime:
.NET 6
(en function/runtime v4) - Plantilla:
HTTP trigger
Instale la extensión de enlace mediante la ejecución del siguiente comando en un terminal ubicado en la carpeta del proyecto:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
Agregue el elemento SqlConnectionString
en la sección Values
de local.settings.json
y rellene la cadena de conexión del servidor de destino:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Reemplace toda la función (archivo .cs del proyecto) por el siguiente fragmento de código. Actualice el espacio de nombres, el nombre de clase y el nombre de función con los suyos:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Actualice el nombre de la tabla de destino de la sección de enlace:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Actualice la clase Device
y la sección de asignación para que coincidan con su propio esquema:
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
Ahora puede probar el enlace entre la función local y la base de datos mediante la depuración (F5 en Visual Studio Code). La base de datos SQL debe ser accesible desde el equipo. Se puede usar SSMS para comprobar la conectividad. A continuación, envíe solicitudes POST al punto de conexión local. Una solicitud con un cuerpo vacío debe devolver http 204. Debe conservarse una solicitud con una carga real en la tabla de destino (en modo de reemplazo o actualización). Esta es una carga de ejemplo correspondiente al esquema usado en este ejemplo:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
La función ya se puede publicar en Azure. Se debe establecer una configuración de la aplicación para SqlConnectionString
. El firewall de Azure SQL Server debe permitir servicios de Azure para que la función activa acceda a ellos.
Luego la función se puede definir como una salida del trabajo de ASA y usarse para reemplazar registros en lugar de insertarlos.
Opción 2: Combinación con asignación compuesta (acumulación) por medio de una consulta SQL personalizada
Nota:
Tras el reinicio y la recuperación, ASA puede volver a enviar eventos de salida que ya se han emitido. Se trata de un comportamiento esperado que puede provocar un error en la lógica de acumulación (duplicación de valores individuales). Para evitarlo, se recomienda generar los mismos datos en una tabla por medio de la salida SQL de ASA nativa. Esta tabla de control se puede usar para detectar problemas y volver a sincronizar la acumulación cuando sea necesario.
Esta opción usa Microsoft.Data.SqlClient. Esta biblioteca permite emitir cualquier consulta SQL a SQL Database.
Este ejemplo se ha basado en:
- Versión 4 del entorno en tiempo de ejecución de Azure Functions
- .NET 6.0
- Microsoft.Data.SqlClient 4.0.0
En primer lugar, cree una aplicación de funciones HttpTrigger predeterminada con este tutorial. Se usa la siguiente información:
- Lenguaje:
C#
- Runtime:
.NET 6
(en function/runtime v4) - Plantilla:
HTTP trigger
Instale la biblioteca SqlClient mediante la ejecución del siguiente comando en un terminal ubicado en la carpeta del proyecto:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
Agregue el elemento SqlConnectionString
en la sección Values
de local.settings.json
y rellene la cadena de conexión del servidor de destino:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Reemplace toda la función (archivo .cs del proyecto) por el siguiente fragmento de código. Actualice el espacio de nombres, el nombre de clase y el nombre de función con los suyos:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
Actualice la sección de compilación de comandos sqltext
para que coincida con su propio esquema (observe cómo se logra la acumulación por medio del operador +=
al actualizar):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
Ahora puede probar el enlace entre la función local y la base de datos mediante la depuración (F5 en VS Code). La base de datos SQL debe ser accesible desde el equipo. Se puede usar SSMS para comprobar la conectividad. A continuación, emita solicitudes POST al punto de conexión local. Una solicitud con un cuerpo vacío debe devolver http 204. Debe conservarse una solicitud con una carga real en la tabla de destino (en modo de acumulación o combinación). Esta es una carga de ejemplo correspondiente al esquema usado en este ejemplo:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
La función ya se puede publicar en Azure. Se debe establecer una configuración de la aplicación para SqlConnectionString
. El firewall de Azure SQL Server debe permitir servicios de Azure para que la función activa acceda a ellos.
Luego la función se puede definir como una salida del trabajo de ASA y usarse para reemplazar registros en lugar de insertarlos.
Alternativas
Fuera de Azure Functions, hay varias maneras de lograr el resultado esperado. En esta sección se proporcionan algunos de ellos.
Procesamiento posterior en la instancia de destino de SQL Database
Una tarea en segundo plano funciona una vez que los datos se insertan en la base de datos a través de las salidas de ASA SQL estándar.
En Azure SQL, se pueden usar desencadenadores DML INSTEAD OF
para interceptar los comandos INSERT emitidos por ASA:
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
En Synapse SQL, ASA puede insertar en una tabla de almacenamiento provisional. Luego una tarea periódica puede transformar los datos según sea necesario en una tabla intermediaria. Por último, los datos se mueven a la tabla de producción.
Procesamiento previo en Azure Cosmos DB
Azure Cosmos DB admite UPSERT de forma nativa. Aquí solo se puede anexar o reemplazar. Las acumulaciones deben administrarse en el lado cliente de Azure Cosmos DB.
Si los requisitos coinciden, una opción es reemplazar la base de datos SQL de destino por una instancia de Azure Cosmos DB. Esto requiere un cambio importante en la arquitectura general de la solución.
En Synapse SQL se puede usar Azure Cosmos DB como capa intermediaria mediante Azure Synapse Link para Azure Cosmos DB. Azure Synapse Link se puede usar para crear un almacén analítico. Este almacén de datos se puede consultar directamente en Synapse SQL.
Comparación de las alternativas
Cada enfoque ofrece diferentes capacidades y propuestas de valor:
Tipo | Opción | Modos | Azure SQL Database | Azure Synapse Analytics |
---|---|---|---|---|
Posprocesamiento | ||||
Desencadenadores | Replace, Accumulate | + | N/D, los desencadenadores no están disponibles en Synapse SQL | |
Ensayo | Replace, Accumulate | + | + | |
Procesamiento previo | ||||
Azure Functions | Replace, Accumulate | + | - (rendimiento fila a fila) | |
Sustitución de Azure Cosmos DB | Replace | N/D | N/D | |
Azure Cosmos DB Azure Synapse Link | Replace | N/D | + |
Obtención de soporte técnico
Para más ayuda, pruebe nuestra página de preguntas y respuestas de Microsoft sobre Azure Stream Analytics.
Pasos siguientes
- Información sobre las salidas desde Azure Stream Analytics
- Salida de Azure Stream Analytics a Azure SQL Database
- Incremento del rendimiento de Azure SQL Database desde Azure Stream Analytics
- Uso de identidades administradas para acceder a Azure SQL Database o Azure Synapse Analytics desde un trabajo de Azure Stream Analytics
- Uso de datos de referencia de una instancia de SQL Database para un trabajo de Azure Stream Analytics
- Ejecución de Azure Functions en trabajos de Azure Stream Analytics: tutorial de salida de Redis
- Inicio rápido: Creación de un trabajo de Stream Analytics mediante Azure Portal