Publish data to an MQTT broker using the data processor
Important
Azure IoT Operations Preview – enabled by Azure Arc is currently in PREVIEW. You shouldn't use this preview software in production environments.
You will need to deploy a new Azure IoT Operations installation when a generally available release is made available, you won't be able to upgrade a preview installation.
See the Supplemental Terms of Use for Microsoft Azure Previews for legal terms that apply to Azure features that are in beta, preview, or otherwise not yet released into general availability.
Use the MQ destination to publish processed messages to an MQTT broker, such as an MQTT broker instance, on the edge. The data processor connects to an MQTT broker by using MQTT v5.0. The destination publishes messages to the MQTT broker as the stage receives them. The MQ destination doesn't support batching.
Prerequisites
To configure and use a destination pipeline stage, you need a deployed instance of the data processor that includes the optional data processor component.
Configure the destination stage
The MQ destination stage JSON configuration defines the details of the stage. To author the stage, you can either interact with the form-based UI, or provide the JSON configuration on the Advanced tab:
Field | Type | Description | Required | Default | Example |
---|---|---|---|---|---|
Name | String | A name to show in the data processor UI. | Yes | - | MQTT broker output |
Description | String | A user-friendly description of what the stage does. | No | Write to topic default/topic1 |
|
Broker | String | The broker address. | Yes | - | mqtt://mqttEndpoint.cluster.local:1111 |
Authentication | String | The authentication details to connect to MQTT broker. None /Username/Password /Service account token (SAT) |
Yes | Service account token (SAT) |
Username/Password |
Username | String | The username to use when Authentication is set to Username/Password . |
No | - | myusername |
Password | String | The secret reference for the password to use when Authentication is set to Username/Password . |
No | - | mysecret |
Topic | Static/Dynamic | The topic definition. String if type is static, jq path if type is dynamic. | Yes | - | ".topic" |
Data Format1 | String | The format to serialize messages to. | Yes | - | Raw |
User properties | A list of key/value pairs | List of custom user properties to set on each MQTT message. Can include static information or data from each message. | No | [] |
| Retry | Retry | The retry policy to use. | No | default
| fixed
|
1Data format: Use the data processor's built-in serializer to serialize your messages to the following Formats before it publishes messages to the MQTT broker:
Raw
JSON
JSONStream
CSV
Protobuf
MessagePack
CBOR
Select Raw
when you don't require serialization. Raw sends the data to the MQTT broker in its current format.
Sample configuration
The following JSON example shows a complete MQ destination stage configuration that writes the entire message to the MQ pipelineOutput
topic:
{
"displayName": "MQ - 67e929",
"type": "output/mqtt@v1",
"viewOptions": {
"position": {
"x": 0,
"y": 992
}
},
"broker": "tls://aio-mq-dmqtt-frontend:8883",
"qos": 1,
"authentication": {
"type": "serviceAccountToken"
},
"topic": {
"type": "static",
"value": "pipelineOutput"
},
"format": {
"type": "json",
"path": "."
},
"userProperties": [],
"retry": {
"type": "fixed",
"interval": "20s",
"maxRetries": 4
}
}
The configuration defines that:
- Authentication is done by using service account token.
- The topic is a static string called
pipelineOutput
. - The output format is
JSON
. - The format path is
.
to ensure the entire data processor message is written to MQ. To write just the payload, change the path to ``.payload`.
Example
The following example shows a sample input message to the MQ destination stage:
{
"payload": {
"Batch": 102,
"CurrentTemperature": 7109,
"Customer": "Contoso",
"Equipment": "Boiler",
"IsSpare": true,
"LastKnownTemperature": 7109,
"Location": "Seattle",
"Pressure": 7109,
"Timestamp": "2023-08-10T00:54:58.6572007Z",
"assetName": "oven"
},
"qos": 0,
"systemProperties": {
"partitionId": 0,
"partitionKey": "quickstart",
"timestamp": "2023-11-06T23:42:51.004Z"
},
"topic": "quickstart"
}