Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Saat ini, Azure Stream Analytics (ASA) hanya mendukung menyisipkan (menambahkan) baris ke output SQL (Azure SQL Database dan Azure Synapse Analytics). Artikel ini membahas solusi untuk mengaktifkan UPDATE, UPSERT, atau MERGE pada database SQL, dengan menggunakan Azure Functions sebagai lapisan perantara.
Opsi alternatif untuk Azure Functions disajikan di akhir.
Persyaratan
Anda bisa menulis data ke tabel dengan menggunakan salah satu mode berikut:
| Mode | Pernyataan T-SQL yang setara | Persyaratan |
|---|---|---|
| Lampirkan | INSERT | Tidak |
| Menggantikan | MENGGABUNGKAN (UPSERT) | Kunci unik |
| Mengakumulasikan | MERGE (UPSERT) dengan operator penetapan majemuk (operator) (+=, -=...) |
Kunci dan akumulator unik |
Untuk mengilustrasikan perbedaan, pertimbangkan apa yang terjadi saat menyerap dua rekaman berikut:
| Waktu Kedatangan | Device_Id | Measure_Value |
|---|---|---|
| 10.00 | A | 1 |
| 10:05 | A | 20 |
Dalam mode tambahkan , Anda menyisipkan dua rekaman. Pernyataan T-SQL yang setara adalah:
INSERT INTO [target] VALUES (...);
Menghasilkan:
| Waktu Modifikasi | Device_Id | Measure_Value |
|---|---|---|
| 10.00 | A | 1 |
| 10:05 | A | 20 |
Dalam mode ganti, Anda hanya mendapatkan nilai terakhir berdasarkan kunci. Di sini Anda menggunakan Device_Id sebagai kunci. Pernyataan T-SQL yang setara adalah:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Menghasilkan:
| Waktu Modifikasi | Kunci_Perangkat | Measure_Value |
|---|---|---|
| 10:05 | A | 20 |
Terakhir, dalam mode akumulasi Anda menjumlahkan Value dengan operator penugasan majemuk (+=). Di sini juga Anda menggunakan Device_Id sebagai kunci:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Menghasilkan:
| Waktu Modifikasi | Kunci_Perangkat | Measure_Value |
|---|---|---|
| 10:05 | A | 21 |
Untuk pertimbangan performa, ASA SQL adaptor output database saat ini hanya mendukung mode penambahan secara asli. Adaptor ini menggunakan sisipan massal untuk memaksimalkan throughput dan membatasi tekanan balik.
Artikel ini menunjukkan cara menggunakan Azure Functions untuk menerapkan mode Ganti dan Kumpulkan untuk ASA. Saat Anda menggunakan fungsi sebagai lapisan perantara, performa penulisan potensial tidak memengaruhi pekerjaan streaming. Dalam hal ini, menggunakan Azure Functions berfungsi paling baik dengan Azure SQL. Dengan Synapse SQL, beralih dari pernyataan massal ke baris demi baris mungkin menciptakan masalah performa yang lebih besar.
Keluaran Azure Functions
Dalam pekerjaan ini, Anda mengganti output ASA SQL dengan output ASA Azure Functions. Fungsi ini mengimplementasikan kemampuan UPDATE, UPSERT, atau MERGE.
Saat ini, Anda dapat mengakses SQL Database dalam fungsi dengan menggunakan dua opsi. Opsi pertama adalah pengikatan output Azure SQL. Saat ini terbatas pada C#, dan hanya menawarkan mode ganti. Opsi kedua adalah membuat kueri SQL untuk dikirimkan melalui driver SQL yang sesuai (Microsoft. Data.SqlClient untuk .NET).
Kedua sampel berikut mengasumsikan skema tabel berikut. Opsi pengikatan memerlukan kunci utama untuk ditetapkan pada tabel target. Walau tidak diharuskan, tetapi disarankan, saat menggunakan driver SQL.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
Untuk menggunakan fungsi sebagai output dari ASA, fungsi harus memenuhi harapan berikut:
- Azure Stream Analytics mengharapkan status HTTP 200 dari aplikasi Functions untuk batch yang berhasil diproses.
- Ketika Azure Stream Analytics menerima pengecualian 413 ("Http Request Entity Too Large") dari fungsi Azure, ini mengurangi ukuran batch yang dikirimnya ke Fungsi Azure.
- Selama koneksi pengujian, Stream Analytics mengirim permintaan POST dengan batch kosong ke Azure Functions dan mengharapkan status HTTP 20x kembali untuk memvalidasi pengujian.
Opsi 1: Perbarui dengan kunci menggunakan Pengikatan SQL Azure Function
Opsi ini menggunakan Pengikatan SQL Output Azure Functions. Ekstensi ini dapat mengganti objek dalam tabel tanpa Anda harus menulis pernyataan SQL. Saat ini, tidak mendukung operator penggabungan majemuk (akumulasi).
Sampel ini dibangun di:
- Runtime Azure Functions versi 4
- .NET 6.0
- Microsoft. Azure. WebJobs.Extensions.Sql 0.1.131-preview
Untuk lebih memahami pendekatan pengikatan, ikuti tutorial ini.
Pertama, buat aplikasi fungsi HttpTrigger default dengan mengikuti tutorial ini. Gunakan informasi berikut:
- Bahasa:
C# - Runtime:
.NET 6(di bawah fungsi/runtime v4) - Templat:
HTTP trigger
Pasang ekstensi pengikatan dengan menjalankan perintah berikut di terminal yang terletak di folder proyek:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
Tambahkan item SqlConnectionString di bagian Valueslocal.settings.json Anda, dengan mengisi string koneksi dari server tujuan:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Ganti seluruh fungsi (file .cs dalam proyek) dengan cuplikan kode berikut. Perbarui namespace, nama kelas, dan nama fungsi dengan nama Anda sendiri:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Perbarui nama tabel tujuan di bagian pengikatan:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Device Perbarui bagian kelas dan pemetaan agar sesuai dengan skema Anda sendiri:
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
Anda sekarang dapat menguji koneksi antara fungsi lokal dan database dengan menggunakan debugging (F5 di Visual Studio Code). Database SQL harus dapat dijangkau dari mesin Anda. Anda dapat menggunakan SSMS untuk memeriksa konektivitas. Kemudian, kirim permintaan POST ke titik akhir lokal. Permintaan dengan isi kosong harus mengembalikan HTTP 204. Permintaan dengan muatan aktual harus dipertahankan di tabel tujuan (dalam mode ganti/perbarui). Berikut adalah payload sampel yang sesuai dengan skema yang digunakan dalam sampel ini:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
Fungsi tersebut sekarang dapat diterbitkan ke Azure. Atur pengaturan aplikasi untuk SqlConnectionString. Firewall Azure SQL Server harus memungkinkan layanan Azure masuk agar fungsi live dapat mencapainya.
Anda kemudian dapat menentukan fungsi sebagai output dalam pekerjaan ASA, dan menggunakannya untuk mengganti rekaman alih-alih menyisipkannya.
Opsi 2: Gabungkan dengan penetapan majemuk (akumulasi) melalui kueri SQL kustom
Catatan
Setelah mulai ulang dan pemulihan, ASA mungkin mengirim ulang peristiwa output yang sudah dipancarkannya. Perilaku ini dapat menyebabkan logika akumulasi gagal (menggandakan nilai individual). Untuk mencegah masalah ini, keluarkan data yang sama dalam tabel dengan menggunakan Output SQL ASA asli. Anda dapat menggunakan tabel kontrol ini untuk mendeteksi masalah dan menyinkronkan ulang akumulasi bila perlu.
Opsi ini menggunakan Microsoft.Data.SqlClient. Pustaka ini memungkinkan Anda mengirim kueri SQL apa pun ke SQL Database.
Sampel ini dibangun di:
Pertama, buat aplikasi fungsi HttpTrigger default dengan mengikuti tutorial ini. Informasi berikut digunakan:
- Bahasa:
C# - Runtime:
.NET 6(di bawah fungsi/runtime v4) - Templat:
HTTP trigger
Pasang pustaka SqlClient dengan menjalankan perintah berikut di terminal yang terletak di folder proyek:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
Tambahkan item SqlConnectionString di bagian Valueslocal.settings.json Anda, dengan mengisi string koneksi dari server tujuan:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Ganti seluruh fungsi (file .cs dalam proyek) dengan cuplikan kode berikut. Perbarui namespace layanan, nama kelas, dan nama fungsi Anda sendiri:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
Perbarui bagian pembuatan perintah sqltext agar sesuai dengan skema Anda sendiri (perhatikan bagaimana akumulasi dicapai melalui operator += saat pembaruan):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
Anda sekarang dapat menguji kabel antara fungsi lokal dan database dengan debugging (F5 di VS Code). Database SQL harus dapat dijangkau dari mesin Anda. Anda dapat menggunakan SSMS untuk memeriksa konektivitas. Kemudian, kirim permintaan POST ke titik akhir lokal. Permintaan dengan isi kosong harus mengembalikan HTTP 204. Permintaan dengan muatan aktual harus dipertahankan di tabel tujuan (dalam mode akumulasi/gabung). Berikut adalah payload sampel yang sesuai dengan skema yang digunakan dalam sampel ini:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
Fungsi tersebut sekarang dapat diterbitkan ke Azure.
Pengaturan aplikasi harus diatur untuk SqlConnectionString. Firewall Azure SQL Server harus memungkinkan layanan Azure masuk agar fungsi live dapat terhubung.
Fungsi ini kemudian dapat didefinisikan sebagai output dalam pekerjaan ASA, dan digunakan untuk mengganti catatan alih-alih memasukkannya.
Alternatif
Di luar Azure Functions, beberapa metode dapat mencapai hasil yang diharapkan. Bagian ini menjelaskan beberapa metode ini.
Pemrosesan pasca di basis data SQL target
Tugas latar belakang beroperasi setelah data dimasukkan ke dalam database melalui output ASA SQL standar.
Untuk Azure SQL, gunakan pemicu INSTEAD OFDML untuk mencegat perintah INSERT yang dikeluarkan ASA.
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
Untuk Synapse SQL, ASA dapat dimasukkan ke dalam tabel penahapan. Tugas berulang kemudian dapat mengubah data sesuai kebutuhan menjadi tabel perantara. Akhirnya, data dipindahkan ke tabel produksi.
Pra-pemrosesan di Azure Cosmos DB
Azure Cosmos DB mendukung UPSERT secara asli. Di sini, hanya menambahkan atau mengganti yang dapat dilakukan. Anda harus mengelola akumulasi sisi klien dalam Azure Cosmos DB.
Jika persyaratan cocok, Anda dapat mengganti database SQL target dengan instans Azure Cosmos DB. Perubahan ini membutuhkan perubahan penting dalam arsitektur solusi keseluruhan.
Untuk Synapse SQL, Anda dapat menggunakan Azure Cosmos DB sebagai lapisan perantara melalui Azure Synapse Link untuk Azure Cosmos DB. Gunakan Azure Synapse Link untuk membuat penyimpanan analitis. Anda kemudian dapat mengkueri penyimpanan data ini langsung di Synapse SQL.
Perbandingan alternatif
Setiap pendekatan menawarkan proposisi dan kemampuan nilai yang berbeda:
| Jenis | Opsi | Mode | Azure SQL Database | Azure Synapse Analytics |
|---|---|---|---|---|
| Pemrosesan Pasca | ||||
| Pemicu | Ganti, Akumulasikan | + | N/A, pemicu tidak tersedia di Synapse SQL | |
| Tahap Pengujian | Ganti, Kumpulkan | + | + | |
| Pra-Pemrosesan | ||||
| Azure Functions | Ganti, Kumpulkan | + | - (kinerja baris demi baris) | |
| Penggantian Azure Cosmos DB | Menggantikan | T/A | T/A | |
| Azure Cosmos DB Azure Synapse Link | Menggantikan | T/A | + |
Dapatkan dukungan
Untuk bantuan lebih lanjut, coba halaman pertanyaan Tanya Jawab Microsoft untuk Azure Stream Analytics.
Langkah berikutnya
- Pahami output dari Azure Stream Analytics
- Hasil dari Azure Stream Analytics ke Azure SQL Database
- Meningkatkan performa throughput ke Azure SQL Database dari Azure Stream Analytics
- Menggunakan identitas terkelola untuk mengakses Azure SQL Database atau Azure Synapse Analytics dari pekerjaan Azure Stream Analytics
- Menggunakan data referensi dari SQL Database untuk pekerjaan Azure Stream Analytics
- Menjalankan Fungsi Azure dalam tugas Azure Stream Analytics - Tutorial output Redis
- Mulai Cepat: Membuat pekerjaan Azure Stream Analytics dengan menggunakan portal Microsoft Azure