Partilhar via


Atualizar ou mesclar registros no Banco de Dados SQL do Azure com o Azure Functions

Atualmente, o Azure Stream Analytics (ASA) suporta apenas a inserção (anexação) de linhas em saídas SQL (Bancos de Dados SQL do Azure e Azure Synapse Analytics). Este artigo discute soluções alternativas para habilitar UPDATE, UPSERT ou MERGE em bancos de dados SQL, com o Azure Functions como a camada intermediária.

Opções alternativas ao Azure Functions são apresentadas no final.

Necessidade

A gravação de dados em uma tabela geralmente pode ser feita da seguinte maneira:

Modo Instrução T-SQL equivalente Requisitos
Acrescentar INSERT Nenhuma
Replace MESCLAR (UPSERT) Chave única
Acumular MERGE (UPSERT) com operador de atribuição composto (+=, -=...) Chave e acumulador únicos

Para ilustrar as diferenças, veja o que acontece ao ingerir os dois registros a seguir:

Arrival_Time Device_Id Measure_Value
10:00 A 1
10:05 A 20

No modo de acréscimo, inserimos dois registros. A instrução T-SQL equivalente é:

INSERT INTO [target] VALUES (...);

Resultando em:

Modified_Time Device_Id Measure_Value
10:00 A 1
10:05 A 20

No modo de substituição , obtemos apenas o último valor por chave. Aqui usamos Device_Id como chave. A instrução T-SQL equivalente é:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Resultando em:

Modified_Time Device_Key Measure_Value
10:05 A 20

Finalmente, no modo de acumulação , somamos Value com um operador de atribuição composto (+=). Aqui também usamos Device_Id como chave:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Resultando em:

Modified_Time Device_Key Measure_Value
10:05 A 21

Para considerações de desempenho , os adaptadores de saída do banco de dados ASA SQL atualmente suportam apenas o modo de acréscimo nativamente. Esses adaptadores usam inserção em massa para maximizar a taxa de transferência e limitar a pressão de retorno.

Este artigo mostra como usar o Azure Functions para implementar os modos Substituir e Acumular para ASA. Quando você usa uma função como uma camada intermediária, o desempenho de gravação potencial não afetará o trabalho de streaming. Nesse sentido, usar o Azure Functions funciona melhor com o Azure SQL. Com o Synapse SQL, alternar de instruções em massa para instruções linha a linha pode criar maiores problemas de desempenho.

Saída do Azure Functions

Em nosso trabalho, substituímos a saída ASA SQL pela saída ASA Azure Functions. Os recursos UPDATE, UPSERT ou MERGE são implementados na função.

Atualmente, há duas opções para acessar um Banco de Dados SQL em uma função. O primeiro é a vinculação de saída SQL do Azure. Atualmente, está limitado ao C# e oferece apenas o modo de substituição. O segundo é compor uma consulta SQL a ser enviada por meio do driver SQL apropriado (Microsoft.Data.SqlClient para .NET).

Para ambos os exemplos a seguir, assumimos o esquema de tabela a seguir. A opção de vinculação requer que uma chave primária seja definida na tabela de destino. Não é necessário, mas recomendado, ao usar um driver SQL.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

Uma função tem que atender às seguintes expectativas para ser usada como uma saída do ASA:

  • O Azure Stream Analytics espera o status HTTP 200 do aplicativo Functions para lotes que foram processados com êxito
  • Quando o Azure Stream Analytics recebe uma exceção 413 ("http Request Entity Too Large") de uma função do Azure, ele reduz o tamanho dos lotes que envia para a Função do Azure
  • Durante a conexão de teste, o Stream Analytics envia uma solicitação POST com um lote vazio para o Azure Functions e espera que o status HTTP 20x de volta para validar o teste

Opção 1: Atualizar por chave com a Vinculação SQL da Função Azure

Esta opção usa a Vinculação de Saída SQL da Função do Azure. Essa extensão pode substituir um objeto em uma tabela, sem ter que escrever uma instrução SQL. No momento, ele não suporta operadores de atribuição compostos (acumulações).

Esta amostra baseou-se em:

Para entender melhor a abordagem de vinculação, é recomendável seguir este tutorial.

Primeiro, crie um aplicativo de função HttpTrigger padrão seguindo este tutorial. São utilizadas as seguintes informações:

  • Idioma: C#
  • Tempo de execução: .NET 6 (em função/tempo de execução v4)
  • Modelo: HTTP trigger

Instale a extensão de vinculação executando o seguinte comando em um terminal localizado na pasta do projeto:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

Adicione o SqlConnectionString item na Values seção do seu local.settings.json, preenchendo a cadeia de conexão do servidor de destino:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Substitua toda a função (arquivo .cs no projeto) pelo trecho de código a seguir. Atualize o namespace, o nome da classe e o nome da função por conta própria:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Atualize o nome da tabela de destino na seção de vinculação:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Atualize a seção de Device classe e mapeamento para corresponder ao seu próprio esquema:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

