تحديث السجلات أو دمجها في Azure SQL Database باستخدام Azure Functions

حاليا، يدعم Azure Stream Analytics (ASA) إدراج صفوف (إلحاق) فقط بمخرجات SQL (قواعد بيانات Azure SQL وAzure Synapse Analytics). تتناول هذه المقالة الحلول البديلة لتمكين UPDATE أو UPSERT أو MERGE في قواعد بيانات SQL، مع Azure Functions كطبقة وسيطة.

يتم تقديم الخيارات البديلة لـ Azure Functions في النهاية.

المتطلبات

يمكن كتابة البيانات في جدول بشكل عام بالطريقة التالية:

وضع عبارة T-SQL المكافئة المتطلبات
إلحاق إدراج بلا
الاستبدال MERGE (UPSERT) مفتاح فريد
جمع استخدام MERGE (UPSERT) مع عامل التشغيل operator (+=, -=...) مفتاح فريد ومجمع

لتوضيح الاختلافات، انظر إلى ما يحدث عند استيعاب السجلين التاليين:

Arrival_Time Device_Id Measure_Value
10:00 و 1
10:05 و 20

في وضع الإلحاق، نقوم بإدراج سجلين. عبارة T-SQL المكافئة هي:

INSERT INTO [target] VALUES (...);

مما ينتج عنه:

Modified_Time Device_Id Measure_Value
10:00 و 1
10:05 و 20

في وضع replace، نحصل فقط على آخر قيمة بالمفتاح. هنا نستخدم Device_Id كمفتاح. عبارة T-SQL المكافئة هي:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

مما ينتج عنه:

Modified_Time Device_Key Measure_Value
10:05 و 20

أخيراً، في وضع accumulate، نجمع Value مع عامل التعيين المركب (+=). هنا أيضا نستخدم Device_Id كمفتاح:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

مما ينتج عنه:

Modified_Time Device_Key Measure_Value
10:05 ش 21

لاعتبارات الأداء، تدعم محولات إخراج قاعدة بيانات ASA SQL حالياً وضع الإلحاق فقط. تستخدم هذه المحولات إدراجاً ضخماً لزيادة المعدل نقل والحد من الضغط الخلفي.

توضح هذه المقالة كيفية استخدام Azure Functions لتنفيذ وضعي الاستبدال والتراكم لـ ASA. عند استخدام دالة كطبقة وسيطة، لن يؤثر أداء الكتابة المحتمل على مهمة الدفق. في هذا الصدد، يعمل استخدام Azure Functions بشكل أفضل مع Azure SQL. باستخدام Synapse SQL، قد يؤدي التبديل من عبارات مجمعة إلى عبارات صف تلو صف إلى حدوث مشكلات أداء أكبر.

إخراج Azure Functions

في وظيفتنا، نستبدل إخراج ASA SQL بمخرجات ASA Azure Functions. يتم تنفيذ قدرات UPDATE أو UPSERT أو MERGE في الدالة .

يوجد حالياً خياران للوصول إلى SQL Database في إحدى الوظائف. الأول هو ربط إخراج Azure SQL. يقتصر حالياً على #C، ولا يوفر سوى وضع الاستبدال. الثاني هو إنشاء استعلام SQL لتقديمه عبر برنامج تشغيل SQL المناسب (Microsoft.Data.SqlClient لـ .NET).

لكلا النموذجين التاليين، نفترض مخطط الجدول التالي. يتطلب خيار الربط مفتاحاً أساسياً ليتم تعيينه في الجدول الهدف. إنه ليس ضرورياً، ولكن يوصى به، عند استخدام برنامج تشغيل SQL.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

يجب أن تفي الوظيفة بالتوقعات التالية لاستخدامها كناتج من ASA:

  • تتوقع Azure Stream Analytics حالة HTTP 200 من تطبيق Functions للدُفعات التي تمت معالجتها بنجاح
  • عندما يتلقى Azure Stream Analytics استثناء 413 ونصه ("http Request Entity Too Large") من إحدى دوال Azure، فإنه يقلل من حجم الدُفعات التي يرسلها إلى Azure Function
  • أثناء اختبار الاتصال، يرسل Stream Analytics طلب POST مع دفعة فارغة إلى Azure Functions ويتوقع حالة HTTP 20x مرة أخرى للتحقق من صحة الاختبار

