Freigeben über


Grundlegendes zu WebAssembly(WASM)-Modulen und -Diagrammdefinitionen für Datenflussdiagramme

Datenflussdiagramme in Azure IoT Einsatz verarbeiten Telemetriedaten am Edge durch Routing über eine Reihe von Operatoren wie Maps, Filter und Verzweigungen. Sie verpacken Ihre benutzerdefinierte Verarbeitungslogik als WebAssembly(WASM)-Module und verbinden sie in einer Diagrammdefinition zusammen, sodass Sie Daten transformieren, filtern und anreichern können, ohne vollständige Dienste zu schreiben.

In diesem Artikel werden die Operatortypen, das zeitaufwendigen Datenflussmodell, die Modulkonfiguration, die Host-APIs und das WIT-Schema erläutert, das WASM-Module unterstützt. Informationen zum lokalen Erstellen, Testen und Debuggen von Modulen mit der VS Code-Erweiterung oder aio-dataflow der CLI finden Sie unter Build WASM-Module für Datenflüsse.

Operatoren und Module

Operatoren sind die Verarbeitungseinheiten in einem Datenflussdiagramm. Jeder Typ dient einem bestimmten Zweck:

Bediener Purpose Rückgabetyp
Map Transformieren jedes Datenelements (z. B. Konvertieren von Temperatureinheiten) DataModel
Filter Übergeben oder Ablegen von Elementen basierend auf einer Bedingung bool
Filiale Weiterleiten von Elementen an zwei verschiedene Pfade bool
Ansammeln Aggregieren von Elementen innerhalb von Zeitfenstern DataModel
Concatenate Zusammenführen mehrerer Datenströme beim Beibehalten der Reihenfolge N/A
Verzögerung Verschieben Sie Zeitstempel im Voraus, um die Zeitsteuerung zu regeln. N/A

Ein Modul ist die WASM-Binärdatei, die einen oder mehrere Operatoren implementiert. Beispielsweise kann ein einzelnes temperature.wasm Modul sowohl einen map Operator (für die Konvertierung) als auch einen filter Operator (für die Schwellenwertüberprüfung) bereitstellen.

Graph Definition → References Module → Provides Operator → Processes Data
     ↓                    ↓               ↓              ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C

Mit dieser Trennung können Sie dasselbe Modul mit unterschiedlichen Diagrammkonfigurationen, Versionsmodulen unabhängig voneinander wiederverwenden und das Verhalten durch Konfigurationsparameter ändern, ohne dass es neu erstellt wird.

Zeitnahes Datenflussmodell

Datenflussdiagramme basieren auf dem Timely Dataflow Rechenmodell aus Microsoft Research's Naiad-Projekt. Jedes Datenelement enthält einen hybriden logischen Zeitstempel:

record hybrid-logical-clock {
    timestamp: timespec,  // Wall-clock time (secs + nanos)
    counter: u64,         // Logical ordering for same-time events
    node-id: string,      // Originating node
}

Dadurch erhalten Sie eine deterministische Verarbeitung (dieselbe Eingabe erzeugt immer dieselbe Ausgabe), Genau-einmal-Semantik und verteilte Koordination zwischen den Knoten. Das vollständige WIT-Schema finden Sie im Repository samples.

Informationen zum Entwickeln von WASM-Modulen mit der VS Code-Erweiterung finden Sie unter Build WASM-Module mit VS Code-Erweiterung.

Operatoren schreiben

Kartenoperator

Ein Kartenoperator transformiert jedes Datenelement und gibt eine geänderte Kopie zurück. Das Schnellstartbeispiel zeigt eine einfache Karte. Nachfolgend sehen Sie ein komplexeres Beispiel, in dem Konfigurationsparameter verwendet werden:

use std::sync::OnceLock;
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;

static OUTPUT_UNIT: OnceLock<String> = OnceLock::new();

fn unit_converter_init(configuration: ModuleConfiguration) -> bool {
    let unit = configuration.properties
        .iter()
        .find(|(k, _)| k == "output_unit")
        .map(|(_, v)| v.clone())
        .unwrap_or_else(|| "celsius".to_string());

    OUTPUT_UNIT.set(unit.clone()).unwrap();
    logger::log(Level::Info, "converter", &format!("Output unit: {unit}"));
    true
}

