Veri akışı grafları için WebAssembly (WASM) modüllerini ve graf tanımlarını anlama

Azure IoT İşlemleri veri akışı grafikleri, telemetri verilerini haritalar, filtreler ve dallar gibi bir dizi işleç aracılığıyla yönlendirerek uçta işler. Özel işleme mantığınızı WebAssembly (WASM) modülleri olarak paketleyip bunları bir grafik tanımında birbirine bağlayarak tam hizmet yazmadan verileri dönüştürebilir, filtreleyebilir ve zenginleştirebilirsiniz.

Bu makalede, WASM modüllerini destekleyen işleç türleri, zamanında veri akışı modeli, modül yapılandırması, konak API'leri ve WIT şeması açıklanmaktadır. VS Code uzantısı veya dataflow-dev CLI ile modülleri yerel olarak derlemek, test etmek ve hatalarını ayıklamak için bkz. Veri akışları için WASM modülleri oluşturma.

İşleçler ve modüller

İşleçler, veri akışı grafiğindeki işleme birimleridir. Her tür belirli bir amaca hizmet eder:

Operatör Amaç Dönüş türü
Harita Her veri öğesini dönüştürme (örneğin, sıcaklık birimlerini dönüştürme) DataModel
Filtre Bir koşula göre öğeleri geçirme veya bırakma bool
Şube Öğeleri iki farklı yola yönlendirme bool
Birikmek Zaman pencereleri içinde öğeleri toplama DataModel
Birleştir Sırayı korurken birden çok akışı birleştirme Mevcut Değil
Gecikme Zaman damgalarını zamanlamayı denetlemek için ilerlet Mevcut Değil

Modül, bir veya daha fazla işleç uygulayan WASM ikili dosyasıdır. Örneğin, tek temperature.wasm bir modül hem işleç map (dönüştürme için) hem de işleç filter (eşik denetimi için) sağlayabilir.

Graph Definition → References Module → Provides Operator → Processes Data
     ↓                    ↓               ↓              ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C

Bu ayrım, aynı modülü farklı grafik yapılandırmalarıyla, sürüm modülleriyle bağımsız olarak yeniden kullanmanıza ve yeniden derlemeden yapılandırma parametreleri aracılığıyla davranışı değiştirmenize olanak tanır.

Zamanında veri akışı modeli

Veri akışı grafikleri, Microsoft Research'ün Naiad projesinden Timely veri akışı hesaplama modeli üzerinde oluşturulur. Her veri öğesi, karma mantıksal saat zaman damgası ile birlikte gelir.

record hybrid-logical-clock {
    timestamp: timespec,  // Wall-clock time (secs + nanos)
    counter: u64,         // Logical ordering for same-time events
    node-id: string,      // Originating node
}

Bu, belirleyici işleme (aynı giriş her zaman aynı çıkışı üretir), tam-tek sefer semantiği ve düğümler arasında dağıtılmış koordinasyon sağlar. WIT şemasının tamamı için bkz. samples deposu.

VS Code uzantısıyla WASM modülleri geliştirmeyi öğrenmek için bkz. VS Code uzantısıyla WASM modülleri oluşturma.

Yazma işleçleri

Eşleme işleci

Eşleme işleci her veri öğesini dönüştürür ve değiştirilmiş bir kopya döndürür. Hızlı başlangıç örneğinde temel bir harita gösterilmektedir. Yapılandırma parametrelerini kullanan daha karmaşık bir örnek aşağıda verilmiştir:

use std::sync::OnceLock;
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;

static OUTPUT_UNIT: OnceLock<String> = OnceLock::new();

fn unit_converter_init(configuration: ModuleConfiguration) -> bool {
    let unit = configuration.properties
        .iter()
        .find(|(k, _)| k == "output_unit")
        .map(|(_, v)| v.clone())
        .unwrap_or_else(|| "celsius".to_string());

    OUTPUT_UNIT.set(unit.clone()).unwrap();
    logger::log(Level::Info, "converter", &format!("Output unit: {unit}"));
    true
}

