Bagikan melalui


Memperbarui atau menggabungkan rekaman dalam Azure SQL Database dengan Azure Functions

Saat ini, Azure Stream Analytics (ASA) hanya mendukung menyisipkan (menambahkan) baris ke output SQL (Azure SQL Databases, dan Azure Synapse Analytics). Artikel ini membahas solusi untuk mengaktifkan PERBARUI, UPSERT, atau GABUNG pada database SQL, dengan Azure Functions sebagai lapisan perantara.

Opsi alternatif untuk Azure Functions disajikan di akhir.

Persyaratan

Menulis data dalam tabel umumnya dapat dilakukan dengan cara berikut:

Mode Pernyataan T-SQL yang setara Persyaratan
Lampirkan INSERT Tidak
Menggantikan GABUNG (UPSERT) Kunci unik
Mengakumulasikan GABUNG (UPSERT) dengan operator penetapan majemuk (+=, -=...) Kunci dan akumulator unik

Untuk mengilustrasikan perbedaannya, lihat apa yang terjadi saat menyerap dua rekaman berikut:

Arrival_Time Device_Id Measure_Value
10.00 A 1
10:05 A 20

Dalam mode tambahkan, kami menyisipkan dua rekaman. Pernyataan T-SQL yang setara adalah:

INSERT INTO [target] VALUES (...);

Menghasilkan:

Modified_Time Device_Id Measure_Value
10.00 A 1
10:05 A 20

Dalam mode ganti, kita hanya mendapatkan nilai terakhir dengan kunci. Di sini kita menggunakan Device_Id sebagai kuncinya. Pernyataan T-SQL yang setara adalah:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Menghasilkan:

Modified_Time Device_Key Measure_Value
10:05 A 20

Akhirnya, dalam mode akumulasi kami menjumlahkan Value dengan operator penugasan majemuk (+=). Di sini juga kita menggunakan Device_Id sebagai kunci:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Menghasilkan:

Modified_Time Device_Key Measure_Value
10:05 A 21

Untuk pertimbangan performa, ASA SQL adaptor output database saat ini hanya mendukung mode penambahan secara asli. Adaptor ini menggunakan sisipan massal untuk memaksimalkan throughput dan membatasi tekanan balik.

Artikel ini menunjukkan cara menggunakan Azure Functions untuk menerapkan mode Ganti dan Kumpulkan untuk ASA. Saat Anda menggunakan fungsi sebagai lapisan perantara, performa penulisan potensial tidak akan memengaruhi pekerjaan streaming. Dalam hal ini, menggunakan Azure Functions berfungsi paling baik dengan Azure SQL. Dengan Synapse SQL, beralih dari pernyataan massal ke baris demi baris mungkin menciptakan masalah performa yang lebih besar.

Output Azure Functions

Dalam pekerjaan kami, kami mengganti output ASA SQL dengan output ASA Azure Functions. Kemampuan UPDATE, UPSERT, atau MERGE diimplementasikan dalam fungsi .

Saat ini ada dua opsi untuk mengakses SQL Database dalam suatu fungsi. Pertama adalah pengikatan output Azure SQL. Saat ini terbatas pada C#, dan hanya menawarkan mode ganti. Kedua adalah menulis kueri SQL yang akan dikirimkan melalui driver SQL yang sesuai (Microsoft.Data.SqlClient for .NET).

Untuk kedua sampel berikut, kami mengasumsikan skema tabel berikut. Opsi pengikatan memerlukan kunci utama untuk ditetapkan pada tabel target. Walau tidak diharuskan, tetapi disarankan, saat menggunakan driver SQL.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

Fungsi harus memenuhi harapan berikut untuk digunakan sebagai output dari ASA:

  • Azure Stream Analytics mengharapkan status HTTP 200 dari aplikasi Functions untuk batch yang berhasil diproses
  • Saat Azure Stream Analytics menerima pengecualian 413 ("http Request Entity Too Large") dari fungsi Azure, itu mengurangi ukuran batch yang dikirim ke Azure Functions
  • Selama koneksi pengujian, Stream Analytics mengirimkan permintaan POST dengan batch kosong ke Azure Functions dan mengharapkan status HTTP 20x kembali untuk memvalidasi pengujian