#[map_operator(init = "unit_converter_init")]
fn convert_temperature(input: DataModel) -> Result<DataModel, Error> {
    let DataModel::Message(mut msg) = input else {
        return Err(Error { message: "Expected Message variant".into() });
    };

    let payload = &msg.payload.read();
    let mut data: serde_json::Value = serde_json::from_slice(payload)
        .map_err(|e| Error { message: format!("JSON parse error: {e}") })?;

    if let Some(temp) = data["temperature"]["value"].as_f64() {
        let unit = OUTPUT_UNIT.get().map(|s| s.as_str()).unwrap_or("celsius");
        let converted = match unit {
            "kelvin" => (temp - 32.0) * 5.0 / 9.0 + 273.15,
            _ => (temp - 32.0) * 5.0 / 9.0, // celsius
        };
        data["temperature"]["value"] = serde_json::json!(converted);
        data["temperature"]["unit"] = serde_json::json!(unit);
        let out = serde_json::to_string(&data).unwrap();
        msg.payload = BufferOrBytes::Bytes(out.into_bytes());
    }

    Ok(DataModel::Message(msg))
}

Filteroperator

Ein Filter gibt true zurück, um Daten durchzulassen oder false zu verwerfen.

use std::sync::OnceLock;
use wasm_graph_sdk::macros::filter_operator;
use wasm_graph_sdk::logger::{self, Level};

const DEFAULT_LOWER: f64 = -40.0;
const DEFAULT_UPPER: f64 = 3422.0;

static LOWER_BOUND: OnceLock<f64> = OnceLock::new();
static UPPER_BOUND: OnceLock<f64> = OnceLock::new();

fn filter_init(configuration: ModuleConfiguration) -> bool {
    for (key, value) in &configuration.properties {
        match key.as_str() {
            "temperature_lower_bound" => {
                if let Ok(v) = value.parse::<f64>() { LOWER_BOUND.set(v).ok(); }
                else { logger::log(Level::Error, "filter", &format!("Invalid lower bound: {value}")); }
            }
            "temperature_upper_bound" => {
                if let Ok(v) = value.parse::<f64>() { UPPER_BOUND.set(v).ok(); }
                else { logger::log(Level::Error, "filter", &format!("Invalid upper bound: {value}")); }
            }
            _ => {}
        }
    }
    true
}

#[filter_operator(init = "filter_init")]
fn filter_temperature(input: DataModel) -> Result<bool, Error> {
    let lower = LOWER_BOUND.get().copied().unwrap_or(DEFAULT_LOWER);
    let upper = UPPER_BOUND.get().copied().unwrap_or(DEFAULT_UPPER);

    let DataModel::Message(msg) = &input else { return Ok(true); };
    let payload = &msg.payload.read();
    let data: serde_json::Value = serde_json::from_slice(payload)
        .map_err(|e| Error { message: format!("JSON parse error: {e}") })?;

    if let Some(temp) = data.get("temperature").and_then(|t| t.get("value")).and_then(|v| v.as_f64()) {
        Ok(temp >= lower && temp <= upper)
    } else {
        Ok(true) // Pass through non-temperature messages
    }
}

Branch-Operator

Eine Verzweigung leitet Daten an zwei Pfade weiter. Geben Sie false für den ersten Arm zurück und true für den zweiten.

use wasm_graph_sdk::macros::branch_operator;

fn branch_init(_configuration: ModuleConfiguration) -> bool { true }

#[branch_operator(init = "branch_init")]
fn branch_by_type(_timestamp: HybridLogicalClock, input: DataModel) -> Result<bool, Error> {
    let DataModel::Message(msg) = &input else { return Ok(true); };
    let payload = &msg.payload.read();
    let data: serde_json::Value = serde_json::from_slice(payload)
        .map_err(|e| Error { message: format!("JSON parse error: {e}") })?;

    // false = first arm (temperature data), true = second arm (everything else)
    Ok(data.get("temperature").is_none())
}

Modulkonfigurationsparameter

