Delen via


WebAssembly (WASM) gebruiken met gegevensstroomgrafieken

Belangrijk

Deze pagina bevat instructies voor het beheren van Azure IoT Operations-onderdelen met behulp van Kubernetes-implementatiemanifesten, die zich in PREVIEW bevinden. Deze functie is voorzien van verschillende beperkingen en mag niet worden gebruikt voor productieworkloads.

Zie de Aanvullende Gebruiksvoorwaarden voor Microsoft Azure Previews voor juridische voorwaarden die van toepassing zijn op Azure-functies die in bèta, preview, of anderszins nog niet algemeen beschikbaar zijn.

Azure IoT Operations-gegevensstroomgrafieken ondersteunen WEBAssembly-modules (WASM) voor aangepaste gegevensverwerking aan de rand. U kunt aangepaste bedrijfslogica en gegevenstransformaties implementeren als onderdeel van uw gegevensstroompijplijnen.

WebAssembly (WASM) met gegevensstroomgrafieken is algemeen beschikbaar.

Aanbeveling

Wilt u AI in-band uitvoeren? Bekijk Uitvoeren van ONNX-inferentie in WebAssembly data flow grafieken om kleine ONNX-modellen in uw WASM-operatoren te verpakken en uit te voeren.

Belangrijk

Gegevensstroomgrafieken ondersteunen momenteel alleen MQTT-, Kafka- en OpenTelemetry-eindpunten. Andere eindpunttypen, zoals Data Lake, Microsoft Fabric OneLake, Azure Data Explorer en Lokale opslag, worden niet ondersteund. Zie Bekende problemen voor meer informatie.

Vereiste voorwaarden

Overzicht

Met WebAssembly-modules (WASM) in Azure IoT Operations-gegevensstroomgrafieken kunt u gegevens aan de rand verwerken met hoge prestaties en beveiliging. WASM wordt uitgevoerd in een sandbox-omgeving en ondersteunt Rust en Python.

Hoe WASM-gegevensstroomgrafieken werken

De implementatie van de WASM-gegevensstroom volgt deze werkstroom:

  1. WASM-modules ontwikkelen: Schrijf aangepaste verwerkingslogica in een ondersteunde taal en compileer deze naar de webassembly-componentmodelindeling.
  2. Grafiekdefinitie ontwikkelen: Definieer hoe gegevens door de modules worden verplaatst met behulp van YAML-configuratiebestanden. Zie WebAssembly-grafiekdefinities configureren voor gedetailleerde informatie.
  3. Artefacten opslaan in het register: push de gecompileerde WASM-modules naar een containerregister met behulp van OCI-compatibele hulpprogramma's zoals ORAS.
  4. Registereindpunten configureren: verificatie- en verbindingsgegevens instellen zodat Azure IoT Operations toegang heeft tot het containerregister.
  5. Gegevensstroomgrafiek maken: Gegevensbronnen, de naam van het artefact en de bestemmingen definiëren.
  6. Implementeren en uitvoeren: Azure IoT Operations haalt WASM-modules op uit het register en voert deze uit op basis van de grafiekdefinitie.

In de volgende voorbeelden ziet u hoe u WASM-gegevensstroomgrafieken instelt en implementeert voor algemene scenario's. In de voorbeelden worden vastgelegde waarden en vereenvoudigde configuraties gebruikt, zodat u snel aan de slag kunt.

Containerregister instellen

Azure IoT Operations heeft een containerregister nodig om WASM-modules en grafiekdefinities op te halen. U kunt Azure Container Registry (ACR) of een ander OCI-compatibel register gebruiken.

Zie Azure Container Registry implementeren om een Azure Container Registry te maken en te configureren.

ORAS CLI installeren

Gebruik de ORAS CLI om WASM-modules en grafiekdefinities naar uw containerregister te pushen. Zie ORAS installeren voor installatie-instructies.

Voorbeeldmodules ophalen uit het openbare register

Vooraf gemaakte voorbeeldmodules gebruiken:

# Pull sample modules and graphs
oras pull ghcr.io/azure-samples/explore-iot-operations/graph-simple:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/graph-complex:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/temperature:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/window:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/snapshot:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/format:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/humidity:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/collection:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/enrichment:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/filter:1.0.0

Modules naar uw register pushen

Zodra u de voorbeeldmodules en grafieken hebt, pusht u ze naar uw containerregister. Vervang door <YOUR_ACR_NAME> de naam van uw Azure Container Registry. Als u ervoor wilt zorgen dat de grafieken en modules zichtbaar zijn in de webinterface van de operations-ervaring, voegt u de --config en --artifact-type vlaggen toe, zoals wordt weergegeven in het volgende voorbeeld:

# Log in to your ACR
az acr login --name <YOUR_ACR_NAME>

# Push modules to your registry
oras push <YOUR_ACR_NAME>.azurecr.io/graph-simple:1.0.0 --config /dev/null:application/vnd.microsoft.aio.graph.v1+yaml graph-simple.yaml:application/yaml --disable-path-validation
oras push <YOUR_ACR_NAME>.azurecr.io/graph-complex:1.0.0 --config /dev/null:application/vnd.microsoft.aio.graph.v1+yaml graph-complex.yaml:application/yaml --disable-path-validation
oras push <YOUR_ACR_NAME>.azurecr.io/temperature:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm temperature-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/window:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm window-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/snapshot:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm snapshot-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/format:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm format-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/humidity:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm humidity-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/collection:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm collection-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/enrichment:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm enrichment-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/filter:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm filter-1.0.0.wasm:application/wasm

Aanbeveling

U kunt ook uw eigen modules pushen en aangepaste grafieken maken. Zie Configuratie van aangepaste gegevensstroomgrafieken.

Een registereindpunt maken

Een registereindpunt definieert de verbinding met uw containerregister. Gegevensstroomgrafieken maken gebruik van registereindpunten om WASM-modules en grafiekdefinities op te halen uit containerregisters. Zie Registereindpunten configureren voor gedetailleerde informatie over het configureren van registereindpunten met verschillende verificatiemethoden en registertypen.

