Deli putem


Use last known values in a data processor pipeline

Important

Azure IoT Operations Preview – enabled by Azure Arc is currently in PREVIEW. You shouldn't use this preview software in production environments.

You will need to deploy a new Azure IoT Operations installation when a generally available release is made available, you won't be able to upgrade a preview installation.

See the Supplemental Terms of Use for Microsoft Azure Previews for legal terms that apply to Azure features that are in beta, preview, or otherwise not yet released into general availability.

Use the last known value (LKV) stage in a data processor pipeline to maintain an up-to-date and complete record of your data. The LKV stage tracks the latest values of key-value pairs for messages in the pipeline. The stage can then enrich messages by using the tracked LKV values. Last known value tracking and enrichment are important for downstream processes that rely on:

  • Multiple time-series data points at a specific timestamp.
  • Payloads that always have a value for a particular key.

In a data processing pipeline, the LKV stage is an optional stage. When you use the LKV stage, you can:

  • Add multiple LKV stages to a pipeline. Each LKV stage can track multiple values.
  • Enrich messages with the stored LKV values, ensuring the data remains complete and comprehensive.
  • Keep LKVs updated automatically with the latest values from the incoming messages.
  • Track LKVs separately for each logical partition. The LKV stage operates independently in each logical partition.
  • Configure the expiration time for each tracked LKV to manage the duration for it remains valid. This control helps to ensure that messages aren't enriched with stale values.

The LKV stage maintains chronological data integrity. The stage ensures that messages with earlier timestamps don't override or replace LKVs with messages that have later timestamps.

The LKV stage enriches incoming messages with the last known values it tracks. These enriched values represent previously recorded data, and aren't necessarily the current real-time values. Be sure that this behavior aligns with your data processing expectations.

Prerequisites

To configure and use an aggregate pipeline stage, you need a deployed instance of the data processor that includes the optional data processor component.

Configure the stage

The LKV stage JSON configuration defines the details of the stage. To author the stage, you can either interact with the form-based UI, or provide the JSON configuration on the Advanced tab:

Field Description Required Default Example
Name User defined name for the stage. Yes - lkv1
Description User defined description for the stage. No - lkv1
Properties > Input path The path of the key to track. Yes - .payload.temperature
Properties > Output path The path to the location in the output message to write the LKV. Yes - .payload.temperature_lkv
Properties > Expiration time Tracked LKVs are valid only for user-defined time interval, after which the output message isn't enriched with the stored value. Expiration is tracked for each LKV key. No - 10h
Properties > Timestamp Path The path to the location in the output message to write the timestamp of when the LKV was last updated. No False -

If you include the timestamp path, it helps you to understand precisely when the LKVs were recorded and enhances transparency and traceability.

inputPath equals outputPath

The outgoing message is either the actual message value, or LKV if the tracked key is missing from the message payload. Any incoming value takes priority and the stage doesn't override it with an LKV. To identify whether the message value is an LKV value, use the timestamp path. The timestamp path is only included in the outgoing message if the value in the message is the tracked LKV.

inputPath isn't equal to outputPath

The stage writes the LKV to the outputPath for all the incoming messages. Use this configuration to track the difference between values in subsequent message payloads.

Sample configuration

The following example shows a sample message for the LKV stage with the message arriving at 10:02 and with a payload that contains the tracked .payload.temperature LKV value:

{ 
  { 
    "systemProperties":{ 
        "partitionKey":"pump", 
        "partitionId":5, 
        "timestamp":"2023-01-11T10:02:07Z" 
    }, 
    "qos":1, 
    "topic":"/assets/pump/#" 
    }, 
    "payload":{ 
        "humidity": 10, 
        "temperature":250, 
        "pressure":30, 
        "runningState": true 
    } 
} 

LKV configuration:

Field Value
Input Path* .payload.temperature
Output Path .payload.lkvtemperature
Expiration time 10h
Timestamp Path .payload.lkvtemperature_timestamp

The tracked LKV values are:

  • .payload.temperature is 250.
  • Timestamp of the LKV is 2023-01-11T10:02:07Z

For a message that arrives at 11:05 with a payload that doesn't have the temperature property, the LKV stage enriches the message with the tracked values:

Example input to LKV stage at 11:05:

{ 
    "systemProperties":{ 
        "partitionKey":"pump", 
        "partitionId":5, 
        "timestamp":"2023-01-11T11:05:00Z" 
    }, 
    "qos":1, 
    "topic":"/assets/pump/#" 
    }, 
    "payload":{ 
        "runningState": true 
    } 
} 

Example output from LKV stage at 11:05:

{ 
    "systemProperties":{ 
        "partitionKey":"pump", 
        "partitionId":5, 
        "timestamp":"2023-01-11T11:05:00Z" 
    }, 
    "qos":1, 
    "topic":"/assets/pump/#" 
    }, 
    "payload":{ 
        "lkvtemperature":250, 
        "lkvtemperature_timestamp"":"2023-01-11T10:02:07Z" 
        "runningState": true 
    } 
}