Analizzare dati JSON e Avro in Analisi di flusso di Azure
Analisi di flusso di Azure supporta l'elaborazione di eventi nei formati di dati CSV, JSON e Avro. Entrambi i dati JSON e Avro possono essere strutturati e possono contenere alcuni tipi complessi come oggetti annidati (record) e matrici.
Nota
I file AVRO creati da Acquisizione di Hub eventi si basano su un formato specifico che richiede l'uso della funzionalità del deserializzatore personalizzato. Per altre informazioni, vedere Leggere input in qualsiasi formato tramite deserializzatori personalizzati .NET.
Tipi di dati record
I tipi di dati record vengono usati per rappresentare le matrici JSON e Avro quando vengono usati formati corrispondenti nei flussi di dati di input. Questi esempi illustrano un sensore di esempio che legge gli eventi di input in formato JSON. Di seguito è riportato un esempio di un singolo evento:
{
"DeviceId" : "12345",
"Location" :
{
"Lat": 47,
"Long": 122
},
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"SensorMetadata" :
{
"Manufacturer":"ABC",
"Version":"1.2.45"
}
}
}
Accedere ai campi annidati nello schema noto
Usare la notazione con il punto (.) per accedere facilmente ai campi annidati direttamente dalla query. Ad esempio, questa query seleziona le coordinate di latitudine e longitudine nella proprietà Location dei dati JSON precedenti. La notazione con il punto può essere usata per spostarsi tra più livelli, come illustrato di seguito.
SELECT
DeviceID,
Location.Lat,
Location.Long,
SensorReadings.Temperature,
SensorReadings.SensorMetadata.Version
FROM input
Il risultato è:
DeviceID | Lat | long | Temperatura | Versione |
---|---|---|---|---|
12345 | 47 | 122 | 80 | 1.2.45 |
Selezionare tutte le proprietà
È possibile selezionare tutte le proprietà di un record annidato con il carattere jolly asterisco (*). Prendere in considerazione gli esempi seguenti:
SELECT
DeviceID,
Location.*
FROM input
Il risultato è:
DeviceID | Lat | long |
---|---|---|
12345 | 47 | 122 |
Accedere ai campi annidati quando il nome della proprietà è una variabile
Usare la funzione GetRecordPropertyValue se il nome della proprietà è una variabile. Ciò consente di creare query dinamiche senza nomi di proprietà hardcoded.
Si supponga che un flusso di dati di esempio debba essere unito tramite join a dati di riferimento contenenti soglie per ogni sensore. Un frammento di tali dati di riferimento è illustrato di seguito.
{
"DeviceId" : "12345",
"SensorName" : "Temperature",
"Value" : 85
},
{
"DeviceId" : "12345",
"SensorName" : "Humidity",
"Value" : 65
}
L'obiettivo è creare un join tra il set di dati di esempio illustrato all'inizio dell'articolo e i dati di riferimento, quindi restituire un evento per ogni misura del sensore al di sopra della soglia. Questo significa che il singolo evento precedente può generare più eventi di output se più sensori superano le rispettive soglie, grazie al join. Per ottenere risultati simili senza un join, vedere la sezione seguente.
SELECT
input.DeviceID,
thresholds.SensorName,
"Alert: Sensor above threshold" AS AlertMessage
FROM input -- stream input
JOIN thresholds -- reference data input
ON
input.DeviceId = thresholds.DeviceId
WHERE
GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value
GetRecordPropertyValue seleziona la proprietà in SensorReadings, il cui nome corrisponde al nome della proprietà proveniente dai dati di riferimento. Quindi viene estratto il valore associato da SensorReadings.
Il risultato è:
DeviceID | SensorName | AlertMessage |
---|---|---|
12345 | Umidità | Alert : Sensor above threshold |
Convertire i campi dei record in eventi distinti
Per convertire i campi di record in eventi separati, usare l'operatore APPLY insieme alla funzione GetRecordProperties.
Con i dati di esempio originali, è possibile usare la query seguente per estrarre le proprietà in eventi diversi.
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
Il risultato è:
DeviceID | SensorName | AlertMessage |
---|---|---|
12345 | Temperatura | 80 |
12345 | Umidità | 70 |
12345 | CustomSensor01 | 5 |
12345 | CustomSensor02 | 99 |
12345 | SensorMetadata | [object Object] |
Usando WITH, è quindi possibile instradare tali eventi a destinazioni diverse:
WITH Stage0 AS
(
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)
SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'
Analizzare il record JSON nei dati di riferimento SQL
Quando si usa Database SQL di Azure come dati di riferimento nel processo, è possibile che una colonna contenga dati in formato JSON. Di seguito è illustrato un esempio.
DeviceID | Data |
---|---|
12345 | {"key": "value1"} |
54321 | {"key": "value2"} |
È possibile analizzare il record JSON nella colonna Data scrivendo una semplice funzione JavaScript definita dall'utente.
function parseJson(string) {
return JSON.parse(string);
}
È quindi possibile creare un passaggio nella query di Analisi di flusso, come illustrato di seguito, per accedere ai campi dei record JSON.
WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)
SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson
ON streamInput.DeviceID = parseJson.DeviceID
Tipi di dati matrice
I tipi di dati matrice sono una raccolta ordinata di valori. Di seguito sono descritte alcune operazioni tipiche sui valori di matrice. Questi esempi usano le funzioni GetArrayElement, GetArrayElements, GetArrayLength e l'operatore APPLY.
Ecco un esempio di evento. Sia CustomSensor03
che SensorMetadata
sono di tipo matrice:
{
"DeviceId" : "12345",
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"CustomSensor03": [12,-5,0]
},
"SensorMetadata":[
{
"smKey":"Manufacturer",
"smValue":"ABC"
},
{
"smKey":"Version",
"smValue":"1.2.45"
}
]
}
Uso di un elemento di matrice specifico
Selezionare un elemento della matrice in corrispondenza dell'indice specificato (selezione del primo elemento della matrice):
SELECT
GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input
Il risultato è:
firstElement |
---|
12 |
Selezionare la lunghezza della matrice
SELECT
GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input
Il risultato è:
arrayLength |
---|
3 |
Convertire gli elementi di matrice in eventi distinti
Selezionare tutti gli elementi della matrice come singoli eventi. L'operatore APPLY, usato insieme alla funzione integrata GetArrayElements, estrae tutti gli elementi della matrice come singoli eventi:
SELECT
DeviceId,
CustomSensor03Record.ArrayIndex,
CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record
Il risultato è:
deviceId | ArrayIndex | ArrayValue |
---|---|---|
12345 | 0 | 12 |
12345 | 1 | -5 |
12345 | 2 | 0 |
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
Il risultato è:
deviceId | smKey | smValue |
---|---|---|
12345 | Produttore | ABC |
12345 | Versione | 1.2.45 |
Se i campi estratti devono essere visualizzati in colonne, è possibile trasformare il set di dati usando la sintassi WITH in aggiunta all'operazione JOIN. Tale join richiede una condizione di limite temporale che impedisce la duplicazione:
WITH DynamicCTE AS (
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
)
SELECT
i.DeviceId,
i.Location.*,
V.smValue AS 'smVersion',
M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0
Il risultato è:
deviceId | Lat | long | smVersion | smManufacturer |
---|---|---|---|---|
12345 | 47 | 122 | 1.2.45 | ABC |
Vedere anche
Data Types in Azure Stream Analytics (Tipi di dati in Analisi di flusso di Azure)