Als u registereindpunten wilt maken, kunt u de Manifesten van Azure Portal, Bicep of Kubernetes gebruiken. Nadat u een registereindpunt hebt gemaakt, zijn de grafieken die u naar het containerregister hebt gepusht klaar om te worden gebruikt in bewerkingen in gegevensstroomgrafieken.

Voor snelle installatie met Azure Container Registry maakt u een registereindpunt met door het systeem toegewezen beheerde identiteitverificatie:

U kunt Azure Portal gebruiken om registereindpunten te maken. Met de portal-ervaring kunt u hostgegevens van een ACR specificeren en desgewenst referenties opgeven. Voordat u begint, moet u ervoor zorgen dat u over de volgende informatie beschikt:

  • Registereindpuntnaam.
  • Een hostnaam voor de ACR.
  • Er worden vier typen verificatie ondersteund:
    • Anoniem
    • Door het systeem beheerde identiteit
    • Door de gebruiker beheerde identiteit
    • Artefactgeheim

Volg deze stappen om een registereindpunt te maken in Azure Portal.

Registereindpunten maken met anonieme verificatie

U kunt een nieuw registereindpunt maken door de hostgegevens van een Azure Container Registry (ACR) op te geven, anonieme toegang in te schakelen voor het ophalen van openbare installatiekopieën en de configuratie op te slaan voor hergebruik. Selecteer eerst het type verificatie dat u wilt gebruiken. In dit voorbeeld gebruiken we anonieme verificatie:

Schermopname van het selectie authenticatieformulier.

Schermopname van de voltooide configuratie voor anonieme verificatie voor het registereindpunt.

Registereindpunten maken met door het systeem beheerde identiteitverificatie

U kunt een nieuw registereindpunt maken door de hostgegevens van een ACR op te geven, te verifiëren met behulp van een door het systeem toegewezen beheerde identiteit voor beveiligde toegang en de configuratie op te slaan voor hergebruik.

Schermopname van de voltooide configuratie van door het systeem beheerde identiteitsverificatie voor registereindpunt.

Registereindpunten maken met door de gebruiker beheerde identiteit

U kunt een nieuw registereindpunt maken door de hostgegevens van een ACR op te geven, te verifiëren met behulp van een door de gebruiker toegewezen beheerde identiteit voor beveiligde toegang en de configuratie op te slaan voor hergebruik.

Opmerking

De client- en tenant-id's zijn vereist om door de gebruiker beheerde identiteit in te schakelen.

Schermopname van de voltooide configuratie van door de gebruiker beheerde identiteitsverificatie voor het registereindpunt.

Registereindpunten maken met artefactgeheimen

Artefactgeheimen worden gebruikt voor verificatie met privécontainerregisters, zoals ACR, Docker Hub of MCR, voor het ophalen van containerafbeeldingen. Geheimen zijn essentieel wanneer het register referenties vereist en de image niet openbaar beschikbaar is. Met dit scenario kunt u gegevensstroomgrafieken beheren in Azure IoT-bewerkingen en de bewerkingservaring. U kunt artefactgeheimen instellen vanuit Azure Key Vault door bestaande geheimen te selecteren.

U kunt een nieuw registereindpunt maken door de hostgegevens van een ACR op te geven, te verifiëren met artefactgeheimen voor beveiligde toegang en de configuratie op te slaan voor hergebruik:

Schermopname van de azure Key Vault-interface voor geheimselectie voor artefactgeheimen.

U stelt artefactgeheimen van Azure Key Vault in door nieuwe geheimen te maken en op te slaan in Azure Key Vault:

Schermopname van het nieuwe geheime formulier maken in Azure Key Vault voor artefactgeheimen.

Opmerking

U kunt registereindpunten opnieuw gebruiken in meerdere gegevensstroomgrafieken en andere Azure IoT Operations-onderdelen, zoals Akri-connectors.

Extensienaam ophalen

# Get extension name
az k8s-extension list \
  --resource-group <RESOURCE_GROUP> \
  --cluster-name <CLUSTER_NAME> \
  --cluster-type connectedClusters \
  --query "[?extensionType=='microsoft.iotoperations'].name" \
  --output tsv

De eerste opdracht retourneert de extensienaam (bijvoorbeeld azure-iot-operations-4gh3y).

Machtigingen voor beheerde identiteit configureren

Als u wilt dat Azure IoT Operations WASM-modules uit uw containerregister haalt, geeft u de beheerde identiteit de juiste machtigingen. De IoT Operations-extensie maakt gebruik van een door het systeem toegewezen beheerde identiteit die de AcrPull rol in uw Azure Container Registry nodig heeft. Zorg ervoor dat u aan de volgende vereisten voldoet:

  • Eigenaarsmachtigingen voor Azure Container Registry.
  • Het containerregister kan zich in een andere resourcegroep of een ander abonnement bevinden, maar moet zich in dezelfde tenant bevinden als uw IoT Operations-implementatie.

Voer deze opdrachten uit om de AcrPull rol toe te wijzen aan de beheerde identiteit van IoT Operations:

# Get the IoT Operations extension managed identity
export EXTENSION_OBJ_ID=$(az k8s-extension list --cluster-name $CLUSTER_NAME -g $RESOURCE_GROUP --cluster-type connectedClusters --query "[?extensionType=='microsoft.iotoperations'].identity.principalId" -o tsv)

# Get the application ID for the managed identity
export SYSTEM_ASSIGNED_MAN_ID=$(az ad sp show --id $EXTENSION_OBJ_ID --query "appId" -o tsv)

# Assign the AcrPull role to the managed identity
az role assignment create --role "AcrPull" --assignee $SYSTEM_ASSIGNED_MAN_ID --scope "/subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RESOURCE_GROUP/providers/Microsoft.ContainerRegistry/registries/$ACR_NAME"

Zie Azure Container Registry-rollen en -machtigingen voor meer informatie over containerregisterrollen.

