Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Şu anda Azure Stream Analytics (ASA) yalnızca SQL çıkışlarına satır eklemeyi (ekleme) destekler (Azure SQL Veritabanları ve Azure Synapse Analytics). Bu makalede, aracı katman olarak Azure İşlevleri kullanarak SQL veritabanlarında UPDATE, UPSERT veya MERGE etkinleştirmeye yönelik geçici çözümler açıklanır.
Azure İşlevleri alternatif seçenekleri sonunda sunulur.
Gereksinim
Aşağıdaki modlardan birini kullanarak tabloya veri yazabilirsiniz:
| Mod | Eşdeğer T-SQL deyimi | Gereksinimler |
|---|---|---|
| Ekle | INSERT | Hiçbiri |
| Değiştir | MERGE (UPSERT) | Benzersiz anahtar |
| Biriktirmek | MERGE (UPSERT) bileşik atama işleci ile (+=, -=...) |
Benzersiz anahtar ve biriktirici |
Farklılıkları göstermek için aşağıdaki iki kaydı alırken ne olacağını göz önünde bulundurun:
| Varış_Zamanı | Device_Id | Ölçüm_Değeri |
|---|---|---|
| 10:00 | A | 1 |
| 10:05 | A | 20 |
Ekleme modunda iki kayıt eklersiniz. Eşdeğer T-SQL deyimi şöyledir:
INSERT INTO [target] VALUES (...);
Sonuç olarak:
| Değiştirilme_Zamanı | Cihaz_Id | Ölçüm_Değeri |
|---|---|---|
| 10:00 | A | 1 |
| 10:05 | A | 20 |
Değiştirme modunda yalnızca anahtara göre son değeri alırsınız. Burada anahtar olarak Device_Id kullanırsınız. Eşdeğer T-SQL deyimi şöyledir:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Sonuç olarak:
| Değiştirilmiş_Zaman | Cihaz_Anahtarı | Ölçüm_Değeri |
|---|---|---|
| 10:05 | A | 20 |
Son olarak, biriktirme modunda, bileşik atama işleci += ile toplamanız Value gerekir. Burada anahtar olarak da Device_Id kullanırsınız:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Sonuç olarak:
| Değiştirilmiş_Zaman | Cihaz_Anahtarı | Ölçüm_Değeri |
|---|---|---|
| 10:05 | A | 21 |
Performansla ilgili dikkat edilmesi gerekenler için, ASA SQL veritabanı çıkış bağdaştırıcıları şu anda yalnızca yerel olarak ekleme modunu destekler. Bu bağdaştırıcılar, aktarım hızını en üst düzeye çıkarmak ve geri basıncı sınırlamak için toplu ekleme kullanır.
Bu makalede, ASA için Değiştir ve Biriktir modlarını uygulamak için Azure İşlevleri nasıl kullanılacağı gösterilmektedir. Bir işlevi aracı katman olarak kullandığınızda, olası yazma performansı akış işini etkilemez. Bu bağlamda, Azure İşlevleri kullanmak Azure SQL ile en iyi şekilde çalışır. Synapse SQL ile toplu deyimlerden satır satır deyimlerine geçiş yapmak daha büyük performans sorunlarına neden olabilir.
Azure İşlevleri çıktısı
Bu işte, ASA SQL çıkışını ASA Azure İşlevleri çıkışı ile değiştirirsiniz. işlevi UPDATE, UPSERT veya MERGE özelliklerini uygular.
Şu anda, iki seçeneği kullanarak bir işlevdeki SQL Veritabanına erişebilirsiniz. İlk seçenek Azure SQL çıkış bağlaması'dır. Şu anda C# ile sınırlıdır ve yalnızca değiştirme modu sunar. İkinci seçenek, uygun SQL driver (örneğin, .NET için Microsoft.Data.SqlClient) aracılığıyla göndermek üzere bir SQL sorgusu oluşturmaktır.
Aşağıdaki örneklerin her ikisi de aşağıdaki tablo şemasını varsayar. Bağlama seçeneği, hedef tabloda birincil anahtarın ayarlanmasını gerektirir. SQL sürücüsü kullanılırken gerekli değildir, ancak önerilir.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
ASA'dan çıktı olarak işlev kullanmak için işlevin aşağıdaki beklentileri karşılaması gerekir:
- Azure Stream Analytics, başarıyla işlediği toplu işler için İşlevler uygulamasından HTTP durumu 200 bekler.
- Azure Stream Analytics bir Azure işlevinden 413 ("http İstek Varlığı Çok Büyük") özel durumu aldığında, Azure İşlevine gönderdiği toplu işlemlerin boyutunu küçültür.
- Test bağlantısı sırasında Stream Analytics, Azure İşlevleri için boş bir toplu iş içeren bir POST isteği gönderir ve testi doğrulamak için HTTP durumunun 20 kat geri gelmesini bekler.
1. Seçenek: Azure İşlevi SQL Bağlaması ile anahtara göre güncelleştirme
Bu seçenek Azure İşlevi SQL Çıkış Bağlamasını kullanır. Bu uzantı, sql deyimi yazmak zorunda kalmadan tablodaki bir nesnenin yerini alabilir. Şu anda bileşik atama işleçlerini (birikmesi) desteklemez.
Bu örnek aşağıdakiler üzerine kurulmuştur:
- Azure İşlevleri çalışma zamanı sürüm 4
- .NET 6.0
- Microsoft. Azure. WebJobs.Extensions.Sql 0.1.131-preview
Bağlama yaklaşımını daha iyi anlamak için bu öğreticiyi izleyin.
İlk olarak, bu öğreticiyi izleyerek varsayılan bir HttpTrigger işlev uygulaması oluşturun. Aşağıdaki bilgileri kullanın:
- Dil:
C# - Çalışma zamanı:
.NET 6(function/runtime v4 altında) - Şablon:
HTTP trigger
Proje klasöründe bulunan bir terminalde aşağıdaki komutu çalıştırarak bağlama uzantısını yükleyin:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
SqlConnectionString öğesini, hedef sunucunun bağlantı dizesini doldurarak Values bölümünde yer alan local.settings.json'e ekleyin.
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
İşlevin tamamını (projedeki .cs dosyası) aşağıdaki kod parçacığıyla değiştirin. Ad alanını, sınıf adını ve işlev adını kendi ad alanınızla güncelleştirin:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Bağlama bölümünde hedef tablo adını güncelleştirin:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Device Sınıfı ve eşleme bölümünü kendi şemanızla eşleşecek şekilde güncelleştirin:
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
Artık hata ayıklama (Visual Studio Code'da F5) ile yerel işlev ile veritabanı arasındaki kablolama işlemini test edebilirsiniz. SQL veritabanına makinenizden erişilebilir olması gerekir. Bağlantıyı denetlemek için SSMS'yi kullanabilirsiniz. Ardından, yerel uç noktaya POST istekleri gönderin. Boş gövdeli bir istek HTTP 204 döndürmelidir. Gerçek yükü olan bir istek hedef tabloda kalıcı olmalıdır (değiştirme/güncelleştirme modunda). Aşağıda, bu örnekte kullanılan şemaya karşılık gelen bir örnek yük verilmişti:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
İşlev artık Azure'da yayımlanabilir.
SqlConnectionString için bir uygulama ayarı ayarlayın. Azure SQL Server güvenlik duvarı, canlı işlevin sunucuya erişebilmesi için Azure hizmetlerine izin vermelidir.
Ardından işlevi ASA işinde çıkış olarak tanımlayabilir ve bunları eklemek yerine kayıtları değiştirmek için kullanabilirsiniz.
2. Seçenek: Özel SQL sorgusu aracılığıyla bileşik atama ile birleştirme (birikme)
Not
Yeniden başlatma ve kurtarma sonrasında, ASA daha önce yaydığı çıkış olaylarını yeniden gönderebilir. Bu davranış, birikme mantığının başarısız olmasına (tek tek değerleri ikiye katlama) neden olabilir. Bu sorunu önlemek için, yerel ASA SQL Çıkışını kullanarak bir tablodaki aynı verileri çıktıya ekleyin. Sorunları algılamak ve gerektiğinde birikimi yeniden eşitlemek için bu denetim tablosunu kullanabilirsiniz.
Bu seçenek Microsoft.Data.SqlClient kullanır. Bu kitaplık, sql veritabanına tüm SQL sorgularını göndermenizi sağlar.
Bu örnek aşağıdakiler üzerine kurulmuştur:
İlk olarak, bu öğreticiyi izleyerek varsayılan bir HttpTrigger işlev uygulaması oluşturun. Aşağıdaki bilgiler kullanılır:
- Dil:
C# - Çalışma zamanı:
.NET 6(function/runtime v4 altında) - Şablon:
HTTP trigger
Proje klasöründe bulunan bir terminalde aşağıdaki komutu çalıştırarak SqlClient kitaplığını yükleyin:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
SqlConnectionString öğesini, hedef sunucunun bağlantı dizesini doldurarak Values bölümünde yer alan local.settings.json'e ekleyin.
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
İşlevin tamamını (projedeki .cs dosyası) aşağıdaki kod parçacığıyla değiştirin. Ad alanını, sınıf adını ve işlev adını kendiniz güncelleştirin:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
sqltext Komut oluşturma bölümünü kendi şemanızla eşleşecek şekilde güncelleştirin (güncelleştirmede işleç aracılığıyla birikmeye nasıl ulaşıldığına += dikkat edin):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
Artık hata ayıklayarak (VS Code'da F5) yerel işlev ile veritabanı arasındaki kablolama test edebilirsiniz. SQL veritabanına makinenizden erişilebilir olması gerekir. Bağlantıyı denetlemek için SSMS'yi kullanabilirsiniz. Ardından, yerel uç noktaya POST istekleri gönderin. Boş gövdeli bir istek HTTP 204 döndürmelidir. Gerçek yükü olan bir istek hedef tabloda kalıcı olmalıdır (birikme/birleştirme modunda). Aşağıda, bu örnekte kullanılan şemaya karşılık gelen bir örnek yük verilmişti:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
İşlev artık Azure'da yayımlanabilir. Uygulama ayarı SqlConnectionString için ayarlanmalıdır. Azure SQL Server güvenlik duvarı, canlı işlevin erişebilmesi için Azure hizmetlerine izin vermelidir.
İşlev daha sonra ASA işinde bir çıkış olarak tanımlanabilir ve bunları eklemek yerine kayıtları değiştirmek için kullanılabilir.
Alternatifler
Azure İşlevleri dışında, birden çok yöntem beklenen sonucu elde edebilir. Bu bölümde bu yöntemlerden bazıları açıklanmaktadır.
Hedef SQL Veritabanında sonradan işleme
Arka plan görevi, veriler standart ASA SQL çıkışları aracılığıyla veritabanına eklendikten sonra çalışır.
Azure SQL için INSTEAD OFDML tetikleyicilerini kullanarak ASA'nın INSERT komutlarını kesebilirsiniz.
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
Synapse SQL için, ASA bir hazırlama tablosuna ekleyebilir. Yinelenen bir görev daha sonra gerektiğinde verileri bir aracı tabloya dönüştürebilir. Son olarak , veriler üretim tablosuna taşınır.
Azure Cosmos DB'de ön işleme
Azure Cosmos DB , UPSERT'i yerel olarak destekler. Burada yalnızca ekleme veya değiştirme mümkündür. Azure Cosmos DB'de birikimleri istemci tarafında yönetmeniz gerekir.
Gereksinimler eşleşiyorsa, hedef SQL veritabanını bir Azure Cosmos DB örneğiyle değiştirebilirsiniz. Bu değişiklik, genel çözüm mimarisinde önemli bir değişiklik gerektirir.
Synapse SQL için, Azure Cosmos DB'yi, Azure Cosmos DB için Azure Synapse Link aracılığıyla bir ara katman olarak kullanabilirsiniz. Azure Synapse Link kullanarak analytical store oluşturun. Daha sonra bu veri deposunu doğrudan Synapse SQL'de sorgulayabilirsiniz.
Alternatiflerin karşılaştırması
Her yaklaşım farklı değer teklifleri ve özellikleri sunar:
| Tür | Seçenek | Modlar | Azure SQL Veritabanı | Azure Synapse Analytics |
|---|---|---|---|---|
| İşlem Sonrası | ||||
| Tetikleyiciler | Değiştir, Biriktir | + | Yok, tetikleyiciler Synapse SQL'de kullanılamaz | |
| Hazırlama | Değiştir, Biriktir | + | + | |
| Ön İşleme | ||||
| Azure İşlevleri | Değiştir, Biriktir | + | - (satır satır performans) | |
| Azure Cosmos DB'nin değiştirilmesi | Değiştir | Yok | Yok | |
| Azure Synapse Link'Azure Cosmos DB | Değiştir | Yok | + |
Destek alın
Daha fazla yardım için Azure Stream Analytics için Microsoft Soru-Cevap soru sayfasını deneyin.
Sonraki adımlar
- Azure Stream Analytics çıkışlarını anlama
- Azure SQL Veritabanı için Azure Stream Analytics çıkışı
- Azure Stream Analytics’ten Azure SQL Veritabanı aktarım hızı performansını artırma
- Azure Stream Analytics işinden Azure SQL Veritabanı veya Azure Synapse Analytics'e erişmek için yönetilen kimlikleri kullanma
- Azure Stream Analytics işi için SQL Veritabanı başvuru verilerini kullanma
- Azure Stream Analytics işlerinde Azure İşlevleri çalıştırma - Redis çıktısı öğreticisi
- Hızlı Başlangıç: Azure portalını kullanarak Stream Analytics işi oluşturma