Configure an MQ source stage in an Azure IoT Data Processor Preview pipeline

Important

Azure IoT Operations Preview – enabled by Azure Arc is currently in PREVIEW. You shouldn't use this preview software in production environments.

See the Supplemental Terms of Use for Microsoft Azure Previews for legal terms that apply to Azure features that are in beta, preview, or otherwise not yet released into general availability.

The source stage is the first and required stage in an Azure IoT Data Processor Preview pipeline. The source stage gets data into the data processing pipeline and prepares it for further processing. The MQ source stage lets you subscribe to messages from an MQTT topic. In the source stage, you define connection details to the MQ source and establish a partitioning configuration based on your specific data processing requirements.

Prerequisites

  • A functioning instance of Data Processor is deployed.
  • An instance of the Azure IoT MQ Preview broker with all necessary raw data available is operational and reachable.

Configure the MQ source

To configure the MQ source:

  • Provide connection details to the MQ source. This configuration includes the type of the MQ source, the MQTT broker URL, the Quality of Service (QoS) level, the session type, and the topics to subscribe to.
  • Specify the authentication method. Currently limited to username/password-based authentication or service account token.

The following table describes the MQ source configuration parameters:

Field Description Required Default Example
Name A customer-visible name for the source stage. Required NA asset-1broker
Description A customer-visible description of the source stage. Optional NA brokerforasset-1
Broker The URL of the MQTT broker to connect to. Required NA tls://aio-mq-dmqtt-frontend:8883
Authentication The authentication method to connect to the broker. One of: None, Username/Password, and Service Account Token (SAT). Required Service Account Token (SAT) Service Account Token (SAT)
Username/Password > Username The username for the username/password authentication Yes NA myuser
Username/Password > Secret Reference to the password stored in Azure Key Vault. Yes NA AKV_USERNAME_PASSWORD
QoS QoS level for message delivery. Required 1 0
Clean session Set to FALSE for a persistent session. Required FALSE FALSE
Topic The topic to subscribe to for data acquisition. Required NA contoso/site1/asset1, contoso/site1/asset2

To learn more about secrets, see Manage secrets for your Azure IoT Operations Preview deployment.

Data Processor doesn't reorder out-of-order data coming from the MQTT broker. If the data is received out of order from the broker, it remains so in the pipeline.

Select data format

In a Data Processor pipeline, the format field in the source stage specifies how to deserialize the incoming data. By default, the Data Processor pipeline uses the raw format that means it doesn't convert the incoming data. To use many Data Processor features such as Filter or Enrich stages in a pipeline, you must deserialize your data in the input stage. You can choose to deserialize your incoming data from JSON, jsonStream, MessagePack, CBOR, CSV, or Protobuf formats into a Data Processor readable message in order to use the full Data Processor functionality.

The following tables describe the different deserialization configuration options:

Field Description Required Default Value
Data Format The type of the data format. Yes Raw Raw JSON jsonStream MessagePack CBOR CSV Protobuf

The Data Format field is mandatory and its value determines the other required fields.

To deserialize CSV messages, you also need to specify the following fields:

Field Description Required Value Example
Header Whether the CSV data includes a header line. Yes Yes No No
Name Name of the column in CSV Yes - temp, asset
Path The jq path in the message where the column information is added. No - The default jq path is the column name
Data Type The data type of the data in the column and how it's represented inside the Data Processor pipeline. No String, Float, Integer, Boolean, Bytes Default: String

To deserialize Protobuf messages, you also need to specify the following fields:

Field Description Required Value Example
Descriptor The base64-encoded descriptor for the protobuf definition. Yes - Zhf...
Message The name of the message type that's used to format the data. Yes - pipeline
Package The name of the package in the descriptor where the type is defined. Yes - schedulerv1

Note

Data Processor supports only one message type in each .proto file.

Configure partitioning

Partitioning in a pipeline divides the incoming data into separate partitions. Partitioning enables data parallelism in the pipeline, which can improve throughput and reduce latency. Partitioning strategies affect how the data is processed in the other stages of the pipeline. For example, the last known value stage and aggregate stage operate on each logical partition.

To partition your data, specify a partitioning strategy and the number of partitions to use:

Field Description Required Default Example
Partition type The type of partitioning to be used: Partition ID or Partition Key Required Key Key
Partition expression The jq expression to use on the incoming message to compute the partition ID or partition Key Required .topic .topic
Number of partitions The number of partitions in a Data Processor pipeline. Required 2 2

Data Processor adds additional metadata to the incoming message. See Data Processor message structure overview to understand how to correctly specify the partitioning expression that runs on the incoming message. By default, the partitioning expression is set to 0 with the Partition type as ID to send all the incoming data to a single partition.

For recommendations and to learn more, see What is partitioning?.

Sample configuration

The following shows an example configuration for the stage:

Parameter Value
Name input data
Broker tls://aio-mq-dmqtt-frontend:8883
Authentication Service Account Token (SAT)
Topic azure-iot-operations/data/opc-ua-connector-0/#
Data format JSON

This configuration then generates messages that look like the following example:

{
    "Timestamp": "2023-08-10T00:54:58.6572007Z", 
    "MessageType": "ua-deltaframe",
    "payload": {
      "temperature": {
        "SourceTimestamp": "2023-08-10T00:54:58.2543129Z",
        "Value": 7109
      },
      "Tag 10": {
        "SourceTimestamp": "2023-08-10T00:54:58.2543482Z",
        "Value": 7109
      }
    },
    "DataSetWriterName": "oven",
    "SequenceNumber": 4660
}