Aracılığıyla paylaş


Azure SQL Veritabanı'daki kayıtları Azure İşlevleri ile güncelleştirme veya birleştirme

Şu anda Azure Stream Analytics (ASA), SQL çıkışlarına (Azure SQL Veritabanı s ve Azure Synapse Analytics) yalnızca satır eklemeyi (eklemeyi) destekler. Bu makalede, SQL veritabanlarında UPDATE, UPSERT veya MERGE'i etkinleştirmeye yönelik geçici çözümler ve ara katman olarak Azure İşlevleri ele alınmaktadır.

Azure İşlevleri alternatif seçenekleri sonunda sunulur.

Gereksinim

Tabloya veri yazma işlemi genellikle aşağıdaki şekilde yapılabilir:

Mod Eşdeğer T-SQL deyimi Gereksinimler
Arkasına Ekle INSERT Hiçbiri
Replace MERGE (UPSERT) Benzersiz anahtar
Yığmak Bileşik atama işleci (, -=...) ile MERGE (+=UPSERT) Benzersiz anahtar ve biriktirici

Farklılıkları göstermek için aşağıdaki iki kaydı alırken neler olacağına bakın:

Arrival_Time Device_Id Measure_Value
10:00 A 1
10:05 A 20

Ekleme modunda iki kayıt ekleriz. Eşdeğer T-SQL deyimi şöyledir:

INSERT INTO [target] VALUES (...);

Sonuç olarak:

Modified_Time Device_Id Measure_Value
10:00 A 1
10:05 A 20

Değiştirme modunda yalnızca anahtara göre son değeri elde ederiz. Burada anahtar olarak Device_Id kullanıyoruz . Eşdeğer T-SQL deyimi şöyledir:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Sonuç olarak:

Modified_Time Device_Key Measure_Value
10:05 A 20

Son olarak, birikmiş modda bileşik atama işleci (+=) ile toplama Value yapıyoruz. Burada anahtar olarak da Device_Id kullanırız:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Sonuç olarak:

Modified_Time Device_Key Measure_Value
10:05 A 21

Performansla ilgili dikkat edilmesi gerekenler için, ASA SQL veritabanı çıkış bağdaştırıcıları şu anda yalnızca yerel olarak ekleme modunu destekler. Bu bağdaştırıcılar, aktarım hızını en üst düzeye çıkarmak ve geri basıncı sınırlamak için toplu ekleme kullanır.

Bu makalede, ASA için Değiştir ve Biriktir modlarını uygulamak için Azure İşlevleri nasıl kullanılacağı gösterilmektedir. Bir işlevi aracı katman olarak kullandığınızda, olası yazma performansı akış işini etkilemez. Bu bağlamda, Azure İşlevleri kullanmak Azure SQL ile en iyi şekilde çalışır. Synapse SQL ile toplu deyimlerden satır satır deyimlerine geçiş yapmak daha büyük performans sorunlarına neden olabilir.

çıktıyı Azure İşlevleri

İşimizde ASA SQL çıkışını ASA Azure İşlevleri çıkışıyla değiştiriyoruz. UPDATE, UPSERT veya MERGE özellikleri işlevinde uygulanır.

Şu anda bir işlevdeki SQL Veritabanı erişmek için iki seçenek vardır. İlk olarak Azure SQL çıkış bağlaması bulunur. Şu anda C# ile sınırlıdır ve yalnızca değiştirme modu sunar. İkincisi, uygun SQL sürücüsü (.NET için Microsoft.Data.SqlClient ) aracılığıyla gönderilecek bir SQL sorgusu oluşturmaktır.

Aşağıdaki örneklerin her ikisi için de aşağıdaki tablo şemasını varsayıyoruz. Bağlama seçeneği, hedef tabloda birincil anahtarın ayarlanmasını gerektirir. SQL sürücüsü kullanılırken gerekli değildir, ancak önerilir.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

