Analizowanie danych JSON i Avro w usłudze Azure Stream Analytics

Usługa Azure Stream Analytics obsługuje przetwarzanie zdarzeń w formatach danych CSV, JSON i Avro. Zarówno dane JSON, jak i Avro mogą być ustrukturyzowane i zawierają niektóre złożone typy, takie jak zagnieżdżone obiekty (rekordy) i tablice.

Uwaga

Pliki AVRO utworzone przez funkcję przechwytywania centrum zdarzeń używają określonego formatu, który wymaga użycia niestandardowej funkcji deserializatora . Aby uzyskać więcej informacji, zobacz Odczyt danych wejściowych w dowolnym formacie przy użyciu niestandardowych deserializacji platformy .NET.

Rejestrowanie typów danych

Typy danych rekordów są używane do reprezentowania tablic JSON i Avro, gdy odpowiednie formaty są używane w strumieniach danych wejściowych. W tych przykładach pokazano przykładowy czujnik, który odczytuje zdarzenia wejściowe w formacie JSON. Oto przykład pojedynczego zdarzenia:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

Uzyskiwanie dostępu do zagnieżdżonych pól w znanym schemacie

Użyj notacji kropkowej (.), aby łatwo uzyskać dostęp do zagnieżdżonych pól bezpośrednio z zapytania. Na przykład to zapytanie wybiera współrzędne szerokości i długości geograficznej w ramach właściwości Location w poprzednich danych JSON. Notacja kropkowa może służyć do nawigowania po wielu poziomach, jak pokazano poniżej.

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

Wynik to:

Deviceid Lat Długi Temperatura Wersja
12345 47 122 80 1.2.45

Wybierz wszystkie właściwości

Możesz wybrać wszystkie właściwości zagnieżdżonego rekordu przy użyciu symbolu wieloznacznych "*". Rozpatrzmy następujący przykład:

SELECT
    DeviceID,
    Location.*
FROM input

Wynik to:

Deviceid Lat Długi
12345 47 122

Uzyskiwanie dostępu do zagnieżdżonych pól, gdy nazwa właściwości jest zmienną

Użyj funkcji GetRecordPropertyValue , jeśli nazwa właściwości jest zmienną. Umożliwia to tworzenie zapytań dynamicznych bez twardych nazw właściwości.

Załóżmy na przykład, że przykładowy strumień danych musi być połączony z danymi referencyjnymi zawierającymi progi dla każdego czujnika urządzenia. Poniżej przedstawiono fragment kodu z takimi danymi referencyjnymi.

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

Celem jest dołączenie naszego przykładowego zestawu danych z góry artykułu do tych danych referencyjnych i wyprowadzenie jednego zdarzenia dla każdej miary czujnika powyżej progu. Oznacza to, że nasze pojedyncze zdarzenie powyżej może wygenerować wiele zdarzeń wyjściowych, jeśli wiele czujników przekracza odpowiednie progi, dzięki sprzężeniu. Aby uzyskać podobne wyniki bez sprzężenia, zobacz sekcję poniżej.

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

Polecenie GetRecordPropertyValue wybiera właściwość SensorReadings, która odpowiada nazwie właściwości pochodzącej z danych referencyjnych. Następnie wyodrębniona jest skojarzona wartość z sensorReadings .

Wynik to:

Deviceid SensorName AlertMessage
12345 Wilgotność Alert: Czujnik powyżej progu

Konwertowanie pól rekordów na oddzielne zdarzenia

Aby przekonwertować pola rekordów na oddzielne zdarzenia, użyj operatora APPLY razem z funkcją GetRecordProperties .

W przypadku oryginalnych przykładowych danych poniższe zapytanie może służyć do wyodrębniania właściwości do różnych zdarzeń.

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

Wynik to:

Deviceid SensorName AlertMessage
12345 Temperatura 80
12345 Wilgotność 70
12345 CustomSensor01 5
12345 CustomSensor02 99
12345 SensorMetadata [object Object]

Za pomocą funkcji WITH możliwe jest kierowanie tych zdarzeń do różnych miejsc docelowych:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

Analizowanie rekordu JSON w danych referencyjnych SQL

W przypadku używania Azure SQL Database jako danych referencyjnych w zadaniu można mieć kolumnę zawierającą dane w formacie JSON. Przykład przedstawiono poniżej.

Deviceid Dane
12345 {"key": "value1"}
54321 {"key": "value2"}

Rekord JSON można przeanalizować w kolumnie Dane , pisząc prostą funkcję zdefiniowaną przez użytkownika w języku JavaScript.

function parseJson(string) {
return JSON.parse(string);
}

Następnie możesz utworzyć krok w zapytaniu usługi Stream Analytics, jak pokazano poniżej, aby uzyskać dostęp do pól rekordów JSON.

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

Typy danych tablicy

Typy danych tablicy to uporządkowana kolekcja wartości. Poniżej przedstawiono niektóre typowe operacje dotyczące wartości tablicy. W tych przykładach użyto funkcji GetArrayElement, GetArrayElements, GetArrayLength i operatora APPLY.

Oto przykład zdarzenia. Zarówno, jak CustomSensor03 i SensorMetadata są typu tablica:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

Praca z określonym elementem tablicy

Wybierz element tablicy w określonym indeksie (wybierając pierwszy element tablicy):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

Wynik to:

firstElement
12

Wybieranie długości tablicy

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

Wynik to:

arrayLength
3

Konwertowanie elementów tablicy na oddzielne zdarzenia

Wybierz wszystkie elementy tablicy jako pojedyncze zdarzenia. Operator APPLY wraz z wbudowaną funkcją GetArrayElements wyodrębnia wszystkie elementy tablicy jako pojedyncze zdarzenia:

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

Wynik to:

DeviceId Arrayindex Arrayvalue
12345 0 12
12345 1 -5
12345 2 0
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

Wynik to:

DeviceId smKey smValue
12345 Producent ABC
12345 Wersja 1.2.45

Jeśli wyodrębnione pola muszą być wyświetlane w kolumnach, można przestawić zestaw danych przy użyciu składni WITH oprócz operacji JOIN . To sprzężenia wymaga warunku granicy czasu , który uniemożliwia duplikowanie:

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

Wynik to:

DeviceId Lat Długi smVersion smManufacturer
12345 47 122 1.2.45 ABC

Zobacz też

Typy danych w usłudze Azure Stream Analytics