Crear un flujo de datos
Importante
Operaciones de IoT de Azure, habilitado por Azure Arc, está actualmente en versión preliminar. No se debería usar este software en versión preliminar en entornos de producción.
Tendrá que implementar una nueva instalación de Operaciones de IoT de Azure cuando esté disponible una versión general. No podrá actualizar una instalación de versión preliminar.
Consulte Términos de uso complementarios para las versiones preliminares de Microsoft Azure para conocer los términos legales que se aplican a las características de Azure que se encuentran en la versión beta, en versión preliminar o que todavía no se han publicado para que estén disponibles con carácter general.
Un flujo de datos es la ruta de acceso que los datos toman del origen al destino con transformaciones opcionales. Puede configurar el flujo de datos mediante el portal de Operaciones de IoT de Azure o mediante la creación de un recurso personalizado de flujo de datos. Antes de crear un flujo de datos, debe configurar puntos de conexión de flujo de datos para los orígenes y destinos de datos.
A continuación se muestra un ejemplo de una configuración de flujo de datos con un punto de conexión de origen MQTT, transformaciones y un punto de conexión de destino de Kafka:
apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
name: my-dataflow
spec:
profileRef: my-dataflow-profile
mode: enabled
operations:
- operationType: source
name: my-source
sourceSettings:
endpointRef: mq
dataSources:
- thermostats/+/telemetry/temperature/#
- humidifiers/+/telemetry/humidity/#
serializationFormat: json
- operationType: builtInTransformation
name: my-transformation
builtInTransformationSettings:
filter:
- inputs:
- 'temperature.Value'
- '"Tag 10".Value'
expression: "$1*$2<100000"
map:
- inputs:
- '*'
output: '*'
- inputs:
- temperature.Value
output: TemperatureF
expression: cToF($1)
- inputs:
- '"Tag 10".Value'
output: 'Tag 10'
serializationFormat: json
- operationType: destination
name: my-destination
destinationSettings:
endpointRef: kafka
dataDestination: factory
Nombre | Descripción |
---|---|
profileRef |
Referencia al perfil de flujo de datos. |
mode |
Modo del flujo de datos: enabled o disabled . |
operations[] |
Operaciones realizadas por el flujo de datos. |
operationType |
Tipo de operación: source , destination o builtInTransformation . |
Revise las secciones siguientes para obtener información sobre cómo configurar los tipos de operación del flujo de datos.
Configuración del origen
Para configurar un origen para el flujo de datos, especifique la referencia del punto de conexión y el origen de datos. Puede especificar una lista de orígenes de datos para el punto de conexión. Por ejemplo, temas MQTT o Kafka. La siguiente definición es un ejemplo de una configuración de flujo de datos con un punto de conexión de origen y un origen de datos:
apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
name: mq-to-kafka
namespace: azure-iot-operations
spec:
profileRef: example-dataflow
operations:
- operationType: source
sourceSettings:
endpointRef: mq-source
dataSources:
- azure-iot-operations/data/thermostat
Nombre | Descripción |
---|---|
operationType |
source |
sourceSettings |
Configuración de la operación de source . |
sourceSettings.endpointRef |
Referencia al punto de conexión de source . |
sourceSettings.dataSources |
Orígenes de datos para la operación de source . Se admite el uso de los caracteres comodín ( # y + ). |
Configuración de la transformación
La operación de transformación es donde puede transformar los datos del origen antes de enviarlos al destino. Las transformaciones son opcionales. Si no necesita realizar cambios en los datos, no incluya la operación de transformación en la configuración del flujo de datos. Varias transformaciones se encadenan en fases independientemente del orden en que se especifiquen en la configuración.
spec:
operations:
- operationType: builtInTransformation
name: transform1
builtInTransformationSettings:
datasets:
# ...
filter:
# ...
map:
# ...
Nombre | Descripción |
---|---|
operationType |
builtInTransformation |
name |
Nombre de la transformación. |
builtInTransformationSettings |
Configuración de la operación de builtInTransformation . |
builtInTransformationSettings.datasets |
Agregar otros datos a los datos de origen dados un conjunto de datos y una condición para que coincidan. |
builtInTransformationSettings.filter |
Filtro de los datos en función de una condición. |
builtInTransformationSettings.map |
Mover datos de un campo a otro con una conversión opcional. |
Enriquecimiento: adición de datos de referencia
Para enriquecer los datos, puede usar el conjunto de datos de referencia en el almacén de estado distribuido (DSS) de Operaciones de IoT de Azure. El conjunto de datos se usa para agregar datos adicionales a los datos de origen en función de una condición. La condición se especifica como un campo en los datos de origen que coincide con un campo del conjunto de datos.
Nombre | Descripción |
---|---|
builtInTransformationSettings.datasets.key |
Conjunto de datos usado para el enriquecimiento (clave en DSS). |
builtInTransformationSettings.datasets.expression |
Condición para la operación de enriquecimiento. |
Los nombres de clave del almacén de estado distribuido corresponden a un conjunto de datos en la configuración del flujo de datos.
Por ejemplo, puede usar el campo deviceId
de los datos de origen para que coincida con el campo asset
del conjunto de datos:
spec:
operations:
- operationType: builtInTransformation
name: transform1
builtInTransformationSettings:
datasets:
- key: assetDataset
inputs:
- $source.deviceId # ------------- $1
- $context(assetDataset).asset # - $2
expression: $1 == $2
Si el conjunto de datos tiene un registro con el campo asset
, similar a:
{
"asset": "thermostat1",
"location": "room1",
"manufacturer": "Contoso"
}
Los datos del origen con el campo deviceId
que coincide con thermostat1
tienen los campos location
y manufacturer
disponibles en las fases filter
y map
.
Puede cargar datos de muestra en el DSS usando la herramienta de configuración de DSS de muestra.
Para más información sobre la sintaxis de las condiciones, consulte Enriquecimiento de datos mediante flujos de datos y Conversión de datos mediante flujos de datos.
Filtro: Filtrar datos en función de una condición
Para filtrar los datos en una condición, puede usar la fase filter
. La condición se especifica como un campo en los datos de origen que coincide con un valor.
Nombre | Descripción |
---|---|
builtInTransformationSettings.filter.inputs[] |
Entradas para evaluar una condición de filtro. |
builtInTransformationSettings.filter.expression |
Condición para la evaluación del filtro. |
Por ejemplo, podría usar el campo temperature
de los datos de origen para filtrar los datos:
spec:
operations:
- operationType: builtInTransformation
name: transform1
builtInTransformationSettings:
filter:
- inputs:
- temperature ? $last # - $1
expression: "$1 > 20"
Si el campo temperature
es mayor que 20, los datos se pasan a la siguiente fase. Si el campo temperature
es menor o igual que 20, se filtran los datos.
Asignación: mover datos de un campo a otro
Para asignar los datos a otro campo con conversión opcional, puede usar la operación map
. La conversión se especifica como una fórmula que usa los campos de los datos de origen.
Nombre | Descripción |
---|---|
builtInTransformationSettings.map[].inputs[] |
Entradas para la operación de asignación |
builtInTransformationSettings.map[].output |
Campo de salida para la operación de asignación |
builtInTransformationSettings.map[].expression |
Fórmula de conversión para la operación de asignación |
Por ejemplo, podría usar el campo temperature
en los datos de origen para convertir la temperatura a Celsius y almacenarla en el campo temperatureCelsius
. También puede enriquecer los datos de origen con el campo location
del conjunto de datos de contextualización:
spec:
operations:
- operationType: builtInTransformation
name: transform1
builtInTransformationSettings:
map:
- inputs:
- temperature # - $1
output: temperatureCelsius
expression: "($1 - 32) * 5/9"
- inputs:
- $context(assetDataset).location
output: location
Para más información, consulte Asignación de datos mediante flujos de datos y Convertir datos mediante flujos de datos.
Configuración del destino
Para configurar un destino para el flujo de datos, debe especificar el punto de conexión y una ruta de acceso (tema o tabla) para el destino.
Nombre | Descripción |
---|---|
destinationSettings.endpointRef |
Referencia al punto de conexión de destination |
destinationSettings.dataDestination |
Destino de los datos |
Configuración de la referencia del punto de conexión de destino
Para configurar el punto de conexión para el destino, debe especificar la referencia de identificador y punto de conexión:
spec:
operations:
- operationType: destination
name: destination1
destinationSettings:
endpointRef: eventgrid
Configuración de la ruta de acceso de destino
Una vez que tenga el punto de conexión, puede configurar la ruta de acceso para el destino. Si el destino es un punto de conexión MQTT o Kafka, use la ruta de acceso para especificar el tema:
- operationType: destination
destinationSettings:
endpointRef: eventgrid
dataDestination: factory
Para los puntos de conexión de almacenamiento como Microsoft Fabric, use la ruta de acceso para especificar el nombre de la tabla:
- operationType: destination
destinationSettings:
endpointRef: adls
dataDestination: telemetryTable