Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Derzeit unterstützt Azure Stream Analytics (ASA) nur das Einfügen (Anfügen) von Zeilen in SQL-Ausgaben (Azure SQL Databases und Azure Synapse Analytics). In diesem Artikel werden Problemumgehungen erläutert, um UPDATE, UPSERT oder MERGE in SQL-Datenbanken mithilfe von Azure Functions als zwischengeschaltete Ebene zu aktivieren.
Die alternativen Optionen für Azure Functions werden am Ende vorgestellt.
Anforderung
Sie können Daten in eine Tabelle schreiben, indem Sie einen der folgenden Modi verwenden:
| Modus | Entsprechende T-SQL-Anweisung | Anforderungen |
|---|---|---|
| Anfügen | INSERT | Keine |
| Replace | MERGE (UPSERT) | Eindeutiger Schlüssel |
| akkumulieren | MERGE (UPSERT) mit Verbundzuweisungs-Operator (+=, -=...) |
Eindeutiger Schlüssel und Akkumulator |
Um die Unterschiede zu veranschaulichen, berücksichtigen Sie, was passiert, wenn Sie die folgenden beiden Datensätze aufnehmen:
| Ankunftszeit | Geräte-ID | Messwert |
|---|---|---|
| 10:00 | A | 1 |
| 10:05 | A | 20 |
Im Anfügemodus fügen Sie zwei Datensätze ein. Die entsprechende T-SQL-Anweisung lautet:
INSERT INTO [target] VALUES (...);
Ergebnis:
| Geändert_Zeit | Device_Id | Messwert |
|---|---|---|
| 10:00 | A | 1 |
| 10:05 | A | 20 |
Im Ersetzungsmodus erhalten Sie nur den letzten Wert pro Schlüssel. Hier verwenden Sie Device_Id als Schlüssel. Die entsprechende T-SQL-Anweisung lautet:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Ergebnis:
| Änderungszeit | Geräte_Schlüssel | Messwert |
|---|---|---|
| 10:05 | A | 20 |
Schließlich summieren Sie im AkkumulationsmodusValue mit einem zusammengesetzten Zuordnungsoperator (+=). Hier verwenden Sie auch Device_Id als Schlüssel:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Ergebnis:
| Änderungszeit | Geräte_Schlüssel | Messwert |
|---|---|---|
| 10:05 | A | 21 |
Aus Leistungsgründen unterstützen die ASA-SQL Datenbankausgabeadapter derzeit nur den Anfügemodus nativ. Diese Adapter nutzen Bulk-Insert, um den Durchsatz zu maximieren und den Gegendruck zu reduzieren.
In diesem Artikel wird die Verwendung von Azure Functions zum Implementieren des Ersetzungs- und des Akkumulationsmodus für ASA veranschaulicht. Wenn Sie eine Funktion als zwischengeschaltete Ebene verwenden, wirkt sich die potenzielle Schreibleistung nicht auf den Streamingauftrag aus. In dieser Hinsicht funktioniert Azure Functions am besten mit Azure SQL. Bei Synapse SQL kann der Wechsel von der Massenausführung zu zeilenbasierten Anweisungen zu größeren Leistungsproblemen führen.
Azure Functions Ausgabe
In dieser Aufgabe ersetzen Sie die ASA SQL-Ausgabe durch die ASA Azure Functions-Ausgabe. Die Funktion implementiert die FUNKTIONEN UPDATE, UPSERT oder MERGE.
Derzeit können Sie mithilfe von zwei Optionen auf eine SQL-Datenbank in einer Funktion zugreifen. Die erste Option ist die Azure SQL Ausgabebindung. Es ist derzeit auf C# beschränkt und bietet nur den Ersetzungsmodus. Die zweite Option besteht darin, eine SQL-Abfrage zu verfassen, die über den entsprechenden SQL-Treiber (Microsoft übermittelt werden soll. Data.SqlClient für .NET).
Beide der folgenden Beispiele gehen davon aus, dass das folgende Tabellenschema verwendet wird. Für die Bindungsoption muss ein Primärschlüssel für die Zieltabelle festgelegt werden. Bei Verwendung eines SQL-Treibers ist dies nicht notwendig, aber empfehlenswert.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
Um eine Funktion als Ausgabe von ASA zu verwenden, muss die Funktion die folgenden Erwartungen erfüllen:
- Azure Stream Analytics erwartet den HTTP-Status 200 aus der Funktionen-App für Batches, die erfolgreich verarbeitet werden.
- Wenn Azure Stream Analytics eine Ausnahme von 413 ("http Request Entity Too Large") von einer Azure-Funktion empfängt, reduziert sie die Größe der Batches, die sie an Azure Function sendet.
- Während der Testverbindung sendet Stream Analytics eine POST-Anforderung mit einem leeren Batch an Azure Functions und erwartet, dass der HTTP-Status 20x wieder den Test überprüft.
Option 1: Aktualisierung nach Schlüssel mit der Azure SQL-Bindungsfunktion
Bei dieser Option wird die Azure-Funktion SQL-Ausgabebindung verwendet. Diese Erweiterung kann ein Objekt in einer Tabelle ersetzen, ohne eine SQL-Anweisung schreiben zu müssen. Zurzeit werden keine Verbundzuweisungsoperatoren (Akkumulationen) unterstützt.
Dieses Beispiel basiert auf:
- Azure Functions Runtime, Version 4
- .NET 6.0
- Microsoft. Azure. WebJobs.Extensions.Sql 0.1.131-preview
Um den Bindungsansatz besser zu verstehen, folgen Sie dieses Lernprogramm.
Erstellen Sie zunächst eine standardmäßige HttpTrigger-Funktions-App gemäß diesem Tutorial. Verwenden Sie die folgenden Informationen:
- Sprache:
C# - Runtime:
.NET 6(unter function/runtime v4) - Vorlage:
HTTP trigger
Installieren Sie die Bindungserweiterung, indem Sie den folgenden Befehl in einem Terminal ausführen, das sich im Projektordner befindet:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
Fügen Sie das SqlConnectionString-Element im Abschnitt Values Ihrer local.settings.json hinzu, und setzen Sie die Verbindungszeichenfolge des Zielservers ein:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Ersetzen Sie die gesamte Funktion (CS-Datei im Projekt) durch den folgenden Codeausschnitt. Aktualisieren Sie den Namespace, den Klassennamen und den Funktionsnamen mit Ihrem eigenen:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Aktualisieren Sie den Namen der Zieltabelle im Bindungsabschnitt:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Aktualisieren Sie die Device-Klasse und den Zuordnungsabschnitt so, dass sie Ihrem eigenen Schema entsprechen:
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
Sie können jetzt die Verkabelung zwischen der lokalen Funktion und der Datenbank durch Debuggen testen (F5 in Visual Studio Code). Die SQL-Datenbank muss von Ihrem Computer aus erreichbar sein. Sie können SSMS verwenden, um die Konnektivität zu überprüfen. Senden Sie dann POST-Anforderungen an den lokalen Endpunkt. Eine Anforderung mit einem leeren Textkörper sollte HTTP 204 zurückgeben. Eine Anforderung mit einer tatsächlichen Nutzlast sollte in der Zieltabelle (im Ersetzungs-/Aktualisierungsmodus) persistiert werden. Hier sehen Sie eine Beispielnutzlast, die dem in diesem Beispiel verwendeten Schema entspricht:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
Die Funktion kann jetzt in Azure veröffentlicht werden. Festlegen einer Anwendungseinstellung für SqlConnectionString. Die Azure SQL Server-Firewall sollte Azure-Diensten eingehende Verbindungen ermöglichen, damit die Livefunktion darauf zugreifen kann.
Anschließend können Sie die Funktion als Ausgabe im ASA-Auftrag definieren und zum Ersetzen von Datensätzen verwenden, anstatt sie einzufügen.
Option 2: Zusammenführen mit Verbundzuweisung (Akkumulation) über eine benutzerdefinierte SQL-Abfrage
Hinweis
Nach dem Neustart und der Wiederherstellung sendet ASA möglicherweise bereits ausgegebene Ausgabeereignisse erneut. Dieses Verhalten kann dazu führen, dass die Akkumulationslogik fehlschlägt (Doppeln einzelner Werte). Um dieses Problem zu verhindern, geben Sie dieselben Daten in einer Tabelle mithilfe der nativen ASA SQL-Ausgabe aus. Sie können diese Steuerelementtabelle verwenden, um Probleme zu erkennen und die Akkumulation bei Bedarf neu zu synchronisieren.
Diese Option verwendet Microsoft.Data.SqlClient. Mit dieser Bibliothek können Sie alle SQL-Abfragen an eine SQL-Datenbank senden.
Dieses Beispiel basiert auf:
Erstellen Sie zunächst eine standardmäßige HttpTrigger-Funktions-App gemäß diesem Tutorial. Die folgenden Informationen werden verwendet:
- Sprache:
C# - Runtime:
.NET 6(unter function/runtime v4) - Vorlage:
HTTP trigger
Installieren Sie die SqlClient-Bibliothek, indem Sie den folgenden Befehl in einem Terminal ausführen, das sich im Projektordner befindet:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
Fügen Sie das SqlConnectionString-Element im Abschnitt Values Ihrer local.settings.json hinzu, und setzen Sie die Verbindungszeichenfolge des Zielservers ein:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Ersetzen Sie die gesamte Funktion (CS-Datei im Projekt) durch den folgenden Codeausschnitt. Aktualisieren Sie den Namespace, den Klassennamen und den Funktionsnamen durch Ihre eigenen Werte:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
Aktualisieren Sie den Abschnitt zur sqltext-Befehlserstellung so, dass er Ihrem eigenen Schema entspricht (beachten Sie, wie Akkumulation durch den +=-Operator bei der Aktualisierung erreicht wird):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
Sie können jetzt die Verkabelung zwischen der lokalen Funktion und der Datenbank durch Debuggen testen (F5 in VS Code). Die SQL-Datenbank muss von Ihrem Computer aus erreichbar sein. Sie können SSMS verwenden, um die Konnektivität zu überprüfen. Senden Sie dann POST-Anforderungen an den lokalen Endpunkt. Eine Anforderung mit einem leeren Textkörper sollte HTTP 204 zurückgeben. Eine Anforderung mit einer tatsächlichen Nutzlast sollte in der Zieltabelle (im Akkumulations-/Zusammenführungsmodus) persistiert werden. Hier sehen Sie eine Beispielnutzlast, die dem in diesem Beispiel verwendeten Schema entspricht:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
Die Funktion kann jetzt in Azure veröffentlicht werden. Eine Anwendungseinstellung sollte für SqlConnectionString festgelegt werden. Die Azure SQL Server-Firewall sollte Azure-Diensten den Zugriff erlauben, damit die Live-Funktion darauf zugreifen kann.
Die Funktion kann dann als Ausgabe im ASA-Auftrag definiert und verwendet werden, um Datensätze zu ersetzen, anstatt sie einzufügen.
Alternativen
Außerhalb Azure Functions können mehrere Methoden das erwartete Ergebnis erzielen. In diesem Abschnitt werden einige dieser Methoden beschrieben.
Nachverarbeitung in der SQL-Zieldatenbank
Eine Hintergrundaufgabe wird ausgeführt, sobald die Daten über die ASA-SQL-Standardausgaben in die Datenbank eingefügt wurden.
Verwenden Sie für Azure SQL INSTEAD OFDML-Trigger, um die befehle INSERT abzufangen, die ASA ausgibt.
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
Bei Synapse SQL kann ASA in eine Staging-Tabelle einfügen. Eine wiederkehrende Aufgabe kann die Daten dann bei Bedarf in eine Zwischentabelle transformieren. Schließlich werden die Daten in die Produktionstabelle verschoben.
Vorverarbeitung in Azure Cosmos DB
Azure Cosmos DB unterstützt UPSERT nativ. Hier ist nur das Anfügen oder Ersetzen möglich. Sie müssen Akkumulationen clientseitig in Azure Cosmos DB verwalten.
Wenn die Anforderungen übereinstimmen, können Sie die SQL-Zieldatenbank durch eine Azure Cosmos DB Instanz ersetzen. Diese Änderung erfordert eine wichtige Änderung der Gesamtlösungsarchitektur.
Für Synapse SQL können Sie Azure Cosmos DB als zwischengeschaltete Ebene über Azure Synapse Link für Azure Cosmos DB verwenden. Verwenden Sie Azure Synapse Link, um einen analytical store zu erstellen. Anschließend können Sie diesen Datenspeicher direkt in Synapse SQL abfragen.
Vergleich der Alternativen
Jeder Ansatz bietet unterschiedliche Wertversprechen und Funktionen:
| Typ | Option | Modus | Azure SQL-Datenbank | Azure Synapse Analytics |
|---|---|---|---|---|
| Nachbearbeitung | ||||
| Auslöser | Ersetzen, Akkumulieren | + | N/V; Trigger stehen in Synapse SQL nicht zur Verfügung | |
| Staging | Ersetzen, Akkumulieren | + | + | |
| Vorverarbeitung | ||||
| Azure-Funktionen | Ersetzen, Akkumulieren | + | – (Zeilenbasierte Leistung) | |
| Azure Cosmos DB-Ersetzung | Ersetzen | – | – | |
| Azure Cosmos DB Azure Synapse Link | Replace | N/A | + |
Unterstützung erhalten
Um weitere Hilfe zu erhalten, probieren Sie die Microsoft Q&A-Frageseite für Azure Stream Analytics aus.
Nächste Schritte
- Grundlegendes zu den Ausgaben von Azure Stream Analytics
- Azure Stream Analytics-Ausgabe an Azure SQL-Datenbank
- Erhöhen Sie die Durchsatzleistung für die Azure SQL-Datenbank mit Azure Stream Analytics.
- Zugreifen auf Azure SQL-Datenbank oder Azure Synapse Analytics mit verwalteten Identitäten aus einem Azure Stream Analytics-Auftrag
- Verwenden von Verweisdaten aus einer SQL-Datenbank für einen Azure Stream Analytics-Auftrag
- Ausführen von Azure Functions in Azure Stream Analytics-Aufträgen – Tutorial für Redis-Ausgabe
- Schnellstart: Erstellen eines Stream Analytics-Auftrags mithilfe des Azure-Portals