Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
V současné době Azure Stream Analytics (ASA) podporuje pouze vkládání (připojování) řádků do výstupů SQL (Azure SQL Databases a Azure Synapse Analytics). Tento článek popisuje alternativní řešení povolení UPDATE, UPSERT nebo MERGE v databázích SQL pomocí Azure Functions jako zprostředkující vrstvy.
Na konci se zobrazí alternativní možnosti Azure Functions.
Požadavek
Data do tabulky můžete zapisovat pomocí jednoho z následujících režimů:
| Režim | Ekvivalentní příkaz T-SQL | Požadavky |
|---|---|---|
| Připojit | INSERT | Nic |
| Nahradit | MERGE (UPSERT) | Jedinečný klíč |
| Hromadit | MERGE (UPSERT) s operátorem složeného přiřazení(+=,-=...) |
Jedinečný klíč a akumulátor |
Pokud chcete znázornit rozdíly, zvažte, co se stane při ingestování následujících dvou záznamů:
| Čas příjezdu | Device_Id | Hodnota_Měření |
|---|---|---|
| 10:00 | A | 1 |
| 10:05 | A | 20 |
V režimu připojení vložíte dva záznamy. Ekvivalentní příkaz T-SQL je:
INSERT INTO [target] VALUES (...);
Výsledkem je:
| Čas_Upravení | Device_Id | Hodnota_Měření |
|---|---|---|
| 10:00 | A | 1 |
| 10:05 | A | 20 |
V režimu nahrazení získáte pouze poslední hodnotu podle klíče. Tady jako klíč použijete Device_Id. Ekvivalentní příkaz T-SQL je:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Výsledkem je:
| Čas_Upravení | Klíč_zařízení | Hodnota_Měření |
|---|---|---|
| 10:05 | A | 20 |
Nakonec v režimu kumulování sečtete Value pomocí operátoru složeného přiřazení (+=). Tady také použijete Device_Id jako klíč:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Výsledkem je:
| Čas_Upravení | Klíč_zařízení | Hodnota_Měření |
|---|---|---|
| 10:05 | A | 21 |
Pro výkonové úvahy momentálně výstupní adaptéry databáze ASA SQL nativně podporují pouze režim připojení. Tyto adaptéry používají hromadné vkládání k maximalizaci propustnosti a omezení zpětného tlaku.
V tomto článku se dozvíte, jak pomocí azure Functions implementovat režimy nahrazení a kumulování pro ASA. Pokud používáte funkci jako zprostředkující vrstvu, potenciální výkon zápisu nemá vliv na úlohu streamování. V tomto ohledu funguje použití Azure Functions nejlépe s Azure SQL. V případě Synapse SQL může přechod z hromadných příkazů na příkazy po řádcích způsobit vyšší problémy s výkonem.
Azure Functions výstup
V této úloze nahradíte výstup z ASA SQL výstupem z ASA Azure Functions. Funkce implementuje funkce UPDATE, UPSERT nebo MERGE.
V současné době máte přístup ke službě SQL Database ve funkci pomocí dvou možností. První možností je výstupní vazba Azure SQL. V současné době je omezený na jazyk C# a nabízí pouze režim nahrazení. Druhou možností je vytvořit dotaz SQL, který se odešle přes příslušný ovladač SQL (Microsoft. Data.SqlClient pro .NET).
Obě následující ukázky předpokládají následující schéma tabulky. Možnost vazby vyžaduje , aby byl v cílové tabulce nastaven primární klíč . Není to nutné, ale doporučuje se při použití ovladače SQL.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
Pokud chcete funkci použít jako výstup z ASA, musí tato funkce splňovat následující očekávání:
- Azure Stream Analytics očekává stav HTTP 200 z aplikace Functions pro dávky, které úspěšně zpracovává.
- Když Azure Stream Analytics obdrží výjimku 413 ("http Request Entity Too Large") z Azure Functions, zmenšuje velikost dávek, které odesílá do Azure Function.
- Během testovacího připojení odešle Stream Analytics POST požadavek s prázdnou dávkou k Azure Functions a očekává zpět stav HTTP 20x pro ověření testu.
Možnost 1: Aktualizace podle klíče pomocí vazby SQL pro Azure Functions
Tato možnost používá SQL výstupní vazbu pro Azure Function. Toto rozšíření může nahradit objekt v tabulce, aniž byste museli psát příkaz SQL. V tuto chvíli nepodporuje operátory složeného přiřazení (akumulace).
Tato ukázka byla postavena na:
- Prostředí runtime pro Azure Functions verze 4
- .NET 6.0
- Microsoft. Azure. WebJobs.Extensions.Sql 0.1.131-preview
Pokud chcete lépe porozumět přístupu vazby, postupujte podle tohoto návodu.
Nejprve podle tohoto kurzu vytvořte výchozí aplikaci funkcí HttpTrigger. Použijte následující informace:
- Jazyk:
C# - Runtime:
.NET 6(v části function/runtime v4) - Šablona:
HTTP trigger
Nainstalujte rozšíření vazby spuštěním následujícího příkazu v terminálu umístěném ve složce projektu:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
Přidejte položku SqlConnectionString do oddílu Values vašeho local.settings.json a vyplňte připojovací řetězec cílového serveru:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Nahraďte celou funkci (.cs soubor v projektu) následujícím fragmentem kódu. Aktualizujte obor názvů, název třídy a název funkce vlastním názvem:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Aktualizujte název cílové tabulky v oddílu vazby:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Aktualizujte blok třídy a mapování tak, aby odpovídaly vlastnímu schématu:
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
Teď můžete otestovat zapojení mezi místní funkcí a databází laděním (F5 v editoru Visual Studio Code). Databáze SQL musí být dostupná z vašeho počítače. SSMS můžete použít ke kontrole připojení. Potom odešlete požadavky POST do místního koncového bodu. Požadavek s prázdným textem by měl vrátit HTTP 204. Požadavek opatřený skutečným obsahem by měl být uložen v cílové tabulce (v režimu nahrazení nebo aktualizace). Zde je vzorová datová část, která odpovídá schématu použitému v tomto příkladu:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
Funkce je teď možné publikovat do Azure. Nastavte nastavení aplikace pro SqlConnectionString. Brána firewall Azure SQL Serveru by měla umožňovat, aby se k ní dostaly služby Azure, aby se k ní dostaly živé funkce.
Pak můžete funkci definovat jako výstup v úloze ASA a místo jejich vložení je použít k nahrazení záznamů.
Možnost 2: Sloučení se složeným přiřazením (kumulování) prostřednictvím vlastního dotazu SQL
Poznámka:
Po restartování a obnovení může ASA znovu odeslat výstupní události, které už vygeneroval. Toto chování může způsobit selhání logiky akumulace (zdvojnásobení jednotlivých hodnot). Pokud chcete tomuto problému zabránit, vypište stejná data v tabulce pomocí nativního výstupu ASA SQL. Tuto řídicí tabulku můžete použít k detekci problémů a opětovné synchronizaci akumulace v případě potřeby.
Tato možnost používá Microsoft.Data.SqlClient. Tato knihovna umožňuje odesílat všechny dotazy SQL do služby SQL Database.
Tato ukázka byla vytvořena na základě:
Nejprve podle tohoto kurzu vytvořte výchozí aplikaci funkcí HttpTrigger. Používají se následující informace:
- Jazyk:
C# - Runtime:
.NET 6(v části function/runtime v4) - Šablona:
HTTP trigger
Nainstalujte knihovnu SqlClient spuštěním následujícího příkazu v terminálu umístěném ve složce projektu:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
Položku SqlConnectionString přidejte do oddílu Values vašeho local.settings.json, vyplněním připojovacího řetězce cílového serveru:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Nahraďte celou funkci (.cs soubor v projektu) následujícím fragmentem kódu. Aktualizujte obor názvů, název třídy a název funkce vlastním názvem:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
sqltext Aktualizujte oddíl sestavení příkazu tak, aby odpovídal vašemu vlastnímu schématu (všimněte si, jak se dosahuje akumulace prostřednictvím operátoru += při aktualizaci):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
Teď můžete otestovat zapojení mezi místní funkcí a databází laděním (F5 v editoru VS Code). Databáze SQL musí být dostupná z vašeho počítače. SSMS můžete použít ke kontrole připojení. Potom odešlete požadavky POST do místního koncového bodu. Požadavek s prázdným textem by měl vrátit HTTP 204. Požadavek se skutečným obsahem by měl být uložen v cílové tabulce (v režimu shromažďování nebo sloučení). Tady je ukázková datová část odpovídající schématu použitému v této ukázce:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
Funkce je teď možné publikovat do Azure. Nastavení aplikace by mělo být nastaveno pro SqlConnectionString. Brána firewall Azure SQL
Funkci pak můžete definovat jako výstup v úloze ASA a místo jejich vložení je použít k nahrazení záznamů.
Alternativy
Mimo Azure Functions může dosáhnout očekávaného výsledku více metod. Tato část popisuje některé z těchto metod.
Následné zpracování v cílové databázi SQL
Úloha na pozadí funguje, jakmile se data vloží do databáze prostřednictvím standardních výstupů ASA SQL.
Pro Azure SQL použijte INSTEAD OFDML triggery k zachycení příkazů INSERT, které vydává ASA.
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
V případě Synapse SQL může ASA vložit do pracovní tabulky. Opakovaný úkol pak může podle potřeby transformovat data do zprostředkující tabulky. Nakonec se data přesunou do produkční tabulky.
Předběžné zpracování ve službě Azure Cosmos DB
Azure Cosmos DB nativně podporuje UPSERT. Tady je možné pouze připojit nebo nahradit. V Azure Cosmos DB musíte spravovat akumulace na straně klienta.
Pokud se požadavky shodují, můžete cílovou databázi SQL nahradit Azure Cosmos DB instancí. Tato změna vyžaduje důležitou změnu v celkové architektuře řešení.
Pro Synapse SQL můžete jako zprostředkující vrstvu použít Azure Cosmos DB prostřednictvím Azure Synapse Link pro Azure Cosmos DB. Pomocí Azure Synapse Link vytvořte analytické úložiště . Toto úložiště dat pak můžete dotazovat přímo ve službě Synapse SQL.
Porovnání alternativ
Každý přístup nabízí různé návrhy hodnot a schopnosti.
| Typ | Možnost | Režimy | Azure SQL Database | Azure Synapse Analytics |
|---|---|---|---|---|
| Následné zpracování | ||||
| Spouštěče | Nahradit, Kumulovat | + | Není k dispozici, triggery nejsou dostupné v Synapse SQL | |
| Příprava | Nahradit, Kumulovat | + | + | |
| Předběžné zpracování | ||||
| Azure Functions | Nahradit, Kumulovat | + | - (výkon po řádcích) | |
| Nahrazení služby Azure Cosmos DB | Nahradit | Není relevantní | Není relevantní | |
| Azure Cosmos DB Azure Synapse Link | Nahradit | Není relevantní | + |
Získání podpory
Pokud potřebujete další pomoc, vyzkoušejte stránku s dotazy microsoftu Q&A pro Azure Stream Analytics.
Další kroky
- Vysvětlení výstupů z Azure Stream Analytics
- Výstup Azure Stream Analytics do služby Azure SQL Database
- Zvýšení výkonu propustnosti do služby Azure SQL Database z Azure Stream Analytics
- Použití spravovaných identit pro přístup ke službě Azure SQL Database nebo Azure Synapse Analytics z úlohy Azure Stream Analytics
- Použití referenčních dat z SQL Database pro úlohu Azure Stream Analytics
- Spuštění Azure Functions v úlohách Azure Stream Analytics – výukový kurz pro výstup Redis
- Rychlý start: Vytvoření úlohy Stream Analytics pomocí webu Azure Portal