在 Azure 串流分析中剖析 JSON 和 Avro 資料

Azure 串流分析可處理資料格式為 CSV、JSON 和 Avro 的事件。 JSON 和 Avro 資料都可進行結構化並包含一些複雜類型,例如巢狀物件 (記錄) 和陣列。

注意

由事件中樞擷取所建立的 AVRO 檔案是使用特定的格式,其會要求您使用「自訂還原序列化程式」功能。 如需詳細資訊,請參閱使用 .NET 自訂還原序列化程式來讀取任何格式的輸入 \(部分機器翻譯\)。

記錄資料類型

當輸入資料流中使用對應的格式時,記錄資料類型可用來代表 JSON 和 Avro 陣列。 這些範例會示範範例感應器,其會以 JSON 格式讀取輸入事件。 以下是單一事件的範例:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

存取已知結構描述中的巢狀欄位

使用點表示法 (.) 輕鬆地直接從查詢存取巢狀欄位。 例如,此查詢會選取上述 JSON 資料中 Location 屬性底下的緯度和經度座標。 點標記法可用來導覽多個層級,如下所示。

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

結果如下:

DeviceID Lat long 溫度 版本
12345 47 122 80 1.2.45

選取所有屬性

您可以使用 '*' 萬用字元來選取巢狀記錄的所有屬性。 請考慮下列範例:

SELECT
    DeviceID,
    Location.*
FROM input

結果如下:

DeviceID Lat long
12345 47 122

當屬性名稱為變數時,存取巢狀欄位

如果屬性名稱為變數,請使用 GetRecordPropertyValue \(英文\) 函式。 這可讓您在無需對屬性名稱進行硬式編碼的情況下建置動態查詢。

例如,假設範例資料流需要與包含每個裝置感應器閾值的參考資料聯結。 這類參考資料的程式碼片段如下所示。

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

這裡的目標是要將文章頂端的範例資料集與該參考資料聯結,並針對每個量值超過其閾值的感應器輸出單一事件。 那表示如果有多個感應器皆高於其對應的閾值,透過聯結便能使上面的單一事件產生多個輸出事件。 若要在不使用聯結的情況下達成類似結果,請參閱下列小節。

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

GetRecordPropertyValue 會選取 SensorReadings 中的屬性,其會以名稱比對來自參考資料的屬性名稱。 接著會擷取來自 SensorReadings 的相關聯值。

結果如下:

DeviceID SensorName AlertMessage
12345 溼度 警告:感應器超過閾值

將記錄欄位轉換成個別的事件

若要將資料列欄位轉換成個別的事件,請搭配使用 APPLY 運算子和 GetRecordProperties 函式。

使用原始範例資料時,下列查詢可用來將屬性擷取為不同的事件。

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

結果如下:

DeviceID SensorName AlertMessage
12345 溫度 80
12345 溼度 70
12345 CustomSensor01 5
12345 CustomSensor02 99
12345 SensorMetadata [object Object]

透過使用 WITH \(英文\),便可以將那些事件路由到不同的目的地:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

剖析 SQL 參考資料中的 JSON 記錄

使用 Azure SQL Database 作為作業中的參考資料時,有可能會有包含 JSON 格式資料的資料行。 範例如下所示。

DeviceID 資料
12345 {"key": "value1"}
54321 {"key": "value2"}

您可以透過撰寫簡單的 JavaScript 使用者定義函式,來剖析 Data 資料行中的 JSON 記錄。

function parseJson(string) {
return JSON.parse(string);
}

您接著可以在串流分析查詢中建立步驟來存取 JSON 記錄中的欄位,如下所示。

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

陣列資料類型

陣列資料類型是已排序的值集合。 以下詳述陣列值的一些一般作業。 這些範例使用函式 GetArrayElementGetArrayElementsGetArrayLength,以及 APPLY 運算子。

以下是事件的範例。 CustomSensor03SensorMetadata 都是 array 類型:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

使用特定的陣列元素

選取指定索引處的陣列元素 (選取第一個陣列元素):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

結果如下:

firstElement
12

選取陣組長度

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

結果如下:

arrayLength
3

將陣列元素轉換成個別的事件

選取所有陣列元素做為個別事件。 APPLY 運算子和 GetArrayElements 內建函式一起擷取所有陣列元素且視為個別事件:

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

結果如下:

DeviceId ArrayIndex ArrayValue
12345 0 12
12345 1 5-
12345 2 0
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

結果如下:

DeviceId smKey smValue
12345 製造商 ABC
12345 版本 1.2.45

如果擷取的欄位必須出現在資料行中,您可以搭配 JOIN \(英文\) 作業使用 WITH \(英文\) 語法來對資料集進行樞紐分析。 該聯結需要能避免重複的時間界限條件:

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

結果如下:

DeviceId Lat long smVersion smManufacturer
12345 47 122 1.2.45 ABC

另請參閱

Azure 串流分析中的資料類型 \(英文\)