Compartir por


Crear un flujo de datos

Importante

Operaciones de IoT de Azure, habilitado por Azure Arc, está actualmente en versión preliminar. No se debería usar este software en versión preliminar en entornos de producción.

Tendrá que implementar una nueva instalación de Operaciones de IoT de Azure cuando esté disponible una versión general. No podrá actualizar una instalación de versión preliminar.

Consulte Términos de uso complementarios para las versiones preliminares de Microsoft Azure para conocer los términos legales que se aplican a las características de Azure que se encuentran en la versión beta, en versión preliminar o que todavía no se han publicado para que estén disponibles con carácter general.

Un flujo de datos es la ruta de acceso que los datos toman del origen al destino con transformaciones opcionales. Puede configurar el flujo de datos mediante el portal de Operaciones de IoT de Azure o mediante la creación de un recurso personalizado de flujo de datos. Antes de crear un flujo de datos, debe configurar puntos de conexión de flujo de datos para los orígenes y destinos de datos.

A continuación se muestra un ejemplo de una configuración de flujo de datos con un punto de conexión de origen MQTT, transformaciones y un punto de conexión de destino de Kafka:

apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
  name: my-dataflow
spec:
  profileRef: my-dataflow-profile
  mode: enabled
  operations:
    - operationType: source
      name: my-source
      sourceSettings:
        endpointRef: mq
        dataSources:
          - thermostats/+/telemetry/temperature/#
          - humidifiers/+/telemetry/humidity/#
        serializationFormat: json
    - operationType: builtInTransformation
      name: my-transformation
      builtInTransformationSettings:
        filter:
          - inputs:
              - 'temperature.Value'
              - '"Tag 10".Value'
            expression: "$1*$2<100000"
        map:
          - inputs:
              - '*'
            output: '*'
          - inputs:
              - temperature.Value
            output: TemperatureF
            expression: cToF($1)
          - inputs:
              - '"Tag 10".Value'
            output: 'Tag 10'
        serializationFormat: json
    - operationType: destination
      name: my-destination
      destinationSettings:
        endpointRef: kafka
        dataDestination: factory
Nombre Descripción
profileRef Referencia al perfil de flujo de datos.
mode Modo del flujo de datos: enabled o disabled.
operations[] Operaciones realizadas por el flujo de datos.
operationType Tipo de operación: source, destination o builtInTransformation.

Revise las secciones siguientes para obtener información sobre cómo configurar los tipos de operación del flujo de datos.

Configuración del origen

Para configurar un origen para el flujo de datos, especifique la referencia del punto de conexión y el origen de datos. Puede especificar una lista de orígenes de datos para el punto de conexión. Por ejemplo, temas MQTT o Kafka. La siguiente definición es un ejemplo de una configuración de flujo de datos con un punto de conexión de origen y un origen de datos:

apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
  name: mq-to-kafka
  namespace: azure-iot-operations
spec:
  profileRef: example-dataflow
  operations:
    - operationType: source
      sourceSettings:
        endpointRef: mq-source
        dataSources:
        - azure-iot-operations/data/thermostat
