Compartir a través de


Actualización o combinación de registros en Azure SQL Database con Azure Functions

Actualmente, Azure Stream Analytics (ASA) solo admite la inserción (anexión) de filas en salidas de SQL (Azure SQL Database y Azure Synapse Analytics). En este artículo se dan soluciones alternativas para habilitar UPDATE, UPSERT o MERGE en bases de datos SQL, con Azure Functions como capa intermediaria.

Al final se presentan opciones alternativas a Azure Functions.

Requisito

Por lo general, la escritura de datos en una tabla se puede realizar de la siguiente manera:

Modo Instrucción T-SQL equivalente Requisitos
Append INSERT None
Replace MERGE (UPSERT) Clave única
Acumular MERGE (UPSERT) con operador de asignación compuesta (+=, -=...) Clave única y acumulador

Para ilustrar las diferencias, se puede ver lo que sucede al ingerir los dos registros siguientes:

Arrival_Time Device_Id Measure_Value
10:00 A 1
10:05 A 20

En el modo append, se insertan los dos registros. La instrucción T-SQL equivalente es:

INSERT INTO [target] VALUES (...);

Elemento resultante:

Modified_Time Device_Id Measure_Value
10:00 A 1
10:05 A 20

En el modo replace, solo se obtiene el último valor por clave. Aquí usamos Device_Id como clave. La instrucción T-SQL equivalente es:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Elemento resultante:

Modified_Time Device_Key Measure_Value
10:05 A 20

Por último, en el modo accumulate, se suma Value con un operador de asignación compuesto (+=). Aquí también usamos Device_Id como clave:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Elemento resultante:

Modified_Time Device_Key Measure_Value
10:05 A 21

Por motivos de rendimiento, los adaptadores de salida de base de datos SQL de ASA solo admiten de momento el modo Append de forma nativa. Estos adaptadores usan la inserción masiva para maximizar el rendimiento y limitar la presión de retorno.

En este artículo se muestra cómo usar Azure Functions para implementar los modos Replace y Accumulate de ASA. Cuando se usa una función como capa intermedia, el posible rendimiento de escritura no afectará al trabajo de streaming. En este sentido, el uso de Azure Functions funciona mejor con Azure SQL. Con Synapse SQL, el cambio de instrucciones masivas a instrucciones fila a fila puede crear mayores problemas de rendimiento.

Salida de Azure Functions

En el trabajo, vamos a reemplazar la salida SQL de ASA por la salida de Azure Functions de ASA. Las capacidades UPDATE, UPSERT o MERGE se van a implementar en la función.

Actualmente hay dos opciones para acceder a SQL Database en una función. Primero está el enlace de salida de Azure SQL. Actualmente está limitado a C# y solo ofrece el modo Replace. En segundo lugar está la creación de una consulta SQL que se va a enviar por medio del controlador SQL adecuado (Microsoft.Data.SqlClient para .NET).

Para los dos ejemplos siguientes, se supone que el esquema de tabla siguiente. La opción de enlace requiere el establecimiento de una clave principal en la tabla de destino. No es necesario, aunque se recomienda, si se usa un controlador SQL.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

Una función tiene que cumplir las siguientes expectativas para usarse como salida de ASA:

  • Azure Stream Analytics espera el estado de HTTP 200 de la aplicación de Functions de los lotes que se han procesado correctamente.
  • Si Azure Stream Analytics recibe una excepción 413 ("La entidad de solicitud HTTP es demasiado grande") de una función de Azure, disminuye el tamaño de los lotes que envía a Azure Functions.
  • Durante la conexión de prueba, Stream Analytics envía una solicitud POST con un lote vacío a Azure Functions y espera que se devuelva el estado HTTP 20x para validar la prueba.

Opción 1: Actualización por clave con el enlace SQL de Azure Functions

Esta opción usa el enlace de salida SQL de Azure Functions. Esta extensión puede reemplazar un objeto de una tabla sin necesidad de escribir una instrucción SQL. En este momento, no admite operadores de asignación compuestos (acumulaciones).

Este ejemplo se ha basado en:

Para comprender mejor el método de enlace, se recomienda seguir este tutorial.

En primer lugar, cree una aplicación de funciones HttpTrigger predeterminada con este tutorial. Se usa la siguiente información:

  • Lenguaje: C#
  • Runtime: .NET 6 (en function/runtime v4)
  • Plantilla: HTTP trigger

Instale la extensión de enlace mediante la ejecución del siguiente comando en un terminal ubicado en la carpeta del proyecto:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

Agregue el elemento SqlConnectionString en la sección Values de local.settings.json y rellene la cadena de conexión del servidor de destino:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Reemplace toda la función (archivo .cs del proyecto) por el siguiente fragmento de código. Actualice el espacio de nombres, el nombre de clase y el nombre de función con los suyos:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Actualice el nombre de la tabla de destino de la sección de enlace:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Actualice la clase Device y la sección de asignación para que coincidan con su propio esquema:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

