Atualizar ou mesclar registros em Banco de Dados SQL do Azure com Azure Functions
Atualmente, o Azure Stream Analytics (ASA) dá suporte apenas a inserir (anexar) linhas (acrescentar) a saídas SQL (Banco de Dados SQL do Azuree ao Azure Synapse Analytics). Este artigo aborda as soluções alternativas para habilitar UPDATE, UPSERT ou MERGE em bancos de dados SQL, tendo Azure Functions como a camada intermediária.
Opções alternativas para Azure Functions são apresentadas no final.
A gravação de dados em uma tabela geralmente pode ser feita da seguinte maneira:
Mode | Instrução T-SQL equivalente | Requisitos |
---|---|---|
Acrescentar | INSERT | Nenhum |
Substitua | MERGE (UPSERT) | Chave exclusiva |
Accumulate | MERGE (UPSERT) com operador de atribuição composta (+= , -= ...) |
Chave exclusiva e acumulador |
Para ilustrar as diferenças, podemos examinar o que acontece ao ingerir os dois registros a seguir:
Arrival_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | Um | 1 |
10:05 | A | 20 |
No modo de acrescentar, inserimos os dois registros. A instrução T-SQL equivalente é:
INSERT INTO [target] VALUES (...);
Resultando em:
Modified_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | Um | 1 |
10:05 | A | 20 |
No modo substituir, obtemos apenas o último valor por chave. Aqui, usamos Device_Id como chave. A instrução T-SQL equivalente é:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Resultando em:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 20 |
Por fim, no modo acumular, somamos Value
com um operador de atribuição composta (+=
). Aqui, também usamos Device_Id como a chave:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Resultando em:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 21 |
Para considerações sobre desempenho, os adaptadores de saída de banco de dados ASA SQL atualmente dão suporte apenas ao modo acrescentar nativamente. Esses adaptadores usam inserção em massa para maximizar a taxa de transferência e limitar a pressão de retorno.
Este artigo mostra como usar Azure Functions para implementar modos Substituir e Acumular para o ASA. Ao usar uma função como camada intermediária, o desempenho de gravação potencial não afetará o trabalho de streaming. Nesse sentido, usar Azure Functions funciona melhor com o SQL do Azure. Com o SQL do Synapse, a alternância de instruções de massa para linha por linha pode causar problemas de desempenho maiores.
Em nosso trabalho, vamos substituir a saída do ASA SQL pela saída do ASA Azure Functions. As funcionalidades UPDATE, UPSERT ou MERGE serão implementados na função.
Atualmente, há duas opções para acessar um Banco de Dados SQL em uma função. A primeira é a associação de saída do SQL do Azure. No momento, ela está limitada a C# e só oferece o modo substituir. A segunda é compor uma consulta SQL a ser enviada por meio do driver de SQL apropriado (Microsoft.Data.sqlclient para .NET).
Para ambos os exemplos a seguir, presumimos o esquema de tabela a seguir. A opção de associação requer que uma chave primária seja definida na tabela de destino. Isso não é necessário, mas é recomendado, ao usar um driver de SQL.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
Uma função deve atender às seguintes expectativas a serem usadas como uma saída do ASA:
- O Azure Stream Analytics espera o status 200 do HTTP do aplicativo Functions para lotes que foram processados com êxito
- Quando o Azure Stream Analytics recebe a exceção 413 (“Entidade de solicitação http muito grande”) de uma função do Azure, ele reduz o tamanho dos lotes que envia para o Azure Functions
- Durante a conexão de teste, o Stream Analytics envia uma solicitação POST com um lote vazio para Azure Functions e espera o status HTTP 20x de volta para validar o teste
Essa opção usa a associação de saída do Azure Function SQL. Essa extensão pode substituir um objeto em uma tabela, sem a necessidade de escrever uma instrução SQL. Neste momento, ela não dá suporte a operadores de atribuição composta (acumulações).
Este exemplo foi criado em:
- Versão 4 do runtime do Azure Functions
- .NET 6.0
- Microsoft.Azure.WebJobs.Extensions.Sql 0.1.131-preview
Para entender melhor a abordagem de associação, é recomendável seguir este tutorial.
Primeiro, crie um aplicativo de função padrão HttpTrigger seguindo este tutorial. As seguintes informações são usadas:
- Linguagem de programação:
C#
- Runtime:
.NET 6
(em função/runtime v4) - Modelo de máquina virtual:
HTTP trigger
Instale a extensão de associação executando o seguinte comando em um terminal localizado na pasta do projeto:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
Adicione o item SqlConnectionString
na seção Values
do local.settings.json
, preenchendo a cadeia de conexão do servidor de destino:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Substitua toda a função (arquivo. cs no projeto) pelo trecho de código a seguir. Atualize o namespace, o nome da classe e o nome da função por conta própria:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Atualize o nome da tabela de destino na seção associação:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Atualize a classe Device
e seção de mapeamento para corresponder ao seu próprio esquema:
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
Agora você pode testar a conexão entre a função local e o banco de dados por depuração (F5 no Visual Studio Code). O banco de dados SQL precisa estar acessível de seu computador. SSMS pode ser usado para verificar a conectividade. Em seguida, envie solicitações POST para o ponto de extremidade local. Uma solicitação com um corpo vazio deve retornar http 204. Uma solicitação com um payload real deve ser persistida na tabela de destino (no modo substituir/atualizar). Aqui está uma amostra de payload correspondente ao esquema usado neste exemplo:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
A função agora pode ser publicada no Azure. Uma configuração de aplicativo deve ser definida para SqlConnectionString
. O firewall do servidor do SQL do Azure deve permitir que os serviços do Azure entrem na função ao vivo para alcançá-lo.
A função pode ser definida como uma saída no trabalho ASA e usada para substituir registros em vez de inseri-los.
Observação
Após a reinicialização e a recuperação, o ASA pode reenviar os eventos de saída que já tiverem sido emitidos. Esse é um comportamento esperado que pode fazer com que a lógica de acumulação falhe (duplicando valores individuais). Para evitar isso, é recomendável gerar os mesmos dados em uma tabela por meio do ASA nativo da saída SQL. Essa tabela de controle pode então ser usada para detectar problemas e sincronizar novamente o acúmulo quando necessário.
Essa opção usa Microsoft.Data.SqlClient. Essa biblioteca nos permite fazer consultas SQL a um Banco de Dados SQL.
Este exemplo foi criado em:
Primeiro, crie um aplicativo de função padrão HttpTrigger seguindo este tutorial. As seguintes informações são usadas:
- Linguagem de programação:
C#
- Runtime:
.NET 6
(em função/runtime v4) - Modelo de máquina virtual:
HTTP trigger
Instale a biblioteca SqlClient executando o seguinte comando em um terminal localizado na pasta do projeto:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
Adicione o item SqlConnectionString
na seção Values
do local.settings.json
, preenchendo a cadeia de conexão do servidor de destino:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Substitua toda a função (arquivo. cs no projeto) pelo trecho de código a seguir. Atualize o namespace, o nome da classe e o nome da função por conta própria:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
Atualize a seção de criação de comando sqltext
para corresponder ao seu próprio esquema (observe como o acúmulo é obtido por meio do operador +=
na atualização):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
Agora você pode testar a conexão entre a função local e o banco de dados por depuração (F5 no VS Code). O banco de dados SQL precisa estar acessível de seu computador. SSMS pode ser usado para verificar a conectividade. Em seguida, emita solicitações POST para o ponto de extremidade local. Uma solicitação com um corpo vazio deve retornar http 204. Uma solicitação com um payload real deve ser persistida na tabela de destino (no modo acumular/mesclar). Aqui está uma amostra de payload correspondente ao esquema usado neste exemplo:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
A função agora pode ser publicada no Azure. Uma configuração de aplicativo deve ser definida para SqlConnectionString
. O firewall do servidor do SQL do Azure deve permitir que os serviços do Azure entrem na função ao vivo para alcançá-lo.
A função pode ser definida como uma saída no trabalho ASA e usada para substituir registros em vez de inseri-los.
Fora do Azure Functions, há várias maneiras de atingir o resultado esperado. Essa seção fornece alguns deles.
Uma tarefa em segundo plano funcionará depois que os dados forem inseridos no banco de dados por meio das saídas do SQL ASA padrão.
Para o SQL do Azure, INSTEAD OF
os gatilhos DML podem ser usados para interceptar os comandos INSERT emitidos pelo ASA:
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
Para o SQL do Synapse, o ASA pode inserir em uma tabela de preparo. Uma tarefa recorrente pode então transformar os dados conforme necessário em uma tabela intermediária. Por fim, os dados são movidos para a tabela de produção.
O Azure Cosmos DB dá suporte ao UPSERT nativamente. Aqui, somente é possível acrescentar/substituir. As acumulações devem ser gerenciadas no lado do cliente no Azure Cosmos DB.
Se os requisitos corresponderem, uma opção é substituir o banco de dados de SQL de destino por uma instância do Azure Cosmos DB. Isso requer uma mudança importante na arquitetura geral da solução.
No caso da SQL do Synapse, o Cosmos DB pode ser usado como uma camada intermediária por meio do Link do Azure Synapse para o Azure Cosmos DB. O Link do Synapse pode ser usado para criar um repositório analítico. Esse armazenamento de dados pode ser consultado diretamente no SQL do Synapse.
Cada abordagem oferece uma proposta de valor e funcionalidades diferentes:
Type | Opção | Modos | Banco de Dados SQL do Azure | Azure Synapse Analytics |
---|---|---|---|---|
Pós-processamento | ||||
Gatilhos | Substituir, Acumular | + | N/A, os gatilhos não estão disponíveis no SQL do Synapse | |
Staging | Substituir, Acumular | + | + | |
Pré-processamento | ||||
Azure Functions | Substituir, Acumular | + | – (desempenho de linha por linha) | |
Substituição do Azure Cosmos DB | Substitua | N/D | N/D | |
Link do Azure Synapse para Azure Cosmos DB | Substitua | N/D | + |
Para obter mais assistência, confira nossa página de Perguntas e respostas do Microsoft do Azure Stream Analytics.
- Entender as saídas do Azure Stream Analytics
- Saída do Azure Stream Analytics para o Banco de Dados SQL do Azure
- Aumentar o desempenho da taxa de transferência para o Banco de Dados SQL do Azure com o Azure Stream Analytics
- Usar identidades gerenciadas para acessar o Banco de Dados SQL do Azure ou o Azure Synapse Analytics por meio de um trabalho do Azure Stream Analytics
- Usar dados de referência de um Banco de Dados SQL para um trabalho do Azure Stream Analytics
- Executar Azure Functions em trabalhos do Azure Stream Analytics – tutorial para saída do Redis
- Início Rápido: Criar um trabalho do Stream Analytics usando o portal do Azure