Sdílet prostřednictvím


Vývoj modulů WebAssembly (WASM) a definic grafů pro grafy toku dat

V tomto článku se dozvíte, jak vyvíjet vlastní moduly WebAssembly (WASM) a definice grafů pro grafy toku dat Azure IoT Operations. Vytvořte moduly v Rustu nebo Pythonu pro implementaci vlastní logiky zpracování. Definujte konfigurace grafu, které určují, jak se moduly připojují k kompletním pracovním postupům zpracování.

Důležité

Grafy toku dat aktuálně podporují pouze koncové body MQTT, Kafka a OpenTelemetry. Jiné typy koncových bodů, jako jsou Data Lake, Microsoft Fabric OneLake, Azure Data Explorer a Local Storage, se nepodporují. Další informace najdete v tématu Známé problémy.

Přehled

Grafy toku dat Azure IoT zpracovávají streamovaná data prostřednictvím konfigurovatelných operátorů implementovaných jako moduly WebAssembly. Každý operátor zpracovává časová razítka dat při zachování dočasného řazení a umožňuje analýzu v reálném čase s deterministickými výsledky.

Klíčové výhody

  • Zpracování v reálném čase: Zpracování streamovaných dat s konzistentní nízkou latencí
  • Sémantika času událostí: Zpracování dat na základě událostí, nikoli při jejich zpracování
  • Odolnost proti chybám: Integrovaná podpora pro zpracování selhání a zajištění konzistence dat
  • Škálovatelnost: Distribuce zpracování napříč několika uzly při zachování záruk objednávek
  • Podpora více jazyků: Vývoj v Rustu nebo Pythonu s konzistentními rozhraními

Základ architektury

Grafy toku dat vycházejí z výpočetního modelu Včasná toku dat , který pochází z projektu Microsoft Research Naiad. Tento přístup zajišťuje:

  • Deterministické zpracování: Stejný vstup vždy vytváří stejný výstup.
  • Sledování průběhu: Systém ví, kdy jsou výpočty dokončené.
  • Distribuovaná koordinace: Více uzlů zpracování zůstane synchronizované

Proč včasný tok dat?

Tradiční systémy zpracování datových proudů mají několik výzev. Data mimo pořadí znamenají, že události můžou dorazit později, než se čekalo. Částečné výsledky ztěžují zjištění, kdy se výpočty dokončí. K problémům koordinace dochází při synchronizaci distribuovaného zpracování.

Včasné toky dat řeší tyto problémy prostřednictvím:

Časové razítka a sledování průběhu

Každá datová položka nese časové razítko představující logický čas. Systém sleduje průběh prostřednictvím časových razítek a umožňuje několik klíčových funkcí:

  • Deterministické zpracování: Stejný vstup vždy vytváří stejný výstup.
  • Přesně jednou sémantika: Žádné duplicitní nebo zmeškané zpracování
  • Vodoznaky: Zjistěte, kdy se na daný čas nedorazí žádná další data.

Hybridní logické hodiny

Mechanismus časového razítka používá hybridní přístup:

pub struct HybridLogicalClock {
    pub physical_time: u64,  // Wall-clock time when event occurred
    pub logical_time: u64,   // Logical ordering for events at same physical time
}

Přístup k hybridním logickým hodinám zajišťuje několik možností:

  • Kauzální řazení: Účinky následují příčiny
  • Záruky průběhu: Systém ví, kdy je zpracování dokončené.
  • Distribuovaná koordinace: Synchronizace více uzlů

Principy operátorů a modulů

Pochopení rozdílu mezi operátory a moduly je nezbytné pro vývoj WASM:

Operátoři

Operátory jsou základní jednotky zpracování založené na operátorech včasného toku dat. Každý typ operátoru slouží ke konkrétnímu účelu:

  • Mapa: Transformace jednotlivých datových položek (například převod jednotek teploty)
  • Filtr: Povolit průchod pouze určitým datovým položkám na základě podmínek (například odebrání neplatných čtení)
  • Větev: Směrování dat do různých cest na základě podmínek (například oddělení dat o teplotě a vlhkosti)
  • Shromažďování: Shromažďování a agregace dat v časových oknech (například výpočetní statistické souhrny)
  • Zřetězení: Sloučení více datových proudů při zachování dočasného pořadí
  • Zpoždění: Řízení časování pomocí časového razítka

Moduly

