Eventos
Únase a nosotros en FabCon Vegas
31 mar, 23 - 2 abr, 23
El último evento dirigido por la comunidad de Microsoft Fabric, Power BI, SQL y AI. 31 de marzo al 2 de abril de 2025.
Regístrate hoyEste explorador ya no se admite.
Actualice a Microsoft Edge para aprovechar las características y actualizaciones de seguridad más recientes, y disponer de soporte técnico.
Actualmente, Azure Stream Analytics (ASA) solo admite la inserción (anexión) de filas en salidas de SQL (Azure SQL Database y Azure Synapse Analytics). En este artículo se dan soluciones alternativas para habilitar UPDATE, UPSERT o MERGE en bases de datos SQL, con Azure Functions como capa intermediaria.
Al final se presentan opciones alternativas a Azure Functions.
Por lo general, la escritura de datos en una tabla se puede realizar de la siguiente manera:
Modo | Instrucción T-SQL equivalente | Requisitos |
---|---|---|
Append | INSERT | None |
Replace | MERGE (UPSERT) | Clave única |
Acumular | MERGE (UPSERT) con operador de asignación compuesta (+= , -= ...) |
Clave única y acumulador |
Para ilustrar las diferencias, se puede ver lo que sucede al ingerir los dos registros siguientes:
Arrival_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | A | 1 |
10:05 | A | 20 |
En el modo append, se insertan los dos registros. La instrucción T-SQL equivalente es:
INSERT INTO [target] VALUES (...);
Elemento resultante:
Modified_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | A | 1 |
10:05 | A | 20 |
En el modo replace, solo se obtiene el último valor por clave. Aquí usamos Device_Id como clave. La instrucción T-SQL equivalente es:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Elemento resultante:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 20 |
Por último, en el modo accumulate, se suma Value
con un operador de asignación compuesto (+=
). Aquí también usamos Device_Id como clave:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Elemento resultante:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 21 |
Por motivos de rendimiento, los adaptadores de salida de base de datos SQL de ASA solo admiten de momento el modo Append de forma nativa. Estos adaptadores usan la inserción masiva para maximizar el rendimiento y limitar la presión de retorno.
En este artículo se muestra cómo usar Azure Functions para implementar los modos Replace y Accumulate de ASA. Cuando se usa una función como capa intermedia, el posible rendimiento de escritura no afectará al trabajo de streaming. En este sentido, el uso de Azure Functions funciona mejor con Azure SQL. Con Synapse SQL, el cambio de instrucciones masivas a instrucciones fila a fila puede crear mayores problemas de rendimiento.
En el trabajo, vamos a reemplazar la salida SQL de ASA por la salida de Azure Functions de ASA. Las capacidades UPDATE, UPSERT o MERGE se van a implementar en la función.
Actualmente hay dos opciones para acceder a SQL Database en una función. Primero está el enlace de salida de Azure SQL. Actualmente está limitado a C# y solo ofrece el modo Replace. En segundo lugar está la creación de una consulta SQL que se va a enviar por medio del controlador SQL adecuado (Microsoft.Data.SqlClient para .NET).
Para los dos ejemplos siguientes, se supone que el esquema de tabla siguiente. La opción de enlace requiere el establecimiento de una clave principal en la tabla de destino. No es necesario, aunque se recomienda, si se usa un controlador SQL.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
Una función tiene que cumplir las siguientes expectativas para usarse como salida de ASA:
Esta opción usa el enlace de salida SQL de Azure Functions. Esta extensión puede reemplazar un objeto de una tabla sin necesidad de escribir una instrucción SQL. En este momento, no admite operadores de asignación compuestos (acumulaciones).
Este ejemplo se ha basado en:
Para comprender mejor el método de enlace, se recomienda seguir este tutorial.
En primer lugar, cree una aplicación de funciones HttpTrigger predeterminada con este tutorial. Se usa la siguiente información:
C#
.NET 6
(en function/runtime v4)HTTP trigger
Instale la extensión de enlace mediante la ejecución del siguiente comando en un terminal ubicado en la carpeta del proyecto:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
Agregue el elemento SqlConnectionString
en la sección Values
de local.settings.json
y rellene la cadena de conexión del servidor de destino:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Reemplace toda la función (archivo .cs del proyecto) por el siguiente fragmento de código. Actualice el espacio de nombres, el nombre de clase y el nombre de función con los suyos:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Actualice el nombre de la tabla de destino de la sección de enlace:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Actualice la clase Device
y la sección de asignación para que coincidan con su propio esquema:
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
Ahora puede probar el enlace entre la función local y la base de datos mediante la depuración (F5 en Visual Studio Code). La base de datos SQL debe ser accesible desde el equipo. Se puede usar SSMS para comprobar la conectividad. A continuación, envíe solicitudes POST al punto de conexión local. Una solicitud con un cuerpo vacío debe devolver http 204. Debe conservarse una solicitud con una carga real en la tabla de destino (en modo de reemplazo o actualización). Esta es una carga de ejemplo correspondiente al esquema usado en este ejemplo:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
La función ya se puede publicar en Azure. Se debe establecer una configuración de la aplicación para SqlConnectionString
. El firewall de Azure SQL Server debe permitir servicios de Azure para que la función activa acceda a ellos.
Luego la función se puede definir como una salida del trabajo de ASA y usarse para reemplazar registros en lugar de insertarlos.
Nota
Tras el reinicio y la recuperación, ASA puede volver a enviar eventos de salida que ya se han emitido. Se trata de un comportamiento esperado que puede provocar un error en la lógica de acumulación (duplicación de valores individuales). Para evitarlo, se recomienda generar los mismos datos en una tabla por medio de la salida SQL de ASA nativa. Esta tabla de control se puede usar para detectar problemas y volver a sincronizar la acumulación cuando sea necesario.
Esta opción usa Microsoft.Data.SqlClient. Esta biblioteca permite emitir cualquier consulta SQL a SQL Database.
Este ejemplo se ha basado en:
En primer lugar, cree una aplicación de funciones HttpTrigger predeterminada con este tutorial. Se usa la siguiente información:
C#
.NET 6
(en function/runtime v4)HTTP trigger
Instale la biblioteca SqlClient mediante la ejecución del siguiente comando en un terminal ubicado en la carpeta del proyecto:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
Agregue el elemento SqlConnectionString
en la sección Values
de local.settings.json
y rellene la cadena de conexión del servidor de destino:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Reemplace toda la función (archivo .cs del proyecto) por el siguiente fragmento de código. Actualice el espacio de nombres, el nombre de clase y el nombre de función con los suyos:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
Actualice la sección de compilación de comandos sqltext
para que coincida con su propio esquema (observe cómo se logra la acumulación por medio del operador +=
al actualizar):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
Ahora puede probar el enlace entre la función local y la base de datos mediante la depuración (F5 en VS Code). La base de datos SQL debe ser accesible desde el equipo. Se puede usar SSMS para comprobar la conectividad. A continuación, emita solicitudes POST al punto de conexión local. Una solicitud con un cuerpo vacío debe devolver http 204. Debe conservarse una solicitud con una carga real en la tabla de destino (en modo de acumulación o combinación). Esta es una carga de ejemplo correspondiente al esquema usado en este ejemplo:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
La función ya se puede publicar en Azure. Se debe establecer una configuración de la aplicación para SqlConnectionString
. El firewall de Azure SQL Server debe permitir servicios de Azure para que la función activa acceda a ellos.
Luego la función se puede definir como una salida del trabajo de ASA y usarse para reemplazar registros en lugar de insertarlos.
Fuera de Azure Functions, hay varias maneras de lograr el resultado esperado. En esta sección se proporcionan algunos de ellos.
Una tarea en segundo plano funciona una vez que los datos se insertan en la base de datos a través de las salidas de ASA SQL estándar.
En Azure SQL, se pueden usar desencadenadores DML INSTEAD OF
para interceptar los comandos INSERT emitidos por ASA:
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
En Synapse SQL, ASA puede insertar en una tabla de almacenamiento provisional. Luego una tarea periódica puede transformar los datos según sea necesario en una tabla intermediaria. Por último, los datos se mueven a la tabla de producción.
Azure Cosmos DB admite UPSERT de forma nativa. Aquí solo se puede anexar o reemplazar. Las acumulaciones deben administrarse en el lado cliente de Azure Cosmos DB.
Si los requisitos coinciden, una opción es reemplazar la base de datos SQL de destino por una instancia de Azure Cosmos DB. Esto requiere un cambio importante en la arquitectura general de la solución.
En Synapse SQL se puede usar Azure Cosmos DB como capa intermediaria mediante Azure Synapse Link para Azure Cosmos DB. Azure Synapse Link se puede usar para crear un almacén analítico. Este almacén de datos se puede consultar directamente en Synapse SQL.
Cada enfoque ofrece diferentes capacidades y propuestas de valor:
Tipo | Opción | Modos | Azure SQL Database | Azure Synapse Analytics |
---|---|---|---|---|
Posprocesamiento | ||||
Desencadenadores | Replace, Accumulate | + | N/D, los desencadenadores no están disponibles en Synapse SQL | |
Ensayo | Replace, Accumulate | + | + | |
Procesamiento previo | ||||
Azure Functions | Replace, Accumulate | + | - (rendimiento fila a fila) | |
Sustitución de Azure Cosmos DB | Replace | N/D | N/D | |
Azure Cosmos DB Azure Synapse Link | Replace | N/D | + |
Para más ayuda, pruebe nuestra página de preguntas y respuestas de Microsoft sobre Azure Stream Analytics.
Eventos
Únase a nosotros en FabCon Vegas
31 mar, 23 - 2 abr, 23
El último evento dirigido por la comunidad de Microsoft Fabric, Power BI, SQL y AI. 31 de marzo al 2 de abril de 2025.
Regístrate hoyCursos
Módulo
Exploración de las opciones de manipulación de datos en Azure SQL Database - Training
Aprenda a invocar puntos de conexión de REST en Azure SQL Database y a manipular datos mediante Azure Functions. Además, explore varias herramientas y opciones para importar y exportar datos hacia y desde Azure SQL Database.
Certificación
Microsoft Certified: Azure Cosmos DB Developer Specialty - Certifications
Escribe consultas eficaces, crea directivas de indexación, administra y aprovisiona recursos en la API de SQL y el SDK con Microsoft Azure Cosmos DB.