Agora você pode testar a fiação entre a função local e o banco de dados depurando (F5 no Visual Studio Code). O banco de dados SQL precisa ser acessível a partir de sua máquina. O SSMS pode ser usado para verificar a conectividade. Em seguida, uma ferramenta como o Postman pode ser usada para emitir solicitações POST para o ponto de extremidade local. Um pedido com um corpo vazio deve retornar http 204. Uma solicitação com uma carga útil real deve ser mantida na tabela de destino (no modo de substituição/atualização). Aqui está um exemplo de carga útil correspondente ao esquema usado neste exemplo:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

A função agora pode ser publicada no Azure. Uma configuração de aplicativo deve ser definida para SqlConnectionString. O firewall do SQL Server do Azure deve permitir que os serviços do Azure para a função ao vivo cheguem até ela.

A função pode então ser definida como uma saída no trabalho ASA e usada para substituir registros em vez de inseri-los.

Opção 2: Mesclar com atribuição composta (acumular) por meio de uma consulta SQL personalizada

Nota

Após a reinicialização e recuperação, o ASA pode reenviar eventos de saída que já foram emitidos. Este é um comportamento esperado que pode fazer com que a lógica de acumulação falhe (duplicação de valores individuais). Para evitar isso, recomenda-se produzir os mesmos dados em uma tabela por meio da saída SQL ASA nativa. Essa tabela de controle pode ser usada para detetar problemas e ressincronizar o acúmulo quando necessário.

Esta opção usa Microsoft.Data.SqlClient. Esta biblioteca permite-nos emitir quaisquer consultas SQL para uma Base de Dados SQL.

Esta amostra baseou-se em:

Primeiro, crie um aplicativo de função HttpTrigger padrão seguindo este tutorial. São utilizadas as seguintes informações:

  • Idioma: C#
  • Tempo de execução: .NET 6 (em função/tempo de execução v4)
  • Modelo: HTTP trigger

Instale a biblioteca SqlClient executando o seguinte comando em um terminal localizado na pasta do projeto:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

Adicione o SqlConnectionString item na Values seção do seu local.settings.json, preenchendo a cadeia de conexão do servidor de destino:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Substitua toda a função (arquivo .cs no projeto) pelo trecho de código a seguir. Atualize o namespace, o nome da classe e o nome da função por conta própria:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

Atualize a seção de construção de sqltext comandos para corresponder ao seu próprio esquema (observe como a acumulação é obtida através do += operador na atualização):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

Agora você pode testar a fiação entre a função local e o banco de dados depurando (F5 no VS Code). O banco de dados SQL precisa ser acessível a partir de sua máquina. O SSMS pode ser usado para verificar a conectividade. Em seguida, uma ferramenta como o Postman pode ser usada para emitir solicitações POST para o ponto de extremidade local. Um pedido com um corpo vazio deve retornar http 204. Uma solicitação com uma carga útil real deve ser mantida na tabela de destino (no modo acumular/mesclar). Aqui está um exemplo de carga útil correspondente ao esquema usado neste exemplo:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

A função agora pode ser publicada no Azure. Uma configuração de aplicativo deve ser definida para SqlConnectionString. O firewall do SQL Server do Azure deve permitir que os serviços do Azure para a função ao vivo cheguem até ela.

A função pode então ser definida como uma saída no trabalho ASA e usada para substituir registros em vez de inseri-los.

Alternativas

Fora do Azure Functions, há várias maneiras de alcançar o resultado esperado. Esta seção fornece alguns deles.

Pós-processamento no Banco de Dados SQL de destino

Uma tarefa em segundo plano funciona quando os dados são inseridos no banco de dados por meio das saídas padrão ASA SQL.

Para o Azure SQL, INSTEAD OF os gatilhos DML podem ser usados para intercetar os comandos INSERT emitidos pelo ASA:

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

Para Synapse SQL, o ASA pode ser inserido em uma tabela de preparo. Uma tarefa recorrente pode então transformar os dados, conforme necessário, em uma tabela intermediária. Finalmente, os dados são movidos para a tabela de produção.

Pré-processamento no Azure Cosmos DB

O Azure Cosmos DB suporta UPSERT nativamente. Aqui só é possível acrescentar/substituir. As acumulações devem ser gerenciadas do lado do cliente no Azure Cosmos DB.

Se os requisitos corresponderem, uma opção é substituir o banco de dados SQL de destino por uma instância do Azure Cosmos DB. Isso requer uma mudança importante na arquitetura geral da solução.

Para Synapse SQL, o Azure Cosmos DB pode ser usado como uma camada intermediária por meio do Azure Synapse Link for Azure Cosmos DB. O Azure Synapse Link pode ser usado para criar um repositório analítico. Esse armazenamento de dados pode ser consultado diretamente no Synapse SQL.

Comparação das alternativas

Cada abordagem oferece diferentes propostas de valor e capacidades:

Type Opção Modos Base de Dados SQL do Azure Azure Synapse Analytics
Pós-processamento
Acionadores Substituir, Acumular + N/A, os gatilhos não estão disponíveis no Synapse SQL
Processo de teste Substituir, Acumular + +
Pré-processamento
Funções do Azure Substituir, Acumular + - (desempenho linha a linha)
Substituição do Azure Cosmos DB Replace N/A N/A
Azure Cosmos DB Azure Synapse Link Replace N/A +

Obter suporte

Para obter mais assistência, experimente a nossa página de perguntas e respostas da Microsoft para o Azure Stream Analytics.

Próximos passos