Moduly představují implementaci logiky operátoru jako kód WASM. Jeden modul může implementovat více typů operátorů. Například modul teploty může poskytovat:

  • Operátor mapy pro převod jednotek
  • Operátor filtru pro kontrolu prahových hodnot
  • Operátor větve pro rozhodování o směrování
  • Kumulační operátor pro statistickou agregaci

Vztah

Vztah mezi definicemi grafu, moduly a operátory se řídí konkrétním vzorem:

Graph Definition → References Module → Provides Operator → Processes Data
     ↓                    ↓               ↓              ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C

Oddělení umožňuje:

  • Opakované použití modulu: Nasazení stejného modulu WASM v různých konfiguracích grafů
  • Nezávislá správa verzí: Aktualizace definic grafu bez opětovného sestavení modulů
  • Dynamická konfigurace: Předání různých parametrů do stejného modulu pro různá chování

Požadavky

Zvolte svůj vývojový jazyk a nastavte požadované nástroje:

  • Sada nástrojů Rust: Instalace s:
    curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
    
  • Cíl WASM: Přidat pomocí:
    rustup target add wasm32-wasip2
    
  • Nástroje sestavení: Instalace pomocí:
    cargo install wasm-tools --version '=1.201.0' --locked
    

Konfigurace vývojového prostředí

Sada WASM Rust SDK je dostupná prostřednictvím vlastního registru Azure DevOps. Nakonfigurujte přístup nastavením těchto proměnných prostředí:

export CARGO_REGISTRIES_AZURE_VSCODE_TINYKUBE_INDEX="sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/"
export CARGO_NET_GIT_FETCH_WITH_CLI=true

Přidejte do profilu prostředí následující proměnné prostředí pro trvalý přístup:

echo 'export CARGO_REGISTRIES_AZURE_VSCODE_TINYKUBE_INDEX="sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/"' >> ~/.bashrc
echo 'export CARGO_NET_GIT_FETCH_WITH_CLI=true' >> ~/.bashrc
source ~/.bashrc

Vytvoření projektu

Začněte vytvořením nového adresáře projektu pro modul operátora. Struktura projektu závisí na zvoleném jazyce.

cargo new --lib temperature-converter
cd temperature-converter

Konfigurace Cargo.toml

Cargo.toml Upravte soubor tak, aby zahrnoval závislosti pro sadu WASM SDK a další knihovny:

[package]
name = "temperature-converter"
version = "0.1.0"
edition = "2021"

[dependencies]
# WebAssembly Interface Types (WIT) code generation
wit-bindgen = "0.22"

# Azure IoT Operations WASM SDK - provides operator macros and host APIs
tinykube_wasm_sdk = { version = "0.2.0", registry = "azure-vscode-tinykube" }

# JSON serialization/deserialization for data processing
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false, features = ["alloc"] }

[lib]
# Required for WASM module compilation
crate-type = ["cdylib"]