Nombre Descripción
operationType source
sourceSettings Configuración de la operación de source.
sourceSettings.endpointRef Referencia al punto de conexión de source.
sourceSettings.dataSources Orígenes de datos para la operación de source. Se admite el uso de los caracteres comodín ( # y + ).

Configuración de la transformación

La operación de transformación es donde puede transformar los datos del origen antes de enviarlos al destino. Las transformaciones son opcionales. Si no necesita realizar cambios en los datos, no incluya la operación de transformación en la configuración del flujo de datos. Varias transformaciones se encadenan en fases independientemente del orden en que se especifiquen en la configuración.

spec:
  operations:
  - operationType: builtInTransformation
    name: transform1
    builtInTransformationSettings:
      datasets:
        # ...
      filter:
        # ...
      map:
        # ...
Nombre Descripción
operationType builtInTransformation
name Nombre de la transformación.
builtInTransformationSettings Configuración de la operación de builtInTransformation.
builtInTransformationSettings.datasets Agregar otros datos a los datos de origen dados un conjunto de datos y una condición para que coincidan.
builtInTransformationSettings.filter Filtro de los datos en función de una condición.
builtInTransformationSettings.map Mover datos de un campo a otro con una conversión opcional.

Enriquecimiento: adición de datos de referencia

Para enriquecer los datos, puede usar el conjunto de datos de referencia en el almacén de estado distribuido (DSS) de Operaciones de IoT de Azure. El conjunto de datos se usa para agregar datos adicionales a los datos de origen en función de una condición. La condición se especifica como un campo en los datos de origen que coincide con un campo del conjunto de datos.

Nombre Descripción
builtInTransformationSettings.datasets.key Conjunto de datos usado para el enriquecimiento (clave en DSS).
builtInTransformationSettings.datasets.expression Condición para la operación de enriquecimiento.

Los nombres de clave del almacén de estado distribuido corresponden a un conjunto de datos en la configuración del flujo de datos.

Por ejemplo, puede usar el campo deviceId de los datos de origen para que coincida con el campo asset del conjunto de datos:

spec:
  operations:
  - operationType: builtInTransformation
    name: transform1
    builtInTransformationSettings:
      datasets:
      - key: assetDataset
        inputs:
          - $source.deviceId # ------------- $1
          - $context(assetDataset).asset # - $2
        expression: $1 == $2

Si el conjunto de datos tiene un registro con el campo asset, similar a:

{
  "asset": "thermostat1",
  "location": "room1",
  "manufacturer": "Contoso"
}

Los datos del origen con el campo deviceId que coincide con thermostat1 tienen los campos location y manufacturer disponibles en las fases filter y map.

Puede cargar datos de muestra en el DSS usando la herramienta de configuración de DSS de muestra.

Para más información sobre la sintaxis de las condiciones, consulte Enriquecimiento de datos mediante flujos de datos y Conversión de datos mediante flujos de datos.

Filtro: Filtrar datos en función de una condición

Para filtrar los datos en una condición, puede usar la fase filter. La condición se especifica como un campo en los datos de origen que coincide con un valor.

Nombre Descripción
builtInTransformationSettings.filter.inputs[] Entradas para evaluar una condición de filtro.
builtInTransformationSettings.filter.expression Condición para la evaluación del filtro.

Por ejemplo, podría usar el campo temperature de los datos de origen para filtrar los datos:

spec:
  operations:
  - operationType: builtInTransformation
    name: transform1
    builtInTransformationSettings:
      filter:
        - inputs:
          - temperature ? $last # - $1
          expression: "$1 > 20"

Si el campo temperature es mayor que 20, los datos se pasan a la siguiente fase. Si el campo temperature es menor o igual que 20, se filtran los datos.

Asignación: mover datos de un campo a otro

Para asignar los datos a otro campo con conversión opcional, puede usar la operación map. La conversión se especifica como una fórmula que usa los campos de los datos de origen.

Nombre Descripción
builtInTransformationSettings.map[].inputs[] Entradas para la operación de asignación
builtInTransformationSettings.map[].output Campo de salida para la operación de asignación
builtInTransformationSettings.map[].expression Fórmula de conversión para la operación de asignación

Por ejemplo, podría usar el campo temperature en los datos de origen para convertir la temperatura a Celsius y almacenarla en el campo temperatureCelsius. También puede enriquecer los datos de origen con el campo location del conjunto de datos de contextualización:

spec:
  operations:
  - operationType: builtInTransformation
    name: transform1
    builtInTransformationSettings:
      map:
        - inputs:
          - temperature # - $1
          output: temperatureCelsius
          expression: "($1 - 32) * 5/9"
        - inputs:
          - $context(assetDataset).location  
          output: location

Para más información, consulte Asignación de datos mediante flujos de datos y Convertir datos mediante flujos de datos.

Configuración del destino

Para configurar un destino para el flujo de datos, debe especificar el punto de conexión y una ruta de acceso (tema o tabla) para el destino.

Nombre Descripción
destinationSettings.endpointRef Referencia al punto de conexión de destination
destinationSettings.dataDestination Destino de los datos

Configuración de la referencia del punto de conexión de destino

Para configurar el punto de conexión para el destino, debe especificar la referencia de identificador y punto de conexión:

spec:
  operations:
  - operationType: destination
    name: destination1
    destinationSettings:
      endpointRef: eventgrid

Configuración de la ruta de acceso de destino

Una vez que tenga el punto de conexión, puede configurar la ruta de acceso para el destino. Si el destino es un punto de conexión MQTT o Kafka, use la ruta de acceso para especificar el tema:

- operationType: destination
  destinationSettings:
    endpointRef: eventgrid
    dataDestination: factory

Para los puntos de conexión de almacenamiento como Microsoft Fabric, use la ruta de acceso para especificar el nombre de la tabla:

- operationType: destination
  destinationSettings:
    endpointRef: adls
    dataDestination: telemetryTable