Configure an InfluxDB v2 source stage
Important
Azure IoT Operations Preview – enabled by Azure Arc is currently in PREVIEW. You shouldn't use this preview software in production environments.
You will need to deploy a new Azure IoT Operations installation when a generally available release is made available, you won't be able to upgrade a preview installation.
See the Supplemental Terms of Use for Microsoft Azure Previews for legal terms that apply to Azure features that are in beta, preview, or otherwise not yet released into general availability.
The source stage is the first and required stage in a data processor pipeline. The source stage gets data into the data processing pipeline and prepares it for further processing. The InfluxDB source stage lets you read data from an InfluxDB v2 database at a user-defined interval.
In the source stage, you define:
- Connection details for InfluxDB v2.
- The interval at which to query the InfluxDB database. The stage waits for a result before it resets the interval timer.
- A partitioning configuration based on your specific data processing requirements.
Prerequisites
- A deployed instance of the data processor that includes the optional data processor component.
- An InfluxDB database with all necessary raw data is operational and reachable.
Prepare the Influx database
To connect to the InfluxDB database, you need to:
- Create an access token that grants the pipeline read access to the InfluxDB database. To learn more, see Manage API tokens.
- Create a secret in Azure Key Vault that contains the access token. To learn more, see Manage secrets for your Azure IoT Operations deployment.
Configure the InfluxDB source
To configure the InfluxDB source:
- Provide details of the InfluxDB database. This configuration includes the server name and a query to retrieve the data.
- Specify the authentication method. Currently, you can only use access token authentication.
The following table describes the InfluxDB source configuration parameters:
The base schema of the input configuration is made up of:
Field | Type | Description | Required? | Default | Example |
---|---|---|---|---|---|
Name | String | A customer-visible name for the source stage. | Required | NA | erp-database |
Description | String | A customer-visible description of the source stage. | Optional | NA | Enterprise database |
Database URL | String | URL of the InfluxDB database | Yes | https://contoso.com/some/url/path |
|
Database port | Integer | The InfluxDB database port | No | 443 | 443 |
Organization | String | The organization that holds the bucket to query from | Yes | test-org |
test-org |
Authentication | Authentication type | The authentication method for connecting to the server. Supports accessToken type only. |
Yes | {"type": "accessToken"} |
{"type": "accessToken"} |
Secret | String | Reference to the token stored in Azure Key Vault. | Yes | Yes | AKV_ACCESS_TOKEN |
Flux query | String | The InfluxDB query | Yes | {"expression": 'from(bucket:"test-bucket")\|> range(start: -1h) \|> filter(fn: (r) => r._measurement == "stat")'} |
|
Query interval | Duration | String representation of the time to wait before the next API call. | Yes | 24h |
|
Data format | Format | The stage applies the format to individual rows retrieved by the query. Only the json format is supported. The top-level path isn't supported. |
Yes | {"type": "json"} |
|
Partitioning | Partitioning | Partitioning configuration for the source stage. | Required | NA | See partitioning |
Configure partitioning
Partitioning in a pipeline divides the incoming data into separate partitions. Partitioning enables data parallelism in the pipeline, which can improve throughput and reduce latency. Partitioning strategies affect how the data is processed in the other stages of the pipeline. For example, the last known value stage and aggregate stage operate on each logical partition.
To partition your data, specify a partitioning strategy and the number of partitions to use:
Field | Description | Required | Default | Example |
---|---|---|---|---|
Partition type | The type of partitioning to be used: Partition ID or Partition Key |
Required | ID |
ID |
Partition expression | The jq expression to use on the incoming message to compute the partition ID or partition Key |
Required | 0 |
.payload.header |
Number of partitions | The number of partitions in a data processor pipeline. | Required | 1 |
1 |
The data processor adds metadata to the incoming message. See Data processor message structure overview to understand how to correctly specify the partitioning expression that runs on the incoming message. By default, the partitioning expression is set to 0
with the Partition type as ID
to send all the incoming data to a single partition.
For recommendations and to learn more, see What is partitioning?.
Sample configuration
The following JSON example shows a complete InfluxDB source stage configuration:
{
"displayName": "InfluxDB v2 - ec8750",
"type": "input/influxdbv2@v1",
"query": {
"expression": "from(bucket:\\\"test-bucket\\\") |> range(start: -1h) |> filter(fn: (r) => r._measurement == \\\"stat\\\")"
},
"url": "https://contoso.com/some/url/path",
"interval": "5s",
"port": 443,
"organization": "test-org",
"format": {
"type": "json"
},
"partitionCount": 1,
"partitionStrategy": {
"type": "id",
"expression": "0"
},
"authentication": {
"type": "accessToken",
"accessToken": "AKV_ACCESS_TOKEN"
},
"description": "Example InfluxDB source stage"
}