مشاركة عبر


تحليل بيانات JSON وAvro في Azure Stream Analytics

تدعم خدمة Azure Stream Analytics معالجة الأحداث بتنسيقات بيانات CSV وJSON وAvro. يمكن هيكلة بيانات JSON وAvro وتحتوي على بعض الأنواع المعقدة مثل الكائنات (السجلات) والمصفوفات المتداخلة.

أنواع بيانات السجل

تُستخدم أنواع بيانات السجل لتمثيل مصفوفات JSON وAvro عند استخدام التنسيقات المقابلة في تدفقات بيانات الإدخال. توضح هذه الأمثلة مستشعر عينة، والذي يقرأ أحداث الإدخال بتنسيق JSON. فيما يلي مثال لحدث واحد:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

الوصول إلى الحقول المتداخلة في المخطط المعروف

استخدم تدوين النقطة (.) للوصول بسهولة إلى الحقول المتداخلة مباشرة من استعلامك. على سبيل المثال، يحدد هذا الاستعلام إحداثيات خطوط الطول والعرض ضمن خاصية الموقع في بيانات JSON السابقة. يمكن استخدام تدوين النقطة للتنقل بين مستويات متعددة كما هو موضح في القصاصة البرمجية التالية:

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

النتيجة هي:

معرف الجهاز Lat طويل درجة الحرارة إصدار
12345 47 122 80 1.2.45

حدد جميع الخصائص

يمكنك تحديد جميع خصائص السجل المتداخل باستخدام حرف البدل "*". تأمل المثال التالي:

SELECT
    DeviceID,
    Location.*
FROM input

النتيجة هي:

معرف الجهاز Lat طويل
12345 47 122

الوصول إلى الحقول المتداخلة عندما يكون اسم الخاصية متغيرًا

استخدم الدالة GetRecordPropertyValue إذا كان اسم الخاصية متغير. يسمح بإنشاء استعلامات ديناميكية دون أسماء خصائص الترميز الثابت.

على سبيل المثال، تخيل أن تدفق بيانات العينة يحتاج إلى أن ينضم مع البيانات المرجعية التي تحتوي على حدود لكل جهاز استشعار. يتم عرض قصاصة برمجية من هذه البيانات المرجعية في القصاصة البرمجية التالية.

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

الهدف هنا هو ضم عينة مجموعة البيانات من أعلى المقالة إلى تلك البيانات المرجعية، وإخراج حدث واحد لكل مقياس استشعار فوق حده. وهذا يعني أن حدثنا الفردي أعلاه يمكن أن يولد أحداث إخراج متعددة إذا كانت أجهزة الاستشعار المتعددة أعلى من الحدود الخاصة بها، وذلك بفضل الصلة. لتحقيق نتائج مماثلة دون الانضمام، راجع المثال التالي:

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

يحدد GetRecordPropertyValue الخاصية في SensorReadings، والذي يطابق الاسم اسم الخاصية الوارد من البيانات المرجعية. ثم يتم استخراج القيمة المقترنة من SensorReadings.

النتيجة هي:

معرف الجهاز SensorName رسالة تنبيه
12345 الرطوبة تنبيه: مستشعر فوق الحد

تحويل حقول التسجيل إلى أحداث منفصلة

لتحويل حقول السجلات إلى أحداث منفصلة، استخدم عامل التشغيل APPLY باستخدام الدالة GetRecordProperties.

باستخدام بيانات العينة الأصلية، يمكن استخدام الاستعلام التالي لاستخراج الخصائص في أحداث مختلفة.

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

النتيجة هي:

معرف الجهاز SensorName رسالة تنبيه
12345 درجة الحرارة 80
12345 الرطوبة 70
12345 CustomSensor01 5
12345 CustomSensor02 99
12345 SensorMetadata [كائن الكائن]

باستخدام WITH، من الممكن بعد ذلك توجيه هذه الأحداث إلى وجهات مختلفة:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

تحليل سجل JSON في بيانات مرجع SQL

عند استخدام قاعدة بيانات Azure SQL كبيانات مرجعية في وظيفتك، من الممكن أن يكون لديك عمود يحتوي على بيانات بتنسيق JSON. يظهر مثال في المثال التالي:

معرف الجهاز بيانات
12345 {"key": "value1"}
54321 {"key": "value2"}

يمكنك تحليل سجل JSON في عمود البيانات عن طريق كتابة دالة JavaScript بسيطة معرفة بواسطة المستخدم.

function parseJson(string) {
return JSON.parse(string);
}

يمكنك بعد ذلك إنشاء خطوة في استعلام Stream Analytics كما هو موضح هنا للوصول إلى حقول سجلات JSON.

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

أنواع بيانات المصفوفة

أنواع بيانات المصفوفة هي مجموعة مرتبة من القيم. يتم تفصيل بعض العمليات النموذجية على قيم الصفيف هنا. تستخدم هذه الأمثلة الدالات GetArrayElement وGetArrayElements وGetArrayLength وعامل APPLY.

فيما يلي مثال على حدث. كل من CustomSensor03 وSensorMetadata من نوع الصفيف:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

العمل مع عنصر مصفوفة محدد

حدد عنصر مصفوفة في فهرس محدد (تحديد عنصر المصفوفة الأول):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

النتيجة هي:

العنصر الأول
12

تحديد طول المصفوفة

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

النتيجة هي:

arrayLength
3

تحويل عناصر المصفوفة إلى أحداث منفصلة

تحويل جميع عناصر المصفوفة كأحداث فردية. عامل APPLY باستخدام استخدام دالة المضمنة GetArrayElements يستخرج كافة عناصر الصفيف كأحداث فردية:

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

النتيجة هي:

معرف الجهاز ArrayIndex قيمة الصفيف
12345 1 12
12345 1 -5
12345 2 1
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

النتيجة هي:

معرف الجهاز smKey قيمة smValue
12345 الشركة المصنعة ABC
12345 إصدار 1.2.45

إذا كانت الحقول المستخرجة بحاجة إلى الظهور في أعمدة، فمن الممكن تدوير مجموعة البيانات باستخدام بناء جملة WITH بالإضافة إلى عملية JOIN . تتطلب هذه الصلة شرط حد زمني يمنع التكرار:

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

النتيجة هي:

معرف الجهاز Lat طويل smVersion smManufacturer
12345 47 122 1.2.45 ABC

أنواع البيانات في Azure Stream Analytics