Ihre Operatoren können Laufzeitkonfigurationsparameter über die init Funktion empfangen. Auf diese Weise können Sie das Verhalten anpassen, ohne das Modul neu zu erstellen.

Die init Funktion empfängt eine ModuleConfiguration Struktur:

record module-configuration {
    properties: list<tuple<string, string>>,   // Key-value pairs from graph definition
    module-schemas: list<module-schema>        // Schema definitions if configured
}

Die init Funktion wird einmal aufgerufen, wenn das Modul geladen wird. Kehren Sie true zurück, um mit der Verarbeitung zu beginnen, oder false um einen Konfigurationsfehler zu signalisieren. Wenn initfalse zurückgibt, verarbeitet der Operator keine Daten, und der Datenfluss zeichnet einen Fehler auf.

Von Bedeutung

Wenn Ihr Operator von Konfigurationsparametern abhängt (z. B. Filtergrenzen oder Schwellenwerte), behandeln Sie immer den Fall, in dem sie nicht bereitgestellt werden. Verwenden Sie vernünftige Standardwerte oder geben Sie false von init zurück. Bei fehlenden Parametern sollten Sie unwrap() nicht aufrufen oder in Panik geraten, da dies den Operator zur Laufzeit ohne eindeutige Fehlermeldung abstürzen lässt.

Sie definieren die Parameter im Abschnitt der Diagrammdefinition moduleConfigurations :

moduleConfigurations:
  - name: module-temperature/filter
    parameters:
      temperature_lower_bound:
        name: temperature_lower_bound
        description: "Minimum valid temperature in Celsius"
      temperature_upper_bound:
        name: temperature_upper_bound
        description: "Maximum valid temperature in Celsius"

Das name Feld muss mit dem Operatornamen im Abschnitt des Diagramms operations übereinstimmen. Weitere Informationen zur Diagrammdefinitionsstruktur finden Sie unter Konfigurieren von WebAssembly-Diagrammdefinitionen.

Modulgröße und Leistung

WASM-Module werden in einer Sandkastenumgebung mit begrenzten Ressourcen ausgeführt. Beachten Sie die folgenden Richtlinien:

  • Minimieren Sie Abhängigkeiten. Verwenden Sie default-features = false auf serde und serde_json in Rust, um die Binärgröße zu reduzieren. Vermeiden Sie das Ziehen großer Kisten.
  • Die Modulgröße ist wichtig. Kleinere Module laden schneller und verwenden weniger Arbeitsspeicher. Ein typischer Temperaturkonverter ist ~2 MB (Rust Release) oder ~5 MB (Python). Verwenden Sie Releasebuilds für die Produktion.
  • Vermeiden Sie Blockierungsvorgänge. Die process Funktion sollte schnell abgeschlossen werden. Rechenintensive Berechnungen verzögern die gesamte Datenflusspipeline.
  • Verwenden Sie wasm-tools, um zu prüfen. Führen Sie wasm-tools component wit your-module.wasm aus, um zu überprüfen, ob Ihr Modul die erwarteten Schnittstellen exportiert, bevor es an ein Register übertragen wird.

Versionsverwaltung und CI/CD

Verwenden Sie die semantische Versionsverwaltung für Ihre Module und Diagrammdefinitionen. Das Dataflowdiagramm verweist auf Artefakte nach Name und Tag (z. B. ), sodass Sie Module aktualisieren können, ohne Diagrammdefinitionen zu ändern, temperature:1.0.0indem Sie eine neue Version mit demselben Tag pushen.

Bei automatisierten Builds sieht eine typische Pipeline wie folgt aus:

  1. Erstellen Sie das WASM-Modul (verwenden Sie den Docker-Generator für Konsistenz).
  2. Führen Sie wasm-tools component wit aus, um die exportierten Schnittstellen zu überprüfen.
  3. Führen Sie Komponententests mit Ihrer Kernlogik aus. Weitere Informationen finden Sie unter Testen von WASM-Modulen.
  4. Übertragen Sie mit ORAS in Ihr Register und markieren Sie es mit der Build-Version.
  5. (Optional) Aktualisieren Sie den Artefaktverweis der Diagrammdefinition und führen Sie ein Push durch.

