Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cet article explique comment développer des modules WebAssembly (WASM) personnalisés et des définitions de graphiques pour les graphiques de flux de données Opérations Azure IoT. Créez des modules dans Rust ou Python pour implémenter une logique de traitement personnalisée. Définissez des configurations de graphique qui spécifient la façon dont vos modules se connectent à des flux de travail complets de traitement.
Important
Actuellement, les graphiques de flux de données prennent uniquement en charge les points de terminaison MQTT, Kafka et OpenTelemetry. D’autres types de points de terminaison tels qu’Azure Data Lake, Microsoft Fabric OneLake, Azure Data Explorer et le stockage local ne sont pas pris en charge. Pour plus d’informations, consultez la section Problèmes connus.
Pour savoir comment développer des modules WASM à l’aide de l’extension VS Code, consultez Générer des modules WASM avec l’extension VS Code.
Pour en savoir plus sur les graphiques et WASM dans Azure IoT Operations, consultez :
- Utiliser un graphique de flux de données avec des modules WebAssembly
- Transformer des données entrantes avec des modules WebAssembly
Aperçu
Les graphiques de flux de données Opérations Azure IoT traitent les données de diffusion en continu via des opérateurs configurables implémentés comme modules WebAssembly. Chaque opérateur traite les données horodatées tout en conservant l’ordre temporel, ce qui permet une analytique en temps réel avec des résultats déterministes.
Principaux avantages
- Traitement en temps réel : gérer les données de diffusion en continu avec une faible latence cohérente
- Sémantique de l’heure des événements : traiter des données en fonction du moment où des événements se sont produits, et non quand ils sont traités
- Tolérance de panne : prise en charge intégrée de la gestion des défaillances et en veillant à la cohérence des données
- Scalabilité : distribuer le traitement sur plusieurs nœuds tout en conservant les garanties d’ordre
- Prise en charge multi-langage : développer dans Rust ou Python avec des interfaces cohérentes
Fondation de l’architecture
Les graphiques de flux de données reposent sur le modèle de calcul Flux de données en temps voulu, qui provient du projet Naiad de Microsoft Research. Cette approche assure ce qui suit :
- Traitement déterministe : la même entrée produit toujours la même sortie
- Suivi de la progression : le système sait quand les calculs sont terminés
- Coordination distribuée : plusieurs nœuds de traitement restent synchronisés
Pourquoi utiliser le flux de données en temps opportun ?
Les systèmes de traitement de flux traditionnels présentent plusieurs défis. Les données hors commande signifient que les événements peuvent arriver plus tard que prévu. Les résultats partiels rendent difficile de savoir le temps d’achèvement des calculs. Les problèmes de coordination se produisent lors de la synchronisation du traitement distribué.
Le flux de données en temps voulu résout ces problèmes via :
Horodateurs et suivi de la progression
Chaque élément de données porte un horodateur représentant son temps logique. Le système suit la progression via les horodateurs, ce qui active plusieurs fonctionnalités clés :
- Traitement déterministe : la même entrée produit toujours la même sortie
- Sémantique exactement une fois : aucun traitement dupliqué ou manqué
- Filigranes : savoir quand aucune autre donnée n’arrivera pendant une période donnée
Horloge logique hybride
Le mécanisme d’horodateur utilise une approche hybride :
pub struct HybridLogicalClock {
pub physical_time: u64, // Wall-clock time when event occurred
pub logical_time: u64, // Logical ordering for events at same physical time
}
L’approche d’horloge logique hybride assure plusieurs fonctionnalités :
- Ordre causal : les effets suivent les causes
- Garanties de progression : le système sait quand le traitement est terminé
- Coordination distribuée : plusieurs nœuds restent synchronisés
Découvrir les opérateurs et les modules
Comprendre la distinction entre les opérateurs et les modules est essentiel pour le développement WASM :
Opérateurs
Les opérateurs sont des unités de traitement fondamentales basées sur les Opérateurs de flux de données en temps voulu. Chaque type d’opérateur a un objectif spécifique :
- Carte : transformer chaque élément de données (par exemple, convertir des unités de température)
- Filtre : autoriser uniquement certains éléments de données à passer en fonction de conditions (telles que la suppression de lectures non valides)
- Branche : acheminer les données vers différents chemins en fonction des conditions (telles que la séparation des données de température et d’humidité)
- Accumulation : Collecter et agréger des données dans les fenêtres de temps (telles que les résumés statistiques informatiques)
- Concaténer : fusionner plusieurs flux de données tout en préservant l’ordre temporel
- Délai : contrôler le minutage en avançant les horodatages
Modules
Les modules constituent l’implémentation de la logique d’opérateur en tant que code WASM. Un seul module peut implémenter plusieurs types d’opérateurs. Par exemple, il est possible qu’un module de température fournissent :
- Un opérateur de carte pour la conversion d’unités
- Un opérateur de filtre pour la vérification de seuil
- Un opérateur de branche pour les décisions de routage
- Un opérateur d’accumulation pour l’agrégation statistique
La relation
La relation entre les définitions de graphiques, les modules et les opérateurs suit un modèle spécifique :
Graph Definition → References Module → Provides Operator → Processes Data
↓ ↓ ↓ ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C
Cette séparation permet :
- Une réutilisation de module : déployer le même module WASM dans différentes configurations de graphique
- Un contrôle de version indépendant : mettre à jour les définitions de graphiques sans regénérer les modules
- Une configuration dynamique : passer différents paramètres au même module pour différents comportements
Prerequisites
Choisissez votre langage de développement et configurez les outils requis :
La chaîne d’outils Rust fournit
cargo,rustcet la bibliothèque standard nécessaire pour compiler des opérateurs. Installer avec :curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -yCible WASM
wasm32-wasip2requise pour générer les composants WASM d’Opérations Azure IoT. Ajouter avec :rustup target add wasm32-wasip2Les outils de génération fournissent des utilitaires utilisés par les générateurs et CI pour valider et empaqueter les artefacts WASM. Installer avec :
cargo install wasm-tools --version '=1.201.0' --locked
Configurer l’environnement de développement
Vous pouvez accéder au Kit de développement logiciel (SDK) WASM Rust via un registre Microsoft Azure DevOps personnalisé. Au lieu d’utiliser des variables d’environnement, configurez l’accès avec un fichier de configuration d’espace de travail :
# .cargo/config.toml (at your workspace root)
[registries]
aio-wg = { index = "sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/" }
[net]
git-fetch-with-cli = true
Cette configuration reflète la configuration d'exemple à samples/wasm/.cargo/config.toml et conserve les paramètres de registre dans le contrôle de version.
Créer un projet
Commencez par créer un répertoire de projet pour votre module d’opérateur. La structure du projet dépend de la langue que vous choisissez.
cargo new --lib temperature-converter
cd temperature-converter
Configurer Cargo.toml
Modifiez le fichier Cargo.toml pour inclure des dépendances pour le Kit de développement logiciel (SDK) WASM et d’autres bibliothèques :
[package]
name = "temperature-converter"
version = "0.1.0"
edition = "2021"
[dependencies]
# WebAssembly Interface Types (WIT) code generation
wit-bindgen = "0.22"
# Azure IoT Operations WASM SDK - provides operator macros and host APIs
wasm_graph_sdk = { version = "=1.1.3", registry = "aio-wg" }
# JSON serialization/deserialization for data processing
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false, features = ["alloc"] }
[lib]
# Required for WASM module compilation
crate-type = ["cdylib"]
Remarques sur les versions et le Registre :
- La version du SDK (
=1.1.3) s’aligne sur les exemples actuels ; la maintenir fixée évite les changements cassants. -
registry = "aio-wg"correspond à l’entrée de Registre définie dans.cargo/config.toml.
Les dépendances clés expliquées :
-
wit-bindgen: génère des liaisons Rust à partir de définitions WIT (WebAssembly Interface Types), ce qui permet à votre code d’interface avec le runtime WASM. -
wasm_graph_sdk: Kit de développement logiciel (SDK) Azure IoT Operations fournissant des macros d’opérateur, telles que#[map_operator]et#[filter_operator], et des API hôtes pour la journalisation, les métriques et la gestion de l’état. -
serde+serde_json: bibliothèques de traitement JSON pour l’analyse et la génération de charges utiles de données.default-features = falseest optimisé pour les contraintes de taille de WASM. -
crate-type = ["cdylib"]: compile la bibliothèque Rust en tant que bibliothèque dynamique compatible C, qui est requise pour la génération de module WASM.
Créer un module simple
Créez un module simple qui convertit la température des degrés Celsius en Fahrenheit. Cet exemple montre la logique de base de structure et de traitement pour les implémentations Rust et Python.
use serde_json::{json, Value};
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;
fn fahrenheit_to_celsius_init(_configuration: ModuleConfiguration) -> bool {
logger::log(Level::Info, "temperature-converter", "Init invoked"); // one-time module init
true
}
#[map_operator(init = "fahrenheit_to_celsius_init")]
fn fahrenheit_to_celsius(input: DataModel) -> Result<DataModel, Error> {
let DataModel::Message(mut result) = input else {
return Err(Error {
message: "Unexpected input type".to_string(),
});
};
let payload = &result.payload.read(); // payload bytes from inbound message
if let Ok(data_str) = std::str::from_utf8(payload) {
if let Ok(mut data) = serde_json::from_str::<Value>(data_str) {
if let Some(temp) = data["temperature"]["value"].as_f64() {
let fahrenheit = (temp * 9.0 / 5.0) + 32.0; // Celsius -> Fahrenheit
data["temperature"] = json!({
"value_fahrenheit": fahrenheit,
"original_celsius": temp
});
if let Ok(output_str) = serde_json::to_string(&data) {
// Replace payload with owned bytes so the host receives the updated JSON
result.payload = BufferOrBytes::Bytes(output_str.into_bytes());
}
}
}
}
Ok(DataModel::Message(result))
}
Générer le module
Choisissez entre les builds de développement local ou les builds conteneurisées en fonction de vos besoins en matière de workflow de développement et d’environnement.
Build locale
Générez directement sur votre machine de développement pour une itération la plus rapide pendant le développement et quand vous avez besoin d’un contrôle total sur l’environnement de génération.
# Build WASM module
cargo build --release --target wasm32-wasip2 # target required for Azure IoT Operations WASM components
# Find your module
ls target/wasm32-wasip2/release/*.wasm
Docker build
Générez en utilisant des environnements conteneurisés avec toutes les dépendances et schémas préconfigurés. Ces images Docker fournissent des versions cohérentes dans différents environnements et sont idéales pour les pipelines CI/CD.
Le référentiel d’exemples Azure IoT Operations gère le générateur Docker Rust et inclut toutes les dépendances nécessaires. Pour obtenir une documentation détaillée, consultez Utilisation du générateur Docker Rust.
# Build release version (optimized for production)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter
# Build debug version (includes debugging symbols and less optimization)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter --build-mode debug
Options de version Docker :
-
--app-name: doit correspondre à votre nom de Crate Rust à partir deCargo.toml -
--build-mode: choisirrelease(par défaut) pour les versions optimisées oudebugpour les versions de développement avec des symboles
Autres exemples
Pour obtenir des exemples complets, consultez les Exemples Rust dans le référentiel d’exemples. Les implémentations complètes sont les suivantes :
- Opérateurs de carte : transformation des données et logique de conversion
- Opérateurs de filtre : traitement de données conditionnel et validation
- Opérateurs de branche : routage à chemins multiples en fonction du contenu des données
- Opérateurs d’accumulation : agrégation à fenêtres temporelles et traitement statistique
- Opérateurs de retard : contrôle de traitement basé sur le temps
Les exemples montrent les implémentations de travail qui présentent la structure complète de chaque type d’opérateur, notamment la gestion des erreurs et les modèles de journalisation appropriés.
Informations de référence et API du Kit de développement logiciel (SDK)
Le Kit de développement logiciel (SDK) WASM Rust fournit des outils de développement complets :
Macros d’opérateur
use wasm_graph_sdk::macros::{map_operator, filter_operator, branch_operator};
use wasm_graph_sdk::{DataModel, HybridLogicalClock};
// Map operator - transforms each data item
#[map_operator(init = "my_init_function")]
fn my_map(input: DataModel) -> Result<DataModel, Error> {
// Transform logic here
}
// Filter operator - allows/rejects data based on predicate
#[filter_operator(init = "my_init_function")]
fn my_filter(input: DataModel) -> Result<bool, Error> {
// Return true to pass data through, false to filter out
}
// Branch operator - routes data to different arms
#[branch_operator(init = "my_init_function")]
fn my_branch(input: DataModel, timestamp: HybridLogicalClock) -> Result<bool, Error> {
// Return true for "True" arm, false for "False" arm
}
Paramètres de configuration du module
Vos opérateurs WASM peuvent recevoir des paramètres de configuration d’exécution via le struct ModuleConfiguration passé à la fonction init. Vous définissez ces paramètres dans la définition de graphique, ce qui vous permet de personnaliser le runtime sans reconstruire les modules.
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::ModuleConfiguration;
fn my_operator_init(configuration: ModuleConfiguration) -> bool {
// Access required parameters
if let Some(threshold_param) = configuration.parameters.get("temperature_threshold") {
let threshold: f64 = threshold_param.parse().unwrap_or(25.0);
logger::log(Level::Info, "my-operator", &format!("Using threshold: {}", threshold));
}
// Access optional parameters with defaults
let unit = configuration.parameters
.get("output_unit")
.map(|s| s.as_str())
.unwrap_or("celsius");
logger::log(Level::Info, "my-operator", &format!("Output unit: {}", unit));
true
}
Pour découvrir plus d’informations sur la définition des paramètres de configuration dans les définitions de graphiques, consultez Paramètres de configuration du module.
API d’hôte
Utilisez le Kit de développement logiciel (SDK) pour utiliser les services distribués :
Magasin d’état pour données persistantes :
use wasm_graph_sdk::state_store;
// Set value
state_store::set(key.as_bytes(), value.as_bytes(), None, None, options)?;
// Get value
let response = state_store::get(key.as_bytes(), None)?;
// Delete key
state_store::del(key.as_bytes(), None, None)?;
Journalisation structurée :
use wasm_graph_sdk::logger::{self, Level};
logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));
Ouvrir des métriques compatibles OpenTelemetry :
use wasm_graph_sdk::metrics;
// Increment counter
metrics::add_to_counter("requests_total", 1.0, Some(labels))?;
// Record histogram value
metrics::record_to_histogram("processing_duration", duration_ms, Some(labels))?;
L'inférence ONNX avec WASM
Pour incorporer et exécuter de petits modèles ONNX à l’intérieur de vos modules pour l’inférence en bande, consultez Exécuter l’inférence ONNX dans les graphiques de flux de données WebAssembly. Cet article traite de l’empaquetage de modèles avec des modules, l’activation de la fonctionnalité wasi-nn dans les définitions de graphiques et les limitations.
Types d’interface WebAssembly (WIT)
Tous les opérateurs implémentent des interfaces standardisées définies en utilisant WebAssembly Interface Types (WIT). WIT fournit des définitions d’interface indépendantes du langage qui assurent la compatibilité entre les modules WASM et le runtime hôte.
Vous trouverez les schémas WIT complets pour les opérations Azure IoT dans le référentiel d’exemples. Ces schémas définissent toutes les interfaces, types et structures de données que vous utilisez lors du développement de modules WASM.
Modèle de données et interfaces
Tous les opérateurs WASM fonctionnent avec des modèles de données standardisés définis à l’aide de types d’interface WebAssembly (WIT) :
Modèle de données de base
// Core timestamp structure using hybrid logical clock
record timestamp {
timestamp: timespec, // Physical time (seconds + nanoseconds)
node-id: buffer-or-string, // Logical node identifier
}
// Union type supporting multiple data formats
variant data-model {
buffer-or-bytes(buffer-or-bytes), // Raw byte data
message(message), // Structured messages with metadata
snapshot(snapshot), // Video/image frames with timestamps
}
// Structured message format
record message {
timestamp: timestamp,
content_type: buffer-or-string,
payload: message-payload,
}
Définitions d’interface WIT
Chaque type d’opérateur implémente une interface WIT spécifique :
// Core operator interfaces
interface map {
use types.{data-model};
process: func(message: data-model) -> result<data-model, error>;
}
interface filter {
use types.{data-model};
process: func(message: data-model) -> result<bool, error>;
}
interface branch {
use types.{data-model, hybrid-logical-clock};
process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}
interface accumulate {
use types.{data-model};
process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}
Définitions de graphique et intégration de WASM
Les définitions de graphique montrent comment vos modules WASM se connectent au traitement des flux de travail. Ils spécifient les opérations, les connexions et les paramètres qui créent des pipelines de traitement de données complets.
Pour obtenir des informations complètes sur la création et la configuration de définitions de graphiques, notamment des exemples détaillés de workflows simples et complexes, consultez Configurer des définitions de graphique WebAssembly pour les graphiques de flux de données.
Rubriques clés abordées dans le guide sur les définitions de graphique :
- Structure de définition de graphique : présentation du schéma YAML et des composants requis
- Exemple de graphique simple : pipeline de conversion de température en trois étapes de base
- Exemple de graphique complexe : traitement à plusieurs capteurs avec branchement et agrégation
- Paramètres de configuration du module : personnalisation du runtime des opérateurs WASM
- Déploiement du Registre : packaging et stockage de définitions de graphiques en tant qu’artefacts OCI
Étapes suivantes
- Consultez des exemples complets et des modèles avancés dans le référentiel d’exemples Opérations Azure IoT WASM.
- Découvrez comment déployer vos modules dans Utiliser WebAssembly avec des graphiques de flux de données.
- Configurez vos points de terminaison de flux de données dans Configurer les points de terminaison de flux de données.