Udostępnij za pośrednictwem


Agregowanie danych w potoku procesora danych

Ważne

Usługa Azure IoT Operations Preview — włączona przez usługę Azure Arc jest obecnie dostępna w wersji zapoznawczej. Nie należy używać tego oprogramowania w wersji zapoznawczej w środowiskach produkcyjnych.

Po udostępnieniu ogólnie dostępnej wersji należy wdrożyć nową instalację operacji usługi Azure IoT. Nie będzie można uaktualnić instalacji w wersji zapoznawczej.

Zobacz Dodatkowe warunki użytkowania wersji zapoznawczych platformy Microsoft Azure, aby zapoznać się z postanowieniami prawnymi dotyczącymi funkcji platformy Azure, które są w wersji beta lub wersji zapoznawczej albo w inny sposób nie zostały jeszcze wydane jako ogólnie dostępne.

Etap agregacji jest opcjonalnym, konfigurowalnym, pośrednim etapem potoku, który umożliwia uruchamianie operacji próbkowania w dół i przetwarzania wsadowego na danych czujnika przesyłania strumieniowego w oknach czasu zdefiniowanych przez użytkownika.

Użyj etapu agregacji, aby zgromadzić komunikaty w zdefiniowanym oknie i obliczyć wartości agregacji z właściwości w komunikatach. Etap emituje zagregowane wartości jako właściwości w jednym komunikacie na końcu każdego przedziału czasu.

  • Każda partycja potoku przeprowadza agregację niezależnie od siebie.
  • Dane wyjściowe etapu to pojedynczy komunikat zawierający wszystkie zdefiniowane właściwości agregujące.
  • Etap pomija wszystkie inne właściwości. Można jednak użyć funkcji Last, First lub Collect, aby zachować właściwości, które w przeciwnym razie zostaną porzucone przez etap podczas agregacji.
  • Aby etap agregacji działał, etap źródła danych w potoku powinien deserializować komunikat przychodzący.

Wymagania wstępne

Aby skonfigurować i użyć zagregowanego etapu potoku, potrzebne jest wdrożone wystąpienie procesora danych, które zawiera opcjonalny składnik procesora danych.

Konfigurowanie etapu

Konfiguracja JSON etapu agregacji definiuje szczegóły etapu. Aby utworzyć etap, możesz wchodzić w interakcje z interfejsem użytkownika opartym na formularzach lub podać konfigurację JSON na karcie Zaawansowane :

Pole Typ Opis Wymagani Wartość domyślna Przykład
Nazwisko String Nazwa wyświetlana w interfejsie użytkownika procesora danych. Tak - Calculate Aggregate
opis String Przyjazny dla użytkownika opis tego, co robi etap agregacji. Nie. Aggregation over temperature
Przedział czasu Czas trwania określający okres, w którym jest uruchamiana agregacja. Tak - 10s
Funkcja właściwości > Wyliczenie Funkcja agregacji do użycia. Tak - Sum
Właściwości > InputPath1 Ścieżka Ścieżka do właściwości w komunikacie przychodzącym w celu zastosowania funkcji. Tak - .payload.temperature
Właściwości > OutputPath2 Ścieżka Ścieżka do lokalizacji w komunikacie wychodzącym, aby umieścić wynik. Tak - .payload.temperature.average

W jednym zagregowanym etapie można zdefiniować wiele konfiguracji właściwości . Na przykład oblicz sumę temperatury i oblicz średnią ciśnienia.

1Ścieżka wejściowa:

  • Typ danych wartości właściwości ścieżki wejściowej musi być zgodny z typem zdefiniowanej funkcji .
  • Tę samą ścieżkę wejściową można podać w wielu konfiguracjach agregacji, aby obliczyć wiele funkcji na tej samej właściwości ścieżki wejściowej. Upewnij się, że ścieżki wyjściowe są inne, aby uniknąć zastępowania wyników.

2Ścieżka wyjściowa:

  • Ścieżki wyjściowe mogą być takie same jak lub inne niż ścieżka wejściowa. Użyj różnych ścieżek wyjściowych, jeśli obliczasz wiele agregacji na tej samej właściwości ścieżki wejściowej.
  • Skonfiguruj odrębne ścieżki wyjściowe, aby uniknąć zastępowania wartości agregujących.

Windows

Okno to przedział czasu, w którym etap gromadzi komunikaty. Na końcu okna etap stosuje skonfigurowaną funkcję do właściwości komunikatu. Następnie etap emituje jeden komunikat.

