Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
V tomto článku se dozvíte, jak vyvíjet vlastní moduly WebAssembly (WASM) a definice grafů pro grafy toku dat Azure IoT Operations. Vytvořte moduly v Rustu nebo Pythonu pro implementaci vlastní logiky zpracování. Definujte konfigurace grafu, které určují, jak se moduly připojují k kompletním pracovním postupům zpracování.
Důležité
Grafy toku dat aktuálně podporují pouze koncové body MQTT, Kafka a OpenTelemetry. Jiné typy koncových bodů, jako jsou Data Lake, Microsoft Fabric OneLake, Azure Data Explorer a Local Storage, se nepodporují. Další informace najdete v tématu Známé problémy.
Přehled
Grafy toku dat Azure IoT zpracovávají streamovaná data prostřednictvím konfigurovatelných operátorů implementovaných jako moduly WebAssembly. Každý operátor zpracovává časová razítka dat při zachování dočasného řazení a umožňuje analýzu v reálném čase s deterministickými výsledky.
Klíčové výhody
- Zpracování v reálném čase: Zpracování streamovaných dat s konzistentní nízkou latencí
- Sémantika času událostí: Zpracování dat na základě událostí, nikoli při jejich zpracování
- Odolnost proti chybám: Integrovaná podpora pro zpracování selhání a zajištění konzistence dat
- Škálovatelnost: Distribuce zpracování napříč několika uzly při zachování záruk objednávek
- Podpora více jazyků: Vývoj v Rustu nebo Pythonu s konzistentními rozhraními
Základ architektury
Grafy toku dat vycházejí z výpočetního modelu Včasná toku dat , který pochází z projektu Microsoft Research Naiad. Tento přístup zajišťuje:
- Deterministické zpracování: Stejný vstup vždy vytváří stejný výstup.
- Sledování průběhu: Systém ví, kdy jsou výpočty dokončené.
- Distribuovaná koordinace: Více uzlů zpracování zůstane synchronizované
Proč včasný tok dat?
Tradiční systémy zpracování datových proudů mají několik výzev. Data mimo pořadí znamenají, že události můžou dorazit později, než se čekalo. Částečné výsledky ztěžují zjištění, kdy se výpočty dokončí. K problémům koordinace dochází při synchronizaci distribuovaného zpracování.
Včasné toky dat řeší tyto problémy prostřednictvím:
Časové razítka a sledování průběhu
Každá datová položka nese časové razítko představující logický čas. Systém sleduje průběh prostřednictvím časových razítek a umožňuje několik klíčových funkcí:
- Deterministické zpracování: Stejný vstup vždy vytváří stejný výstup.
- Přesně jednou sémantika: Žádné duplicitní nebo zmeškané zpracování
- Vodoznaky: Zjistěte, kdy se na daný čas nedorazí žádná další data.
Hybridní logické hodiny
Mechanismus časového razítka používá hybridní přístup:
pub struct HybridLogicalClock {
pub physical_time: u64, // Wall-clock time when event occurred
pub logical_time: u64, // Logical ordering for events at same physical time
}
Přístup k hybridním logickým hodinám zajišťuje několik možností:
- Kauzální řazení: Účinky následují příčiny
- Záruky průběhu: Systém ví, kdy je zpracování dokončené.
- Distribuovaná koordinace: Synchronizace více uzlů
Principy operátorů a modulů
Pochopení rozdílu mezi operátory a moduly je nezbytné pro vývoj WASM:
Operátoři
Operátory jsou základní jednotky zpracování založené na operátorech včasného toku dat. Každý typ operátoru slouží ke konkrétnímu účelu:
- Mapa: Transformace jednotlivých datových položek (například převod jednotek teploty)
- Filtr: Povolit průchod pouze určitým datovým položkám na základě podmínek (například odebrání neplatných čtení)
- Větev: Směrování dat do různých cest na základě podmínek (například oddělení dat o teplotě a vlhkosti)
- Shromažďování: Shromažďování a agregace dat v časových oknech (například výpočetní statistické souhrny)
- Zřetězení: Sloučení více datových proudů při zachování dočasného pořadí
- Zpoždění: Řízení časování pomocí časového razítka
Moduly
Moduly představují implementaci logiky operátoru jako kód WASM. Jeden modul může implementovat více typů operátorů. Například modul teploty může poskytovat:
- Operátor mapy pro převod jednotek
- Operátor filtru pro kontrolu prahových hodnot
- Operátor větve pro rozhodování o směrování
- Kumulační operátor pro statistickou agregaci
Vztah
Vztah mezi definicemi grafu, moduly a operátory se řídí konkrétním vzorem:
Graph Definition → References Module → Provides Operator → Processes Data
↓ ↓ ↓ ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C
Oddělení umožňuje:
- Opakované použití modulu: Nasazení stejného modulu WASM v různých konfiguracích grafů
- Nezávislá správa verzí: Aktualizace definic grafu bez opětovného sestavení modulů
- Dynamická konfigurace: Předání různých parametrů do stejného modulu pro různá chování
Požadavky
Zvolte svůj vývojový jazyk a nastavte požadované nástroje:
-
Sada nástrojů Rust: Instalace s:
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y -
Cíl WASM: Přidat pomocí:
rustup target add wasm32-wasip2 -
Nástroje sestavení: Instalace pomocí:
cargo install wasm-tools --version '=1.201.0' --locked
Konfigurace vývojového prostředí
Sada WASM Rust SDK je dostupná prostřednictvím vlastního registru Azure DevOps. Nakonfigurujte přístup nastavením těchto proměnných prostředí:
export CARGO_REGISTRIES_AZURE_VSCODE_TINYKUBE_INDEX="sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/"
export CARGO_NET_GIT_FETCH_WITH_CLI=true
Přidejte do profilu prostředí následující proměnné prostředí pro trvalý přístup:
echo 'export CARGO_REGISTRIES_AZURE_VSCODE_TINYKUBE_INDEX="sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/"' >> ~/.bashrc
echo 'export CARGO_NET_GIT_FETCH_WITH_CLI=true' >> ~/.bashrc
source ~/.bashrc
Vytvoření projektu
Začněte vytvořením nového adresáře projektu pro modul operátora. Struktura projektu závisí na zvoleném jazyce.
cargo new --lib temperature-converter
cd temperature-converter
Konfigurace Cargo.toml
Cargo.toml Upravte soubor tak, aby zahrnoval závislosti pro sadu WASM SDK a další knihovny:
[package]
name = "temperature-converter"
version = "0.1.0"
edition = "2021"
[dependencies]
# WebAssembly Interface Types (WIT) code generation
wit-bindgen = "0.22"
# Azure IoT Operations WASM SDK - provides operator macros and host APIs
tinykube_wasm_sdk = { version = "0.2.0", registry = "azure-vscode-tinykube" }
# JSON serialization/deserialization for data processing
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false, features = ["alloc"] }
[lib]
# Required for WASM module compilation
crate-type = ["cdylib"]
Vysvětlení klíčových závislostí:
-
wit-bindgen: Generuje vazby Rust z definic typů rozhraní WebAssembly (WIT), což umožňuje vašemu kódu rozhraní s modulem RUNTIME WASM. -
tinykube_wasm_sdk: Sada Azure IoT Operations SDK poskytující makra operátorů (#[map_operator]#[filter_operator]atd.) a rozhraní API hostitele pro protokolování, metriky a správu stavu -
serde+serde_json: Knihovny zpracování JSON pro analýzu a generování datových částí;default-features = falseoptimalizuje omezení velikosti WASM. -
crate-type = ["cdylib"]: Kompiluje knihovnu Rust jako dynamickou knihovnu kompatibilní s jazykem C, která se vyžaduje pro generování modulu WASM.
Vytvoření jednoduchého modulu
Vytvořte jednoduchý modul, který převádí teplotu ze Celsia na Fahrenheita. Tento příklad ukazuje základní strukturu a logiku zpracování pro implementace Rustu i Pythonu.
use serde_json::{json, Value};
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;
fn fahrenheit_to_celsius_init(_configuration: ModuleConfiguration) -> bool {
logger::log(Level::Info, "temperature-converter", "Init invoked");
true
}
#[map_operator(init = "fahrenheit_to_celsius_init")]
fn fahrenheit_to_celsius(input: DataModel) -> Result<DataModel, Error> {
let DataModel::Message(mut result) = input else {
return Err(Error {
message: "Unexpected input type".to_string(),
});
};
let payload = &result.payload.read();
if let Ok(data_str) = std::str::from_utf8(payload) {
if let Ok(mut data) = serde_json::from_str::<Value>(data_str) {
if let Some(temp) = data["temperature"]["value"].as_f64() {
let fahrenheit = (temp * 9.0 / 5.0) + 32.0;
data["temperature"] = json!({
"value_fahrenheit": fahrenheit,
"original_celsius": temp
});
if let Ok(output_str) = serde_json::to_string(&data) {
result.payload = BufferOrBytes::Bytes(output_str.into_bytes());
}
}
}
}
Ok(DataModel::Message(result))
}
Modul sestavení
Vyberte si mezi místními buildy vývoje nebo kontejnerizovanými sestaveními na základě vašich požadavků na vývoj a prostředí.
Místní sestavení
Zabudujte přímo na vývojovém počítači pro nejrychlejší iteraci během vývoje a v případě, že potřebujete úplnou kontrolu nad prostředím sestavení.
# Build WASM module
cargo build --release --target wasm32-wasip2
# Find your module
ls target/wasm32-wasip2/release/*.wasm
Sestavení Dockeru
Sestavte pomocí kontejnerizovaných prostředí s předkonfigurovanými všemi závislostmi a schématy. Tyto image Dockeru poskytují konzistentní buildy v různých prostředích a jsou ideální pro kanály CI/CD.
Tvůrce Rust Dockeru se udržuje v úložišti ukázek operací Azure IoT a zahrnuje všechny nezbytné závislosti. Podrobnou dokumentaci najdete v tématu Použití tvůrce Dockeru z Rustu.
# Build release version (optimized for production)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter
# Build debug version (includes debugging symbols and less optimization)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter --build-mode debug
Možnosti sestavení Dockeru:
-
--app-name: Musí se shodovat s názvem vaší bedny Rust zCargo.toml -
--build-mode: Zvolterelease(výchozí) pro optimalizovaná sestavení nebodebugvývojová sestavení se symboly.
Další příklady
Komplexní příklady najdete v příkladech Rustu v úložišti ukázek. Mezi kompletní implementace patří:
- Operátory mapování: Transformace dat a logika převodu
- Operátory filtru: Podmíněné zpracování a ověření dat
- Operátory větví: Směrování s více cestami na základě datového obsahu
- Operátory kumulování: Agregace s časovým intervalem a statistické zpracování
- Operátory zpoždění: Ovládací prvek zpracování na základě času
Příklady ukazují funkční implementace, které zobrazují úplnou strukturu pro každý typ operátoru, včetně správného zpracování chyb a vzorů protokolování.
Referenční informace k sadě SDK a rozhraní API
Sada WASM Rust SDK poskytuje komplexní vývojové nástroje:
Makra operátoru
use tinykube_wasm_sdk::macros::{map_operator, filter_operator, branch_operator};
use tinykube_wasm_sdk::{DataModel, HybridLogicalClock};
// Map operator - transforms each data item
#[map_operator(init = "my_init_function")]
fn my_map(input: DataModel) -> DataModel {
// Transform logic here
}
// Filter operator - allows/rejects data based on predicate
#[filter_operator(init = "my_init_function")]
fn my_filter(input: DataModel) -> bool {
// Return true to pass data through, false to filter out
}
// Branch operator - routes data to different arms
#[branch_operator(init = "my_init_function")]
fn my_branch(input: DataModel, timestamp: HybridLogicalClock) -> bool {
// Return true for "True" arm, false for "False" arm
}
Parametry konfigurace modulu
Operátory WASM mohou přijímat parametry konfigurace modulu runtime prostřednictvím ModuleConfiguration struktury předané funkci init . Tyto parametry jsou definovány v definici grafu a umožňují přizpůsobení modulu runtime bez opětovného sestavení modulů.
use tinykube_wasm_sdk::logger::{self, Level};
use tinykube_wasm_sdk::ModuleConfiguration;
fn my_operator_init(configuration: ModuleConfiguration) -> bool {
// Access required parameters
if let Some(threshold_param) = configuration.parameters.get("temperature_threshold") {
let threshold: f64 = threshold_param.parse().unwrap_or(25.0);
logger::log(Level::Info, "my-operator", &format!("Using threshold: {}", threshold));
}
// Access optional parameters with defaults
let unit = configuration.parameters
.get("output_unit")
.map(|s| s.as_str())
.unwrap_or("celsius");
logger::log(Level::Info, "my-operator", &format!("Output unit: {}", unit));
true
}
Podrobné informace o definování parametrů konfigurace v definicích grafu naleznete v tématu Parametry konfigurace modulu.
Rozhraní API hostitele
Použití sady SDK pro práci s distribuovanými službami:
Úložiště stavu pro trvalá data:
use tinykube_wasm_sdk::state_store;
// Set value
state_store::set(key.as_bytes(), value.as_bytes(), None, None, options)?;
// Get value
let response = state_store::get(key.as_bytes(), None)?;
// Delete key
state_store::del(key.as_bytes(), None, None)?;
Strukturované protokolování:
use tinykube_wasm_sdk::logger::{self, Level};
logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));
Metriky kompatibilní s OpenTelemetry:
use tinykube_wasm_sdk::metrics;
// Increment counter
metrics::add_to_counter("requests_total", 1.0, Some(labels))?;
// Record histogram value
metrics::record_to_histogram("processing_duration", duration_ms, Some(labels))?;
Odvození ONNX s WASM
Pokud chcete do modulů vkládat a spouštět malé modely ONNX pro odvozování v rámci pásma, přečtěte si téma Spouštění odvozování ONNX v grafech toku dat WebAssembly. Tento článek popisuje balení modelů s moduly, které umožňují funkci wasi-nn v definicích grafu a omezení.
Typy rozhraní WebAssembly (WIT)
Všechny operátory implementují standardizovaná rozhraní definovaná pomocí typů rozhraní WebAssembly (WIT). Technologie WIT poskytuje definice rozhraní nezávislé na jazyce, které zajišťují kompatibilitu mezi moduly WASM a modulem runtime hostitele.
Kompletní schémata WIT pro operace Azure IoT jsou k dispozici v úložišti ukázek. Tato schémata definují všechna rozhraní, typy a datové struktury, se kterými budete pracovat při vývoji modulů WASM.
Datový model a rozhraní
Všichni operátoři WASM pracují se standardizovanými datovými modely definovanými pomocí typů rozhraní WebAssembly (WIT):
Základní datový model
// Core timestamp structure using hybrid logical clock
record timestamp {
timestamp: timespec, // Physical time (seconds + nanoseconds)
node-id: buffer-or-string, // Logical node identifier
}
// Union type supporting multiple data formats
variant data-model {
buffer-or-bytes(buffer-or-bytes), // Raw byte data
message(message), // Structured messages with metadata
snapshot(snapshot), // Video/image frames with timestamps
}
// Structured message format
record message {
timestamp: timestamp,
content_type: buffer-or-string,
payload: message-payload,
}
Definice rozhraní WIT
Každý typ operátoru implementuje konkrétní rozhraní WIT:
// Core operator interfaces
interface map {
use types.{data-model};
process: func(message: data-model) -> data-model;
}
interface filter {
use types.{data-model};
process: func(message: data-model) -> bool;
}
interface branch {
use types.{data-model, hybrid-logical-clock};
process: func(timestamp: hybrid-logical-clock, message: data-model) -> bool;
}
interface accumulate {
use types.{data-model};
process: func(staged: data-model, message: list<data-model>) -> data-model;
}
Definice grafů a integrace WASM
Definice grafů definují, jak se moduly WASM připojují ke zpracování pracovních postupů. Určují operace, připojení a parametry, které vytvářejí kompletní kanály zpracování dat.
Podrobné informace o vytváření a konfiguraci definic grafu, včetně podrobných příkladů jednoduchých a složitých pracovních postupů, najdete v tématu Konfigurace definic grafu WebAssembly pro grafy toku dat.
Klíčová témata probíraná v průvodci definicemi grafu:
- Struktura definice grafu: Principy schématu YAML a požadovaných komponent
- Příklad jednoduchého grafu: Základní třífázový kanál převodu teploty
- Příklad komplexního grafu: Zpracování více senzorů s větvením a agregací
- Parametry konfigurace modulu: Přizpůsobení modulu operátorů WASM za běhu
- Nasazení registru: Balení a ukládání definic grafu jako artefaktů OCI
Další kroky
- Podívejte se na kompletní příklady a pokročilé vzory v úložišti ukázek WASM operací Azure IoT .
- Naučte se nasazovat moduly pomocí WebAssembly s grafy toku dat.
- Nakonfigurujte koncové body toku dat v konfiguraci koncových bodů toku dat.