Aggiornare o unire record nel database SQL di Azure con Funzioni di Azure
Attualmente Analisi di flusso di Azure (ASA) supporta solo l'inserimento (accodamento) di righe agli output SQL (database SQL di Azure e Azure Synapse Analytics). Questo articolo illustra le soluzioni alternative per abilitare UPDATE, UPSERT o MERGE nei database SQL, con Funzioni di Azure come livello intermedio.
Le opzioni alternative a Funzioni di Azure vengono presentate alla fine.
Requisito
La scrittura di dati in una tabella può essere in genere eseguita nel modo seguente:
Modalità | Istruzione T-SQL equivalente | Requisiti |
---|---|---|
Aggiunta | INSERT … | None |
Sostituzione | UNIRE (UPSERT) | Chiave univoca |
Accumulate | UNIRE (UPSERT) con operatore di assegnazione composta (+= , -= ...) |
Chiave univoca e identificatore |
Per illustrare le differenze, esaminare cosa accade quando si inseriscono i due record seguenti:
Ora di arrivo | Device_Id | Measure_Value |
---|---|---|
10.00 | A | 1 |
10:05 | A | 20 |
Nella modalità di accodamento vengono inseriti due record. L'istruzione T-SQL equivalente è:
INSERT INTO [target] VALUES (...);
Risultato:
Modified_Time | Device_Id | Measure_Value |
---|---|---|
10.00 | A | 1 |
10:05 | A | 20 |
Nella modalità di sostituzione si ottiene solo l'ultimo valore per chiave. Qui si usa Device_Id come chiave. L'istruzione T-SQL equivalente è:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Risultato:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 20 |
Infine, nella modalità di accumulo si somma Value
con un operatore di assegnazione composta (+=
). In questo caso si usano anche Device_Id come chiave:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Risultato:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | Un | 21 |
Per considerazioni sulle prestazioni, gli adattatori di output del database SQL ASA supportano attualmente solo la modalità di accodamento in modo nativo. Questi adattatori usano l'inserimento bulk per ottimizzare la velocità effettiva e limitare la pressione.
Questo articolo illustra come usare Funzioni di Azure per implementare le modalità Replace e Accumulate per ASA. Quando si usa una funzione come livello intermedio, le potenziali prestazioni di scrittura non influiscono sul processo di streaming. A questo proposito, l'uso di Funzioni di Azure funziona meglio con Azure SQL. Con Synapse SQL, il passaggio da istruzioni bulk a righe per riga potrebbe creare problemi di prestazioni maggiori.
Output di Funzioni di Azure
Nel processo viene sostituito l'output SQL ASA dall'output di Funzioni di Azure ASA. Le funzionalità UPDATE, UPSERT o MERGE vengono implementate nella funzione.
Attualmente sono disponibili due opzioni per accedere a un database SQL in una funzione. Il primo è l'associazione di output di Azure SQL. Attualmente è limitato a C# e offre solo la modalità di sostituzione. In secondo luogo, creare una query SQL da inviare tramite il driver SQL appropriato (Microsoft.Data.SqlClient per .NET).
Per entrambi gli esempi seguenti si presuppone lo schema della tabella seguente. L'opzione di associazione richiede l'impostazione di una chiave primaria nella tabella di destinazione. Non è necessario, ma consigliato, quando si usa un driver SQL.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
Una funzione deve soddisfare le aspettative seguenti da usare come output da ASA:
- Analisi di flusso di Azure prevede lo stato HTTP 200 dall'app Funzioni per i batch elaborati correttamente
- Quando Analisi di flusso di Azure riceve l'eccezione 413 ("Entità richiesta HTTP troppo grande") da una funzione di Azure, riduce la dimensione dei batch che invia a Funzioni di Azure
- Durante la connessione di test, Analisi di flusso invia una richiesta POST con un batch vuoto a Funzioni di Azure e prevede lo stato HTTP 20x per convalidare il test
Opzione 1: Aggiornare per chiave con l'associazione SQL della funzione di Azure
Questa opzione usa l'associazione di output SQL della funzione di Azure. Questa estensione può sostituire un oggetto in una tabella, senza dover scrivere un'istruzione SQL. Al momento, non supporta operatori di assegnazione composti (accumuli).
Questo esempio è stato compilato in:
- Impostare la versione del runtime di Funzioni di Azure versione 4
- .NET 6.0
- Microsoft.Azure.WebJobs.Extensions.Sql 0.1.131-preview
Per comprendere meglio l'approccio di associazione, è consigliabile seguire questa esercitazione.
Prima di tutto, creare un'app per le funzioni HttpTrigger predefinita seguendo questa esercitazione. Sono richieste le informazioni seguenti:
- Lingua:
C#
- Runtime:
.NET 6
(in funzione/runtime v4) - Modello:
HTTP trigger
Installare l'estensione di associazione eseguendo il comando seguente in un terminale che si trova nella cartella del progetto:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
Aggiungere l'elemento SqlConnectionString
nella sezione Values
del local.settings.json
, inserendo la stringa di connessione del server di destinazione:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Sostituire l'intera funzione (.cs file nel progetto) con il frammento di codice seguente. Aggiornare lo spazio dei nomi, il nome della classe e il nome della funzione in base al proprio:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Aggiornare il nome della tabella di destinazione nella sezione di associazione:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Aggiornare la sezione di classe e mapping Device
in modo che corrisponda al proprio schema:
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
È ora possibile testare il collegamento tra la funzione locale e il database eseguendo il debug (F5 in Visual Studio Code). Il database SQL deve essere raggiungibile dal computer. È possibile usare SSMS per controllare la connettività. Inviare quindi richieste POST all'endpoint locale. Una richiesta con un corpo vuoto dovrebbe restituire http 204. Una richiesta con un payload effettivo dovrebbe essere mantenuta nella tabella di destinazione (in modalità replace/update). Ecco un payload di esempio corrispondente allo schema usato in questo esempio:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
È ora possibile pubblicare la funzione in Azure. È necessario specificare un'impostazione dell'applicazione per SqlConnectionString
. Il firewall di SQL Server di Azure dovrebbe consentire l'accesso dei servizi di Azure perché la funzione live possa raggiungerlo.
È quindi possibile definire la funzione come output nel processo ASA e usarla per sostituire i record anziché inserirli.
Opzione 2: eseguire il merge con l'assegnazione composta (accumulare) tramite una query SQL personalizzata
Nota
Al riavvio e al ripristino, ASA può inviare nuovamente gli eventi di output già generati. Si tratta di un comportamento previsto che può causare l'esito negativo della logica di accumulo (raddoppiando i singoli valori). Per evitare questo problema, è consigliabile restituire gli stessi dati in una tabella tramite l'output SQL ASA nativo. È quindi possibile usare questa tabella di controllo per rilevare i problemi e sincronizzare nuovamente l'accumulo quando necessario.
Questa opzione usa Microsoft.Data.SqlClient. Questa libreria consente di eseguire query SQL a un database SQL.
Questo esempio è stato compilato in:
- Impostare la versione del runtime di Funzioni di Azure versione 4
- .NET 6.0
- Microsoft.Data.SqlClient 4.0.0
Prima di tutto, creare un'app per le funzioni HttpTrigger predefinita seguendo questa esercitazione. Sono richieste le informazioni seguenti:
- Lingua:
C#
- Runtime:
.NET 6
(in funzione/runtime v4) - Modello:
HTTP trigger
Installare la libreria SqlClient eseguendo il comando seguente in un terminale che si trova nella cartella del progetto:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
Aggiungere l'elemento SqlConnectionString
nella sezione Values
del local.settings.json
, inserendo la stringa di connessione del server di destinazione:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Sostituire l'intera funzione (.cs file nel progetto) con il frammento di codice seguente. Aggiornare lo spazio dei nomi, il nome della classe e il nome della funzione in base al proprio:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
Aggiornare la sezione di compilazione dei comandi sqltext
in modo che corrisponda al proprio schema (si noti come l'accumulo viene ottenuto tramite l'operatore +=
all'aggiornamento):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
È ora possibile testare il collegamento tra la funzione locale e il database eseguendo il debug (F5 in VS Code). Il database SQL deve essere raggiungibile dal computer. È possibile usare SSMS per controllare la connettività. Inviare quindi richieste POST all'endpoint locale. Una richiesta con un corpo vuoto dovrebbe restituire http 204. Una richiesta con un payload effettivo dovrebbe essere salvata in modo permanente nella tabella di destinazione (in modalità di accumulo/unione). Ecco un payload di esempio corrispondente allo schema usato in questo esempio:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
È ora possibile pubblicare la funzione in Azure. È necessario specificare un'impostazione dell'applicazione per SqlConnectionString
. Il firewall di SQL Server di Azure dovrebbe consentire l'accesso dei servizi di Azure perché la funzione live possa raggiungerlo.
È quindi possibile definire la funzione come output nel processo ASA e usarla per sostituire i record anziché inserirli.
Alternative
Al di fuori di Funzioni di Azure, esistono diversi modi per ottenere il risultato previsto. In questa sezione vengono fornite alcune di esse.
Post-elaborazione nel database SQL di destinazione
Un'attività in background viene eseguita dopo l'inserimento dei dati nel database tramite gli output STANDARD di ASA SQL.
Per Azure SQL, è possibile usare i INSTEAD OF
trigger DML per intercettare i comandi INSERT emessi da ASA:
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
Per Synapse SQL, ASA è in grado di inserire in una tabella di staging. Un'attività ricorrente può quindi trasformare i dati in base alle esigenze in una tabella intermedia. Infine, i dati vengono spostati nella tabella di produzione.
Pre-elaborazione in Azure Cosmos DB
Azure Cosmos DB supporta UPSERT in modo nativo. Qui è possibile solo accodamento/sostituzione. Gli accumuli devono essere gestiti sul lato client in Azure Cosmos DB.
Se i requisiti corrispondono, un'opzione consiste nel sostituire il database SQL di destinazione da un'istanza di Azure Cosmos DB. In questo modo è necessaria una modifica importante nell'architettura complessiva della soluzione.
Per Synapse SQL, è possibile usare Azure Cosmos DB come livello intermedio tramite Collegamento ad Azure Synapse per Azure Cosmos DB. È possibile usare Collegamento ad Azure Synapse per creare un archivio analitico. È quindi possibile sottoporre questo archivio dati a query direttamente in Synapse SQL.
Confronto delle alternative
Ogni approccio offre proposte e funzionalità di valore diverse:
Type | Opzione | Modalità | database SQL di Azure | Azure Synapse Analytics |
---|---|---|---|---|
Post-elaborazione | ||||
Trigger | Sostituisci, Accumula | + | N/D, i trigger non sono disponibili in Synapse SQL | |
Staging | Sostituisci, Accumula | + | + | |
Pre-elaborazione | ||||
Funzioni di Azure | Sostituisci, Accumula | + | - (prestazioni riga per riga) | |
Sostituzione di Azure Cosmos DB | Sostituzione | N/D | N/D | |
Azure Cosmos DB Azure Synapse Link | Sostituzione | N/D | + |
Ottenere supporto
Per maggiore supporto, provare la Pagina delle domande di Domande e risposte Microsoft per Analisi di flusso di Azure.
Passaggi successivi
- Informazioni sugli output di Analisi di flusso di Azure
- Output di Analisi di flusso di Azure nel database SQL di Azure
- Aumentare le prestazioni della velocità effettiva per il database SQL di Azure da Analisi di flusso di Azure
- Usare le identità gestite per accedere al database SQL di Azure o ad Azure Synapse Analytics da un processo di Analisi di flusso di Azure
- Usare dati di riferimento da un database SQL per un processo di Analisi di flusso di Azure
- Eseguire Funzioni di Azure in processi di Analisi di flusso di Azure - Esercitazione per l'output Redis
- Avvio rapido: creare un processo di Analisi di flusso di Azure tramite il portale di Azure