Mettre à jour ou fusionner des enregistrements dans Azure SQL Database avec Azure Functions
Actuellement, Azure Stream Analytics (ASA) prend en charge seulement l’insertion (l’ajout) de lignes dans les sorties SQL (Bases de données Azure SQL et Azure Synapse Analytics). Cet article présente des solutions de contournement pour activer UPDATE, UPSERT ou MERGE sur les bases de données SQL, avec Azure Functions comme couche intermédiaire.
D’autres options d’Azure Functions sont présentées à la fin.
Condition requise
L’écriture de données dans une table peut généralement être effectuée de la manière suivante :
Mode | Instruction T-SQL équivalente | Spécifications |
---|---|---|
Ajouter | INSERT | Aucun |
Remplacer | MERGE (UPSERT) | Clé unique |
Accumuler | MERGE (UPSERT) avec opérateur d’assignation composé (+= , -= …) |
Clé unique et accumulateur |
Pour illustrer les différences, regardons ce qui se passe lors de l’ingestion des deux enregistrements suivants :
Arrival_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | A | 1 |
10:05 | A | 20 |
Dans le mode ajout, nous insérons deux enregistrements. L’instruction T-SQL équivalente est la suivante :
INSERT INTO [target] VALUES (...);
Ce qui donne :
Modified_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | A | 1 |
10:05 | A | 20 |
En mode de remplacement, nous obtenons uniquement la dernière valeur par clé. Nous utilisons ici Device_Id comme clé. L’instruction T-SQL équivalente est la suivante :
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Ce qui donne :
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 20 |
Enfin, en mode d’accumulation, nous additionnons Value
à un opérateur d’assignation composé (+=
). Ici aussi, nous utilisons Device_Id comme clé :
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Ce qui donne :
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 21 |
Pour des raisons de performances, les adaptateurs de sortie de base de données SQL d’Azure Stream Analytics ne prennent actuellement en charge que le mode d’ajout en mode natif. Ces adaptateurs utilisent l’insertion en bloc pour maximiser le débit et limiter la sollicitation.
Cet article explique comment utiliser Azure Functions pour implémenter les modes Remplacer et Accumuler pour Azure Stream Analytics. Quand vous utilisez une fonction comme couche intermédiaire, les performances d’écriture potentielles n’auront pas d’impact sur le travail de diffusion en continu. À cet égard, l’utilisation d’Azure Functions fonctionne mieux avec Azure SQL. Avec Synapse SQL, passer d’une instruction en bloc à une instruction ligne par ligne peut créer des problèmes de performance plus importants.
Sortie Azure Functions
Dans notre travail, nous remplaçons la sortie SQL d’Azure Stream Analytics par la sortie Azure Functions d’Azure Stream Analytics. Les capacités UPDATE, UPSERT ou MERGE seront implémentées dans la fonction.
Il existe actuellement deux options pour accéder à une base de données SQL dans une fonction. La première est la liaison de sortie Azure SQL. Elle est actuellement limitée à C# et ne propose que le mode de remplacement. La seconde consiste à composer une requête SQL qui sera soumise via le pilote SQL approprié (Microsoft.Data.SqlClient for .NET).
Pour les deux exemples suivants, nous supposons le schéma de table suivant. L’option de liaison exige qu’une clé primaire soit définie sur la table cible. Ce n’est pas obligatoire, mais recommandé, lorsque vous utilisez un pilote SQL.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
Une fonction doit répondre aux attentes suivantes pour être utilisée comme une sortie d’Azure Stream Analytics :
- Azure Stream Analytics attend l’état HTTP 200 de l’application Functions pour les lots qui ont été traités avec succès.
- Lorsqu’Azure Stream Analytics reçoit une exception 413 (qui indique que l’entité de requête HTTP est trop volumineuse) de la part d’une fonction Azure, il réduit la taille des lots envoyés à Azure Functions.
- Pendant le test de connexion, Stream Analytics envoie une requête POST avec un lot vide à Azure Functions et s’attend à recevoir l’état HTTP 20x en retour pour valider le test.
Option 1 : Mettre à jour par clé avec la liaison SQL Azure Functions
Cette option utilise la liaison de sortie SQL Azure Functions. Cette extension peut remplacer un objet dans une table, sans avoir à écrire une instruction SQL. Pour l’instant, elle ne prend pas en charge les opérateurs d’assignation composés (accumulations).
Cet exemple a été construit sur :
- Runtime Azure Functions version 4
- .NET 6.0
- Microsoft.Azure.WebJobs.Extensions.Sql 0.1.131-preview
Pour mieux comprendre l’approche de liaison, il est recommandé de suivre ce tutoriel.
Tout d’abord, créez une application de fonction HttpTrigger par défaut en suivant ce tutoriel. Les informations suivantes sont utilisées :
- Langage :
C#
- Runtime :
.NET 6
(sous fonction/runtime v4) - Modèle :
HTTP trigger
Installez l’extension de liaison en exécutant la commande suivante dans un terminal situé dans le dossier du projet :
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
Ajoutez l’élément SqlConnectionString
dans la section Values
de votre fichier local.settings.json
, en renseignant la chaîne de connexion du serveur de destination :
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Remplacez l’ensemble de la fonction (fichier .cs dans le projet) par l’extrait de code suivant. Mettez à jour l’espace de noms, le nom de la classe et le nom de la fonction par les vôtres :
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Mettez à jour le nom de la table de destination dans la section de liaison :
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Mettez à jour la classe Device
et la section de mappage pour qu’elles correspondent à votre propre schéma :
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
Vous pouvez maintenant tester la liaison entre la fonction locale et la base de données en déboguant (F5 dans Visual Studio Code). La base de données SQL doit être accessible depuis votre machine. SSMS peut être utilisé pour vérifier la connectivité. Ensuite, envoyez des requêtes POST au point de terminaison local. Une requête avec un corps vide doit renvoyer http 204. Une requête avec une charge utile réelle doit être rendue persistante dans la table de destination (en mode de remplacement/mise à jour). Voici un exemple de charge utile correspondant au schéma utilisé dans cet exemple :
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
La fonction peut désormais être publiée sur Azure. Un paramètre d’application doit être défini pour SqlConnectionString
. Le pare-feu du serveur SQL Azure doit autoriser les services Azure à entrer pour que la fonction en direct puisse l’atteindre.
La fonction peut ensuite être définie en tant que sortie dans le travail Azure Stream Analytics et utilisée pour remplacer des enregistrements au lieu de les insérer.
Option 2 : Fusionner avec un opérateur d’assignation composé (accumuler) à l’aide d’une requête SQL personnalisée
Notes
Lors du redémarrage et de la récupération, Azure Stream Analytics peut renvoyer des événements de sortie qui ont déjà été émis. Il s’agit d’un comportement attendu qui peut provoquer l’échec de la logique d’accumulation (doublement des valeurs individuelles). Pour éviter cela, il est recommandé de générer les mêmes données dans une table par le biais de la sortie SQL native d’Azure Stream Analytics. Cette table de contrôle peut alors être utilisée pour détecter les problèmes et resynchroniser l’accumulation le cas échéant.
Cette option utilise Microsoft.Data.SqlClient. Cette bibliothèque nous permet d’émettre n’importe quelle requête SQL vers une base de données SQL.
Cet exemple a été construit sur :
Tout d’abord, créez une application de fonction HttpTrigger par défaut en suivant ce tutoriel. Les informations suivantes sont utilisées :
- Langage :
C#
- Runtime :
.NET 6
(sous fonction/runtime v4) - Modèle :
HTTP trigger
Installez la bibliothèque SqlClient en exécutant la commande suivante dans un terminal situé dans le dossier du projet :
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
Ajoutez l’élément SqlConnectionString
dans la section Values
de votre fichier local.settings.json
, en renseignant la chaîne de connexion du serveur de destination :
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Remplacez l’ensemble de la fonction (fichier .cs dans le projet) par l’extrait de code suivant. Mettez à jour l’espace de noms, le nom de la classe et le nom de la fonction par les vôtres :
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
Mettez à jour la section de liaison de la commande sqltext
pour qu’elle corresponde à votre propre schéma (notez la manière dont l’accumulation est obtenue via l’opérateur +=
lors de la mise à jour) :
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
Vous pouvez maintenant tester la liaison entre la fonction locale et la base de données en déboguant (F5 dans VS Code). La base de données SQL doit être accessible depuis votre machine. SSMS peut être utilisé pour vérifier la connectivité. Puis, émettez des requêtes POST au point de terminaison local. Une requête avec un corps vide doit renvoyer http 204. Une requête avec une charge utile réelle doit être rendue persistante dans la table de destination (en mode d’accumulation/de fusion). Voici un exemple de charge utile correspondant au schéma utilisé dans cet exemple :
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
La fonction peut désormais être publiée sur Azure. Un paramètre d’application doit être défini pour SqlConnectionString
. Le pare-feu du serveur SQL Azure doit autoriser les services Azure à entrer pour que la fonction en direct puisse l’atteindre.
La fonction peut ensuite être définie en tant que sortie dans le travail Azure Stream Analytics et utilisée pour remplacer des enregistrements au lieu de les insérer.
Autres solutions
En dehors d’Azure Functions, il existe plusieurs façons d’obtenir le résultat escompté. Cette section fournit certaines d’entre elles.
Post-traitement dans la base de données SQL cible
Une tâche en arrière-plan fonctionne une fois que les données seront insérées dans la base de données via les sorties standard d’Azure Stream Analytics.
Pour Azure SQL, les déclencheurs DML INSTEAD OF
peuvent être utilisés pour intercepter les commandes INSERT émises par Azure Stream Analytics :
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
Pour Synapse SQL, Azure Stream Analytics peut insérer une table de mise en lots. Une tâche récurrente peut ensuite transformer les données selon les besoins dans une table intermédiaire. Enfin, les données sont déplacées vers la table de production.
Prétraitement dans Azure Cosmos DB
Azure Cosmos DB prend en charge UPSERT en mode natif. Ici, seul l’ajout/le remplacement est possible. Les accumulations doivent être gérées côté client dans Azure Cosmos DB.
Si les spécifications correspondent, une option consiste à remplacer la base de données SQL cible par une instance Azure Cosmos DB. Cela nécessite une modification importante de l’architecture globale de la solution.
Pour Synapse SQL, Azure Cosmos DB peut être utilisé comme couche intermédiaire via Azure Synapse Link pour Azure Cosmos DB. Azure Synapse Link peut être utilisé pour créer un magasin analytique. Ce magasin de données peut ensuite être interrogé directement dans Synapse SQL.
Comparaison des alternatives
Chaque approche offre une proposition de valeur et des capacités différentes :
Type | Option | Modes | Azure SQL Database | Azure Synapse Analytics |
---|---|---|---|---|
Post-traitement | ||||
Déclencheurs | Remplacer, Accumuler | + | s.o., les déclencheurs ne sont pas disponibles dans Synapse SQL | |
Staging | Remplacer, Accumuler | + | + | |
Pré-traitement | ||||
Azure Functions | Remplacer, Accumuler | + | - (performances ligne par ligne) | |
Remplacement Azure Cosmos DB | Remplacer | N/A | N/A | |
Azure Cosmos DB Azure Synapse Link | Replace | N/A | + |
Obtenir de l’aide
Pour obtenir de l’aide supplémentaire, essayez notre page de questions Microsoft Q&A pour Azure Stream Analytics.
Étapes suivantes
- Comprendre les sorties d’Azure Stream Analytics
- Sortie Azure Stream Analytics dans Azure SQL Database
- Augmenter les performances de débit pour Azure SQL Database à partir d’Azure Stream Analytics
- Utiliser les identités managées pour accéder à Azure SQL Database ou Azure Synapse Analytics à partir d’une tâche Azure Stream Analytics
- Utiliser les données de référence d’une base de données SQL pour une tâche Azure Stream Analytics
- Exécuter Azure Functions dans des travaux Azure Stream Analytics – Tutoriel pour la sortie Redis
- Démarrage rapide : Créer un travail Stream Analytics à l’aide du portail Azure