Поделиться через


Анализ данных JSON и AVRO в Azure Stream Analytics

Служба Azure Stream Analytics поддерживает обработку событий в форматах данных CSV, JSON и Avro. Данные JSON и Avro могут быть структурированными и содержать сложные типы, такие как вложенные объекты (записи) и массивы.

Тип данных "запись"

Тип данных "запись" используется для представления массивов JSON и Avro, когда соответствующие форматы используются во входных потоках данных. Эти примеры демонстрируют пример датчика, который считывает входные события в формате JSON. Ниже приведен пример одного события:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

Доступ к вложенным полям в известной схеме

Используйте точечную нотацию (.) для простого доступа к вложенным полям непосредственно из запроса. Например, этот запрос выбирает координаты широты и долготы в свойстве Location из предыдущего фрагмента данных JSON. Нотация точек может использоваться для навигации по нескольким уровням, как показано в следующем фрагменте кода:

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

Результат:

Идентификатор устройства Шир. Длинный Температура Версия
12345 47 122 80 1.2.45

Выбор всех свойств

Все свойства вложенной записи можно выбрать с помощью подстановочного знака "*". Рассмотрим следующий пример:

SELECT
    DeviceID,
    Location.*
FROM input

Результат:

Идентификатор устройства Шир. Длинный
12345 47 122

Доступ к вложенным полям, если имя свойства является переменной

Используйте функцию GetRecordPropertyValue, если имя свойства является переменной. Он позволяет создавать динамические запросы без жесткого обозначения имен свойств.

Например, представьте пример потока данных, который нужно соединить с эталонными данными, содержащими пороговые значения для каждого датчика устройства. Фрагмент таких ссылочных данных показан в следующем фрагменте кода.

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

Цель состоит в том, чтобы присоединить наш пример набора данных из верхней части статьи к этим эталонным данным и вывести одно событие для каждой меры датчика выше порогового значения. Это означает, что с помощью объединения одно событие может создать несколько выходных событий, если несколько датчиков превысили пороги. Чтобы добиться аналогичных результатов без соединения, см. следующий пример:

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

GetRecordPropertyValue выбирает свойство в SensorReadings, имя которого совпадает с именем свойства, поступающего от эталонных данных. Затем извлекается связанное значение из SensorReadings.

Результат:

Идентификатор устройства SensorName Сообщение об оповещении
12345 Влажность Оповещение: датчик выше порогового значения

Преобразование полей записей в отдельные события

Чтобы преобразовать поля записей в отдельные события, используйте оператор APPLY вместе с функцией GetRecordProperties.

С исходными примерами данных можно использовать следующий запрос для извлечения свойств в различные события.

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

Результат:

Идентификатор устройства SensorName Сообщение об оповещении
12345 Температура 80
12345 Влажность 70
12345 CustomSensor01 5
12345 CustomSensor02 99
12345 Метаданные датчиков [объект объекта]

Используя WITH, можно перенаправлять эти события в разные места назначения:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

Анализ записи JSON в эталонных данных SQL

При использовании базы данных SQL Azure в качестве эталонных данных в задании может существовать столбец, содержащий данные в формате JSON. Пример показан в следующем примере:

Идентификатор устройства Данные
12345 {"key": "value1"}
54321 {"key": "value2"}

Можно выполнить синтаксический анализ записи JSON в столбце Данные, написав простую определяемую пользователем функцию JavaScript.

function parseJson(string) {
return JSON.parse(string);
}

Затем вы можете создать шаг в запросе Stream Analytics, как показано здесь, чтобы получить доступ к полям записей JSON.

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

Тип данных "массив"

Тип данных "массив" представляет собой упорядоченную коллекцию значений. Ниже описаны некоторые типичные операции с значениями массива. В этих примерах используются функции GetArrayElement, GetArrayElements, GetArrayLength и оператор APPLY.

Ниже приведен пример события. CustomSensor03 и SensorMetadata принадлежат к типу array:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

Работа с конкретным элементом массива

Выберите элемент массива по указанному индексу (первый элемент массива):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

Результат:

первый элемент
12

Выбор длины массива

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

Результат:

длина массива
3

Преобразование элементов массива в отдельные события

Выберите все элементы массива как отдельные события. Оператор APPLY вместе со встроенной функцией GetArrayElements извлекает все элементы массива как отдельные события:

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

Результат:

DeviceId (Идентификатор устройства) Индекс массива ArrayValue
12345 0 12
12345 1 -5
12345 2 0
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

Результат:

DeviceId (Идентификатор устройства) smKey smValue
12345 Производитель Эй-Би-Си
12345 Версия 1.2.45

Если извлеченные поля должны отображаться в столбцах, можно свести набор данных с помощью синтаксиса WITH в дополнение к операции JOIN . Для этого соединения требуется условие границ времени, которое предотвращает дублирование:

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

Результат:

DeviceId (Идентификатор устройства) Шир. Длинный smVersion производительSM
12345 47 122 1.2.45 Эй-Би-Си

Типы данных в Azure Stream Analytics