Delen via


Gegevensstromen configureren in Azure IoT-bewerkingen

Belangrijk

Azure IoT Operations Preview: ingeschakeld door Azure Arc is momenteel in preview. Gebruik deze preview-software niet in productieomgevingen.

U moet een nieuwe Azure IoT Operations-installatie implementeren wanneer er een algemeen beschikbare release beschikbaar wordt gesteld. U kunt geen preview-installatie upgraden.

Raadpleeg de Aanvullende voorwaarden voor Microsoft Azure-previews voor juridische voorwaarden die van toepassing zijn op Azure-functies die in bèta of preview zijn of die anders nog niet algemeen beschikbaar zijn.

Een gegevensstroom is het pad dat gegevens van de bron naar de bestemming halen met optionele transformaties. U kunt de gegevensstroom configureren door een aangepaste gegevensstroomresource te maken of met behulp van de Azure IoT Operations Studio-portal. Een gegevensstroom bestaat uit drie delen: de bron, de transformatie en de bestemming.

Diagram van een gegevensstroom met stroom van bron naar transformatie en bestemming.

Als u de bron en het doel wilt definiëren, moet u de eindpunten voor de gegevensstroom configureren. De transformatie is optioneel en kan bewerkingen bevatten, zoals het verrijken van de gegevens, het filteren van de gegevens en het toewijzen van de gegevens aan een ander veld.

U kunt de bewerkingservaring in Azure IoT-bewerkingen gebruiken om een gegevensstroom te maken. De bewerkingservaring biedt een visuele interface voor het configureren van de gegevensstroom. U kunt bicep ook gebruiken om een gegevensstroom te maken met behulp van een Bicep-sjabloonbestand of Kubernetes gebruiken om een gegevensstroom te maken met behulp van een YAML-bestand.

Lees verder voor meer informatie over het configureren van de bron, transformatie en bestemming.

Vereisten

U kunt gegevensstromen implementeren zodra u een exemplaar van Azure IoT Operations Preview hebt met behulp van het standaardgegevensstroomprofiel en -eindpunt. Het is echter mogelijk dat u gegevensstroomprofielen en -eindpunten wilt configureren om de gegevensstroom aan te passen.

Gegevensstroomprofiel

Het gegevensstroomprofiel geeft het aantal exemplaren op dat de gegevensstromen eronder moeten worden gebruikt. Als u niet meerdere groepen gegevensstromen met verschillende schaalinstellingen nodig hebt, kunt u het standaardgegevensstroomprofiel gebruiken. Zie Gegevensstroomprofielen configureren voor meer informatie over het configureren van een gegevensstroomprofiel.

Gegevensstroomeindpunten

Eindpunten voor gegevensstromen zijn vereist om de bron en het doel voor de gegevensstroom te configureren. Als u snel aan de slag wilt gaan, kunt u het standaardgegevensstroomeindpunt voor de lokale MQTT-broker gebruiken. U kunt ook andere typen gegevensstroomeindpunten maken, zoals Kafka, Event Hubs of Azure Data Lake Storage. Zie Eindpunten voor gegevensstromen configureren voor meer informatie over het configureren van elk type gegevensstroomeindpunt.

Aan de slag

Zodra u aan de vereisten voldoet, kunt u beginnen met het maken van een gegevensstroom.

Als u een gegevensstroom in de bewerkingservaring wilt maken, selecteert u Gegevensstroom> maken. Vervolgens ziet u de pagina waar u de bron, transformatie en bestemming voor de gegevensstroom kunt configureren.

Schermopname van bewerkingen om een gegevensstroom te maken.

Bekijk de volgende secties voor meer informatie over het configureren van de bewerkingstypen van de gegevensstroom.

Bron

Als u een bron voor de gegevensstroom wilt configureren, geeft u de eindpuntverwijzing en een lijst met gegevensbronnen voor het eindpunt op.

Asset als bron gebruiken