Vysvětlení klíčových závislostí:

  • wit-bindgen: Generuje vazby Rust z definic typů rozhraní WebAssembly (WIT), což umožňuje vašemu kódu rozhraní s modulem RUNTIME WASM.
  • tinykube_wasm_sdk: Sada Azure IoT Operations SDK poskytující makra operátorů (#[map_operator]#[filter_operator]atd.) a rozhraní API hostitele pro protokolování, metriky a správu stavu
  • serde + serde_json: Knihovny zpracování JSON pro analýzu a generování datových částí; default-features = false optimalizuje omezení velikosti WASM.
  • crate-type = ["cdylib"]: Kompiluje knihovnu Rust jako dynamickou knihovnu kompatibilní s jazykem C, která se vyžaduje pro generování modulu WASM.

Vytvoření jednoduchého modulu

Vytvořte jednoduchý modul, který převádí teplotu ze Celsia na Fahrenheita. Tento příklad ukazuje základní strukturu a logiku zpracování pro implementace Rustu i Pythonu.

use serde_json::{json, Value};

use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;

fn fahrenheit_to_celsius_init(_configuration: ModuleConfiguration) -> bool {
    logger::log(Level::Info, "temperature-converter", "Init invoked");
    true
}

#[map_operator(init = "fahrenheit_to_celsius_init")]
fn fahrenheit_to_celsius(input: DataModel) -> Result<DataModel, Error> {
    let DataModel::Message(mut result) = input else {
        return Err(Error {
            message: "Unexpected input type".to_string(),
        });
    };

    let payload = &result.payload.read();
    if let Ok(data_str) = std::str::from_utf8(payload) {
        if let Ok(mut data) = serde_json::from_str::<Value>(data_str) {
            if let Some(temp) = data["temperature"]["value"].as_f64() {
                let fahrenheit = (temp * 9.0 / 5.0) + 32.0;
                data["temperature"] = json!({
                    "value_fahrenheit": fahrenheit,
                    "original_celsius": temp
                });

                if let Ok(output_str) = serde_json::to_string(&data) {
                    result.payload = BufferOrBytes::Bytes(output_str.into_bytes());
                }
            }
        }
    }

    Ok(DataModel::Message(result))
}

Modul sestavení

Vyberte si mezi místními buildy vývoje nebo kontejnerizovanými sestaveními na základě vašich požadavků na vývoj a prostředí.

Místní sestavení

Zabudujte přímo na vývojovém počítači pro nejrychlejší iteraci během vývoje a v případě, že potřebujete úplnou kontrolu nad prostředím sestavení.

# Build WASM module
cargo build --release --target wasm32-wasip2

# Find your module  
ls target/wasm32-wasip2/release/*.wasm

Sestavení Dockeru

Sestavte pomocí kontejnerizovaných prostředí s předkonfigurovanými všemi závislostmi a schématy. Tyto image Dockeru poskytují konzistentní buildy v různých prostředích a jsou ideální pro kanály CI/CD.

Tvůrce Rust Dockeru se udržuje v úložišti ukázek operací Azure IoT a zahrnuje všechny nezbytné závislosti. Podrobnou dokumentaci najdete v tématu Použití tvůrce Dockeru z Rustu.

# Build release version (optimized for production)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter

# Build debug version (includes debugging symbols and less optimization)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter --build-mode debug

Možnosti sestavení Dockeru:

  • --app-name: Musí se shodovat s názvem vaší bedny Rust z Cargo.toml
  • --build-mode: Zvolte release (výchozí) pro optimalizovaná sestavení nebo debug vývojová sestavení se symboly.

Další příklady

Komplexní příklady najdete v příkladech Rustu v úložišti ukázek. Mezi kompletní implementace patří:

  • Operátory mapování: Transformace dat a logika převodu
  • Operátory filtru: Podmíněné zpracování a ověření dat
  • Operátory větví: Směrování s více cestami na základě datového obsahu
  • Operátory kumulování: Agregace s časovým intervalem a statistické zpracování
  • Operátory zpoždění: Ovládací prvek zpracování na základě času

Příklady ukazují funkční implementace, které zobrazují úplnou strukturu pro každý typ operátoru, včetně správného zpracování chyb a vzorů protokolování.

Referenční informace k sadě SDK a rozhraní API

Sada WASM Rust SDK poskytuje komplexní vývojové nástroje:

Makra operátoru

use tinykube_wasm_sdk::macros::{map_operator, filter_operator, branch_operator};
use tinykube_wasm_sdk::{DataModel, HybridLogicalClock};

// Map operator - transforms each data item
#[map_operator(init = "my_init_function")]
fn my_map(input: DataModel) -> DataModel {
    // Transform logic here
}

// Filter operator - allows/rejects data based on predicate  
#[filter_operator(init = "my_init_function")]
fn my_filter(input: DataModel) -> bool {
    // Return true to pass data through, false to filter out
}

// Branch operator - routes data to different arms
#[branch_operator(init = "my_init_function")]
fn my_branch(input: DataModel, timestamp: HybridLogicalClock) -> bool {
    // Return true for "True" arm, false for "False" arm
}

Parametry konfigurace modulu

Operátory WASM mohou přijímat parametry konfigurace modulu runtime prostřednictvím ModuleConfiguration struktury předané funkci init . Tyto parametry jsou definovány v definici grafu a umožňují přizpůsobení modulu runtime bez opětovného sestavení modulů.

use tinykube_wasm_sdk::logger::{self, Level};
use tinykube_wasm_sdk::ModuleConfiguration;

fn my_operator_init(configuration: ModuleConfiguration) -> bool {
    // Access required parameters
    if let Some(threshold_param) = configuration.parameters.get("temperature_threshold") {
        let threshold: f64 = threshold_param.parse().unwrap_or(25.0);
        logger::log(Level::Info, "my-operator", &format!("Using threshold: {}", threshold));
    }
    
    // Access optional parameters with defaults
    let unit = configuration.parameters
        .get("output_unit")
        .map(|s| s.as_str())
        .unwrap_or("celsius");
    
    logger::log(Level::Info, "my-operator", &format!("Output unit: {}", unit));
    true
}

Podrobné informace o definování parametrů konfigurace v definicích grafu naleznete v tématu Parametry konfigurace modulu.

Rozhraní API hostitele

Použití sady SDK pro práci s distribuovanými službami:

Úložiště stavu pro trvalá data:

use tinykube_wasm_sdk::state_store;

// Set value
state_store::set(key.as_bytes(), value.as_bytes(), None, None, options)?;

// Get value  
let response = state_store::get(key.as_bytes(), None)?;

// Delete key
state_store::del(key.as_bytes(), None, None)?;

Strukturované protokolování:

use tinykube_wasm_sdk::logger::{self, Level};

logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));

Metriky kompatibilní s OpenTelemetry:

use tinykube_wasm_sdk::metrics;

// Increment counter
metrics::add_to_counter("requests_total", 1.0, Some(labels))?;

// Record histogram value
metrics::record_to_histogram("processing_duration", duration_ms, Some(labels))?;

Odvození ONNX s WASM

Pokud chcete do modulů vkládat a spouštět malé modely ONNX pro odvozování v rámci pásma, přečtěte si téma Spouštění odvozování ONNX v grafech toku dat WebAssembly. Tento článek popisuje balení modelů s moduly, které umožňují funkci wasi-nn v definicích grafu a omezení.

Typy rozhraní WebAssembly (WIT)

Všechny operátory implementují standardizovaná rozhraní definovaná pomocí typů rozhraní WebAssembly (WIT). Technologie WIT poskytuje definice rozhraní nezávislé na jazyce, které zajišťují kompatibilitu mezi moduly WASM a modulem runtime hostitele.

Kompletní schémata WIT pro operace Azure IoT jsou k dispozici v úložišti ukázek. Tato schémata definují všechna rozhraní, typy a datové struktury, se kterými budete pracovat při vývoji modulů WASM.

Datový model a rozhraní

Všichni operátoři WASM pracují se standardizovanými datovými modely definovanými pomocí typů rozhraní WebAssembly (WIT):

Základní datový model

// Core timestamp structure using hybrid logical clock
record timestamp {
    timestamp: timespec,     // Physical time (seconds + nanoseconds)
    node-id: buffer-or-string,  // Logical node identifier
}

// Union type supporting multiple data formats
variant data-model {
    buffer-or-bytes(buffer-or-bytes),    // Raw byte data
    message(message),                    // Structured messages with metadata
    snapshot(snapshot),                  // Video/image frames with timestamps
}

// Structured message format
record message {
    timestamp: timestamp,
    content_type: buffer-or-string,
    payload: message-payload,
}

Definice rozhraní WIT

Každý typ operátoru implementuje konkrétní rozhraní WIT:

// Core operator interfaces
interface map {
    use types.{data-model};
    process: func(message: data-model) -> data-model;
}

interface filter {
    use types.{data-model};
    process: func(message: data-model) -> bool;
}

interface branch {
    use types.{data-model, hybrid-logical-clock};
    process: func(timestamp: hybrid-logical-clock, message: data-model) -> bool;
}

interface accumulate {
    use types.{data-model};
    process: func(staged: data-model, message: list<data-model>) -> data-model;
}

Definice grafů a integrace WASM

Definice grafů definují, jak se moduly WASM připojují ke zpracování pracovních postupů. Určují operace, připojení a parametry, které vytvářejí kompletní kanály zpracování dat.

Podrobné informace o vytváření a konfiguraci definic grafu, včetně podrobných příkladů jednoduchých a složitých pracovních postupů, najdete v tématu Konfigurace definic grafu WebAssembly pro grafy toku dat.

Klíčová témata probíraná v průvodci definicemi grafu:

  • Struktura definice grafu: Principy schématu YAML a požadovaných komponent
  • Příklad jednoduchého grafu: Základní třífázový kanál převodu teploty
  • Příklad komplexního grafu: Zpracování více senzorů s větvením a agregací
  • Parametry konfigurace modulu: Přizpůsobení modulu operátorů WASM za běhu
  • Nasazení registru: Balení a ukládání definic grafu jako artefaktů OCI

Další kroky