Als u verificatiefouten krijgt met de Azure CLI, wijst u machtigingen toe in Azure Portal:

  1. Ga naar uw Azure Container Registry in Azure Portal.
  2. Selecteer Toegangsbeheer (IAM) in het menu.
  3. Selecteer Toevoegen>Roltoewijzing toevoegen.
  4. Kies de ingebouwde AcrPull-rol .
  5. Selecteer Gebruiker, groep of service-principal als de optie Toegang toewijzen.
  6. Zoek en selecteer de naam van uw IoT Operations-extensie (bijvoorbeeld azure-iot-operations-4gh3y).
  7. Selecteer Opslaan om de roltoewijzing te voltooien.

Zie Azure-rollen toewijzen met behulp van Azure Portal voor gedetailleerde instructies.

Voorbeeld 1: Basisimplementatie met één WASM-module

In dit voorbeeld worden temperatuurgegevens van Fahrenheit geconverteerd naar Celsius met behulp van een WASM-module. De broncode van de temperatuurmodule is beschikbaar op GitHub. Gebruik de vooraf gecompileerde versie graph-simple:1.0.0 die u naar het containerregister hebt gepusht.

Hoe het werkt

Met de grafiekdefinitie wordt een eenvoudige pijplijn met drie fasen gemaakt:

  1. Bron: ontvangt temperatuurgegevens van MQTT
  2. Kaart: Verwerkt gegevens met de TEMPERATUUR WASM-module
  3. Sink: hiermee worden geconverteerde gegevens terug naar MQTT verzonden

Zie voorbeeld 1: Eenvoudige grafiekdefinitie voor gedetailleerde informatie over de werking van de eenvoudige grafiekdefinitie en de bijbehorende structuur.

Invoerindeling:

{"temperature": {"value": 100.0, "unit": "F"}}

Uitvoerindeling:

{"temperature": {"value": 37.8, "unit": "C"}}

Met de volgende configuratie maakt u een gegevensstroomgrafiek die gebruikmaakt van deze pijplijn voor temperatuurconversie. De grafiek verwijst naar het graph-simple:1.0.0 artefact, dat de YAML-definitie bevat en haalt de temperatuurmodule op uit uw containerregister.

De gegevensstroomgrafiek configureren

Deze configuratie definieert drie knooppunten die de werkstroom voor temperatuurconversie implementeren: een bronknooppunt dat zich abonneert op binnenkomende temperatuurgegevens, een knooppunt voor grafiekverwerking waarmee de WASM-module wordt uitgevoerd en een doelknooppunt dat de geconverteerde resultaten publiceert.

De gegevensstroomgrafiekresource 'verpakt' het artefact voor de grafiekdefinitie en verbindt de abstracte bron-/sinkbewerkingen met concrete eindpunten:

  • De bewerking van source de grafiekdefinitie maakt verbinding met het bronknooppunt van de gegevensstroom (MQTT-onderwerp)
  • De bewerking van sink de grafiekdefinitie maakt verbinding met het doelknooppunt van de gegevensstroom (MQTT-onderwerp)
  • De verwerkingsbewerkingen van de grafiekdefinitie worden uitgevoerd in het knooppunt voor graafverwerking

Met deze scheiding kunt u dezelfde grafiekdefinitie implementeren met verschillende eindpunten in verschillende omgevingen, terwijl de verwerkingslogica ongewijzigd blijft.

  1. Als u een gegevensstroomgrafiek wilt maken in de bewerkingservaring, gaat u naar het tabblad Gegevensstroom .

  2. Selecteer de vervolgkeuzelijst naast + Maken en selecteer Een gegevensstroomgrafiek maken

    Schermopname van de interface voor bewerkingen waarin wordt getoond hoe u een gegevensstroomgrafiek maakt.

  3. Selekteer de tijdelijke naam nieuwe gegevensstroom om de eigenschappen van de gegevensstroom in te stellen. Voer de naam van de gegevensstroomgrafiek in en kies het gegevensstroomprofiel dat u wilt gebruiken.

  4. Selecteer bron in het gegevensstroomdiagram om het bronknooppunt te configureren. Selecteer onder Brondetailshet eindpunt van de asset of gegevensstroom.

    Schermopname van de interface voor bewerkingservaring waarin wordt getoond hoe u een bron voor de gegevensstroomgrafiek selecteert.

    1. Als u Asset selecteert, kiest u de asset waaruit u gegevens wilt ophalen en klikt u op Toepassen.

    2. Als u Het eindpunt van de gegevensstroom selecteert, voert u de volgende gegevens in en klikt u op Toepassen.

      Configuratie Description
      Eindpunt van gegevensstroom Selecteer de standaardwaarde om het standaardeindpunt van de MQTT-berichtbroker te gebruiken.
      Onderwerp Het onderwerpfilter waarop u zich wilt abonneren voor binnenkomende berichten. Gebruik onderwerp(en)> Rij toevoegen om meerdere onderwerpen toe te voegen.
      Berichtschema Het schema dat moet worden gebruikt om de binnenkomende berichten te deserialiseren.
  5. Selecteer in het gegevensstroomdiagram grafiektransformatie toevoegen (optioneel) om een knooppunt voor grafiekverwerking toe te voegen. Selecteer in het deelvenster Grafiekselectiegrafiek eenvoudig:1 en klik op Toepassen.

    Schermopname van de interface voor bewerkingen waarin wordt getoond hoe u een eenvoudige gegevensstroomgrafiek maakt.

    Belangrijk

    In dit voorbeeld wordt het graph-simple:1.0.0 artefact gebruikt dat u naar uw container registry hebt gepusht. U kunt uw aangepaste grafieken maken door uw eigen WASM-modules te ontwikkelen en naar uw containerregister te pushen. De grafieken die u naar het containerregister pusht, zijn beschikbaar in het deelvenster Grafiekselectie .

  6. U kunt bepaalde instellingen voor de grafiekoperator configureren door het grafiekknooppunt in het diagram te selecteren. U kunt bijvoorbeeld moduletemperatuur/kaartoperator selecteren en de waarde key2invoerenexample-value-2. Klik op Toepassen om de wijzigingen op te slaan.

    Schermopname van de interface voor bewerkingen waarin wordt getoond hoe u een eenvoudige gegevensstroomgrafiek configureert.

  7. Selecteer Bestemming in het gegevensstroomdiagram om het doelknooppunt te configureren.

  8. Selecteer Opslaan onder de naam van de gegevensstroomgrafiek om de gegevensstroomgrafiek op te slaan.