U kunt een asset als bron voor de gegevensstroom gebruiken. Het gebruik van een asset als bron is alleen beschikbaar in de bewerkingservaring.

  1. Selecteer Asset onder Brondetails.

  2. Selecteer de asset die u wilt gebruiken als het broneindpunt.

  3. Selecteer Doorgaan.

    Er wordt een lijst met gegevenspunten voor de geselecteerde asset weergegeven.

    Schermopname van het gebruik van bewerkingen om een asset te selecteren als broneindpunt.

  4. Selecteer Toepassen om de asset als broneindpunt te gebruiken.

Standaard MQTT-eindpunt gebruiken als bron

  1. Selecteer onder Brondetails de optie MQTT.

    Schermopname van de bewerkingservaring om MQTT te selecteren als het broneindpunt.

  2. Voer de volgende instellingen in voor de MQTT-bron:

    Instelling Beschrijving
    MQTT-onderwerp Het MQTT-onderwerpfilter waarop u zich wilt abonneren voor binnenkomende berichten. Zie MQTT- of Kafka-onderwerpen configureren.
    Berichtschema Het schema dat moet worden gebruikt om de binnenkomende berichten te deserialiseren. Zie Schema opgeven om gegevens te deserialiseren.
  3. Selecteer Toepassen.

Zie MQTT-eindpunt voor meer informatie over het standaard MQTT-eindpunt en het maken van een MQTT-eindpunt als gegevensstroombron.

Aangepast MQTT- of Kafka-gegevensstroomeindpunt als bron gebruiken

Als u een aangepast MQTT- of Kafka-gegevensstroomeindpunt hebt gemaakt (bijvoorbeeld voor gebruik met Event Grid of Event Hubs), kunt u dit gebruiken als bron voor de gegevensstroom. Houd er rekening mee dat eindpunten van het opslagtype, zoals Data Lake of Fabric OneLake, niet als bron kunnen worden gebruikt.

Gebruik Kubernetes YAML of Bicep om te configureren. Vervang tijdelijke aanduidingen door de naam en onderwerpen van uw aangepaste eindpunt.

Het gebruik van een aangepast MQTT- of Kafka-eindpunt als bron wordt momenteel niet ondersteund in de bewerkingservaring.

Gegevensbronnen (MQTT- of Kafka-onderwerpen) configureren

U kunt meerdere MQTT- of Kafka-onderwerpen in een bron opgeven zonder dat u de configuratie van het gegevensstroomeindpunt hoeft te wijzigen. Dit betekent dat hetzelfde eindpunt opnieuw kan worden gebruikt voor meerdere gegevensstromen, zelfs als de onderwerpen verschillen. Zie Eindpunten voor gegevensstromen opnieuw gebruiken voor meer informatie.

MQTT-onderwerpen

Wanneer de bron een MQTT-eindpunt (Opgenomen Event Grid) is, kunt u het MQTT-onderwerpfilter gebruiken om u te abonneren op binnenkomende berichten. Het onderwerpfilter kan jokertekens bevatten om u te abonneren op meerdere onderwerpen. Abonneert zich bijvoorbeeld thermostats/+/telemetry/temperature/# op alle telemetrieberichten van temperatuur van thermostaten. De MQTT-onderwerpfilters configureren:

Selecteer MQTT in de gegevensstroombrondetails van de bewerkingservaring en gebruik vervolgens het onderwerpveld MQTT om het MQTT-onderwerpfilter op te geven waarop u zich wilt abonneren voor binnenkomende berichten.

Notitie

Er kan slechts één MQTT-onderwerpfilter worden opgegeven in de bewerkingservaring. Als u meerdere MQTT-onderwerpfilters wilt gebruiken, gebruikt u Bicep of Kubernetes.

Kafka-onderwerpen

Wanneer de bron een Kafka-eindpunt (Opgenomen Event Hubs) is, geeft u de afzonderlijke Kafka-onderwerpen op waarop u zich wilt abonneren voor inkomende berichten. Jokertekens worden niet ondersteund, dus u moet elk onderwerp statisch opgeven.

Notitie