Der Datenflussgraph erfasst automatisch neue Modulversionen, die an denselben Tag übertragen werden, ohne dass eine erneute Bereitstellung erforderlich ist. Siehe Aktualisieren eines Moduls in einem ausgeführten Diagramm.

Hosten von APIs

Ihre WASM-Module können Host-APIs für die Zustandsverwaltung, Protokollierung und Metriken verwenden.

Statusspeicher

Speichern von Daten über process Aufrufe hinweg mithilfe des verteilten Zustandsspeichers:

use wasm_graph_sdk::state_store;

// Set value (fire-and-forget; state_store returns StateStoreError, not types::Error)
let options = state_store::SetOptions {
    conditions: state_store::SetConditions::Unconditional,
    expires: None,
};
let _ = state_store::set(key.as_bytes(), value.as_bytes(), None, None, options);

// Get value
let response = state_store::get(key.as_bytes(), None);

// Delete key
let _ = state_store::del(key.as_bytes(), None, None);

Protokollierung

Strukturierte Protokollierung mit Schweregradstufen:

use wasm_graph_sdk::logger::{self, Level};

logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));

Metriken

Mit OpenTelemetry kompatible Metriken:

use wasm_graph_sdk::metrics::{self, CounterValue, HistogramValue, Label};

let labels = vec![Label { key: "module".to_owned(), value: "my-operator".to_owned() }];
let _ = metrics::add_to_counter("requests_total", CounterValue::U64(1), Some(&labels));
let _ = metrics::record_to_histogram("processing_duration", HistogramValue::F64(duration_ms), Some(&labels));

ONNX-Ableitung

Informationen zum Einbetten und Ausführen kleiner ONNX-Modelle in Ihre Module zur In-Band-Inference finden Sie unter Ausführen von ONNX-Inference in WebAssembly-Datenflussdiagrammen.

WIT-Schemareferenz

Alle Operatoren implementieren standardisierte Schnittstellen, die mithilfe von WIT (WebAssembly Interface Types, WebAssembly-Schnittstellentypen) definiert werden. Sie finden die vollständigen Schemas im Repository samples.

Operatorschnittstellen

Jeder Operator verfügt über eine init Funktion für die Konfiguration und eine process Funktion für die Datenverarbeitung:

interface map {
    use types.{data-model, error, module-configuration};
    init: func(configuration: module-configuration) -> bool;
    process: func(message: data-model) -> result<data-model, error>;
}

interface filter {
    use types.{data-model, error, module-configuration};
    init: func(configuration: module-configuration) -> bool;
    process: func(message: data-model) -> result<bool, error>;
}

interface branch {
    use types.{data-model, error, module-configuration};
    use hybrid-logical-clock.{hybrid-logical-clock};
    init: func(configuration: module-configuration) -> bool;
    process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}

interface accumulate {
    use types.{data-model, error, module-configuration};
    init: func(configuration: module-configuration) -> bool;
    process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}

Datenmodell

Von processor.wit (wasm-graph:processor@1.1.0):

record timestamp {
    timestamp: timespec,        // Physical time (seconds + nanoseconds)
    counter: u64,               // Logical counter for ordering
    node-id: buffer-or-string,  // Originating node
}

record message {
    timestamp: timestamp,
    topic: buffer-or-bytes,
    content-type: option<buffer-or-string>,
    payload: buffer-or-bytes,
    properties: message-properties,
    schema: option<message-schema>,
}

variant data-model {
    buffer-or-bytes(buffer-or-bytes),  // Raw byte data
    message(message),                  // MQTT messages (most common)
    snapshot(snapshot),                // Video/image frames
}

Hinweis

Die meisten Operatoren arbeiten mit der message Variante. Überprüfen Sie diesen Typ am Anfang Ihrer process Funktion. Die Datenladung verwendet entweder ein Hostpufferhandle (buffer) für nullkopierende Lesevorgänge oder vom Modul verwaltete Bytes (bytes). Rufen Sie buffer.read() auf, um Hostbytes in den Speicher Ihres Moduls zu kopieren.