#[map_operator(init = "unit_converter_init")]
fn convert_temperature(input: DataModel) -> Result<DataModel, Error> {
    let DataModel::Message(mut msg) = input else {
        return Err(Error { message: "Expected Message variant".into() });
    };

    let payload = &msg.payload.read();
    let mut data: serde_json::Value = serde_json::from_slice(payload)
        .map_err(|e| Error { message: format!("JSON parse error: {e}") })?;

    if let Some(temp) = data["temperature"]["value"].as_f64() {
        let unit = OUTPUT_UNIT.get().map(|s| s.as_str()).unwrap_or("celsius");
        let converted = match unit {
            "kelvin" => (temp - 32.0) * 5.0 / 9.0 + 273.15,
            _ => (temp - 32.0) * 5.0 / 9.0, // celsius
        };
        data["temperature"]["value"] = serde_json::json!(converted);
        data["temperature"]["unit"] = serde_json::json!(unit);
        let out = serde_json::to_string(&data).unwrap();
        msg.payload = BufferOrBytes::Bytes(out.into_bytes());
    }

    Ok(DataModel::Message(msg))
}

Filtre operatörü

Filtre, veriyi iletmek için true veya bırakmak için false olarak geri döner.

use std::sync::OnceLock;
use wasm_graph_sdk::macros::filter_operator;
use wasm_graph_sdk::logger::{self, Level};

const DEFAULT_LOWER: f64 = -40.0;
const DEFAULT_UPPER: f64 = 3422.0;

static LOWER_BOUND: OnceLock<f64> = OnceLock::new();
static UPPER_BOUND: OnceLock<f64> = OnceLock::new();

fn filter_init(configuration: ModuleConfiguration) -> bool {
    for (key, value) in &configuration.properties {
        match key.as_str() {
            "temperature_lower_bound" => {
                if let Ok(v) = value.parse::<f64>() { LOWER_BOUND.set(v).ok(); }
                else { logger::log(Level::Error, "filter", &format!("Invalid lower bound: {value}")); }
            }
            "temperature_upper_bound" => {
                if let Ok(v) = value.parse::<f64>() { UPPER_BOUND.set(v).ok(); }
                else { logger::log(Level::Error, "filter", &format!("Invalid upper bound: {value}")); }
            }
            _ => {}
        }
    }
    true
}

#[filter_operator(init = "filter_init")]
fn filter_temperature(input: DataModel) -> Result<bool, Error> {
    let lower = LOWER_BOUND.get().copied().unwrap_or(DEFAULT_LOWER);
    let upper = UPPER_BOUND.get().copied().unwrap_or(DEFAULT_UPPER);

    let DataModel::Message(msg) = &input else { return Ok(true); };
    let payload = &msg.payload.read();
    let data: serde_json::Value = serde_json::from_slice(payload)
        .map_err(|e| Error { message: format!("JSON parse error: {e}") })?;

    if let Some(temp) = data.get("temperature").and_then(|t| t.get("value")).and_then(|v| v.as_f64()) {
        Ok(temp >= lower && temp <= upper)
    } else {
        Ok(true) // Pass through non-temperature messages
    }
}

Dal işleci

Dal, verileri iki yola yönlendirir. İlk kol için false, ikinci kol için true olarak geri dönün.

use wasm_graph_sdk::macros::branch_operator;

fn branch_init(_configuration: ModuleConfiguration) -> bool { true }

#[branch_operator(init = "branch_init")]
fn branch_by_type(_timestamp: HybridLogicalClock, input: DataModel) -> Result<bool, Error> {
    let DataModel::Message(msg) = &input else { return Ok(true); };
    let payload = &msg.payload.read();
    let data: serde_json::Value = serde_json::from_slice(payload)
        .map_err(|e| Error { message: format!("JSON parse error: {e}") })?;

    // false = first arm (temperature data), true = second arm (everything else)
    Ok(data.get("temperature").is_none())
}

Modül yapılandırma parametreleri

