Configure dataflows in Azure IoT Operations
Important
Azure IoT Operations Preview – enabled by Azure Arc is currently in preview. You shouldn't use this preview software in production environments.
You'll need to deploy a new Azure IoT Operations installation when a generally available release becomes available. You won't be able to upgrade a preview installation.
For legal terms that apply to Azure features that are in beta, in preview, or otherwise not yet released into general availability, see the Supplemental Terms of Use for Microsoft Azure Previews.
A dataflow is the path that data takes from the source to the destination with optional transformations. You can configure the dataflow by creating a Dataflow custom resource or using the Azure IoT Operations Studio portal. A dataflow is made up of three parts: the source, the transformation, and the destination.
To define the source and destination, you need to configure the dataflow endpoints. The transformation is optional and can include operations like enriching the data, filtering the data, and mapping the data to another field.
Important
Each dataflow must have the Azure IoT Operations local MQTT broker default endpoint as either the source or destination.
You can use the operations experience in Azure IoT Operations to create a dataflow. The operations experience provides a visual interface to configure the dataflow. You can also use Bicep to create a dataflow using a Bicep template file, or use Kubernetes to create a dataflow using a YAML file.
Continue reading to learn how to configure the source, transformation, and destination.
Prerequisites
You can deploy dataflows as soon as you have an instance of Azure IoT Operations Preview using the default dataflow profile and endpoint. However, you might want to configure dataflow profiles and endpoints to customize the dataflow.
Dataflow profile
The dataflow profile specifies the number of instances for the dataflows under it to use. If you don't need multiple groups of dataflows with different scaling settings, you can use the default dataflow profile. To learn how to configure a dataflow profile, see Configure dataflow profiles.
Dataflow endpoints
Dataflow endpoints are required to configure the source and destination for the dataflow. To get started quickly, you can use the default dataflow endpoint for the local MQTT broker. You can also create other types of dataflow endpoints like Kafka, Event Hubs, or Azure Data Lake Storage. To learn how to configure each type of dataflow endpoint, see Configure dataflow endpoints.
Get started
Once you have the prerequisites, you can start to create a dataflow.
To create a dataflow in operations experience, select Dataflow > Create dataflow. Then, you see the page where you can configure the source, transformation, and destination for the dataflow.
Review the following sections to learn how to configure the operation types of the dataflow.
Source
To configure a source for the dataflow, specify the endpoint reference and a list of data sources for the endpoint.
Use asset as source
You can use an asset as the source for the dataflow. Using an asset as a source is only available in the operations experience.
Under Source details, select Asset.
Select the asset you want to use as the source endpoint.
Select Proceed.
A list of datapoints for the selected asset is displayed.
Select Apply to use the asset as the source endpoint.
When using an asset as the source, the asset definition is used to infer the schema for the dataflow. The asset definition includes the schema for the asset's datapoints. To learn more, see Manage asset configurations remotely.
Once configured, the data from the asset reached the dataflow via the local MQTT broker. So, when using an asset as the source, the dataflow uses the local MQTT broker default endpoint as the source in actuality.
Use default MQTT endpoint as source
Under Source details, select MQTT.
Enter the following settings for the MQTT source:
Setting Description MQTT topic The MQTT topic filter to subscribe to for incoming messages. See Configure MQTT or Kafka topics. Message schema The schema to use to deserialize the incoming messages. See Specify schema to deserialize data. Select Apply.
If the default endpoint isn't used as the source, it must be used as the destination. To learn more about, see Dataflows must use local MQTT broker endpoint.
Use custom MQTT or Kafka dataflow endpoint as source
If you created a custom MQTT or Kafka dataflow endpoint (for example, to use with Event Grid or Event Hubs), you can use it as the source for the dataflow. Remember that storage type endpoints, like Data Lake or Fabric OneLake, can't be used as source.
To configure, use Kubernetes YAML or Bicep. Replace placeholder values with your custom endpoint name and topics.
Using a custom MQTT or Kafka endpoint as a source is currently not supported in the operations experience.
Configure data sources (MQTT or Kafka topics)
You can specify multiple MQTT or Kafka topics in a source without needing to modify the dataflow endpoint configuration. This flexibility means the same endpoint can be reused across multiple dataflows, even if the topics vary. For more information, see Reuse dataflow endpoints.
MQTT topics
When the source is an MQTT (Event Grid included) endpoint, you can use the MQTT topic filter to subscribe to incoming messages. The topic filter can include wildcards to subscribe to multiple topics. For example, thermostats/+/telemetry/temperature/#
subscribes to all temperature telemetry messages from thermostats. To configure the MQTT topic filters:
In the operations experience dataflow Source details, select MQTT, then use the MQTT topic field to specify the MQTT topic filter to subscribe to for incoming messages.
Note
Only one MQTT topic filter can be specified in the operations experience. To use multiple MQTT topic filters, use Bicep or Kubernetes.
Shared subscriptions
To use shared subscriptions with MQTT sources, you can specify the shared subscription topic in the form of $shared/<GROUP_NAME>/<TOPIC_FILTER>
.
In operations experience dataflow Source details, select MQTT and use the MQTT topic field to specify the shared subscription group and topic.
If the instance count in the dataflow profile is greater than 1, shared subscription is automatically enabled for all dataflows that use MQTT source. In this case, the $shared
prefix is added and the shared subscription group name automatically generated. For example, if you have a dataflow profile with an instance count of 3, and your dataflow uses an MQTT endpoint as source configured with topics topic1
and topic2
, they are automatically converted to shared subscriptions as $shared/<GENERATED_GROUP_NAME>/topic1
and $shared/<GENERATED_GROUP_NAME>/topic2
. If you want to use a different shared subscription group ID, you can override it in the topic, like $shared/mygroup/topic1
.
Important
Dataflows requireing shared subscription when instance count is greater than 1 is important when using Event Grid MQTT broker as a source since it doesn't support shared subscriptions. To avoid missing messages, set the dataflow profile instance count to 1 when using Event Grid MQTT broker as the source. That is when the dataflow is the subscriber and receiving messages from the cloud.
Kafka topics
When the source is a Kafka (Event Hubs included) endpoint, specify the individual kafka topics to subscribe to for incoming messages. Wildcards are not supported, so you must specify each topic statically.
Note
When using Event Hubs via the Kafka endpoint, each individual event hub within the namespace is the Kafka topic. For example, if you have an Event Hubs namespace with two event hubs, thermostats
and humidifiers
, you can specify each event hub as a Kafka topic.
To configure the Kafka topics:
Using a Kafka endpoint as a source is currently not supported in the operations experience.
Specify schema to deserialize data
If the source data has optional fields or fields with different types, specify a deserialization schema to ensure consistency. For example, the data might have fields that aren't present in all messages. Without the schema, the transformation can't handle these fields as they would have empty values. With the schema, you can specify default values or ignore the fields.
Specifying the schema is only relevant when using the MQTT or Kafka source. If the source is an asset, the schema is automatically inferred from the asset definition.
To configure the schema used to deserialize the incoming messages from a source:
In operations experience dataflow Source details, select MQTT and use the Message schema field to specify the schema. You can use the Upload button to upload a schema file first. To learn more, see Understand message schemas.
Transformation
The transformation operation is where you can transform the data from the source before you send it to the destination. Transformations are optional. If you don't need to make changes to the data, don't include the transformation operation in the dataflow configuration. Multiple transformations are chained together in stages regardless of the order in which they're specified in the configuration. The order of the stages is always:
- Enrich, Rename, or add a New property: Add additional data to the source data given a dataset and condition to match.
- Filter: Filter the data based on a condition.
- Map or Compute: Move data from one field to another with an optional conversion.
In the operations experience, select Dataflow > Add transform (optional).
Enrich: Add reference data
To enrich the data, you can use the reference dataset in the Azure IoT Operations distributed state store (DSS). The dataset is used to add extra data to the source data based on a condition. The condition is specified as a field in the source data that matches a field in the dataset.
You can load sample data into the DSS by using the DSS set tool sample. Key names in the distributed state store correspond to a dataset in the dataflow configuration.
In the operations experience, the Enrich stage is currently supported using the Rename and New property transforms.
In the operations experience, select a dataflow then Add transform (optional).
Choose Rename or New property transforms then select Add.
If the dataset has a record with the asset
field, similar to:
{
"asset": "thermostat1",
"location": "room1",
"manufacturer": "Contoso"
}
The data from the source with the deviceId
field matching thermostat1
has the location
and manufacturer
fields available in filter and map stages.
For more information about condition syntax, see Enrich data by using dataflows and Convert data using dataflows.
Filter: Filter data based on a condition
To filter the data on a condition, you can use the filter
stage. The condition is specified as a field in the source data that matches a value.
Under Transform (optional), select Filter > Add.
Choose the datapoints to include in the dataset.
Add a filter condition and description.
Select Apply.
For example, you could use a filter condition like temperature > 20
to filter data less than or equal to 20 based on the temperature field.
Map: Move data from one field to another
To map the data to another field with optional conversion, you can use the map
operation. The conversion is specified as a formula that uses the fields in the source data.
In the operations experience, mapping is currently supported using Compute transforms.
Under Transform (optional), select Compute > Add.
Enter the required fields and expressions.
Select Apply.
To learn more, see Map data by using dataflows and Convert data by using dataflows.
Serialize data according to a schema
If you want to serialize the data before sending it to the destination, you need to specify a schema and serialization format. Otherwise, the data is serialized in JSON with the types inferred. Storage endpoints like Microsoft Fabric or Azure Data Lake require a schema to ensure data consistency. Supported serialization formats are Parquet and Delta.
Currently, specifying the output schema and serialization isn't supported in the operations experience.
For more information about schema registry, see Understand message schemas.
Destination
To configure a destination for the dataflow, specify the endpoint reference and data destination. You can specify a list of data destinations for the endpoint.
To send data to a destination other than the local MQTT broker, create a dataflow endpoint. To learn how, see Configure dataflow endpoints. If the destination isn't the local MQTT broker, it must be used as a source. To learn more about, see Dataflows must use local MQTT broker endpoint.
Important
Storage endpoints require a schema reference. If you've created storage destination endpoints for Microsoft Fabric OneLake, ADLS Gen 2, Azure Data Explorer and Local Storage, you must specify schema reference.
Select the dataflow endpoint to use as the destination.
Select Proceed to configure the destination.
Enter the required settings for the destination, including the topic or table to send the data to. See Configure data destination (topic, container, or table) for more information.
Configure data destination (topic, container, or table)
Similar to data sources, data destination is a concept that is used to keep the dataflow endpoints reusable across multiple dataflows. Essentially, it represents the subdirectory in the dataflow endpoint configuration. For example, if the dataflow endpoint is a storage endpoint, the data destination is the table in the storage account. If the dataflow endpoint is a Kafka endpoint, the data destination is the Kafka topic.
Endpoint type | Data destination meaning | Description |
---|---|---|
MQTT (or Event Grid) | Topic | The MQTT topic where the data is sent. Only static topics are supported, no wildcards. |
Kafka (or Event Hubs) | Topic | The Kafka topic where the data is sent. Only static topics are supported, no wildcards. If the endpoint is an Event Hubs namespace, the data destination is the individual event hub within the namespace. |
Azure Data Lake Storage | Container | The container in the storage account. Not the table. |
Microsoft Fabric OneLake | Table or Folder | Corresponds to the configured path type for the endpoint. |
Azure Data Explorer | Table | The table in the Azure Data Explorer database. |
Local Storage | Folder | The folder or directory name in the local storage persistent volume mount. When using Azure Container Storage enabled by Azure Arc Cloud Ingest Edge Volumes, this must match the spec.path parameter for the subvolume you created. |
To configure the data destination:
When using the operations experience, the data destination field is automatically interpreted based on the endpoint type. For example, if the dataflow endpoint is a storage endpoint, the destination details page prompts you to enter the container name. If the dataflow endpoint is an MQTT endpoint, the destination details page prompts you to enter the topic, and so on.
Example
The following example is a dataflow configuration that uses the MQTT endpoint for the source and destination. The source filters the data from the MQTT topic azure-iot-operations/data/thermostat
. The transformation converts the temperature to Fahrenheit and filters the data where the temperature multiplied by the humiditiy is less than 100000. The destination sends the data to the MQTT topic factory
.
See Bicep or Kubernetes tabs for the configuration example.
To see more examples of dataflow configurations, see Azure REST API - Dataflow and the quickstart Bicep.
Verify a dataflow is working
Follow Tutorial: Bi-directional MQTT bridge to Azure Event Grid to verify the dataflow is working.
Export dataflow configuration
To export the dataflow configuration, you can use the operations experience or by exporting the Dataflow custom resource.
Select the dataflow you want to export and select Export from the toolbar.
Proper dataflow configuration
To ensure the dataflow is working as expected, verify the following:
- The default MQTT dataflow endpoint must be used as either the source or destination.
- The dataflow profile exists and is referenced in the dataflow configuration.
- Source is either an MQTT endpoint, Kafka endpoint, or an asset. Storage type endpoints can't be used as a source.
- When using Event Grid as the source, the dataflow profile instance count is set to 1 because Event Grid MQTT broker doesn't support shared subscriptions.
- When using Event Hubs as the source, each event hub in the namespace is a separate Kafka topic and must be specified as the data source.
- Transformation, if used, is configured with proper syntax, including proper escaping of special characters.
- When using storage type endpoints as destination, a schema is specified.