Obecnie etap obsługuje tylko okna wirowania .

Okna wirowania to seria stałych rozmiarów, nieprzezwalających się i kolejnych interwałów czasowych. Okno rozpoczyna się i kończy się w stałych punktach w czasie:

Diagram przedstawiający 10-sekundowe okna wirowania na etapie agregacji.

Rozmiar okna definiuje przedział czasu, w którym etap gromadzi komunikaty. Rozmiar okna można zdefiniować przy użyciu wspólnego wzorca Czasu trwania .

Funkcje

Etap agregacji obsługuje następujące funkcje, aby obliczyć wartości agregujące dla właściwości komunikatu zdefiniowanej w ścieżce wejściowej:

Function opis
Sum Oblicza sumę wartości właściwości w komunikatach wejściowych.
Średnia Oblicza średnią wartości właściwości w komunikatach wejściowych.
Count Zlicza liczbę wyświetleń właściwości w oknie.
Minimum Oblicza minimalną wartość wartości właściwości w komunikatach wejściowych.
Maksimum Oblicza maksymalną wartość wartości właściwości w komunikatach wejściowych.
Last Zwraca najnowszą wartość wartości właściwości w komunikatach wejściowych.
First Zwraca pierwszą wartość wartości właściwości w komunikatach wejściowych.
Collect Zwraca wszystkie wartości właściwości w komunikatach wejściowych.

W poniższej tabeli wymieniono typy danych komunikatów obsługiwane przez każdą funkcję:

Function Integer Liczba zmiennoprzecinkowa String Datetime Tablica Objekt Plik binarny
Sum
Średnia
Licznik
Min
Max
Last
First
Collect

Przykładowa konfiguracja

W poniższym przykładzie JSON przedstawiono pełną konfigurację etapu agregacji:

{ 
    "displayName":"downSample", 
    "description":"Calculate average for production tags", 
    "window": 
    { 
        "type":"tumbling", 
        "size":"10s" 
    }, 
    "properties": 
    [ 
        { 
            "function":"average", 
            "inputPath": ".payload.temperature", 
            "outputPath":".payload.temperature_avg" 
        }, 
        {  
            "function":"collect",  
            "inputPath": ".payload.temperature", 
            "outputPath":".payload.temperature_all"  
        },  
        {  
            "function":"average",  
            "inputPath":".payload.pressure", 
            "outputPath":".payload.pressure"                  
        },  
        {  
            "function":"last",  
            "inputPath":".systemProperties", 
            "outputPath": ".systemProperties" 
        } 
    ] 
}

Konfiguracja definiuje etap agregujący, który oblicza dziesięć sekund:

  • Średnia temperatura
  • Suma temperatury
  • Suma ciśnienia

Przykład

Ten przykład obejmuje dwa przykładowe komunikaty wejściowe i przykładowy komunikat wyjściowy wygenerowany przy użyciu poprzedniej konfiguracji:

Komunikat wejściowy 1:

{ 
    "systemProperties":{ 
        "partitionKey":"foo", 
        "partitionId":5, 
        "timestamp":"2023-01-11T10:02:07Z" 
    }, 
    "qos":1, 
    "topic":"/assets/foo/tags/bar", 
    "properties":{ 
        "responseTopic":"outputs/foo/tags/bar", 
        "contentType": "application/json" 
    }, 
    "payload":{ 
        "humidity": 10, 
        "temperature":250, 
        "pressure":30, 
        "runningState": true 
    } 
} 

Komunikat wejściowy 2:

{ 
    "systemProperties":{ 
        "partitionKey":"foo", 
        "partitionId":5, 
        "timestamp":"2023-01-11T10:02:07Z" 
    }, 
    "qos":1, 
    "topic":"/assets/foo/tags/bar", 
    "properties":{ 
        "responseTopic":"outputs/foo/tags/bar", 
        "contentType": "application/json" 
    }, 
    "payload":{ 
        "humidity": 11, 
        "temperature":235, 
        "pressure":25, 
        "runningState": true 
    } 
} 

Komunikat wyjściowy:

{ 
    "systemProperties":{  
        "partitionKey":"foo",  
        "partitionId":5,  
        "timestamp":"2023-01-11T10:02:07Z"  
    }, 
    "payload":{ 
        "temperature_avg":242.5, 
        "temperature_all":[250,235], 
        "pressure":27.5 
    } 
}