İşleçleriniz işlev aracılığıyla init çalışma zamanı yapılandırma parametrelerini alabilir. Bu, modülü yeniden derlemeden davranışı özelleştirmenizi sağlar.

init İşlev bir ModuleConfiguration yapı alır:

record module-configuration {
    properties: list<tuple<string, string>>,   // Key-value pairs from graph definition
    module-schemas: list<module-schema>        // Schema definitions if configured
}

init modül yüklendiğinde işlev bir kez çağrılır. İşlemeye başlamak için true geri dönün, veya yapılandırma hatası sinyali vermek için false geri dönün. Eğer initfalse döndürürse, işleç herhangi bir veri işlemez ve veri akışı bir hatayı günlüğe kaydeder.

Önemli

İşleciniz yapılandırma parametrelerine (örneğin, filtre sınırları veya eşik değerleri) bağımlıysa, bunlar sağlanmadığında ortaya çıkan durumları her zaman ele alın. Varsayılanları makul bir şekilde kullanın veya falseinit değerini döndürün. Eksik parametrelerle unwrap() öğesini çağırmayın veya paniğe kapılmayın, çünkü bu, operatörü çalışma zamanında net bir hata mesajı olmadan çökertebilir.

Parametreleri graf tanımının moduleConfigurations bölümünde tanımlarsınız:

moduleConfigurations:
  - name: module-temperature/filter
    parameters:
      temperature_lower_bound:
        name: temperature_lower_bound
        description: "Minimum valid temperature in Celsius"
      temperature_upper_bound:
        name: temperature_upper_bound
        description: "Maximum valid temperature in Celsius"

name alanı, grafiğin operations bölümündeki işleç adıyla eşleşmelidir. Graf tanımı yapısı hakkında daha fazla bilgi için bkz. WebAssembly graf tanımlarını yapılandırma.

Modül boyutu ve performansı

WASM modülleri sınırlı kaynaklarla izole bir ortamda çalışır. Şu yönergeleri göz önünde bulundurun:

  • Bağımlılıkları en aza indirin. Rust için, ikili boyutunu azaltmak amacıyla default-features = false kullanarak serde ve serde_json üzerinde çalışın. Büyük kasaları çekmekten kaçının.
  • Modül boyutu önemlidir. Daha küçük modüller daha hızlı yüklenir ve daha az bellek kullanır. Tipik bir sıcaklık dönüştürücüsü yaklaşık 2 MB (Rust sürüm) veya 5 MB (Python). Üretim için yayın derlemelerini kullanın.
  • Engelleme işlemlerinden kaçının. İşlev hızlı process bir şekilde tamamlanmalıdır. Yoğun hesaplama, veri akışı işlem hattının tamamını geciktirir.
  • wasm-tools ile inceleyin. Kayıt defterine göndermeden önce modülünüzün beklenen arabirimleri dışarı aktardığını doğrulamak için komutunu çalıştırın wasm-tools component wit your-module.wasm .

Sürüm Oluşturma ve CI/CD

Modülleriniz ve graf tanımlarınız için anlamsal sürüm oluşturma özelliğini kullanın. Veri akış grafiği yapıtlara isim ve etikete (örneğin, temperature:1.0.0) göre referans verir, böylece aynı etikete sahip yeni bir sürüm göndererek grafik tanımlarını değiştirmeden modülleri güncelleştirebilirsiniz.

Otomatik derlemeler için tipik bir işlem hattı şöyle görünür:

  1. WASM modülünü oluşturun (tutarlılık için Docker oluşturucusunu kullanın).
  2. Dışarı aktarılan arabirimleri doğrulamak için komutunu çalıştırın wasm-tools component wit .
  3. Temel mantığınıza göre birim testleri çalıştırın. Daha fazla bilgi edinmek için bkz. WASM modüllerini test etme.
  4. DERLEME sürümüyle etiketleyerek ORAS ile kayıt defterinize gönderin.
  5. (İsteğe bağlı) Graf tanımının artifakt referansını güncelleyin ve gönderin.

