Agregowanie danych w potoku procesora danych
Ważne
Usługa Azure IoT Operations Preview — włączona przez usługę Azure Arc jest obecnie dostępna w wersji zapoznawczej. Nie należy używać tego oprogramowania w wersji zapoznawczej w środowiskach produkcyjnych.
Po udostępnieniu ogólnie dostępnej wersji należy wdrożyć nową instalację operacji usługi Azure IoT. Nie będzie można uaktualnić instalacji w wersji zapoznawczej.
Zobacz Dodatkowe warunki użytkowania wersji zapoznawczych platformy Microsoft Azure, aby zapoznać się z postanowieniami prawnymi dotyczącymi funkcji platformy Azure, które są w wersji beta lub wersji zapoznawczej albo w inny sposób nie zostały jeszcze wydane jako ogólnie dostępne.
Etap agregacji jest opcjonalnym, konfigurowalnym, pośrednim etapem potoku, który umożliwia uruchamianie operacji próbkowania w dół i przetwarzania wsadowego na danych czujnika przesyłania strumieniowego w oknach czasu zdefiniowanych przez użytkownika.
Użyj etapu agregacji, aby zgromadzić komunikaty w zdefiniowanym oknie i obliczyć wartości agregacji z właściwości w komunikatach. Etap emituje zagregowane wartości jako właściwości w jednym komunikacie na końcu każdego przedziału czasu.
- Każda partycja potoku przeprowadza agregację niezależnie od siebie.
- Dane wyjściowe etapu to pojedynczy komunikat zawierający wszystkie zdefiniowane właściwości agregujące.
- Etap pomija wszystkie inne właściwości. Można jednak użyć funkcji Last, First lub Collect, aby zachować właściwości, które w przeciwnym razie zostaną porzucone przez etap podczas agregacji.
- Aby etap agregacji działał, etap źródła danych w potoku powinien deserializować komunikat przychodzący.
Wymagania wstępne
Aby skonfigurować i użyć zagregowanego etapu potoku, potrzebne jest wdrożone wystąpienie procesora danych, które zawiera opcjonalny składnik procesora danych.
Konfigurowanie etapu
Konfiguracja JSON etapu agregacji definiuje szczegóły etapu. Aby utworzyć etap, możesz wchodzić w interakcje z interfejsem użytkownika opartym na formularzach lub podać konfigurację JSON na karcie Zaawansowane :
Pole | Typ | Opis | Wymagani | Wartość domyślna | Przykład |
---|---|---|---|---|---|
Nazwisko | String | Nazwa wyświetlana w interfejsie użytkownika procesora danych. | Tak | - | Calculate Aggregate |
opis | String | Przyjazny dla użytkownika opis tego, co robi etap agregacji. | Nie. | Aggregation over temperature |
|
Przedział czasu | Czas trwania określający okres, w którym jest uruchamiana agregacja. | Tak | - | 10s |
|
Funkcja właściwości > | Wyliczenie | Funkcja agregacji do użycia. | Tak | - | Sum |
Właściwości > InputPath1 | Ścieżka | Ścieżka do właściwości w komunikacie przychodzącym w celu zastosowania funkcji. | Tak | - | .payload.temperature |
Właściwości > OutputPath2 | Ścieżka | Ścieżka do lokalizacji w komunikacie wychodzącym, aby umieścić wynik. | Tak | - | .payload.temperature.average |
W jednym zagregowanym etapie można zdefiniować wiele konfiguracji właściwości . Na przykład oblicz sumę temperatury i oblicz średnią ciśnienia.
1Ścieżka wejściowa:
- Typ danych wartości właściwości ścieżki wejściowej musi być zgodny z typem zdefiniowanej funkcji .
- Tę samą ścieżkę wejściową można podać w wielu konfiguracjach agregacji, aby obliczyć wiele funkcji na tej samej właściwości ścieżki wejściowej. Upewnij się, że ścieżki wyjściowe są inne, aby uniknąć zastępowania wyników.
2Ścieżka wyjściowa:
- Ścieżki wyjściowe mogą być takie same jak lub inne niż ścieżka wejściowa. Użyj różnych ścieżek wyjściowych, jeśli obliczasz wiele agregacji na tej samej właściwości ścieżki wejściowej.
- Skonfiguruj odrębne ścieżki wyjściowe, aby uniknąć zastępowania wartości agregujących.
Windows
Okno to przedział czasu, w którym etap gromadzi komunikaty. Na końcu okna etap stosuje skonfigurowaną funkcję do właściwości komunikatu. Następnie etap emituje jeden komunikat.
Obecnie etap obsługuje tylko okna wirowania .
Okna wirowania to seria stałych rozmiarów, nieprzezwalających się i kolejnych interwałów czasowych. Okno rozpoczyna się i kończy się w stałych punktach w czasie:
Rozmiar okna definiuje przedział czasu, w którym etap gromadzi komunikaty. Rozmiar okna można zdefiniować przy użyciu wspólnego wzorca Czasu trwania .
Funkcje
Etap agregacji obsługuje następujące funkcje, aby obliczyć wartości agregujące dla właściwości komunikatu zdefiniowanej w ścieżce wejściowej:
Function | opis |
---|---|
Sum | Oblicza sumę wartości właściwości w komunikatach wejściowych. |
Średnia | Oblicza średnią wartości właściwości w komunikatach wejściowych. |
Count | Zlicza liczbę wyświetleń właściwości w oknie. |
Minimum | Oblicza minimalną wartość wartości właściwości w komunikatach wejściowych. |
Maksimum | Oblicza maksymalną wartość wartości właściwości w komunikatach wejściowych. |
Last | Zwraca najnowszą wartość wartości właściwości w komunikatach wejściowych. |
First | Zwraca pierwszą wartość wartości właściwości w komunikatach wejściowych. |
Collect | Zwraca wszystkie wartości właściwości w komunikatach wejściowych. |
W poniższej tabeli wymieniono typy danych komunikatów obsługiwane przez każdą funkcję:
Function | Integer | Liczba zmiennoprzecinkowa | String | Datetime | Tablica | Objekt | Plik binarny |
---|---|---|---|---|---|---|---|
Sum | ✅ | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ |
Średnia | ✅ | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ |
Licznik | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
Min | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ |
Max | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ |
Last | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
First | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
Collect | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
Przykładowa konfiguracja
W poniższym przykładzie JSON przedstawiono pełną konfigurację etapu agregacji:
{
"displayName":"downSample",
"description":"Calculate average for production tags",
"window":
{
"type":"tumbling",
"size":"10s"
},
"properties":
[
{
"function":"average",
"inputPath": ".payload.temperature",
"outputPath":".payload.temperature_avg"
},
{
"function":"collect",
"inputPath": ".payload.temperature",
"outputPath":".payload.temperature_all"
},
{
"function":"average",
"inputPath":".payload.pressure",
"outputPath":".payload.pressure"
},
{
"function":"last",
"inputPath":".systemProperties",
"outputPath": ".systemProperties"
}
]
}
Konfiguracja definiuje etap agregujący, który oblicza dziesięć sekund:
- Średnia temperatura
- Suma temperatury
- Suma ciśnienia
Przykład
Ten przykład obejmuje dwa przykładowe komunikaty wejściowe i przykładowy komunikat wyjściowy wygenerowany przy użyciu poprzedniej konfiguracji:
Komunikat wejściowy 1:
{
"systemProperties":{
"partitionKey":"foo",
"partitionId":5,
"timestamp":"2023-01-11T10:02:07Z"
},
"qos":1,
"topic":"/assets/foo/tags/bar",
"properties":{
"responseTopic":"outputs/foo/tags/bar",
"contentType": "application/json"
},
"payload":{
"humidity": 10,
"temperature":250,
"pressure":30,
"runningState": true
}
}
Komunikat wejściowy 2:
{
"systemProperties":{
"partitionKey":"foo",
"partitionId":5,
"timestamp":"2023-01-11T10:02:07Z"
},
"qos":1,
"topic":"/assets/foo/tags/bar",
"properties":{
"responseTopic":"outputs/foo/tags/bar",
"contentType": "application/json"
},
"payload":{
"humidity": 11,
"temperature":235,
"pressure":25,
"runningState": true
}
}
Komunikat wyjściowy:
{
"systemProperties":{
"partitionKey":"foo",
"partitionId":5,
"timestamp":"2023-01-11T10:02:07Z"
},
"payload":{
"temperature_avg":242.5,
"temperature_all":[250,235],
"pressure":27.5
}
}