Wanneer u Event Hubs via het Kafka-eindpunt gebruikt, is elke afzonderlijke Event Hub binnen de naamruimte het Kafka-onderwerp. Als u bijvoorbeeld een Event Hubs-naamruimte met twee Event Hubs-hubs thermostats hebt en humidifiersu kunt elke Event Hub opgeven als een Kafka-onderwerp.

De Kafka-onderwerpen configureren:

Het gebruik van een Kafka-eindpunt als bron wordt momenteel niet ondersteund in de bewerkingservaring.

Schema opgeven om gegevens te deserialiseren

Als de brongegevens optionele velden of velden met verschillende typen bevatten, geeft u een deserialisatieschema op om consistentie te garanderen. De gegevens bevatten bijvoorbeeld velden die niet aanwezig zijn in alle berichten. Zonder het schema kan de transformatie deze velden niet verwerken, omdat ze lege waarden zouden hebben. Met het schema kunt u standaardwaarden opgeven of de velden negeren.

Het opgeven van het schema is alleen relevant wanneer u de MQTT- of Kafka-bron gebruikt. Als de bron een asset is, wordt het schema automatisch afgeleid van de assetdefinitie.

Het schema configureren dat wordt gebruikt om de binnenkomende berichten van een bron te deserialiseren:

Selecteer in bewerkingen gegevensstroombrondetails MQTT en gebruik het veld Berichtschema om het schema op te geven. U kunt de knop Uploaden gebruiken om eerst een schemabestand te uploaden. Zie Berichtschema's begrijpen voor meer informatie.

Gedeelde abonnementen

Als u gedeelde abonnementen wilt gebruiken met MQTT-bronnen, kunt u het onderwerp voor gedeelde abonnementen opgeven in de vorm van $shared/<GROUP_NAME>/<TOPIC_FILTER>.

Selecteer MQTT in operations experience dataflow Source details en gebruik het MQTT-onderwerpveld om de gedeelde abonnementsgroep en het onderwerp op te geven.

Notitie

Als het aantal exemplaren in het gegevensstroomprofiel groter is dan 1, wordt het voorvoegsel voor het gedeelde abonnementsonderwerp automatisch toegevoegd aan het onderwerpfilter.

Transformatie

Met de transformatiebewerking kunt u de gegevens van de bron transformeren voordat u deze naar de bestemming verzendt. Transformaties zijn optioneel. Als u geen wijzigingen in de gegevens hoeft aan te brengen, neemt u de transformatiebewerking niet op in de configuratie van de gegevensstroom. Meerdere transformaties worden in fasen gekoppeld, ongeacht de volgorde waarin ze zijn opgegeven in de configuratie. De volgorde van de fasen is altijd:

  1. Verrijken: voeg aanvullende gegevens toe aan de brongegevens op basis van een gegevensset en voorwaarde die overeenkomen.
  2. Filter: Filter de gegevens op basis van een voorwaarde.
  3. Kaart: Verplaats gegevens van het ene veld naar het andere met een optionele conversie.

Selecteer in de bewerkingservaring de optie Gegevensstroom>Transformatie toevoegen (optioneel).

Schermopname van bewerkingen om een transformatie toe te voegen aan een gegevensstroom.

Verrijken: Referentiegegevens toevoegen

Als u de gegevens wilt verrijken, kunt u de referentiegegevensset gebruiken in het DSS -archief (Gedistribueerde statusopslag) van Azure IoT Operations. De gegevensset wordt gebruikt om extra gegevens toe te voegen aan de brongegevens op basis van een voorwaarde. De voorwaarde wordt opgegeven als een veld in de brongegevens die overeenkomen met een veld in de gegevensset.

U kunt voorbeeldgegevens in de DSS laden met behulp van het voorbeeld van het hulpprogramma voor DSS-set. Sleutelnamen in het gedistribueerde statusarchief komen overeen met een gegevensset in de gegevensstroomconfiguratie.

Momenteel is de verrijkingsbewerking niet beschikbaar in de bewerkingservaring.