Opsi 1: Perbarui dengan kunci dengan Pengikatan SQL Azure Functions

Opsi ini menggunakan Pengikatan SQL Azure Functions. Ekstensi ini dapat menggantikan objek dalam tabel, tanpa harus menulis pernyataan SQL. Saat ini, tidak mendukung operator penugasan majemuk (akumulasi).

Sampel ini dibangun di:

Untuk lebih memahami pendekatan pengikatan, disarankan untuk mengikuti tutorial ini.

Pertama, buat aplikasi fungsi HttpTrigger default dengan mengikuti tutorial ini. Informasi berikut digunakan:

  • Bahasa pemrogram: C#
  • Runtime bahasa umum: .NET 6 (di bawah fungsi/runtime bahasa umum v4)
  • Templat: HTTP trigger

Pasang ekstensi pengikatan dengan menjalankan perintah berikut di terminal yang terletak di folder proyek:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

Tambahkan item SqlConnectionString di bagian Values local.settings.json Anda, dengan mengisi string koneksi dari server tujuan:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Ganti seluruh fungsi (file .cs dalam proyek) dengan cuplikan kode berikut. Perbarui namespace layanan, nama kelas, dan nama fungsi Anda sendiri:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Perbarui nama tabel tujuan di bagian pengikatan:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Device Perbarui bagian kelas dan pemetaan agar sesuai dengan skema Anda sendiri:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

Anda sekarang dapat menguji kabel antara fungsi lokal dan database dengan penelusuran kesalahan (F5 di Visual Studio Code). Database SQL harus dapat dijangkau dari mesin Anda. SQL Server Management Studio dapat digunakan untuk memeriksa konektivitas. Kemudian, kirim permintaan POST ke titik akhir lokal. Permintaan dengan badan kosong harus mengembalikan http 204. Permintaan dengan muatan aktual harus dipertahankan di tabel tujuan (dalam mode ganti/perbarui). Berikut adalah payload sampel yang sesuai dengan skema yang digunakan dalam sampel ini:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

Fungsi tersebut sekarang dapat diterbitkan ke Azure. Pengaturan aplikasi harus diatur untuk SqlConnectionString. Firewall Azure SQL Server harus memungkinkan layanan Azure masuk untuk fungsi langsung untuk mencapainya.

Fungsi ini kemudian dapat didefinisikan sebagai output dalam pekerjaan ASA, dan digunakan untuk mengganti catatan alih-alih memasukkannya.

Opsi 2: Gabungkan dengan penetapan majemuk (akumulasi) melalui kueri SQL kustom

Catatan

Setelah menghidupkan ulang dan pemulihan, ASA dapat mengirim kembali peristiwa output yang sudah dipancarkan. Ini adalah perilaku yang diharapkan yang dapat menyebabkan logika akumulasi gagal (menggandakan nilai individu). Untuk mencegah hal ini, disarankan untuk mengeluarkan data yang sama dalam tabel melalui Output ASA SQL asli. Tabel kontrol ini kemudian dapat digunakan untuk mendeteksi masalah dan menyinkronkan kembali akumulasi bila diperlukan.

Opsi ini menggunakan Microsoft.Data.SqlClient. Pustaka ini memungkinkan kami mengeluarkan kueri SQL apa pun ke SQL Database.

Sampel ini dibangun di:

Pertama, buat aplikasi fungsi HttpTrigger default dengan mengikuti tutorial ini. Informasi berikut digunakan:

  • Bahasa pemrogram: C#
  • Runtime bahasa umum: .NET 6 (di bawah fungsi/runtime bahasa umum v4)
  • Templat: HTTP trigger

