Azure Stream Analytics サービスでは、CSV、JSON、Avro データ形式のイベントの処理をサポートしています。 JSON データと Avro データのどちらも、入れ子になったオブジェクト (レコード) や配列などの複合型を含む構造にすることができます。
レコード データ型
レコード データ型は、対応する形式が入力データ ストリームで使用される場合に、JSON 配列と Avro の配列を表すために使用されます。 これらの例は、JSON 形式の入力イベントを読み取るサンプル センサーを示しています。 1 つのイベントの例を以下に示します。
{
"DeviceId" : "12345",
"Location" :
{
"Lat": 47,
"Long": 122
},
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"SensorMetadata" :
{
"Manufacturer":"ABC",
"Version":"1.2.45"
}
}
}
既知のスキーマの入れ子のフィールドにアクセスする
クエリから直接、入れ子になったフィールドに簡単にアクセスするには、ドット表記 (.) を使用します。 たとえば、このクエリでは、上記の JSON データの Location プロパティの緯度と経度の座標が選択されます。 次のスニペットに示したように、ドット表記を使用してさまざまなレベルに移動できます。
SELECT
DeviceID,
Location.Lat,
Location.Long,
SensorReadings.Temperature,
SensorReadings.SensorMetadata.Version
FROM input
結果は次のとおりです。
デバイスID | Lat | 長い | 気温 | バージョン |
---|---|---|---|---|
12345 | 47 | 122 | 80 | 1.2.45 |
すべてのプロパティを選択する
'*' ワイルドカードを使用すると、入れ子になったレコードのすべてのプロパティを選択できます。 次の例を確認してください。
SELECT
DeviceID,
Location.*
FROM input
結果は次のとおりです。
デバイスID | Lat | 長い |
---|---|---|
12345 | 47 | 122 |
プロパティ名が変数であるときに入れ子のフィールドにアクセスする
プロパティ名が変数の場合は、GetRecordPropertyValue 関数を使用します。 これにより、プロパティ名をハードコーディングすることなく、動的なクエリを作成できます。
たとえば、サンプル データ ストリームを、各デバイス センサーのしきい値を含む参照データと結合する必要があるとします。 そのような参照データのスニペットを次のスニペットに示します。
{
"DeviceId" : "12345",
"SensorName" : "Temperature",
"Value" : 85
},
{
"DeviceId" : "12345",
"SensorName" : "Humidity",
"Value" : 65
}
ここでの目的は、記事の先頭にあるサンプル データセットをその参照データに結合し、各センサー メジャーに対してしきい値を上回る 1 つのイベントを出力することです。 これは、結合により、複数のセンサーがそれぞれのしきい値を超えた場合に、上記の 1 つのイベントによって複数の出力イベントが生成される可能性があることを意味します。 結合せずに同様の結果を得るには、以下の例を参照してください。
SELECT
input.DeviceID,
thresholds.SensorName,
"Alert: Sensor above threshold" AS AlertMessage
FROM input -- stream input
JOIN thresholds -- reference data input
ON
input.DeviceId = thresholds.DeviceId
WHERE
GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value
GetRecordPropertyValueにより、 SensorReadingsのプロパティを選択します。この名前は参照データから取得したプロパティ名と一致します。 次に、SensorReadings の関連する値が抽出されます。
結果は次のとおりです。
デバイスID | SensorName | 警告メッセージ |
---|---|---|
12345 | 湿度 | アラート: しきい値を超えたセンサー |
レコード フィールドを個々のイベントに変換する
レコード フィールドを個々のイベントに変換するには、APPLY 演算子を GetRecordProperties 関数と組み合わせて使用します。
元のサンプルデータでは、次のクエリを使用して、さまざまなイベントにプロパティを抽出できます。
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
結果は次のとおりです。
デバイスID | SensorName | 警告メッセージ |
---|---|---|
12345 | 気温 | 80 |
12345 | 湿度 | 70 |
12345 | CustomSensor01 | 5 |
12345 | CustomSensor02 | 九十九 |
12345 | センサーのメタデータ | [オブジェクト オブジェクト] |
WITHを使用すると、これらのイベントを異なる宛先にルーティングできます。
WITH Stage0 AS
(
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)
SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'
SQL 参照データの JSON レコードを解析する
ジョブで参照データとして Azure SQL Database を使用する場合、JSON 形式のデータを含む列を持つことができます。 例を次の例に示します。
デバイスID | データ |
---|---|
12345 | {"key": "value1"} |
54321 | {"key": "value2"} |
単純な JavaScript ユーザー定義関数を記述することで、Data 列の JSON レコードを解析できます。
function parseJson(string) {
return JSON.parse(string);
}
次に、ここに示すように Stream Analytics クエリのステップを作成して、JSON レコードのフィールドにアクセスできます。
WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)
SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson
ON streamInput.DeviceID = parseJson.DeviceID
配列データ型
配列データ型は、順序が付けられた値のコレクションです。 配列値のいくつかの一般的な操作の詳細をここに示します。 これらの例では、関数 GetArrayElement、GetArrayElements、GetArrayLength、および APPLY 演算子を使用しています。
イベントの例を次に示します。
CustomSensor03
と SensorMetadata
のどちらも 配列型です。
{
"DeviceId" : "12345",
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"CustomSensor03": [12,-5,0]
},
"SensorMetadata":[
{
"smKey":"Manufacturer",
"smValue":"ABC"
},
{
"smKey":"Version",
"smValue":"1.2.45"
}
]
}
特定の配列要素を操作する
指定したインデックス位置にある配列の要素を選択します (配列の最初の要素を選択します)。
SELECT
GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input
結果は次のとおりです。
最初の要素 |
---|
12 |
配列の長さを選択する
SELECT
GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input
結果は次のとおりです。
配列の長さ |
---|
3 |
配列要素を個々のイベントに変換する
配列のすべての要素を個々のイベントとして選択します。 APPLY 演算子が GetArrayElements 組み込み関数と組み合わされて、配列のすべての要素を個々のイベントとして抽出します。
SELECT
DeviceId,
CustomSensor03Record.ArrayIndex,
CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record
結果は次のとおりです。
deviceId | アレイインデックス | 配列値 |
---|---|---|
12345 | 0 | 12 |
12345 | 1 | -5 |
12345 | 2 | 0 |
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
結果は次のとおりです。
deviceId | smKey | smValue |
---|---|---|
12345 | 生産者 | 甲乙丙 |
12345 | バージョン | 1.2.45 |
抽出されたフィールドを列に表示する必要がある場合は、JOIN 操作に加えて、WITH 構文を使用してデータセットをピボットすることができます。 この結合を行うには、重複を防ぐ時間的境界の条件が必要です。
WITH DynamicCTE AS (
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
)
SELECT
i.DeviceId,
i.Location.*,
V.smValue AS 'smVersion',
M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0
結果は次のとおりです。
deviceId | Lat | 長い | smVersion | smManufacturer |
---|---|---|---|---|
12345 | 47 | 122 | 1.2.45 | 甲乙丙 |