Compartir vía


Adición de datos en una canalización de versión preliminar del procesador de datos de Azure IoT

Importante

Operaciones de IoT de Azure, habilitado por Azure Arc, está actualmente en VERSIÓN PRELIMINAR. No se debería usar este software en versión preliminar en entornos de producción.

Consulte Términos de uso complementarios para las versiones preliminares de Microsoft Azure para conocer los términos legales que se aplican a las características de Azure que se encuentran en la versión beta, en versión preliminar o que todavía no se han publicado para que estén disponibles con carácter general.

La fase de agregado es una fase de canalización opcional, configurable e intermedia que permite ejecutar operaciones de muestreo y procesamiento por lotes en los datos del sensor de streaming en función de los períodos de tiempo definidos por el usuario.

Use una fase de agregado para acumular mensajes a través de un período definido y calcule los valores de agregado de las propiedades de los mensajes. La fase emite los valores agregados como propiedades en un único mensaje al final de cada período de tiempo.

  • Cada partición de canalización lleva a cabo un proceso de agregación independiente.
  • La salida de la fase es un único mensaje que contiene todas las propiedades de agregado definidas.
  • La fase quita todas las demás propiedades. Sin embargo, puede usar Last, First, o Collect(funciones) para conservar las propiedades que, de lo contrario, se quitarían en la fase durante la agregación.
  • Para que la fase de agregado funcione, la fase de origen de datos de la canalización debe deserializar el mensaje entrante.

Requisitos previos

Para configurar y usar una fase de canalización de agregado, necesita una instancia implementada de Azure IoT Data Processor Preview que incluya el componente opcional del procesador de datos.

Configuración de la fase

La configuración JSON de la fase de agregado define sus detalles. Para crear la fase, puede interactuar con la interfaz de usuario basada en formularios o proporcionar la configuración JSON en la pestaña Opciones avanzadas:

Campo Tipo Descripción Necesario Valor predeterminado Ejemplo
Nombre String Un nombre para mostrar en la interfaz de usuario del procesador de datos. - Calculate Aggregate
Descripción String Una descripción sencilla de lo que hace la fase de agregado. No Aggregation over temperature
Período de tiempo Duración que especifica el período durante el que se ejecuta la agregación. - 10s
Propiedades y función Enum La función de agregado que se debe usar. - Sum
Propiedades > InputPath1 Path Ruta de acceso a la propiedad del mensaje entrante al que se va a aplicar la función. - .payload.temperature
Propiedades > OutputPath2 Path La ruta de acceso a la ubicación del mensaje saliente para colocar el resultado. - .payload.temperature.average

Puede definir varias configuraciones de Propiedades en una fase de agregado. Por ejemplo, calcule la suma de la temperatura y calcule el promedio de presión.

1ruta de acceso de entrada:

  • El tipo de datos del valor de la propiedad de ruta de acceso de entrada debe ser compatible con el tipo de función definida.
  • Puede proporcionar la misma ruta de acceso de entrada en varias configuraciones de agregación para calcular varias funciones a través de la misma propiedad de ruta de acceso de entrada. Asegúrese de que las rutas de acceso de salida son diferentes para evitar sobrescribir los resultados.

2ruta de acceso de salida:

  • Las rutas de acceso de salida pueden ser iguales o diferentes de la ruta de acceso de entrada. Use diferentes rutas de acceso de salida si va a calcular varias agregaciones en la misma propiedad de ruta de acceso de entrada.
  • Configure rutas de acceso de salida distintas para evitar sobrescribir valores agregados.

Windows

El período es el intervalo de tiempo en el que la fase acumula mensajes. Al final del período, la fase aplica la función configurada a las propiedades del mensaje. A continuación, la fase emite un único mensaje.

Actualmente, la fase solo admite una ventana de saltos de tamaño constante.

Las ventanas de saltos de tamaño constante representan una serie de intervalos de tiempo fijos, no superpuestos y consecutivos. La ventana se inicia y termina en puntos fijos en el tiempo:

Diagrama que muestra 10 segundos de ventanas de saltos de tamaño constante en la fase de agregado.

El tamaño de la ventana define el intervalo de tiempo en el que la fase acumula los mensajes. El tamaño de la ventana se define mediante el patrón común Duración.

Functions

La fase de agregado admite las siguientes funciones para calcular los valores agregados a través de la propiedad de mensaje definida en la ruta de acceso de entrada:

Función Descripción
Sum Calcula la suma de los valores de la propiedad en los mensajes de entrada.
Average Calcula el promedio de los valores de la propiedad en los mensajes de entrada.
Recuento Cuenta el número de veces que la propiedad aparece en la ventana.
Mín. Calcula el valor mínimo de los valores de la propiedad en los mensajes de entrada.
Máx. Calcula el valor máximo de los valores de la propiedad en los mensajes de entrada.
Último Devuelve el valor más reciente de los valores de la propiedad en los mensajes de entrada.
First Devuelve el primer valor de los valores de la propiedad en los mensajes de entrada.
Collect Devuelve todos los valores de la propiedad en los mensajes de entrada.

En la tabla siguiente se enumeran los tipos de datos de mensaje compatibles con cada función:

Función Entero Float String Datetime Matriz Object Binario
Sum
Average
Recuento
Mín.
Max
Último
First
Collect

Configuración de ejemplo

En el ejemplo JSON siguiente, se muestra una configuración de fase de agregado completa:

{ 
    "displayName":"downSample", 
    "description":"Calculate average for production tags", 
    "window": 
    { 
        "type":"tumbling", 
        "size":"10s" 
    }, 
    "properties": 
    [ 
        { 
            "function":"average", 
            "inputPath": ".payload.temperature", 
            "outputPath":".payload.temperature_avg" 
        }, 
        {  
            "function":"collect",  
            "inputPath": ".payload.temperature", 
            "outputPath":".payload.temperature_all"  
        },  
        {  
            "function":"average",  
            "inputPath":".payload.pressure", 
            "outputPath":".payload.pressure"                  
        },  
        {  
            "function":"last",  
            "inputPath":".systemProperties", 
            "outputPath": ".systemProperties" 
        } 
    ] 
}

La configuración define una fase de agregado que calcula, en una ventana de diez segundos:

  • Temperatura media
  • Suma de temperatura
  • Suma de presión

Ejemplo

En este ejemplo, se incluyen dos mensajes de entrada de ejemplo y un mensaje de salida de ejemplo generado mediante la configuración anterior:

Mensaje de entrada 1:

{ 
    "systemProperties":{ 
        "partitionKey":"foo", 
        "partitionId":5, 
        "timestamp":"2023-01-11T10:02:07Z" 
    }, 
    "qos":1, 
    "topic":"/assets/foo/tags/bar", 
    "properties":{ 
        "responseTopic":"outputs/foo/tags/bar", 
        "contentType": "application/json" 
    }, 
    "payload":{ 
        "humidity": 10, 
        "temperature":250, 
        "pressure":30, 
        "runningState": true 
    } 
} 

Mensaje de entrada 2:

{ 
    "systemProperties":{ 
        "partitionKey":"foo", 
        "partitionId":5, 
        "timestamp":"2023-01-11T10:02:07Z" 
    }, 
    "qos":1, 
    "topic":"/assets/foo/tags/bar", 
    "properties":{ 
        "responseTopic":"outputs/foo/tags/bar", 
        "contentType": "application/json" 
    }, 
    "payload":{ 
        "humidity": 11, 
        "temperature":235, 
        "pressure":25, 
        "runningState": true 
    } 
} 

Mensaje de salida:

{ 
    "systemProperties":{  
        "partitionKey":"foo",  
        "partitionId":5,  
        "timestamp":"2023-01-11T10:02:07Z"  
    }, 
    "payload":{ 
        "temperature_avg":242.5, 
        "temperature_all":[250,235], 
        "pressure":27.5 
    } 
}