Pasang pustaka klien Redis dengan menjalankan perintah berikut di terminal yang terletak di folder proyek:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

Tambahkan item SqlConnectionString di bagian Values local.settings.json Anda, dengan mengisi string koneksi dari server tujuan:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Ganti seluruh fungsi (file .cs dalam proyek) dengan cuplikan kode berikut. Perbarui namespace layanan, nama kelas, dan nama fungsi Anda sendiri:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

Perbarui bagian pembuatan perintah sqltext agar sesuai dengan skema Anda sendiri (perhatikan bagaimana akumulasi dicapai melalui operator += saat pembaruan):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

Anda sekarang dapat menguji kabel antara fungsi lokal dan database dengan debugging (F5 di VS Code). Database SQL harus dapat dijangkau dari mesin Anda. SQL Server Management Studio dapat digunakan untuk memeriksa konektivitas. Kemudian, terbitkan permintaan POST ke titik akhir lokal. Permintaan dengan badan kosong harus mengembalikan http 204. Permintaan dengan muatan aktual harus dipertahankan di tabel tujuan (dalam mode akumulasi/gabung). Berikut adalah payload sampel yang sesuai dengan skema yang digunakan dalam sampel ini:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

Fungsi tersebut sekarang dapat diterbitkan ke Azure. Pengaturan aplikasi harus diatur untuk SqlConnectionString. Firewall Azure SQL Server harus memungkinkan layanan Azure masuk untuk fungsi langsung untuk mencapainya.

Fungsi ini kemudian dapat didefinisikan sebagai output dalam pekerjaan ASA, dan digunakan untuk mengganti catatan alih-alih memasukkannya.

Alternatif

Di luar Azure Functions, ada beberapa cara untuk mencapai hasil yang diharapkan. Bagian ini menyediakan beberapa dari mereka.

Pasca-pemrosesan di SQL Database target

Tugas latar belakang beroperasi setelah data dimasukkan ke dalam database melalui output ASA SQL standar.

Untuk Azure SQL, INSTEAD OF pemicu DML dapat digunakan untuk mencegat perintah INSERT yang dikeluarkan oleh ASA:

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

Untuk Synapse SQL, ASA dapat dimasukkan ke dalam tabel penahapan. Tugas berulang kemudian dapat mengubah data sesuai kebutuhan menjadi tabel perantara. Akhirnya data dipindahkan ke tabel produksi.

Pra-pemrosesan di Azure Cosmos DB

Azure Cosmos DB mendukung UPSERT secara asli. Di sini hanya memungkinkan tambah/ganti. Akumulasi harus dikelola sisi klien di Azure Cosmos DB.

Jika persyaratan cocok, opsinya adalah mengganti target SQL database dengan instans Azure Cosmos DB. Melakukan hal itu membutuhkan perubahan penting dalam arsitektur solusi secara keseluruhan.

Untuk Synapse SQL, Azure Cosmos DB dapat digunakan sebagai lapisan perantara melalui Azure Synapse Link untuk Azure Cosmos DB. Azure Synapse Link dapat digunakan untuk membuat penyimpanan analitik. Penyimpanan data ini kemudian dapat ditanyakan langsung di Synapse SQL.

Perbandingan alternatif

Setiap pendekatan menawarkan proposisi nilai dan kemampuan yang berbeda:

Jenis Opsi Mode Azure SQL Database Azure Synapse Analytics
Pasca-Pemrosesan
Pemicu Ganti, Akumulasikan + N/A, pemicu tidak tersedia di Synapse SQL
Staging Ganti, Akumulasikan + +
Pra-Pemrosesan
Azure Functions Ganti, Akumulasikan + - (row-by-row performance)
Penggantian Azure Cosmos DB Menggantikan T/A T/A
Azure Cosmos DB Azure Synapse Link Menggantikan T/A +

Dapatkan dukungan

Untuk bantuan lebih lanjut, coba halaman pertanyaan Microsoft Q&A untuk Azure Stream Analytics.

Langkah berikutnya