Configure an MQ source stage in a data processor pipeline
Important
Azure IoT Operations Preview – enabled by Azure Arc is currently in PREVIEW. You shouldn't use this preview software in production environments.
You will need to deploy a new Azure IoT Operations installation when a generally available release is made available, you won't be able to upgrade a preview installation.
See the Supplemental Terms of Use for Microsoft Azure Previews for legal terms that apply to Azure features that are in beta, preview, or otherwise not yet released into general availability.
The source stage is the first and required stage in a data processor pipeline. The source stage gets data into the data processing pipeline and prepares it for further processing. The MQ source stage lets you subscribe to messages from an MQTT topic. In the source stage, you define connection details to the MQ source and establish a partitioning configuration based on your specific data processing requirements.
Prerequisites
- A deployed instance of the data processor that includes the optional data processor component.
- An instance of the MQTT broker with all necessary raw data available is operational and reachable.
Configure the MQ source
To configure the MQ source:
- Provide connection details to the MQ source. This configuration includes the type of the MQ source, the MQTT broker URL, the Quality of Service (QoS) level, the session type, and the topics to subscribe to.
- Specify the authentication method. Currently limited to username/password-based authentication or service account token.
The following table describes the MQ source configuration parameters:
Field | Description | Required | Default | Example |
---|---|---|---|---|
Name | A customer-visible name for the source stage. | Required | NA | asset-1broker |
Description | A customer-visible description of the source stage. | Optional | NA | brokerforasset-1 |
Broker | The URL of the MQTT broker to connect to. | Required | NA | tls://aio-mq-dmqtt-frontend:8883 |
Authentication | The authentication method to connect to the broker. One of: None , Username/Password , and Service Account Token (SAT) . |
Required | Service Account Token (SAT) |
Service Account Token (SAT) |
Username/Password > Username | The username for the username/password authentication | Yes | NA | myuser |
Username/Password > Secret | Reference to the password stored in Azure Key Vault. | Yes | NA | AKV_USERNAME_PASSWORD |
QoS | QoS level for message delivery. | Required | 1 | 0 |
Clean session | Set to FALSE for a persistent session. |
Required | FALSE |
FALSE |
Topic | The topic to subscribe to for data acquisition. | Required | NA | contoso/site1/asset1 , contoso/site1/asset2 |
To learn more about secrets, see Manage secrets for your Azure IoT Operations Preview deployment.
The data processor doesn't reorder out-of-order data coming from the MQTT broker. If the data is received out of order from the broker, it remains so in the pipeline.
Select data format
In a data processor pipeline, the format field in the source stage specifies how to deserialize the incoming data. By default, the data processor pipeline uses the raw
format that means it doesn't convert the incoming data. To use many data processor features such as Filter
or Enrich
stages in a pipeline, you must deserialize your data in the input stage. You can choose to deserialize your incoming data from JSON
, jsonStream
, MessagePack
, CBOR
, CSV
, or Protobuf
formats into a data processor readable message in order to use the full data processor functionality.
The following tables describe the different deserialization configuration options:
Field | Description | Required | Default | Value |
---|---|---|---|---|
Data Format | The type of the data format. | Yes | Raw |
Raw JSON jsonStream MessagePack CBOR CSV Protobuf |
The Data Format
field is mandatory and its value determines the other required fields.
To deserialize CSV messages, you also need to specify the following fields:
Field | Description | Required | Value | Example |
---|---|---|---|---|
Header | Whether the CSV data includes a header line. | Yes | Yes No |
No |
Name | Name of the column in CSV | Yes | - | temp , asset |
Path | The jq path in the message where the column information is added. | No | - | The default jq path is the column name |
Data Type | The data type of the data in the column and how it's represented inside the data processor pipeline. | No | String , Float , Integer , Boolean , Bytes |
Default: String |
To deserialize Protobuf messages, you also need to specify the following fields:
Field | Description | Required | Value | Example |
---|---|---|---|---|
Descriptor | The base64-encoded descriptor for the protobuf definition. | Yes | - | Zhf... |
Message | The name of the message type that's used to format the data. | Yes | - | pipeline |
Package | The name of the package in the descriptor where the type is defined. | Yes | - | schedulerv1 |
Note
The data processor supports only one message type in each .proto file.
Configure partitioning
Partitioning in a pipeline divides the incoming data into separate partitions. Partitioning enables data parallelism in the pipeline, which can improve throughput and reduce latency. Partitioning strategies affect how the data is processed in the other stages of the pipeline. For example, the last known value stage and aggregate stage operate on each logical partition.
To partition your data, specify a partitioning strategy and the number of partitions to use:
Field | Description | Required | Default | Example |
---|---|---|---|---|
Partition type | The type of partitioning to be used: Partition ID or Partition Key |
Required | Key |
Key |
Partition expression | The jq expression to use on the incoming message to compute the partition ID or partition Key |
Required | .topic |
.topic |
Number of partitions | The number of partitions in a data processor pipeline. | Required | 2 |
2 |
The data processor adds additional metadata to the incoming message. See Data processor message structure overview to understand how to correctly specify the partitioning expression that runs on the incoming message. By default, the partitioning expression is set to 0
with the Partition type as ID
to send all the incoming data to a single partition.
For recommendations and to learn more, see What is partitioning?.
Sample configuration
The following shows an example configuration for the stage:
Parameter | Value |
---|---|
Name | input data |
Broker | tls://aio-mq-dmqtt-frontend:8883 |
Authentication | Service Account Token (SAT) |
Topic | azure-iot-operations/data/opc-ua-connector-0/# |
Data format | JSON |
This configuration then generates messages that look like the following example:
{
"Timestamp": "2023-08-10T00:54:58.6572007Z",
"MessageType": "ua-deltaframe",
"payload": {
"temperature": {
"SourceTimestamp": "2023-08-10T00:54:58.2543129Z",
"Value": 7109
},
"Tag 10": {
"SourceTimestamp": "2023-08-10T00:54:58.2543482Z",
"Value": 7109
}
},
"DataSetWriterName": "oven",
"SequenceNumber": 4660
}