الخيار 1: التحديث بالمفتاح باستخدام ربط SQL للوظيفة Azure

يستخدم هذا الخيار ربط إخراج SQL الخاص بوظيفة Azure. يمكن أن يحل هذا الملحق محل عنصر في جدول، دون الحاجة إلى كتابة عبارة SQL. في الوقت الحالي، لا يدعم عوامل التخصيص المركبة (التراكمات).

تم بناء هذه العينة على:

لفهم أسلوب الربط بشكل أفضل، يوصى باتباع هذا البرنامج التعليمي.

أولاً، أنشئ تطبيق وظيفة HttpTrigger افتراضياً باتباع هذا البرنامج التعليمي. يتم استخدام المعلومات التالية:

  • اللغة: C#‎
  • وقت التشغيل: .NET 6 (ضمن الوظيفة/وقت التشغيل v4)
  • القالب: HTTP trigger

قم بتثبيت ملحق الربط عن طريق تشغيل الأمر التالي في محطة طرفية موجودة في مجلد المشروع:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

أضف عنصر SqlConnectionString في قسم Values في local.settings.json، وملء سلسلة الاتصال للخادم الوجهة:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

استبدل الوظيفة بأكملها (ملف cs. في المشروع) بقصاصة التعليمة البرمجية التالية. قم بتحديث مساحة الاسم واسم الفئة واسم الوظيفة بنفسك:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

قم بتحديث اسم الجدول الوجهة في قسم الربط:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

قم بتحديث فئة Device وقسم التعيين لمطابقة مخططك الخاص:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

يمكنك الآن اختبار الأسلاك بين الدالة المحلية وقاعدة البيانات عن طريق تصحيح الأخطاء (F5 في Visual Studio Code). يجب أن تكون قاعدة بيانات SQL قابلة للوصول من جهازك. يمكن استخدامManagement Studio للتحقق من الاتصال. ثم أرسل طلبات POST إلى نقطة النهاية المحلية. يجب أن يؤدي الطلب الذي يحتوي على نص فارغ إلى إرجاع http 204. يجب الاستمرار في طلب مع حمولة فعلية في الجدول الوجهة (في وضع الاستبدال/التحديث). فيما يلي نموذج حمولة يتوافق مع المخطط المستخدم في هذا النموذج:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

يمكن الآن نشر الوظيفة في Azure. يجب تعيين إعداد التطبيق لـ SqlConnectionString. يجب أن يسمح جدار الحماية Azure SQL Server بخدمات Azure للوظيفة المباشرة للوصول إليها.

يمكن بعد ذلك تعريف الوظيفة على أنها مخرجات في وظيفة ASA، واستخدامها لاستبدال السجلات بدلاً من إدراجها.

الخيار 2: الدمج مع التعيين المركب (التراكم) عبر استعلام SQL مخصص

إشعار

عند إعادة التشغيل والاسترداد، قد تقوم ASA بإعادة إرسال أحداث الإخراج التي تم إرسالها بالفعل. هذا سلوك متوقع يمكن أن يتسبب في فشل منطق التراكم (مضاعفة القيم الفردية). لمنع ذلك، يوصى بإخراج نفس البيانات في جدول عبر إخراج ASA SQL الأصلي. يمكن بعد ذلك استخدام جدول التحكم هذا لاكتشاف المشكلات وإعادة مزامنة التراكم عند الضرورة.

يستخدم هذا الخيار Microsoft.Data.SqlClient. تتيح لنا هذه المكتبة إصدار أي استعلامات SQL إلى SQL Database.

تم بناء هذه العينة على:

أولاً، أنشئ تطبيق وظيفة HttpTrigger افتراضياً باتباع هذا البرنامج التعليمي. يتم استخدام المعلومات التالية:

  • اللغة: C#‎
  • وقت التشغيل: .NET 6 (ضمن الوظيفة/وقت التشغيل v4)
  • القالب: HTTP trigger

قم بتثبيت مكتبة SqlClient عن طريق تشغيل الأمر التالي في محطة طرفية موجودة في مجلد المشروع:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

أضف عنصر SqlConnectionString في قسم Values في local.settings.json، وملء سلسلة الاتصال للخادم الوجهة:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