De gegevensstroom testen

Als u de gegevensstroom wilt testen, verzendt u MQTT-berichten vanuit het cluster. Implementeer eerst de MQTT-clientpod door de instructies te volgen in Testconnectiviteit met MQTT-broker met MQTT-clients. De MQTT-client biedt de verificatietokens en certificaten om verbinding te maken met de broker. Voer de volgende opdracht uit om de MQTT-client te implementeren:

kubectl apply -f https://raw.githubusercontent.com/Azure-Samples/explore-iot-operations/main/samples/quickstarts/mqtt-client.yaml

Temperatuurberichten verzenden

Maak en voer in de eerste terminalsessie een script uit om temperatuurgegevens in Fahrenheit te verzenden:

# Connect to the MQTT client pod
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh -c '
# Create and run temperature.sh from within the MQTT client pod
while true; do
  # Generate a random temperature value between 0 and 6000 Fahrenheit
  random_value=$(shuf -i 0-6000 -n 1)
  payload="{\"temperature\":{\"value\":$random_value,\"unit\":\"F\"}}"

  echo "Publishing temperature: $payload"

  # Publish to the input topic
  mosquitto_pub -h aio-broker -p 18883 \
    -m "$payload" \
    -t "sensor/temperature/raw" \
    -d \
    --cafile /var/run/certs/ca.crt \
    -D PUBLISH user-property __ts $(date +%s)000:0:df \
    -D CONNECT authentication-method 'K8S-SAT' \
    -D CONNECT authentication-data $(cat /var/run/secrets/tokens/broker-sat)

  sleep 1
done'

Opmerking

De MQTT-gebruikerseigenschap __ts wordt gebruikt om een tijdstempel toe te voegen aan de berichten om ervoor te zorgen dat berichten tijdig worden verwerkt met behulp van de HLC (Hybrid Logical Clock). Als u de tijdstempel hebt, kan de gegevensstroom bepalen of u het bericht wilt accepteren of verwijderen. De indeling van de eigenschap is <timestamp>:<counter>:<nodeid>. Het maakt de verwerking van de gegevensstroom nauwkeuriger, maar is niet verplicht.

Het script publiceert elke seconde willekeurige temperatuurgegevens naar het sensor/temperature/raw onderwerp. Dit moet er als volgt uitzien:

Publishing temperature: {"temperature":{"value":1234,"unit":"F"}}
Publishing temperature: {"temperature":{"value":5678,"unit":"F"}}

Laat het script actief om door te gaan met het publiceren van temperatuurgegevens.

Abonneren op verwerkte berichten

Abonneer u in de tweede terminalsessie (ook verbonden met de MQTT-clientpod) op het uitvoeronderwerp om de geconverteerde temperatuurwaarden te bekijken:

# Connect to the MQTT client pod
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh -c '
mosquitto_sub -h aio-broker -p 18883 -t "sensor/temperature/processed" --cafile /var/run/certs/ca.crt \
-D CONNECT authentication-method "K8S-SAT" \
-D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)"'

U ziet de temperatuurgegevens die zijn geconverteerd van Fahrenheit naar Celsius door de WASM-module.

{"temperature":{"value":1292.2222222222222,"count":0,"max":0.0,"min":0.0,"average":0.0,"last":0.0,"unit":"C","overtemp":false}}
{"temperature":{"value":203.33333333333334,"count":0,"max":0.0,"min":0.0,"average":0.0,"last":0.0,"unit":"C","overtemp":false}}

Voorbeeld 2: Een complexe grafiek implementeren

In dit voorbeeld ziet u een geavanceerde werkstroom voor gegevensverwerking die meerdere gegevenstypen verwerkt, waaronder temperatuur-, vochtigheids- en afbeeldingsgegevens. De complexe grafiekdefinitie organiseert meerdere WASM-modules om geavanceerde analyse en objectdetectie uit te voeren.

Hoe het werkt

De complexe grafiek verwerkt drie gegevensstromen en combineert deze in verrijkte sensoranalyses:

  • Temperatuurverwerking: Converteert Fahrenheit naar Celsius, filtert ongeldige metingen en berekent statistieken
  • Vochtigheidsverwerking: verzamelt vochtigheidsmetingen gedurende tijdsintervallen
  • Afbeeldingsverwerking: objectdetectie op cameramomentopnamen en indelingen van resultaten uitvoeren

Zie voorbeeld 2: Complexe grafiekdefinitie voor gedetailleerde informatie over de werking van de complexe grafiekdefinitie, de structuur en de gegevensstroom door meerdere verwerkingsfasen.

De grafiek maakt gebruik van gespecialiseerde modules van de Rust-operators.

De grafiek van de complexe gegevensstroom configureren

Met deze configuratie wordt de werkstroom voor multisensorverwerking geïmplementeerd met behulp van het graph-complex:1.0.0 artefact. U ziet hoe de implementatie van de gegevensstroomgrafiek vergelijkbaar is met voorbeeld 1: beide gebruiken hetzelfde patroon met drie knooppunten (bron, grafiekprocessor, doel), ook al is de verwerkingslogica anders.