Bir işlevin ASA çıkışı olarak kullanılması için aşağıdaki beklentileri karşılaması gerekir:

  • Azure Stream Analytics, başarıyla işlenen toplu işlemler için İşlevler uygulamasından HTTP durumu 200'ü bekler
  • Azure Stream Analytics bir Azure işlevinden 413 ("http İstek Varlığı Çok Büyük") özel durumu aldığında, Azure İşlevi'ne gönderdiği toplu işlemlerin boyutunu küçültür
  • Test bağlantısı sırasında Stream Analytics, Azure İşlevleri için boş bir toplu iş içeren bir POST isteği gönderir ve testi doğrulamak için HTTP durumunun 20 kat geri gelmesini bekler

1. Seçenek: Azure İşlevi SQL Bağlaması ile anahtara göre güncelleştirme

Bu seçenek Azure İşlevi SQL Çıkış Bağlamasını kullanır. Bu uzantı, SQL deyimi yazmak zorunda kalmadan tablodaki bir nesnenin yerini alabilir. Şu anda bileşik atama işleçlerini (birikmesi) desteklemez.

Bu örnek aşağıdakiler üzerine kurulmuştur:

Bağlama yaklaşımını daha iyi anlamak için bu öğreticiyi izlemenizi öneririz.

İlk olarak, bu öğreticiyi izleyerek varsayılan bir HttpTrigger işlev uygulaması oluşturun. Aşağıdaki bilgiler kullanılır:

  • Dil: C#
  • Çalışma zamanı: .NET 6 (function/runtime v4 altında)
  • Şablon: HTTP trigger

Proje klasöründe bulunan bir terminalde aşağıdaki komutu çalıştırarak bağlama uzantısını yükleyin:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

SqlConnectionString öğesini hedef local.settings.jsonsunucunun Values bağlantı dizesi doldurarak öğesini ekleyin:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

İşlevin tamamını (projedeki .cs dosyası) aşağıdaki kod parçacığıyla değiştirin. Ad alanını, sınıf adını ve işlev adını kendiniz güncelleştirin:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Bağlama bölümünde hedef tablo adını güncelleştirin:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Device Sınıfı ve eşleme bölümünü kendi şemanızla eşleşecek şekilde güncelleştirin:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