Veri akışı grafiği, yeniden dağıtım gerektirmeden aynı etikete gönderilen yeni modül sürümlerini otomatik olarak alır. Bkz . Çalışan bir grafikte modülü güncelleştirme.

Konak API'leri

WASM modülleriniz durum yönetimi, günlük kaydı ve ölçümler için konak API'lerini kullanabilir.

Devlet mağazası

Dağıtılmış durum depoyu kullanarak verileri çağrılar arasında process kalıcı hale döndürme:

use wasm_graph_sdk::state_store;

// Set value (fire-and-forget; state_store returns StateStoreError, not types::Error)
let options = state_store::SetOptions {
    conditions: state_store::SetConditions::Unconditional,
    expires: None,
};
let _ = state_store::set(key.as_bytes(), value.as_bytes(), None, None, options);

// Get value
let response = state_store::get(key.as_bytes(), None);

// Delete key
let _ = state_store::del(key.as_bytes(), None, None);

Ağaç kesimi

Önem düzeyleriyle yapılandırılmış günlük kaydı:

use wasm_graph_sdk::logger::{self, Level};

logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));

Metrics

OpenTelemetry uyumlu ölçümler:

use wasm_graph_sdk::metrics::{self, CounterValue, HistogramValue, Label};

let labels = vec![Label { key: "module".to_owned(), value: "my-operator".to_owned() }];
let _ = metrics::add_to_counter("requests_total", CounterValue::U64(1), Some(&labels));
let _ = metrics::record_to_histogram("processing_duration", HistogramValue::F64(duration_ms), Some(&labels));

ONNX çıkarımı

Bant içi çıkarım için modüllerinize küçük ONNX modelleri eklemek ve çalıştırmak için bkz. WebAssembly veri akışı grafiklerinde ONNX çıkarımını çalıştırma.

WIT şema referansı

Tüm işleçler WebAssembly Arabirim Türleri (WIT) kullanılarak tanımlanan standartlaştırılmış arabirimler uygular. Şemaların tamamını samples deposunda bulabilirsiniz.

İşleç arabirimleri

Her işlecin yapılandırmaya yönelik bir init işlevi ve veri işlemeye yönelik bir process işlevi vardır:

interface map {
    use types.{data-model, error, module-configuration};
    init: func(configuration: module-configuration) -> bool;
    process: func(message: data-model) -> result<data-model, error>;
}

interface filter {
    use types.{data-model, error, module-configuration};
    init: func(configuration: module-configuration) -> bool;
    process: func(message: data-model) -> result<bool, error>;
}

interface branch {
    use types.{data-model, error, module-configuration};
    use hybrid-logical-clock.{hybrid-logical-clock};
    init: func(configuration: module-configuration) -> bool;
    process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}

interface accumulate {
    use types.{data-model, error, module-configuration};
    init: func(configuration: module-configuration) -> bool;
    process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}

Veri modeli

Kaynak processor.wit (wasm-graph:processor@1.1.0):

record timestamp {
    timestamp: timespec,        // Physical time (seconds + nanoseconds)
    counter: u64,               // Logical counter for ordering
    node-id: buffer-or-string,  // Originating node
}

record message {
    timestamp: timestamp,
    topic: buffer-or-bytes,
    content-type: option<buffer-or-string>,
    payload: buffer-or-bytes,
    properties: message-properties,
    schema: option<message-schema>,
}

variant data-model {
    buffer-or-bytes(buffer-or-bytes),  // Raw byte data
    message(message),                  // MQTT messages (most common)
    snapshot(snapshot),                // Video/image frames
}

Uyarı

Çoğu operatör message varyantı ile çalışır. İşlevinizin process başında bu türü denetleyin. Yük, sıfır kopya okumaları için bir konak arabellek tutamacı (buffer) veya modüle ait baytlar (bytes) kullanır. buffer.read() öğesini çağırarak ana makine baytlarını modülünüzün belleğine kopyalayın.