استبدل الوظيفة بأكملها (ملف cs. في المشروع) بقصاصة التعليمة البرمجية التالية. قم بتحديث مساحة الاسم واسم الفئة واسم الوظيفة بنفسك:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

قم بتحديث قسم بناء الأوامر sqltext لمطابقة مخططك الخاص (لاحظ كيف يتم تحقيق التراكم عبر عامل التشغيل += عند التحديث):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

يمكنك الآن اختبار الأسلاك بين الوظيفة المحلية وقاعدة البيانات عن طريق تصحيح الأخطاء (F5 في VS Code). يجب أن تكون قاعدة بيانات SQL قابلة للوصول من جهازك. يمكن استخدامManagement Studio للتحقق من الاتصال. ثم قم بإصدار طلبات POST إلى نقطة النهاية المحلية. يجب أن يؤدي الطلب الذي يحتوي على نص فارغ إلى إرجاع http 204. يجب الاستمرار في طلب حمولة فعلية في الجدول الوجهة (في وضع التجميع/الدمج). فيما يلي نموذج حمولة يتوافق مع المخطط المستخدم في هذا النموذج:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

يمكن الآن نشر الوظيفة في Azure. يجب تعيين إعداد التطبيق لـ SqlConnectionString. يجب أن يسمح جدار الحماية Azure SQL Server بخدمات Azure للوظيفة المباشرة للوصول إليها.

يمكن بعد ذلك تعريف الوظيفة على أنها مخرجات في وظيفة ASA، واستخدامها لاستبدال السجلات بدلاً من إدراجها.

البدائل

خارج Azure Functions، توجد طرق متعددة لتحقيق النتيجة المتوقعة. يوفر هذا القسم بعضها.

المعالجة اللاحقة في SQL Database الهدف

تعمل مهمة الخلفية بمجرد إدراج البيانات في قاعدة البيانات عبر مخرجات ASA SQL القياسية.

بالنسبة إلى Azure SQL، INSTEAD OF يمكن استخدام مشغلات DML لاعتراض أوامر INSERT الصادرة عن ASA:

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

بالنسبة لـ Synapse SQL، يمكن لـ ASA الإدراج في جدول مرحلي. يمكن للمهمة المتكررة بعد ذلك تحويل البيانات حسب الحاجة إلى جدول وسيط. أخيراً، يتم نقل البيانات إلى جدول الإنتاج.

المعالجة المسبقة في Azure Cosmos DB

Azure Cosmos DB يدعم UPSERT في الأصل. هنا فقط يمكن إلحاق/استبدال. يجب إدارة التراكمات من جانب العميل في Azure Cosmos DB.

إذا كانت المتطلبات متطابقة، فإن أحد الخيارات هو استبدال قاعدة بيانات SQL المستهدفة بمثيل Azure Cosmos DB. يتطلب القيام بذلك تغييراً مهماً في بنية الحل الشاملة.

بالنسبة إلى Synapse SQL، يمكن استخدام Azure Cosmos DB كطبقة وسيطة عبر Azure Synapse Link ل Azure Cosmos DB. يمكن استخدام Azure Synapse Link لإنشاء مخزن تحليلي. يمكن بعد ذلك الاستعلام عن مخزن البيانات هذا مباشرة في Synapse SQL.

مقارنة البدائل

يقدم كل نهج عرض قيمة وإمكانيات مختلفة:

نوع خيار الأوضاع قاعدة بيانات Azure SQL Azure Synapse Analytics
مرحلة ما بعد المعالجة
أزرار التشغيل استبدال، تراكم + غير متاح، لا تتوفر المشغلات في Synapse SQL
التدريج استبدال، تراكم + +
المعالجة المسبقة
دالات Azure استبدال، تراكم + - (أداء صف تلو الآخر)
استبدال Azure Cosmos DB الاستبدال ‏‫غير متوفر‬ ‏‫غير متوفر‬
Azure Cosmos DB Azure Synapse Link الاستبدال ‏‫غير متوفر‬ +

الحصول على الدعم

لمزيد من المساعدة، جرب صفحة سؤال Microsoft Q&A الخاصة بنا ل Azure Stream Analytics.

الخطوات التالية