Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Datenflussdiagramme in Azure IoT Einsatz verarbeiten Telemetriedaten am Edge durch Routing über eine Reihe von Operatoren wie Maps, Filter und Verzweigungen. Sie verpacken Ihre benutzerdefinierte Verarbeitungslogik als WebAssembly(WASM)-Module und verbinden sie in einer Diagrammdefinition zusammen, sodass Sie Daten transformieren, filtern und anreichern können, ohne vollständige Dienste zu schreiben.
In diesem Artikel werden die Operatortypen, das zeitaufwendigen Datenflussmodell, die Modulkonfiguration, die Host-APIs und das WIT-Schema erläutert, das WASM-Module unterstützt. Informationen zum lokalen Erstellen, Testen und Debuggen von Modulen mit der VS Code-Erweiterung oder aio-dataflow der CLI finden Sie unter Build WASM-Module für Datenflüsse.
Operatoren und Module
Operatoren sind die Verarbeitungseinheiten in einem Datenflussdiagramm. Jeder Typ dient einem bestimmten Zweck:
| Bediener | Purpose | Rückgabetyp |
|---|---|---|
| Map | Transformieren jedes Datenelements (z. B. Konvertieren von Temperatureinheiten) | DataModel |
| Filter | Übergeben oder Ablegen von Elementen basierend auf einer Bedingung | bool |
| Filiale | Weiterleiten von Elementen an zwei verschiedene Pfade | bool |
| Ansammeln | Aggregieren von Elementen innerhalb von Zeitfenstern | DataModel |
| Concatenate | Zusammenführen mehrerer Datenströme beim Beibehalten der Reihenfolge | N/A |
| Verzögerung | Verschieben Sie Zeitstempel im Voraus, um die Zeitsteuerung zu regeln. | N/A |
Ein Modul ist die WASM-Binärdatei, die einen oder mehrere Operatoren implementiert. Beispielsweise kann ein einzelnes temperature.wasm Modul sowohl einen map Operator (für die Konvertierung) als auch einen filter Operator (für die Schwellenwertüberprüfung) bereitstellen.
Graph Definition → References Module → Provides Operator → Processes Data
↓ ↓ ↓ ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C
Mit dieser Trennung können Sie dasselbe Modul mit unterschiedlichen Diagrammkonfigurationen, Versionsmodulen unabhängig voneinander wiederverwenden und das Verhalten durch Konfigurationsparameter ändern, ohne dass es neu erstellt wird.
Zeitnahes Datenflussmodell
Datenflussdiagramme basieren auf dem Timely Dataflow Rechenmodell aus Microsoft Research's Naiad-Projekt. Jedes Datenelement enthält einen hybriden logischen Zeitstempel:
record hybrid-logical-clock {
timestamp: timespec, // Wall-clock time (secs + nanos)
counter: u64, // Logical ordering for same-time events
node-id: string, // Originating node
}
Dadurch erhalten Sie eine deterministische Verarbeitung (dieselbe Eingabe erzeugt immer dieselbe Ausgabe), Genau-einmal-Semantik und verteilte Koordination zwischen den Knoten. Das vollständige WIT-Schema finden Sie im Repository samples.
Informationen zum Entwickeln von WASM-Modulen mit der VS Code-Erweiterung finden Sie unter Build WASM-Module mit VS Code-Erweiterung.
Operatoren schreiben
Kartenoperator
Ein Kartenoperator transformiert jedes Datenelement und gibt eine geänderte Kopie zurück. Das Schnellstartbeispiel zeigt eine einfache Karte. Nachfolgend sehen Sie ein komplexeres Beispiel, in dem Konfigurationsparameter verwendet werden:
use std::sync::OnceLock;
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;
static OUTPUT_UNIT: OnceLock<String> = OnceLock::new();
fn unit_converter_init(configuration: ModuleConfiguration) -> bool {
let unit = configuration.properties
.iter()
.find(|(k, _)| k == "output_unit")
.map(|(_, v)| v.clone())
.unwrap_or_else(|| "celsius".to_string());
OUTPUT_UNIT.set(unit.clone()).unwrap();
logger::log(Level::Info, "converter", &format!("Output unit: {unit}"));
true
}
#[map_operator(init = "unit_converter_init")]
fn convert_temperature(input: DataModel) -> Result<DataModel, Error> {
let DataModel::Message(mut msg) = input else {
return Err(Error { message: "Expected Message variant".into() });
};
let payload = &msg.payload.read();
let mut data: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| Error { message: format!("JSON parse error: {e}") })?;
if let Some(temp) = data["temperature"]["value"].as_f64() {
let unit = OUTPUT_UNIT.get().map(|s| s.as_str()).unwrap_or("celsius");
let converted = match unit {
"kelvin" => (temp - 32.0) * 5.0 / 9.0 + 273.15,
_ => (temp - 32.0) * 5.0 / 9.0, // celsius
};
data["temperature"]["value"] = serde_json::json!(converted);
data["temperature"]["unit"] = serde_json::json!(unit);
let out = serde_json::to_string(&data).unwrap();
msg.payload = BufferOrBytes::Bytes(out.into_bytes());
}
Ok(DataModel::Message(msg))
}
Filteroperator
Ein Filter gibt true zurück, um Daten durchzulassen oder false zu verwerfen.
use std::sync::OnceLock;
use wasm_graph_sdk::macros::filter_operator;
use wasm_graph_sdk::logger::{self, Level};
const DEFAULT_LOWER: f64 = -40.0;
const DEFAULT_UPPER: f64 = 3422.0;
static LOWER_BOUND: OnceLock<f64> = OnceLock::new();
static UPPER_BOUND: OnceLock<f64> = OnceLock::new();
fn filter_init(configuration: ModuleConfiguration) -> bool {
for (key, value) in &configuration.properties {
match key.as_str() {
"temperature_lower_bound" => {
if let Ok(v) = value.parse::<f64>() { LOWER_BOUND.set(v).ok(); }
else { logger::log(Level::Error, "filter", &format!("Invalid lower bound: {value}")); }
}
"temperature_upper_bound" => {
if let Ok(v) = value.parse::<f64>() { UPPER_BOUND.set(v).ok(); }
else { logger::log(Level::Error, "filter", &format!("Invalid upper bound: {value}")); }
}
_ => {}
}
}
true
}
#[filter_operator(init = "filter_init")]
fn filter_temperature(input: DataModel) -> Result<bool, Error> {
let lower = LOWER_BOUND.get().copied().unwrap_or(DEFAULT_LOWER);
let upper = UPPER_BOUND.get().copied().unwrap_or(DEFAULT_UPPER);
let DataModel::Message(msg) = &input else { return Ok(true); };
let payload = &msg.payload.read();
let data: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| Error { message: format!("JSON parse error: {e}") })?;
if let Some(temp) = data.get("temperature").and_then(|t| t.get("value")).and_then(|v| v.as_f64()) {
Ok(temp >= lower && temp <= upper)
} else {
Ok(true) // Pass through non-temperature messages
}
}
Branch-Operator
Eine Verzweigung leitet Daten an zwei Pfade weiter. Geben Sie false für den ersten Arm zurück und true für den zweiten.
use wasm_graph_sdk::macros::branch_operator;
fn branch_init(_configuration: ModuleConfiguration) -> bool { true }
#[branch_operator(init = "branch_init")]
fn branch_by_type(_timestamp: HybridLogicalClock, input: DataModel) -> Result<bool, Error> {
let DataModel::Message(msg) = &input else { return Ok(true); };
let payload = &msg.payload.read();
let data: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| Error { message: format!("JSON parse error: {e}") })?;
// false = first arm (temperature data), true = second arm (everything else)
Ok(data.get("temperature").is_none())
}
Modulkonfigurationsparameter
Ihre Operatoren können Laufzeitkonfigurationsparameter über die init Funktion empfangen. Auf diese Weise können Sie das Verhalten anpassen, ohne das Modul neu zu erstellen.
Die init Funktion empfängt eine ModuleConfiguration Struktur:
record module-configuration {
properties: list<tuple<string, string>>, // Key-value pairs from graph definition
module-schemas: list<module-schema> // Schema definitions if configured
}
Die init Funktion wird einmal aufgerufen, wenn das Modul geladen wird. Kehren Sie true zurück, um mit der Verarbeitung zu beginnen, oder false um einen Konfigurationsfehler zu signalisieren. Wenn initfalse zurückgibt, verarbeitet der Operator keine Daten, und der Datenfluss zeichnet einen Fehler auf.
Von Bedeutung
Wenn Ihr Operator von Konfigurationsparametern abhängt (z. B. Filtergrenzen oder Schwellenwerte), behandeln Sie immer den Fall, in dem sie nicht bereitgestellt werden. Verwenden Sie vernünftige Standardwerte oder geben Sie false von init zurück. Bei fehlenden Parametern sollten Sie unwrap() nicht aufrufen oder in Panik geraten, da dies den Operator zur Laufzeit ohne eindeutige Fehlermeldung abstürzen lässt.
Sie definieren die Parameter im Abschnitt der Diagrammdefinition moduleConfigurations :
moduleConfigurations:
- name: module-temperature/filter
parameters:
temperature_lower_bound:
name: temperature_lower_bound
description: "Minimum valid temperature in Celsius"
temperature_upper_bound:
name: temperature_upper_bound
description: "Maximum valid temperature in Celsius"
Das name Feld muss mit dem Operatornamen im Abschnitt des Diagramms operations übereinstimmen. Weitere Informationen zur Diagrammdefinitionsstruktur finden Sie unter Konfigurieren von WebAssembly-Diagrammdefinitionen.
Modulgröße und Leistung
WASM-Module werden in einer Sandkastenumgebung mit begrenzten Ressourcen ausgeführt. Beachten Sie die folgenden Richtlinien:
- Minimieren Sie Abhängigkeiten. Verwenden Sie
default-features = falseaufserdeundserde_jsonin Rust, um die Binärgröße zu reduzieren. Vermeiden Sie das Ziehen großer Kisten. - Die Modulgröße ist wichtig. Kleinere Module laden schneller und verwenden weniger Arbeitsspeicher. Ein typischer Temperaturkonverter ist ~2 MB (Rust Release) oder ~5 MB (Python). Verwenden Sie Releasebuilds für die Produktion.
- Vermeiden Sie Blockierungsvorgänge. Die
processFunktion sollte schnell abgeschlossen werden. Rechenintensive Berechnungen verzögern die gesamte Datenflusspipeline. - Verwenden Sie
wasm-tools, um zu prüfen. Führen Siewasm-tools component wit your-module.wasmaus, um zu überprüfen, ob Ihr Modul die erwarteten Schnittstellen exportiert, bevor es an ein Register übertragen wird.
Versionsverwaltung und CI/CD
Verwenden Sie die semantische Versionsverwaltung für Ihre Module und Diagrammdefinitionen. Das Dataflowdiagramm verweist auf Artefakte nach Name und Tag (z. B. ), sodass Sie Module aktualisieren können, ohne Diagrammdefinitionen zu ändern, temperature:1.0.0indem Sie eine neue Version mit demselben Tag pushen.
Bei automatisierten Builds sieht eine typische Pipeline wie folgt aus:
- Erstellen Sie das WASM-Modul (verwenden Sie den Docker-Generator für Konsistenz).
- Führen Sie
wasm-tools component witaus, um die exportierten Schnittstellen zu überprüfen. - Führen Sie Komponententests mit Ihrer Kernlogik aus. Weitere Informationen finden Sie unter Testen von WASM-Modulen.
- Übertragen Sie mit ORAS in Ihr Register und markieren Sie es mit der Build-Version.
- (Optional) Aktualisieren Sie den Artefaktverweis der Diagrammdefinition und führen Sie ein Push durch.
Der Datenflussgraph erfasst automatisch neue Modulversionen, die an denselben Tag übertragen werden, ohne dass eine erneute Bereitstellung erforderlich ist. Siehe Aktualisieren eines Moduls in einem ausgeführten Diagramm.
Hosten von APIs
Ihre WASM-Module können Host-APIs für die Zustandsverwaltung, Protokollierung und Metriken verwenden.
Statusspeicher
Speichern von Daten über process Aufrufe hinweg mithilfe des verteilten Zustandsspeichers:
use wasm_graph_sdk::state_store;
// Set value (fire-and-forget; state_store returns StateStoreError, not types::Error)
let options = state_store::SetOptions {
conditions: state_store::SetConditions::Unconditional,
expires: None,
};
let _ = state_store::set(key.as_bytes(), value.as_bytes(), None, None, options);
// Get value
let response = state_store::get(key.as_bytes(), None);
// Delete key
let _ = state_store::del(key.as_bytes(), None, None);
Protokollierung
Strukturierte Protokollierung mit Schweregradstufen:
use wasm_graph_sdk::logger::{self, Level};
logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));
Metriken
Mit OpenTelemetry kompatible Metriken:
use wasm_graph_sdk::metrics::{self, CounterValue, HistogramValue, Label};
let labels = vec![Label { key: "module".to_owned(), value: "my-operator".to_owned() }];
let _ = metrics::add_to_counter("requests_total", CounterValue::U64(1), Some(&labels));
let _ = metrics::record_to_histogram("processing_duration", HistogramValue::F64(duration_ms), Some(&labels));
ONNX-Ableitung
Informationen zum Einbetten und Ausführen kleiner ONNX-Modelle in Ihre Module zur In-Band-Inference finden Sie unter Ausführen von ONNX-Inference in WebAssembly-Datenflussdiagrammen.
WIT-Schemareferenz
Alle Operatoren implementieren standardisierte Schnittstellen, die mithilfe von WIT (WebAssembly Interface Types, WebAssembly-Schnittstellentypen) definiert werden. Sie finden die vollständigen Schemas im Repository samples.
Operatorschnittstellen
Jeder Operator verfügt über eine init Funktion für die Konfiguration und eine process Funktion für die Datenverarbeitung:
interface map {
use types.{data-model, error, module-configuration};
init: func(configuration: module-configuration) -> bool;
process: func(message: data-model) -> result<data-model, error>;
}
interface filter {
use types.{data-model, error, module-configuration};
init: func(configuration: module-configuration) -> bool;
process: func(message: data-model) -> result<bool, error>;
}
interface branch {
use types.{data-model, error, module-configuration};
use hybrid-logical-clock.{hybrid-logical-clock};
init: func(configuration: module-configuration) -> bool;
process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}
interface accumulate {
use types.{data-model, error, module-configuration};
init: func(configuration: module-configuration) -> bool;
process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}
Datenmodell
Von processor.wit (wasm-graph:processor@1.1.0):
record timestamp {
timestamp: timespec, // Physical time (seconds + nanoseconds)
counter: u64, // Logical counter for ordering
node-id: buffer-or-string, // Originating node
}
record message {
timestamp: timestamp,
topic: buffer-or-bytes,
content-type: option<buffer-or-string>,
payload: buffer-or-bytes,
properties: message-properties,
schema: option<message-schema>,
}
variant data-model {
buffer-or-bytes(buffer-or-bytes), // Raw byte data
message(message), // MQTT messages (most common)
snapshot(snapshot), // Video/image frames
}
Hinweis
Die meisten Operatoren arbeiten mit der message Variante. Überprüfen Sie diesen Typ am Anfang Ihrer process Funktion. Die Datenladung verwendet entweder ein Hostpufferhandle (buffer) für nullkopierende Lesevorgänge oder vom Modul verwaltete Bytes (bytes). Rufen Sie buffer.read() auf, um Hostbytes in den Speicher Ihres Moduls zu kopieren.
Verwandte Inhalte
- Erstellen von WASM-Modulen für Datenflüsse
- Erstellen zustandsbehafteter WASM-Diagramme mit dem Statusspeicher
- Verwenden der Schemaregistrierung mit WASM-Modulen
- Debuggen von WASM-Modulen
- TESTEN VON WASM-Modulen
- Konfigurieren von Diagrammdefinitionen
- Bereitstellen von Diagrammdefinitionen
- ONNX-Rückschluss in WASM-Modulen
- Verwenden von WASM in Datenflussdiagrammen