Zie Gegevens verrijken met behulp van gegevensstromen en gegevens converteren met behulp van gegevensstromen voor meer informatie over de syntaxis van voorwaarden.

Filter: Gegevens filteren op basis van een voorwaarde

Als u de gegevens op een voorwaarde wilt filteren, kunt u de filter fase gebruiken. De voorwaarde wordt opgegeven als een veld in de brongegevens die overeenkomen met een waarde.

  1. Selecteer Onder Transformeren (optioneel) de optie Filter>toevoegen.

  2. Kies de gegevenspunten die u wilt opnemen in de gegevensset.

  3. Voeg een filtervoorwaarde en beschrijving toe.

    Schermopname van bewerkingen om een filtertransformatie toe te voegen.

  4. Selecteer Toepassen.

U kunt bijvoorbeeld een filtervoorwaarde gebruiken, zoals temperature > 20 het filteren van gegevens kleiner dan of gelijk aan 20 op basis van het temperatuurveld.

Kaart: Gegevens van het ene veld naar het andere verplaatsen

Als u de gegevens wilt toewijzen aan een ander veld met optionele conversie, kunt u de map bewerking gebruiken. De conversie wordt opgegeven als een formule die gebruikmaakt van de velden in de brongegevens.

In de bewerkingservaring wordt toewijzing momenteel ondersteund met behulp van Compute-transformaties .

  1. Selecteer Onder Transformeren (optioneel) de optie Compute>Add.

  2. Voer de vereiste velden en expressies in.

    Schermopname van bewerkingen om een rekentransformatie toe te voegen.

  3. Selecteer Toepassen.

Zie Kaartgegevens met behulp van gegevensstromen en Gegevens converteren met behulp van gegevensstromen voor meer informatie.

Gegevens serialiseren volgens een schema

Als u de gegevens wilt serialiseren voordat u deze naar de bestemming verzendt, moet u een schema en serialisatie-indeling opgeven. Anders worden de gegevens geserialiseerd in JSON met de typen die zijn afgeleid. Voor opslageindpunten, zoals Microsoft Fabric of Azure Data Lake, is een schema vereist om gegevensconsistentie te garanderen. Ondersteunde serialisatie-indelingen zijn Parquet en Delta.

Het opgeven van het uitvoerschema en de serialisatie wordt momenteel niet ondersteund in de bewerkingservaring.

Zie Berichtschema's begrijpen voor meer informatie over het schemaregister.

Bestemming

Als u een bestemming voor de gegevensstroom wilt configureren, geeft u de eindpuntverwijzing en het gegevensdoel op. U kunt een lijst met gegevensbestemmingen voor het eindpunt opgeven.

Als u gegevens wilt verzenden naar een andere bestemming dan de lokale MQTT-broker, maakt u een gegevensstroomeindpunt. Zie Eindpunten voor gegevensstromen configureren voor meer informatie.

Belangrijk

Voor opslageindpunten is een schemareferentie vereist. Als u eindpunten voor opslagbestemmingen hebt gemaakt voor Microsoft Fabric OneLake, ADLS Gen 2, Azure Data Explorer en Lokale opslag, moet u schemaverwijzing opgeven.

  1. Selecteer het eindpunt van de gegevensstroom dat u als doel wilt gebruiken.

    Schermopname van het gebruik van de bewerkingservaring om het eindpunt van de Event Hubs-bestemming te selecteren.

  2. Selecteer Doorgaan om de bestemming te configureren.

  3. Voer de vereiste instellingen voor de bestemming in, inclusief het onderwerp of de tabel waarnaar de gegevens moeten worden verzonden. Zie Gegevensdoel configureren (onderwerp, container of tabel) voor meer informatie.

Gegevensbestemming configureren (onderwerp, container of tabel)