Ahora puede probar el enlace entre la función local y la base de datos mediante la depuración (F5 en Visual Studio Code). La base de datos SQL debe ser accesible desde el equipo. Se puede usar SSMS para comprobar la conectividad. A continuación, envíe solicitudes POST al punto de conexión local. Una solicitud con un cuerpo vacío debe devolver http 204. Debe conservarse una solicitud con una carga real en la tabla de destino (en modo de reemplazo o actualización). Esta es una carga de ejemplo correspondiente al esquema usado en este ejemplo:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

La función ya se puede publicar en Azure. Se debe establecer una configuración de la aplicación para SqlConnectionString. El firewall de Azure SQL Server debe permitir servicios de Azure para que la función activa acceda a ellos.

Luego la función se puede definir como una salida del trabajo de ASA y usarse para reemplazar registros en lugar de insertarlos.

Opción 2: Combinación con asignación compuesta (acumulación) por medio de una consulta SQL personalizada

Nota:

Tras el reinicio y la recuperación, ASA puede volver a enviar eventos de salida que ya se han emitido. Se trata de un comportamiento esperado que puede provocar un error en la lógica de acumulación (duplicación de valores individuales). Para evitarlo, se recomienda generar los mismos datos en una tabla por medio de la salida SQL de ASA nativa. Esta tabla de control se puede usar para detectar problemas y volver a sincronizar la acumulación cuando sea necesario.

Esta opción usa Microsoft.Data.SqlClient. Esta biblioteca permite emitir cualquier consulta SQL a SQL Database.

Este ejemplo se ha basado en:

En primer lugar, cree una aplicación de funciones HttpTrigger predeterminada con este tutorial. Se usa la siguiente información:

  • Lenguaje: C#
  • Runtime: .NET 6 (en function/runtime v4)
  • Plantilla: HTTP trigger

Instale la biblioteca SqlClient mediante la ejecución del siguiente comando en un terminal ubicado en la carpeta del proyecto:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

Agregue el elemento SqlConnectionString en la sección Values de local.settings.json y rellene la cadena de conexión del servidor de destino:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Reemplace toda la función (archivo .cs del proyecto) por el siguiente fragmento de código. Actualice el espacio de nombres, el nombre de clase y el nombre de función con los suyos:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

Actualice la sección de compilación de comandos sqltext para que coincida con su propio esquema (observe cómo se logra la acumulación por medio del operador += al actualizar):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

Ahora puede probar el enlace entre la función local y la base de datos mediante la depuración (F5 en VS Code). La base de datos SQL debe ser accesible desde el equipo. Se puede usar SSMS para comprobar la conectividad. A continuación, emita solicitudes POST al punto de conexión local. Una solicitud con un cuerpo vacío debe devolver http 204. Debe conservarse una solicitud con una carga real en la tabla de destino (en modo de acumulación o combinación). Esta es una carga de ejemplo correspondiente al esquema usado en este ejemplo:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

La función ya se puede publicar en Azure. Se debe establecer una configuración de la aplicación para SqlConnectionString. El firewall de Azure SQL Server debe permitir servicios de Azure para que la función activa acceda a ellos.

Luego la función se puede definir como una salida del trabajo de ASA y usarse para reemplazar registros en lugar de insertarlos.

Alternativas

Fuera de Azure Functions, hay varias maneras de lograr el resultado esperado. En esta sección se proporcionan algunos de ellos.

Procesamiento posterior en la instancia de destino de SQL Database

Una tarea en segundo plano funciona una vez que los datos se insertan en la base de datos a través de las salidas de ASA SQL estándar.

En Azure SQL, se pueden usar desencadenadores DML INSTEAD OFpara interceptar los comandos INSERT emitidos por ASA:

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

En Synapse SQL, ASA puede insertar en una tabla de almacenamiento provisional. Luego una tarea periódica puede transformar los datos según sea necesario en una tabla intermediaria. Por último, los datos se mueven a la tabla de producción.

Procesamiento previo en Azure Cosmos DB

Azure Cosmos DB admite UPSERT de forma nativa. Aquí solo se puede anexar o reemplazar. Las acumulaciones deben administrarse en el lado cliente de Azure Cosmos DB.

Si los requisitos coinciden, una opción es reemplazar la base de datos SQL de destino por una instancia de Azure Cosmos DB. Esto requiere un cambio importante en la arquitectura general de la solución.

En Synapse SQL se puede usar Azure Cosmos DB como capa intermediaria mediante Azure Synapse Link para Azure Cosmos DB. Azure Synapse Link se puede usar para crear un almacén analítico. Este almacén de datos se puede consultar directamente en Synapse SQL.

Comparación de las alternativas

Cada enfoque ofrece diferentes capacidades y propuestas de valor:

Tipo Opción Modos Azure SQL Database Azure Synapse Analytics
Posprocesamiento
Desencadenadores Replace, Accumulate + N/D, los desencadenadores no están disponibles en Synapse SQL
Ensayo Replace, Accumulate + +
Procesamiento previo
Azure Functions Replace, Accumulate + - (rendimiento fila a fila)
Sustitución de Azure Cosmos DB Replace N/D N/D
Azure Cosmos DB Azure Synapse Link Replace N/D +

Obtención de soporte técnico

Para más ayuda, pruebe nuestra página de preguntas y respuestas de Microsoft sobre Azure Stream Analytics.

Pasos siguientes