Deze overeenkomst treedt op omdat de gegevensstroomgrafiekresource fungeert als een hostomgeving waarmee grafiekdefinities worden geladen en uitgevoerd. De werkelijke verwerkingslogica bevindt zich in het grafiekdefinitieartefact (graph-simple:1.0.0 vs graph-complex:1.0.0), dat de YAML-specificatie van bewerkingen en verbindingen tussen WASM-modules bevat. De gegevensstroomgrafiekresource biedt de runtime-infrastructuur voor het ophalen van het artefact, het instantiëren van de modules en het routeren van gegevens via de gedefinieerde werkstroom.

  1. Als u een gegevensstroomgrafiek wilt maken in de bewerkingservaring, gaat u naar het tabblad Gegevensstroom .

  2. Selecteer de vervolgkeuzelijst naast + Maken en selecteer Een gegevensstroomgrafiek maken

    Schermopname van de interface voor de bewerkingservaring waarin wordt getoond hoe u een complexe grafiek voor een gegevensstroom maakt.

  3. Selekteer de tijdelijke naam nieuwe gegevensstroom om de eigenschappen van de gegevensstroom in te stellen. Voer de naam van de gegevensstroomgrafiek in en kies het gegevensstroomprofiel dat u wilt gebruiken.

  4. Selecteer bron in het gegevensstroomdiagram om het bronknooppunt te configureren. Selecteer onder Brondetailshet eindpunt van de asset of gegevensstroom.

    Schermopname van de interface voor bewerkingservaring waarin wordt getoond hoe u een bron voor de gegevensstroomgrafiek selecteert.

    1. Als u Asset selecteert, kiest u de asset waaruit u gegevens wilt ophalen en klikt u op Toepassen.

    2. Als u Het eindpunt van de gegevensstroom selecteert, voert u de volgende gegevens in en klikt u op Toepassen.

      Configuratie Description
      Eindpunt van gegevensstroom Selecteer de standaardwaarde om het standaardeindpunt van de MQTT-berichtbroker te gebruiken.
      Onderwerp Het onderwerpfilter waarop u zich wilt abonneren voor binnenkomende berichten. Gebruik onderwerp(en)> Rij toevoegen om meerdere onderwerpen toe te voegen.
      Berichtschema Het schema dat moet worden gebruikt om de binnenkomende berichten te deserialiseren.
  5. Selecteer in het gegevensstroomdiagram grafiektransformatie toevoegen (optioneel) om een knooppunt voor grafiekverwerking toe te voegen. Selecteer grafiekcomplex:1 in het deelvenster Grafiekselectie en klik op Toepassen.

    Schermopname van de interface voor bewerkingen waarin wordt getoond hoe u een complexe gegevensstroomgrafiek maakt.

    Belangrijk

    In dit voorbeeld wordt het graph-complex:1.0.0 artefact gebruikt dat u naar uw container registry hebt gepusht. U kunt uw aangepaste grafieken maken door uw eigen WASM-modules te ontwikkelen en naar uw containerregister te pushen. De grafieken die u naar het containerregister pusht, zijn beschikbaar in het deelvenster Grafiekselectie .

  6. U kunt bepaalde instellingen voor de grafiekoperator configureren door het grafiekknooppunt in het diagram te selecteren.

    Schermopname van de interface voor bewerkingen waarin wordt getoond hoe u een complexe gegevensstroomgrafiek configureert.

    Operator Description
    module-snapshot/branch Hiermee configureert u de snapshot module om objectdetectie uit te voeren op afbeeldingen. U kunt de snapshot_topic configuratiesleutel instellen om het invoeronderwerp voor afbeeldingsgegevens op te geven.
    moduletemperatuur/kaart Transformeert key2 temperatuurwaarden naar een andere schaal.
  7. Klik op Toepassen om de wijzigingen op te slaan.

  8. Selecteer Bestemming in het gegevensstroomdiagram om het doelknooppunt te configureren.

  9. Selecteer Opslaan onder de naam van de gegevensstroomgrafiek om de gegevensstroomgrafiek op te slaan.

De complexe gegevensstroom testen

Voordat we de uitvoer kunnen zien, moeten we de brongegevens instellen.

RAW-afbeeldingsbestanden uploaden naar de mqtt-clientpod

De afbeeldingsbestanden zijn bedoeld voor de snapshot module om objecten in de afbeeldingen te detecteren. Ze bevinden zich in de map afbeeldingen op GitHub.

Kloon eerst de opslagplaats om toegang te krijgen tot de afbeeldingsbestanden:

git clone https://github.com/Azure-Samples/explore-iot-operations.git
cd explore-iot-operations

Als u RAW-afbeeldingsbestanden van de ./samples/wasm/images map naar de mqtt-client pod wilt uploaden, kunt u de volgende opdracht gebruiken:

kubectl cp ./samples/wasm/images azure-iot-operations/mqtt-client:/tmp

Controleer of de bestanden zijn geüpload:

kubectl exec -it mqtt-client -n azure-iot-operations -- ls /tmp/images

U ziet nu de lijst met bestanden in de /tmp/images map.

beaker.raw          laptop.raw          sunny2.raw
binoculars.raw      lawnmower.raw       sunny4.raw
broom.raw           milkcan.raw         thimble.raw
camera.raw          photocopier.raw     tripod.raw
computer_mouse.raw  radiator.raw        typewriter.raw
daisy3.raw          screwdriver.raw     vacuum_cleaner.raw
digital_clock.raw   sewing_machine.raw
hammer.raw          sliding_door.raw

Gesimuleerde temperatuur, vochtigheidsgegevens publiceren en afbeeldingen verzenden

U kunt de opdrachten voor het publiceren van temperatuur- en vochtigheidsgegevens en het verzenden van afbeeldingen combineren tot één script. Gebruik de volgende opdracht:

# Connect to the MQTT client pod and run the script
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh -c '
while true; do 
  # Generate a random temperature value between 0 and 6000
  temp_value=$(shuf -i 0-6000 -n 1)
  temp_payload="{\"temperature\":{\"value\":$temp_value,\"unit\":\"F\"}}"
  echo "Publishing temperature: $temp_payload"
  mosquitto_pub -h aio-broker -p 18883 \
    -m "$temp_payload" \
    -t "sensor/temperature/raw" \
    --cafile /var/run/certs/ca.crt \
    -D CONNECT authentication-method "K8S-SAT" \
    -D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)" \
    -D PUBLISH user-property __ts $(date +%s)000:0:df

  # Generate a random humidity value between 30 and 90
  humidity_value=$(shuf -i 30-90 -n 1)
  humidity_payload="{\"humidity\":{\"value\":$humidity_value}}"
  echo "Publishing humidity: $humidity_payload"
  mosquitto_pub -h aio-broker -p 18883 \
    -m "$humidity_payload" \
    -t "sensor/humidity/raw" \
    --cafile /var/run/certs/ca.crt \
    -D CONNECT authentication-method "K8S-SAT" \
    -D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)" \
    -D PUBLISH user-property __ts $(date +%s)000:0:df

  # Send an image every 2 seconds
  if [ $(( $(date +%s) % 2 )) -eq 0 ]; then
    file=$(ls /tmp/images/*.raw | shuf -n 1)
    echo "Sending file: $file"
    mosquitto_pub -h aio-broker -p 18883 \
      -f $file \
      -t "sensor/images/raw" \
      --cafile /var/run/certs/ca.crt \
      -D CONNECT authentication-method "K8S-SAT" \
      -D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)" \
      -D PUBLISH user-property __ts $(date +%s)000:0:df
  fi

  # Wait for 1 second before the next iteration
  sleep 1
done'

De uitvoer controleren

Abonneer u in een nieuwe terminal op het uitvoeronderwerp:

kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh -c '
mosquitto_sub -h aio-broker -p 18883 -t "analytics/sensor/processed" --cafile /var/run/certs/ca.crt \
-D CONNECT authentication-method "K8S-SAT" \
-D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)"'

De uitvoer moet er als volgt uitzien

{"temperature":[{"count":9,"max":2984.4444444444443,"min":248.33333333333337,"average":1849.6296296296296,"last":2612.222222222222,"unit":"C","overtemp":true}],"humidity":[{"count":10,"max":76.0,"min":30.0,"average":49.7,"last":38.0}],"object":[{"result":"milk can; broom; screwdriver; binoculars, field glasses, opera glasses; toy terrier"}]}
{"temperature":[{"count":10,"max":2490.5555555555557,"min":430.55555555555554,"average":1442.6666666666667,"last":1270.5555555555557,"unit":"C","overtemp":true}],"humidity":[{"count":9,"max":87.0,"min":34.0,"average":57.666666666666664,"last":42.0}],"object":[{"result":"broom; Saint Bernard, St Bernard; radiator"}]}

Hier bevat de uitvoer de temperatuur- en vochtigheidsgegevens, evenals de gedetecteerde objecten in de afbeeldingen.

Een module bijwerken in een actieve grafiek

U kunt een WASM-module bijwerken in een actieve grafiek zonder de grafiek te stoppen. Dit is handig als u de logica van een operator wilt bijwerken zonder de gegevensstroom te stoppen. Als u bijvoorbeeld de temperatuurconversiemodule wilt bijwerken van versie 1.0.0 naar 2.0.0, uploadt u de nieuwe versie als volgt:

oras push <YOUR_ACR_NAME>.azurecr.io/temperature:2.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm temperature-2.0.0.wasm:application/wasm

De gegevensstroomgrafiek haalt automatisch de nieuwe versie van de module op zonder extra configuratie. De grafiek blijft zonder onderbreking worden uitgevoerd en de nieuwe moduleversie wordt gebruikt voor de volgende gegevensverwerking.

Aangepaste WASM-modules ontwikkelen

Als u aangepaste logica voor gegevensverwerking voor uw gegevensstroomgrafieken wilt maken, ontwikkelt u WebAssembly-modules in Rust of Python. Met aangepaste modules kunt u gespecialiseerde bedrijfslogica, gegevenstransformaties en analyses implementeren die niet beschikbaar zijn in de ingebouwde operators.

Voor uitgebreide richtlijnen voor ontwikkeling, waaronder:

  • Uw ontwikkelomgeving instellen
  • Operators maken in Rust en Python
  • Informatie over het gegevensmodel en de interfaces
  • Uw modules bouwen en testen

Zie WebAssembly-modules ontwikkelen voor gegevensstroomgrafieken.

Zie WebAssembly-grafiekdefinities configureren voor gedetailleerde informatie over het maken en configureren van de YAML-grafiekdefinities die uw gegevensverwerkingswerkstromen definiëren.

Configuratie van aangepaste gegevensstroomgrafieken

Deze sectie bevat gedetailleerde informatie over het configureren van gegevensstroomgrafieken met WASM-modules. Hierin worden alle configuratieopties, gegevensstroomeindpunten en geavanceerde instellingen behandeld.

Overzicht van gegevensstroomgrafiek

Een gegevensstroomgrafiek definieert hoe gegevens stromen via WebAssembly-modules voor verwerking. Elke grafiek bestaat uit:

  • Modus waarmee wordt bepaald of de grafiek is ingeschakeld of uitgeschakeld
  • Profielreferentie die is gekoppeld aan een gegevensstroomprofiel dat schaalaanpassing en resource-instellingen definieert
  • Schijfpersistentie die optioneel permanente opslag voor grafiekstatus mogelijk maakt
  • Knooppunten die de bron-, verwerkings- en doelonderdelen definiëren
  • Knooppuntverbindingen die aangeven hoe gegevens stromen tussen knooppunten

Modusconfiguratie

De moduseigenschap bepaalt of de gegevensstroomgrafiek gegevens actief verwerkt. U kunt de modus instellen op Enabled of Disabled (niet hoofdlettergevoelig). Wanneer deze is uitgeschakeld, stopt de grafiek met het verwerken van gegevens, maar blijft de configuratie behouden.

Wanneer u een gegevensstroomgrafiek maakt of bewerkt, schakelt u in het deelvenster Eigenschappen van gegevensstroom de optie Gegevensstroom inschakelen in op Ja om de modus in Enabledte stellen op . Als u deze optie uitgeschakeld laat, wordt de modus ingesteld op Disabled.

Schermopname van de interface voor bewerkingservaring waarin wordt getoond hoe u de modusconfiguratie inschakelt of uitschakelt.

Profielreferentie

De profielreferentie verbindt uw gegevensstroomgrafiek met een gegevensstroomprofiel, waarmee schaalinstellingen, aantal exemplaren en resourcelimieten worden gedefinieerd. Als u geen profielreferentie opgeeft, moet u in plaats daarvan een Kubernetes-eigenaarsreferentie gebruiken. In de meeste scenario's wordt het standaardprofiel gebruikt dat wordt geleverd door Azure IoT Operations.

Wanneer u een gegevensstroomgrafiek maakt of bewerkt, selecteert u in het deelvenster Eigenschappen van de gegevensstroom het gegevensstroomprofiel. Het standaardprofiel voor gegevensstromen is standaard geselecteerd. Zie Gegevensstroomprofiel configureren voor meer informatie over gegevensstroomprofielen.

Belangrijk

U kunt alleen het gegevensstroomprofiel kiezen bij het maken van een gegevensstroomgrafiek. U kunt het gegevensstroomprofiel niet wijzigen nadat de gegevensstroomgrafiek is gemaakt. Als u het gegevensstroomprofiel van een bestaande gegevensstroomgrafiek wilt wijzigen, verwijdert u de oorspronkelijke gegevensstroomgrafiek en maakt u een nieuw gegevensstroomprofiel met het nieuwe gegevensstroomprofiel.

Schijfpersistentie aanvragen

Belangrijk

Er is een bekend probleem met schijfpersistentie voor gegevensstroomgrafieken. Deze functie werkt momenteel niet zoals verwacht. Zie Bekende problemen voor meer informatie.

Door schijfpersistentie aan te vragen, kunnen gegevensstroomgrafieken de status behouden bij opnieuw opstarten. Wanneer u deze functie inschakelt, kan de grafiek de verwerkingsstatus herstellen als de verbonden broker opnieuw wordt opgestart. Deze functie is handig voor stateful verwerkingsscenario's waarbij het verlies van tussenliggende gegevens problematisch zou zijn. Wanneer u schijfpersistentie aanvraagt, bewaart de broker de MQTT-gegevens, zoals berichten in de wachtrij voor abonnees, op schijf. Deze aanpak zorgt ervoor dat de gegevensbron van uw gegevensstroom geen gegevensverlies ondervindt tijdens stroomstoringen of dat broker opnieuw wordt opgestart. De broker onderhoudt optimale prestaties omdat persistentie per gegevensstroom is geconfigureerd, dus alleen de gegevensstromen die persistentie nodig hebben, gebruiken deze functie.

De gegevensstroomgrafiek doet deze persistentieaanvraag tijdens het abonnement met behulp van een MQTTv5-gebruikerseigenschap. Deze functie werkt alleen wanneer:

  • De gegevensstroom gebruikt de MQTT-broker als bron (bronknooppunt met MQTT-eindpunt)
  • De MQTT-broker heeft persistentie ingeschakeld met dynamische persistentiemodus ingesteld op Enabled voor het gegevenstype, zoals abonneewachtrijen

Met deze configuratie kunnen MQTT-clients, zoals gegevensstroomgrafieken, schijfpersistentie aanvragen voor hun abonnementen met behulp van MQTTv5-gebruikerseigenschappen. Zie MQTT-brokerpersistentie configureren voor gedetailleerde configuratie van MQTT-brokerpersistentie.

De instelling accepteert Enabled of Disabled, met Disabled als standaardwaarde.

Wanneer u een gegevensstroomgrafiek maakt of bewerkt, kunt u in het eigenschappenvenster Gegevensstroom controleren op Ja om de persistentie van de aanvraagschijf in te stellen op Enabled. Als u het ongecontroleerd laat, is de instelling Disabled.

Knooppuntconfiguratie

Knooppunten zijn de bouwstenen van een gegevensstroomgrafiek. Elk knooppunt heeft een unieke naam in de grafiek en voert een specifieke functie uit. Er zijn drie typen knooppunten:

Bronknooppunten

Bronknooppunten definiëren waar gegevens de grafiek invoeren. Ze maken verbinding met eindpunten voor gegevensstromen die gegevens ontvangen van MQTT-brokers of Kafka-onderwerpen. Elk bronknooppunt moet het volgende opgeven:

  • Eindpuntreferentie die verwijst naar een geconfigureerd gegevensstroomeindpunt
  • Gegevensbronnen als een lijst met MQTT-onderwerpen of Kafka-onderwerpen waarop u zich wilt abonneren
  • Assetreferentie (optioneel) die is gekoppeld aan een Azure Device Registry-asset voor schemadeductie

Met de matrix gegevensbronnen kunt u zich abonneren op meerdere onderwerpen zonder de configuratie van het eindpunt te wijzigen. Dankzij deze flexibiliteit kunnen eindpunten opnieuw worden gebruikt in verschillende gegevensstromen.

Opmerking

Op dit moment worden alleen MQTT- en Kafka-eindpunten ondersteund als gegevensbronnen voor gegevensstroomgrafieken. Zie Eindpunten voor gegevensstromen configureren voor meer informatie.

Selecteer bron in het gegevensstroomdiagram om het bronknooppunt te configureren. Selecteer onder Brondetailshet eindpunt van de gegevensstroom en gebruik vervolgens het veld Onderwerp(en) om de MQTT-onderwerpfilters op te geven waarop u zich wilt abonneren voor binnenkomende berichten. U kunt meerdere MQTT-onderwerpen toevoegen door rij toevoegen te selecteren en een nieuw onderwerp in te voeren.

Knooppunten voor grafiekverwerking

Knooppunten voor grafiekverwerking bevatten de WebAssembly-modules waarmee gegevens worden getransformeerd. Met deze knooppunten worden WASM-artefacten opgehaald uit containerregisters en uitgevoerd met opgegeven configuratieparameters. Voor elk grafiekknooppunt is het volgende vereist:

  • Registereindpuntverwijzing die verwijst naar een registereindpunt voor het ophalen van artefacten
  • Artefactspecificatie die de modulenaam en versie definieert die moet worden opgehaald
  • Configuratieparameters als sleutel-waardeparen die worden doorgegeven aan de WASM-module

Met de configuratiematrix kunt u het modulegedrag aanpassen zonder het WASM-artefact opnieuw te bouwen. Algemene configuratieopties zijn onder andere verwerkingsparameters, drempelwaarden, conversie-instellingen en functievlagmen.

Selecteer in het gegevensstroomdiagram grafiektransformatie toevoegen (optioneel) om een knooppunt voor grafiekverwerking toe te voegen. Selecteer in het deelvenster Grafiekselectie het gewenste grafiekartefact, ofwel een eenvoudige of complexe grafiek, en klik op Toepassen. U kunt bepaalde instellingen voor de grafiekoperator configureren door het grafiekknooppunt in het diagram te selecteren.

De configuratiesleutel-waardeparen worden tijdens runtime doorgegeven aan de WASM-module. De module heeft toegang tot deze waarden om het gedrag ervan aan te passen. Met deze aanpak kunt u het volgende doen:

  • Dezelfde WASM-module implementeren met verschillende configuraties
  • Verwerkingsparameters aanpassen zonder modules opnieuw te bouwen
  • Functies in- of uitschakelen op basis van implementatievereisten
  • Omgevingsspecifieke waarden instellen, zoals drempelwaarden of eindpunten

Doelknooppunten

Doelknooppunten definiëren waar verwerkte gegevens worden verzonden. Ze maken verbinding met eindpunten voor gegevensstromen die gegevens verzenden naar MQTT-brokers, cloudopslag of andere systemen. Elk doelknooppunt geeft het volgende op:

  • Eindpuntreferentie die verwijst naar een geconfigureerd gegevensstroomeindpunt
  • Gegevensbestemming als het specifieke onderwerp, pad of locatie voor uitvoergegevens
  • Uitvoerschema-instellingen (optioneel) die de serialisatie-indeling en schemavalidatie definiëren

Voor opslagbestemmingen zoals Azure Data Lake of Fabric OneLake kunt u uitvoerschema-instellingen opgeven om te bepalen hoe gegevens worden geserialiseerd en gevalideerd.

Opmerking

Momenteel worden alleen MQTT-, Kafka- en OpenTelemetry-eindpunten ondersteund als gegevensbestemmingen voor gegevensstroomgrafieken. Zie Eindpunten voor gegevensstromen configureren voor meer informatie.

  1. Selecteer in het gegevensstroomdiagram het doelknooppunt .
  2. Selecteer het gewenste gegevensstroomeindpunt in de vervolgkeuzelijst Details van gegevensstroomeindpunt .
  3. Selecteer Doorgaan om de bestemming te configureren.
  4. Voer de vereiste instellingen voor de bestemming in, inclusief het onderwerp of de tabel waarnaar de gegevens moeten worden verzonden. Het gegevensdoelveld wordt automatisch geïnterpreteerd op basis van het eindpunttype. Als het eindpunt van de gegevensstroom bijvoorbeeld een MQTT-eindpunt is, wordt u op de pagina met doelgegevens gevraagd om het onderwerp in te voeren.

Knooppuntverbindingen

Knooppuntverbindingen definiëren het gegevensstroompad tussen knooppunten. Elke verbinding geeft een bronknooppunt en doelknooppunt op, waardoor de verwerkingspijplijn wordt gemaakt. Verbindingen kunnen eventueel schemavalidatie bevatten om de gegevensintegriteit tussen verwerkingsfasen te garanderen.

Wanneer u schemavalidatie opgeeft, valideert het systeem de gegevensindeling en -structuur terwijl deze tussen knooppunten stroomt. De validatie helpt bij het vroegtijdig vangen van inconsistenties van gegevens en zorgt ervoor dat WASM-modules gegevens ontvangen in de verwachte indeling.

De bewerkingservaring maakt automatisch knooppuntverbindingen wanneer u het knooppunt voor grafiekverwerking selecteert. U kunt de verbindingen niet wijzigen nadat de grafiek is gemaakt.

Eindpunten voor gegevensstroom

Gegevensstroomgrafieken maken verbinding met externe systemen via gegevensstroomeindpunten. Het type eindpunt bepaalt of het kan worden gebruikt als bron, bestemming of beide:

MQTT-eindpunten

MQTT-eindpunten kunnen fungeren als zowel bronnen als bestemmingen. Ze maken verbinding met MQTT-brokers, waaronder:

  • Lokale MQTT-broker van Azure IoT Operations (vereist in elke gegevensstroom)
  • Azure Event Grid MQTT
  • Aangepaste MQTT-brokers

Zie MQTT-gegevensstroomeindpunten configureren voor gedetailleerde configuratie-informatie.

Kafka-eindpunten

Kafka-eindpunten kunnen fungeren als zowel bronnen als bestemmingen. Ze maken verbinding met kafka-compatibele systemen, waaronder:

  • Azure Event Hubs (kafka-compatibel)
  • Apache Kafka-clusters
  • Confluent Cloud

Zie Azure Event Hubs en Kafka-gegevensstroomeindpunten configureren voor gedetailleerde configuratie-informatie.

Storage-eindpunten

Opslageindpunten kunnen alleen fungeren als bestemmingen. Ze maken verbinding met cloudopslagsystemen voor langetermijnretentie en analyses van gegevens:

  • Azure Data Lake Storage
  • Microsoft Fabric OneLake
  • Lokale opslag

Voor opslageindpunten zijn doorgaans uitvoerschema-instellingen vereist om de indeling voor gegevensserialisatie te definiëren.

Registersysteemeindpunten

Registereindpunten bieden toegang tot containerregisters voor het ophalen van WASM-modules en grafiekdefinities. Ze worden niet rechtstreeks in de gegevensstroom gebruikt, maar knooppunten voor grafiekverwerking verwijzen ernaar.

Zie Registereindpunten configureren voor gedetailleerde configuratie-informatie.