Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
Jelenleg Azure Stream Analytics (ASA) csak SQL-kimenetekbe (Azure SQL Adatbázisok és Azure Synapse Analytics) szúr be (hozzáfűző) sorokat. Ez a cikk a UPDATE, UPSERT vagy MERGE SQL-adatbázisokon való engedélyezésének megkerülő megoldásait ismerteti a Azure Functions közvetítőrétegként való használatával.
A Azure Functions alternatív lehetőségei a végén jelennek meg.
Követelmény
Az alábbi módok egyikével adatokat írhat egy táblába:
| Mód | Egyenértékű T-SQL utasítás | Követelmények |
|---|---|---|
| Hozzáfűzés | INSERT | Egyik sem |
| Cserél | EGYESÍTÉS (FELÜLÍR/BEILLESZTÉS) | Egyedi kulcs |
| Felhalmoz | MERGE (UPSERT) összetett hozzárendelési operátorral (+=, -=...) |
Egyedi kulcs és akkumulátor |
A különbségek szemléltetéséhez gondolja át, mi történik a következő két rekord betöltésekor:
| Érkezési_Idő | Eszköz_Azonosító | Mérték_Érték |
|---|---|---|
| 10:00 | A | 1 |
| 10:05 | A | 20 |
Hozzáfűzési módban két rekordot szúr be. Az egyenértékű T-SQL utasítás a következő:
INSERT INTO [target] VALUES (...);
Ennek eredménye:
| Módosított_Idő | Eszköz_azonosító | Mérték_Érték |
|---|---|---|
| 10:00 | A | 1 |
| 10:05 | A | 20 |
Csere módban csak az utolsó értéket kapja meg kulcs szerint. Itt a Device_Id használja kulcsként. Az egyenértékű T-SQL utasítás a következő:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Ennek eredménye:
| Módosított_Idő | Eszköz_kulcs | Mérték_Érték |
|---|---|---|
| 10:05 | A | 20 |
Végül halmozási módban += összegezel egy összetett hozzárendelési operátorral (Value). Itt a Device_Id is használja kulcsként:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Ennek eredménye:
| Módosított_Idő | Eszköz_kulcs | Mérték_Érték |
|---|---|---|
| 10:05 | A | 21 |
A teljesítmény szempontjából az ASA SQL-adatbázis kimeneti adapterei jelenleg csak natív módon támogatják a hozzáfűzési módot. Ezek az adapterek tömeges beszúrással maximalizálják az átviteli sebességet, és korlátozzák a visszanyomást.
Ez a cikk bemutatja, hogyan valósíthat meg csere- és halmozási módokat az Azure Functions használatával az ASA-hoz. Ha egy függvényt közvetítő rétegként használ, a lehetséges írási teljesítmény nem befolyásolja a streamelési feladatot. Ebben a tekintetben az Azure Functions használata a legjobban az Azure SQL-sel működik. A Synapse SQL esetén a tömegesről az egyesével történő feldolgozásra való váltás nagyobb teljesítményproblémákat eredményezhet.
Azure Functions kimenet
Ebben a feladatban az ASA SQL-kimenetet a ASA Azure Functions kimenetre cseréli. A függvény implementálja az UPDATE, UPSERT vagy MERGE képességeket.
Jelenleg egy függvényben lévő SQL Database két lehetőséggel érhető el. Az első lehetőség a Azure SQL kimeneti kötés. Jelenleg C#-ra van korlátozva, és csak csere módot kínál. A második lehetőség egy SQL-lekérdezés összeállítása a megfelelő SQL-illesztőn keresztül (Microsoft. Data.SqlClient .NET esetén).
Az alábbi minták mindegyike a következő táblázatsémát feltételezi. A kötési beállításhoz egy elsődleges kulcsot kell beállítani a céltáblán. SQL-illesztő használata esetén nem szükséges, de ajánlott.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
Ha egy függvényt ASA-kimenetként szeretne használni, a függvénynek meg kell felelnie az alábbi elvárásoknak:
- Az Azure Stream Analytics a sikeresen feldolgozott kötegek esetén HTTP-állapot 200-ra számít a Functions alkalmazástól.
- Ha Azure Stream Analytics 413 ("http Request Entity Too Large") kivételt kap egy Azure függvénytől, csökkenti a Azure függvénynek küldött kötegek méretét.
- Tesztkapcsolat során a Stream Analytics egy üres köteggel rendelkező POST-kérést küld az Azure Functions szolgáltatásnak, és HTTP 20x státuszkódot vár vissza a teszt érvényesítéséhez.
1. lehetőség: Kulcs szerinti frissítés az Azure-függvény SQL-kötésével
Ez a beállítás az Azure Function SQL Kimeneti kötést használja. Ez a bővítmény sql-utasítás írása nélkül lecserélheti a táblák objektumát. Jelenleg nem támogatja az összetett hozzárendelési operátorokat (akkumulációkat).
Ez a minta a következőre épült:
- Az Azure Functions 4-es futtatókörnyezete
- .NET 6.0
- Microsoft. Azure. WebJobs.Extensions.Sql 0.1.131-preview
A kötési megközelítés jobb megértéséhez kövesse ezt az oktatóanyagot.
Először hozzon létre egy alapértelmezett HttpTrigger-függvényalkalmazást az oktatóanyag követésével. Használja a következő információkat:
- Nyelv:
C# - Futtatási idő:
.NET 6(funkció/runtime v4 alatt) - Sablon:
HTTP trigger
Telepítse a kötésbővítményt a következő parancs futtatásával a projektmappában található terminálon:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
Adja hozzá az SqlConnectionString elemet a Values szakaszhoz a local.settings.json, kitöltve a célkiszolgáló kapcsolati sztringjét.
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Cserélje le a teljes függvényt (.cs fájlt a projektben) a következő kódrészletre. Frissítse a névteret, az osztálynevet és a függvénynevet a sajátjával:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Frissítse a céltábla nevét a kötési szakaszban:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Frissítse az Device osztály- és leképezési szakaszt a saját sémájának megfelelően:
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
Mostantól hibakereséssel tesztelheti a helyi függvény és az adatbázis közötti vezetékeket (F5 a Visual Studio Code-ban). Az SQL-adatbázisnak elérhetőnek kell lennie a gépről. Az SSMS használatával ellenőrizheti a kapcsolatot. Ezután küldjön POST-kéréseket a helyi végpontnak. Egy üres törzsű kérésnek HTTP 204-et kell visszaadnia. A tényleges hasznos adattal rendelkező kéréseket a céltáblában kell őrizni (csere/frissítés módban). Íme egy minta adathalmaz, amely megfelel az ebben a mintában alkalmazott sémának:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
A függvény mostantól közzétehető az Azure-ban. Beállítson egy alkalmazásbeállítást a következőhözSqlConnectionString: . Az Azure SQL Server tűzfalának engedélyeznie kell , hogy az Azure-szolgáltatások elérjék az élő függvényt.
Ezután az ASA-feladatban kimenetként definiálhatja a függvényt, és a beszúrás helyett rekordok cseréjére használhatja.
2. lehetőség: Összetett hozzárendeléssel (felhalmozással) egyéni SQL-lekérdezés segítségével végzett egyesítés
Feljegyzés
Újraindításkor és helyreállításkor előfordulhat, hogy az ASA újraküldi a már kibocsátott kimeneti eseményeket. Ez a viselkedés a felhalmozási logika meghiúsulását okozhatja (az egyes értékek megduplázása). A probléma elkerülése érdekében a natív ASA SQL-kimenettel adja ki ugyanazokat az adatokat egy táblában. Ezzel a vezérlőtáblával észlelheti a problémákat, és szükség esetén újraszinkronizálhatja a felhalmozást.
Ez a beállítás a Microsoft.Data.SqlClientet használja. Ez a kódtár lehetővé teszi, hogy sql-lekérdezéseket küldjön egy SQL Database-nek.
Ez a minta a következőre épült:
Először hozzon létre egy alapértelmezett HttpTrigger-függvényalkalmazást az oktatóanyag követésével. A rendszer a következő információkat használja:
- Nyelv:
C# - Futtatókörnyezet:
.NET 6(a function/runtime v4 alatt) - Sablon:
HTTP trigger
Telepítse az SqlClient-kódtárat a következő parancs futtatásával a projektmappában található terminálon:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
Adja hozzá az SqlConnectionString elemet a Values szakaszban a local.settings.json-hoz, és töltse ki a célkiszolgáló kapcsolati sztringjét.
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Cserélje le a teljes függvényt (.cs fájlt a projektben) a következő kódrészletre. Frissítse saját névterét, osztálynevét és függvénynevét:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
Frissítse a sqltext parancsépítési szakaszt a saját sémájának megfelelően (figyelje meg, hogyan érhető el a felhalmozás az operátoron keresztül a += frissítés során):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
Mostantól hibakereséssel tesztelheti a helyi függvény és az adatbázis közötti vezetékeket (F5 a VS Code-ban). Az SQL-adatbázisnak elérhetőnek kell lennie a gépről. Az SSMS használatával ellenőrizheti a kapcsolatot. Ezután küldjön POST-kéréseket a helyi végpontnak. Egy üres törzsű kérésnek HTTP 204-et kell visszaadnia. A tényleges hasznos adattal rendelkező kéréseket a céltáblában kell őrizni (halmozási/egyesítési módban). Íme egy mintaadatcsomag, amely megfelel az ebben a mintában használt sémának:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
A függvény mostantól közzétehető az Azure-ban. Egy alkalmazásbeállítást kell beállítani a következőhöz SqlConnectionString: . Az Azure SQL Server tűzfalának engedélyeznie kell , hogy az Azure-szolgáltatások elérjék az élő függvényt.
A függvény ezután kimenetként definiálható az ASA-feladatban, és a rekordok beszúrása helyett rekordok cseréjére használható.
Alternatívák
A Azure Functions kívül több metódus is elérheti a várt eredményt. Ez a szakasz ezen módszerek némelyikét ismerteti.
Utófeldolgozás a cél SQL Database-ben
A háttérfeladat akkor működik, ha az adatok a standard ASA SQL-kimeneteken keresztül kerülnek be az adatbázisba.
Az Azure SQL esetében INSTEAD OFDML triggerek használatával elfoghatja az ASA által kiadott INSERT parancsokat.
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
A Synapse SQL esetében az ASA beszúrhat egy előkészítési táblába. Az ismétlődő tevékenységek ezután szükség szerint átalakíthatják az adatokat egy köztes táblává. Végül az adatok átkerülnek az éles táblába.
Előfeldolgozás az Azure Cosmos DB-ben
Az Azure Cosmos DB natív módon támogatja az UPSERT-t. Itt csak hozzáfűzés vagy csere lehetséges. Az ügyféloldali halmozódásokat az Azure Cosmos DB-n neked kell kezelned.
Ha a követelmények egyeznek, lecserélheti a cél SQL-adatbázist egy Azure Cosmos DB-példányra. Ehhez a változáshoz fontos változásra van szükség az általános megoldásarchitektúra terén.
A Synapse SQL esetében a Azure Cosmos DB a Azure Synapse Link Azure Cosmos DB segítségével használhatja köztes rétegként. Az Azure Synapse Link használatával hozzon létre egy analytical store. Ezt követően közvetlenül a Synapse SQL-ben kérdezheti le ezt az adattárat.
Az alternatívák összehasonlítása
Minden megközelítés különböző értékajánlatokat és képességeket kínál:
| Típus | Lehetőség | Módok | Azure SQL Database | Azure Synapse Analytics |
|---|---|---|---|---|
| Utófeldolgozás | ||||
| Triggerek | Csere, felhalmozás | + | N/A, az eseményindítók nem érhetők el a Synapse SQL-ben | |
| Előkészítés | Cserélni, Összegyűjtés | + | + | |
| Előzetes feldolgozás | ||||
| Azure Functions | Csere, felhalmozás | + | - (sorról sorra teljesítmény) | |
| Az Azure Cosmos DB cseréje | Cserélje | N/A | Nincs adat | |
| Azure Cosmos DB Azure Synapse Link | Csere | Nem alkalmazható | + |
Támogatás kérése
További segítségért próbálja ki a Microsoft Q&A kérdésoldalát az Azure Stream Analyticshez.
Következő lépések
- Az Azure Stream Analytics kimeneteinek ismertetése
- Azure Stream Analytics-kimenet az Azure SQL Database-be
- Az Azure Stream Analyticsből az Azure SQL Database-be irányuló átviteli teljesítmény növelése
- Felügyelt identitások használata az Azure SQL Database vagy az Azure Synapse Analytics eléréséhez egy Azure Stream Analytics-feladatból
- Referenciaadatok használata SQL Database-ből azure Stream Analytics-feladathoz
- Az Azure Functions futtatása az Azure Stream Analytics-feladatokban – Oktatóanyag a Redis kimenetéhez
- Rövid útmutató: Stream Analytics-feladat létrehozása az Azure Portal használatával