Поделиться через


Анализ данных JSON и Avro в Azure Stream Analytics

Служба Azure Stream Analytics поддерживает обработку событий в форматах данных CSV, JSON и Avro. Данные JSON и Avro могут быть структурированными и содержать сложные типы, такие как вложенные объекты (записи) и массивы.

Тип данных "запись"

Тип данных "Record" используется для представления массивов JSON и Avro, когда указанные форматы применяются во входных потоках данных. Эти примеры демонстрируют пример датчика, который считывает входные события в формате JSON. Ниже приведен пример одного события:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

Доступ к вложенным полям в известной схеме

Используйте нотацию точек (.) для доступа к вложенным полям непосредственно из запроса. Например, этот запрос выбирает координаты широты и долготы в свойстве Location из предыдущего фрагмента данных JSON. Используйте нотацию точек для навигации по нескольким уровням, как показано в следующем фрагменте кода:

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

Результат:

|DeviceID|Lat|Long|Temperature|Version|
|-|-|-|-|-|
|12345|47|122|80|1.2.45|

Выбор всех свойств

Вы можете выбрать все свойства вложенной записи с помощью подстановочного * знака. Рассмотрим следующий пример:

SELECT
    DeviceID,
    Location.*
FROM input

Результат:

|DeviceID|Lat|Long|
|-|-|-|
|12345|47|122|

Доступ к вложенным полям, если имя свойства является переменной

Используйте функцию GetRecordPropertyValue , если имя свойства является переменной. Эта функция помогает создавать динамические запросы без жесткого определения имен свойств.

Например, представьте, что пример потока данных необходимо объединить с эталонными данными , содержащими пороговые значения для каждого датчика устройства. Фрагмент таких ссылочных данных показан в следующем фрагменте кода.

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

Цель состоит в том, чтобы присоединить образец набора данных из верхней части статьи к этим эталонным данным и вывести одно событие для каждой меры датчика выше порогового значения. Это соединение означает, что одно событие может создавать несколько выходных событий, если несколько датчиков превышают соответствующие пороговые значения. Чтобы добиться аналогичных результатов без соединения, см. следующий пример:

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

GetRecordPropertyValue выбирает свойство в SensorReadings , которое соответствует имени свойства, исходящему из эталонных данных. Затем он извлекает связанное значение из SensorReadings.

Результат:

|DeviceID|SensorName|AlertMessage|
| - | - | - |
| 12345 | Humidity | Alert: Sensor above threshold |

Преобразование полей записей в отдельные события

Чтобы преобразовать поля записи в отдельные события, используйте оператор APPLY вместе с функцией GetRecordProperties .

Используя исходные примеры данных, можно использовать следующий запрос для извлечения свойств в различные события:

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

Результат:

|DeviceID|SensorName|AlertMessage|
|-|-|-|
|12345|Temperature|80|
|12345|Humidity|70|
|12345|CustomSensor01|5|
|12345|CustomSensor02|99|
|12345|SensorMetadata|[object Object]|

С помощью WITH эти события можно направлять в разные места назначения:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

Анализ записи JSON в эталонных данных SQL

При использовании Azure SQL Database в качестве эталонных данных в задании можно включить столбец, содержащий данные в формате JSON. В следующем примере показан следующий формат:

|DeviceID|Data|
|-|-|
|12345|{"key": "value1"}|
|54321|{"key": "value2"}|

Вы можете проанализировать запись JSON в столбце данных , написав простую определяемую пользователем функцию JavaScript.

function parseJson(string) {
return JSON.parse(string);
}

Чтобы получить доступ к полям записей JSON, создайте шаг в запросе Stream Analytics, как показано в следующем примере.

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

Типы данных массивов

Тип данных "массив" представляет собой упорядоченную коллекцию значений. В этом разделе описаны некоторые типичные операции со значениями массива. В этих примерах используются функции GetArrayElement, GetArrayElements, GetArrayLength и оператор APPLY.

Ниже приведен пример события. Оба CustomSensor03 и SensorMetadata имеют тип массив:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

Работа с определенным элементом массива

Выберите элемент массива по указанному индексу (выберите первый элемент массива):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

Результат:

|firstElement|
|-|
|12|

Выбор длины массива

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

Результат:

|arrayLength|
|-|
|3|

Преобразование элементов массива в отдельные события

Выберите все элементы массива в качестве отдельных событий. Оператор APPLY вместе со встроенной функцией GetArrayElements извлекает все элементы массива в виде отдельных событий:

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

Результат:

|DeviceId|ArrayIndex|ArrayValue|
|-|-|-|
|12345|0|12|
|12345|1|-5|
|12345|2|0|
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

Результат:

|DeviceId|smKey|smValue|
|-|-|-|
|12345|Manufacturer|ABC|
|12345|Version|1.2.45|

Чтобы отобразить извлеченные поля в столбцах, свести набор данных, используя синтаксис WITH и операцию JOIN. Для этого соединения требуется условие границ времени , которое предотвращает дублирование:

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

Результат:

|DeviceId|Lat|Long|smVersion|smManufacturer|
|-|-|-|-|-|
|12345|47|122|1.2.45|ABC|

Data Types in Azure Stream Analytics