JSON- és Avro-adatok feldolgozása Azure Stream Analyticsben

A Azure Stream Analytics szolgáltatás támogatja az események CSV, JSON és Avro adatformátumban történő feldolgozását. A JSON- és Avro-adatok is strukturálhatók, és összetett típusokat, például beágyazott objektumokat (rekordokat) és tömböket tartalmazhatnak.

Adattípusok rögzítése

A rekord adattípusok JSON- és Avro-tömbök ábrázolására szolgálnak, ha a bemeneti adatfolyamokban megfelelő formátumokat használnak. Ezek a példák egy mintaérzékelőt mutatnak be, amely JSON formátumban olvassa be a bemeneti eseményeket. Íme egy példa egyetlen eseményre:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

Beágyazott mezők elérése ismert sémában

Pont jelöléssel (.) közvetlenül a lekérdezésből érheti el a beágyazott mezőket. Ez a lekérdezés például a Földrajzi szélesség és hosszúság koordinátákat választja ki az előző JSON-adatok Hely tulajdonsága alatt. A pont jelölésével több szinten is navigálhat az alábbi kódrészletben látható módon:

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

Az eredmény a következő:

|DeviceID|Lat|Long|Temperature|Version|
|-|-|-|-|-|
|12345|47|122|80|1.2.45|

Az összes tulajdonság kijelölése

A helyettesítő karakterrel kiválaszthatja egy beágyazott rekord összes tulajdonságát * . Vegyük a következő példát:

SELECT
    DeviceID,
    Location.*
FROM input

Az eredmény a következő:

|DeviceID|Lat|Long|
|-|-|-|
|12345|47|122|

Beágyazott mezők elérése, ha a tulajdonság neve változó

Használja a GetRecordPropertyValue függvényt, ha a tulajdonság neve változó. Ez a függvény segít dinamikus lekérdezések létrehozásában a tulajdonságnevek bekódolása nélkül.

Tegyük fel például, hogy a mintaadatfolyamot össze kell illeszteni az egyes eszközérzékelők küszöbértékeit tartalmazó referenciaadatokkal. Az ilyen referenciaadatok kódrészlete az alábbi kódrészletben jelenik meg.

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

A cél az, hogy a cikk tetején lévő mintaadatkészletet összekapcsolja a referenciaadatokkal, és minden érzékelőmértékhez egy eseményt ad ki a küszöbértéke felett. Ez az összekapcsolás azt jelenti, hogy az egyetlen esemény több kimeneti eseményt is létrehozhat, ha több érzékelő meghaladja a megfelelő küszöbértékeket. Ha hasonló eredményeket szeretne elérni illesztés nélkül, tekintse meg a következő példát:

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

A GetRecordPropertyValue a SensorReadings tulajdonságát választja ki, amely megfelel a referenciaadatokból származó tulajdonságnévnek. Ezután kinyeri a társított értéket a SensorReadingsből.

Az eredmény a következő:

|DeviceID|SensorName|AlertMessage|
| - | - | - |
| 12345 | Humidity | Alert: Sensor above threshold |

Rekordmezők átalakítása külön eseményekké

A rekordmezők külön eseményekké alakításához használja az APPLY operátort a GetRecordProperties függvénnyel együtt.

Az eredeti mintaadatok használatával a következő lekérdezéssel kinyerheti a tulajdonságokat különböző eseményekbe:

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

Az eredmény a következő:

|DeviceID|SensorName|AlertMessage|
|-|-|-|
|12345|Temperature|80|
|12345|Humidity|70|
|12345|CustomSensor01|5|
|12345|CustomSensor02|99|
|12345|SensorMetadata|[object Object]|

A WITH használatával ezeket az eseményeket különböző célhelyekre irányíthatja:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

JSON-rekord elemzése SQL-referenciaadatokban

Ha az Azure SQL Database-et használja referenciaadatként a munkájában, felvehet egy olyan oszlopot, amely JSON formátumban tartalmaz adatokat. Az alábbi példa ezt a formátumot mutatja be:

|DeviceID|Data|
|-|-|
|12345|{"key": "value1"}|
|54321|{"key": "value2"}|

A JSON rekordot az Adat oszlopban egy egyszerű JavaScript-felhasználó által definiált függvény megírásával elemezheti.

function parseJson(string) {
return JSON.parse(string);
}

A JSON-rekordok mezőinek eléréséhez hozzon létre egy lépést a Stream Analytics-lekérdezésben az alábbi példában látható módon.

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

Tömb adattípusai

A tömb adattípusok az értékek rendezett gyűjteményei. Ez a szakasz a tömbértékek néhány tipikus műveletét ismerteti. Ezek a példák a GetArrayElement, a GetArrayElements, a GetArrayLength és az APPLY operátort használják.

Íme egy példa egy eseményre. CustomSensor03 SensorMetadata mindkettő tömb típusú:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

Adott tömbelem kezelése

Válassza ki a tömbelemet egy adott indexben (válassza ki az első tömbelemet):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

Az eredmény a következő:

|firstElement|
|-|
|12|

Tömb hosszának kiválasztása

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

Az eredmény a következő:

|arrayLength|
|-|
|3|

Tömbelemek átalakítása különálló eseményekké

Jelölje ki az összes tömbelemet egyedi eseményként. Az APPLY operátor a GetArrayElements beépített függvényével együtt az összes tömbelemet egyedi eseményként nyeri ki:

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

Az eredmény a következő:

|DeviceId|ArrayIndex|ArrayValue|
|-|-|-|
|12345|0|12|
|12345|1|-5|
|12345|2|0|
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

Az eredmény a következő:

|DeviceId|smKey|smValue|
|-|-|-|
|12345|Manufacturer|ABC|
|12345|Version|1.2.45|

A kinyert mezők oszlopokban való megjelenítéséhez a WITH szintaxissal és a JOIN művelettel fordossa meg az adathalmazt. Ehhez az illesztéshez időhatár-feltétel szükséges, amely megakadályozza a duplikációt:

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

Az eredmény a következő:

|DeviceId|Lat|Long|smVersion|smManufacturer|
|-|-|-|-|-|
|12345|47|122|1.2.45|ABC|

Adattípusok a Azure Stream Analytics