Net als bij gegevensbronnen is de gegevensbestemming een concept dat wordt gebruikt om de eindpunten van de gegevensstroom herbruikbaar te houden voor meerdere gegevensstromen. In wezen vertegenwoordigt het de submap in de configuratie van het gegevensstroomeindpunt. Als het eindpunt voor de gegevensstroom bijvoorbeeld een opslageindpunt is, is de gegevensbestemming de tabel in het opslagaccount. Als het eindpunt van de gegevensstroom een Kafka-eindpunt is, is de gegevensbestemming het Kafka-onderwerp.

Eindpunttype Betekenis van gegevensbestemming Beschrijving
MQTT (of Event Grid) Onderwerp Het MQTT-onderwerp waar de gegevens worden verzonden. Alleen statische onderwerpen worden ondersteund, geen jokertekens.
Kafka (of Event Hubs) Onderwerp Het Kafka-onderwerp waar de gegevens worden verzonden. Alleen statische onderwerpen worden ondersteund, geen jokertekens. Als het eindpunt een Event Hubs-naamruimte is, is de gegevensbestemming de afzonderlijke Event Hub binnen de naamruimte.
Azure Data Lake Storage Container De container in het opslagaccount. Niet de tabel.
Microsoft Fabric OneLake Tabel of map Komt overeen met het geconfigureerde padtype voor het eindpunt.
Azure Data Explorer Tabel De tabel in de Azure Data Explorer-database.
Lokale opslag Map De map- of mapnaam in de permanente volumekoppeling van de lokale opslag.

De gegevensbestemming configureren:

Wanneer u de bewerkingservaring gebruikt, wordt het gegevensdoelveld automatisch geïnterpreteerd op basis van het eindpunttype. Als het eindpunt voor de gegevensstroom bijvoorbeeld een opslageindpunt is, wordt u op de pagina met doelgegevens gevraagd om de containernaam in te voeren. Als het eindpunt voor de gegevensstroom een MQTT-eindpunt is, wordt u op de pagina met doelgegevens gevraagd het onderwerp in te voeren, enzovoort.

Schermopname van de bewerkingservaring waarin de gebruiker wordt gevraagd een MQTT-onderwerp in te voeren op basis van het eindpunttype.

Opmerking

Het volgende voorbeeld is een gegevensstroomconfiguratie die gebruikmaakt van het MQTT-eindpunt voor de bron en het doel. De bron filtert de gegevens uit de MQTT-onderwerpen thermostats/+/telemetry/temperature/# en humidifiers/+/telemetry/humidity/#. De transformatie converteert de temperatuur naar Fahrenheit en filtert de gegevens waar de temperatuur kleiner is dan 100000. De bestemming verzendt de gegevens naar het MQTT-onderwerp factory.

apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
  name: my-dataflow
  namespace: azure-iot-operations
spec:
  profileRef: default
  mode: Enabled
  operations:
    - operationType: Source
      sourceSettings:
        endpointRef: default
        dataSources:
          - thermostats/+/telemetry/temperature/#
          - humidifiers/+/telemetry/humidity/#
    - operationType: builtInTransformation
      builtInTransformationSettings:
        filter:
          - inputs:
              - 'temperature.Value'
              - '"Tag 10".Value'
            expression: "$1*$2<100000"
        map:
          - inputs:
              - '*'
            output: '*'
          - inputs:
              - temperature.Value
            output: TemperatureF
            expression: cToF($1)
          - inputs:
              - '"Tag 10".Value'
            output: 'Tag 10'
    - operationType: Destination
      destinationSettings:
        endpointRef: default
        dataDestination: factory

Controleren of een gegevensstroom werkt

Volg de zelfstudie: Bidirectionele MQTT-brug naar Azure Event Grid om te controleren of de gegevensstroom werkt.

Configuratie van gegevensstroom exporteren

Als u de configuratie van de gegevensstroom wilt exporteren, kunt u de bewerkingservaring gebruiken of door de aangepaste gegevensstroomresource te exporteren.

Selecteer de gegevensstroom die u wilt exporteren en selecteer Exporteren op de werkbalk.

Schermopname van het gebruik van bewerkingen om een gegevensstroom te exporteren.

Volgende stappen