Analisar dados JSON e Avro no Azure Stream Analytics

O Azure Stream Analytics dá suporte ao processamento de eventos em formatos de dados CSV, JSON e Avro. JSON e Avro podem ser estruturados e conter alguns tipos complexos, como objetos aninhados (registros) e matrizes.

Observação

Os arquivos AVRO criados pela captura do Hub de Eventos usam um formato específico que exige que você use o recurso Desserializador personalizado. Para saber mais, confira Ler entradas em qualquer formato usando desserializadores .NET personalizados.

Tipos de dados do sistema

Tipos de dados de registro são usados para representar matrizes JSON e Avro quando formatos correspondentes são usados em fluxos de dados de entrada. Esses exemplos demonstram um sensor de exemplo, o que está lendo os eventos de entrada no formato JSON. Aqui está um exemplo de um único evento:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

Acessar campos aninhados no esquema conhecido

Use a notação de ponto (.) para acessar facilmente campos aninhados diretamente da sua consulta. Por exemplo, esta consulta seleciona as coordenadas de Latitude e Longitude na propriedade local nos dados JSON anteriores. A notação de ponto pode ser usada para navegar por vários níveis, conforme mostrado abaixo.

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

O resultado é:

DeviceID Lat long Temperatura Versão
12345 47 122 80 1.2.45

Selecione todas as propriedades

Você pode selecionar todas as propriedades de um registro aninhado usando o curinga'*'. Considere o exemplo a seguir:

SELECT
    DeviceID,
    Location.*
FROM input

O resultado é:

DeviceID Lat long
12345 47 122

Acessar os campos aninhados quando o nome da propriedade for uma variável

Use a função GetRecordPropertyValue se o nome da propriedade for uma variável. Isso permite a criação de consultas dinâmicas sem codificar nomes de propriedade.

Por exemplo, imagine que o fluxo de dados de exemplo precisa ser unida com dados de referência com limites para cada sensor do dispositivo. Um trecho desses dados de referência é mostrado abaixo.

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

A meta aqui é unir nosso conjunto de dados de exemplo da parte superior do artigo aos dados de referência, e gerar um evento para cada medida de sensor acima do limite. Isso significa que nosso evento único acima pode gerar vários eventos de saída se vários sensores estiverem acima de seus respectivos limites, graças à junção. Para obter resultados similares sem uma junção, confira a seção abaixo.

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

GetRecordPropertyValue seleciona a propriedade em SensorReadings, qual nome corresponde ao nome da propriedade que vem dos dados de referência. Em seguida, o valor associado de SensorReadings é extraído.

O resultado é:

DeviceID SensorName AlertMessage
12345 Umidade Alerta: sensor acima do limite

Converter campos de registro em eventos separados

Para converter os campos de registro em eventos separados, use o operador APLICAR junto com a função GetRecordProperties.

Com os dados de exemplo originais, a consulta a seguir pode ser usada para extrair propriedades em diferentes eventos.

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

O resultado é:

DeviceID SensorName AlertMessage
12345 Temperatura 80
12345 Umidade 70
12345 CustomSensor01 5
12345 CustomSensor02 99
12345 SensorMetadata [object Object]

Usando WITH, é possível rotear esses eventos para diferentes destinos:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

Analisar registro JSON nos dados de referência do SQL

Ao usar o Banco de Dados SQL do Azure como dado de referência em seu trabalho, é possível ter uma coluna com dados no formato JSON. Um exemplo é mostrado abaixo.

DeviceID Dados
12345 {"key": "value1"}
54321 {"key": "value2"}

Você pode analisar o registro JSON na coluna Dados escrevendo uma função simples definida pelo usuário do JavaScript.

function parseJson(string) {
return JSON.parse(string);
}

Em seguida, você pode criar uma etapa na consulta Stream Analytics, conforme mostrado abaixo, para acessar os campos dos seus registros JSON.

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

Tipos de dados de matrizes

Tipos de dados de matriz são uma coleção ordenada de valores. Algumas operações típicas em valores da matriz são detalhadas abaixo. Esses exemplos usam as funções GetArrayElement, GetArrayElements, GetArrayLengthe o operador Aplicar.

Aqui está um exemplo de evento. CustomSensor03 e SensorMetadata são do tipo matriz:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

Trabalhar com um elemento de matriz específico

Selecione o elemento da matriz em um índice especificado (selecionando o primeiro elemento da matriz):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

O resultado é:

firstElement
12

Selecionar o comprimento da matriz

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

O resultado é:

arrayLength
3

Converter elementos de matriz em eventos separados

Selecione todos os elementos de matriz como eventos individuais. O operador APLICAR junto com a função GetArrayElements interna extrai todos os elementos da matriz como eventos individuais:

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

O resultado é:

deviceId ArrayIndex ArrayValue
12345 0 12
12345 1 -5
12345 2 0
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

O resultado é:

deviceId smKey smValue
12345 Fabricante ABC
12345 Versão 1.2.45

Se os campos extraídos precisarem aparecer em colunas, é possível dinamizar o conjunto de linhas usando a sintaxe WITH além da operação JOIN. Essa junção exige uma condição de limite de tempo que impede a duplicação:

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

O resultado é:

deviceId Lat long smVersion smManufacturer
12345 47 122 1.2.45 ABC

Consulte Também

Tipos de dados no Azure Stream Analytics