Artık hata ayıklama (Visual Studio Code'da F5) ile yerel işlev ile veritabanı arasındaki kablolama işlemini test edebilirsiniz. SQL veritabanına makinenizden erişilebilir olması gerekir. SSMS , bağlantıyı denetlemek için kullanılabilir. Ardından Postman gibi bir araç, yerel uç noktaya POST istekleri göndermek için kullanılabilir. Boş bir gövdeye sahip istek http 204 döndürmelidir. Gerçek yükü olan bir istek hedef tabloda kalıcı olmalıdır (değiştirme/güncelleştirme modunda). Aşağıda, bu örnekte kullanılan şemaya karşılık gelen bir örnek yük verilmişti:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

İşlev artık Azure'da yayımlanabilir. için SqlConnectionStringbir uygulama ayarı ayarlanmalıdır. Azure SQL Server güvenlik duvarı, canlı işlevin erişmesi için azure hizmetlerine izin vermelidir.

İşlev daha sonra ASA işinde bir çıkış olarak tanımlanabilir ve bunları eklemek yerine kayıtları değiştirmek için kullanılabilir.

2. Seçenek: Özel sql sorgusu aracılığıyla bileşik atamayla birleştirme (birikme)

Not

Yeniden başlatma ve kurtarma sonrasında, ASA zaten yayılan çıkış olaylarını yeniden gönderebilir. Bu, birikme mantığının başarısız olmasına (tek tek değerleri ikiye katlama) neden olabilecek beklenen bir davranıştır. Bunu önlemek için, yerel ASA SQL Çıkışı aracılığıyla bir tabloda aynı verilerin çıkışının kullanılması önerilir. Bu denetim tablosu daha sonra sorunları algılamak ve gerektiğinde birikimi yeniden eşitlemek için kullanılabilir.

Bu seçenek Microsoft.Data.SqlClient kullanır. Bu kitaplık, bir SQL Veritabanı tüm SQL sorgularını vermemizi sağlar.

Bu örnek aşağıdakiler üzerine kurulmuştur:

İlk olarak, bu öğreticiyi izleyerek varsayılan bir HttpTrigger işlev uygulaması oluşturun. Aşağıdaki bilgiler kullanılır:

  • Dil: C#
  • Çalışma zamanı: .NET 6 (function/runtime v4 altında)
  • Şablon: HTTP trigger

Proje klasöründe bulunan bir terminalde aşağıdaki komutu çalıştırarak SqlClient kitaplığını yükleyin:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

SqlConnectionString öğesini hedef local.settings.jsonsunucunun Values bağlantı dizesi doldurarak öğesini ekleyin:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

İşlevin tamamını (projedeki .cs dosyası) aşağıdaki kod parçacığıyla değiştirin. Ad alanını, sınıf adını ve işlev adını kendiniz güncelleştirin:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

sqltext Komut oluşturma bölümünü kendi şemanızla eşleşecek şekilde güncelleştirin (güncelleştirmede işleç aracılığıyla birikmeye nasıl ulaşıldığına += dikkat edin):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

Artık hata ayıklayarak (VS Code'da F5) yerel işlev ile veritabanı arasındaki kablolama test edebilirsiniz. SQL veritabanına makinenizden erişilebilir olması gerekir. SSMS , bağlantıyı denetlemek için kullanılabilir. Ardından Postman gibi bir araç, yerel uç noktaya POST istekleri göndermek için kullanılabilir. Boş bir gövdeye sahip istek http 204 döndürmelidir. Gerçek yükü olan bir istek hedef tabloda kalıcı olmalıdır (birikme/birleştirme modunda). Aşağıda, bu örnekte kullanılan şemaya karşılık gelen bir örnek yük verilmişti:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

İşlev artık Azure'da yayımlanabilir. için SqlConnectionStringbir uygulama ayarı ayarlanmalıdır. Azure SQL Server güvenlik duvarı, canlı işlevin erişmesi için azure hizmetlerine izin vermelidir.

İşlev daha sonra ASA işinde bir çıkış olarak tanımlanabilir ve bunları eklemek yerine kayıtları değiştirmek için kullanılabilir.

Alternatifler

Azure İşlevleri dışında, beklenen sonucu elde etmenin birden çok yolu vardır. Bu bölümde bunlardan bazıları sağlanır.

Hedef SQL Veritabanı işlem sonrası

Arka plan görevi, veriler standart ASA SQL çıkışları aracılığıyla veritabanına eklendikten sonra çalışır.

Azure SQL için DML tetikleyicileri, INSTEAD OF ASA tarafından verilen INSERT komutlarını kesmek için kullanılabilir:

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

Synapse SQL için, ASA bir hazırlama tablosuna ekleyebilir. Yinelenen bir görev daha sonra gerektiğinde verileri bir aracı tabloya dönüştürebilir. Son olarak veriler üretim tablosuna taşınır.

Azure Cosmos DB'de ön işleme

Azure Cosmos DB , UPSERT'i yerel olarak destekler. Burada yalnızca ekleme/değiştirme mümkündür. Birikmeli işlemlerin Azure Cosmos DB'de istemci tarafında yönetilmesi gerekir.

Gereksinimler eşleşiyorsa, hedef SQL veritabanını bir Azure Cosmos DB örneğiyle değiştirmek bir seçenektir. Bunu yapmak için genel çözüm mimarisinde önemli bir değişiklik yapılması gerekir.

Synapse SQL için Azure Cosmos DB, Azure Cosmos DB için Azure Synapse Link aracılığıyla aracı katman olarak kullanılabilir. Analiz deposu oluşturmak için Azure Synapse Link kullanılabilir. Bu veri deposu daha sonra doğrudan Synapse SQL'de sorgulanabilir.

Alternatiflerin karşılaştırması

Her yaklaşım farklı değer teklifi ve özellikleri sunar:

Tür Seçenek Modu Azure SQL Veritabanı Azure Synapse Analytics
İşlem Sonrası
Tetikleyiciler Değiştir, Biriktir + Yok, tetikleyiciler Synapse SQL'de kullanılamaz
Hazırlama Değiştir, Biriktir + +
Ön İşleme
Azure İşlevleri Değiştir, Biriktir + - (satır satır performans)
Azure Cosmos DB'nin değiştirilmesi Replace Yok Yok
Azure Cosmos DB Azure Synapse Link Replace Yok +

Destek alın

Daha fazla yardım için Azure Stream Analytics için Microsoft Soru-Cevap soru sayfamızı deneyin.

Sonraki adımlar