Enhancing Azure Functions with Custom Triggers and AI Integration for Real-Time IoT Data Processing in Event-Driven Architectures

Luma Warren (Axon Enterprise) 165 Reputation points
2025-12-10T10:19:28.19+00:00

I’m building an Azure-based IoT solution using Azure Functions (runtime v4) with Node.js (v20.10.0) as the primary language, integrated with Azure IoT Hub for device telemetry ingestion and Azure Cosmos DB for state management. The current setup includes HTTP-triggered functions that process incoming sensor data via Event Grid events.

However, I aim to enhance this by developing a new custom trigger for Azure Functions that supports ML-based anomaly detection using Azure Machine Learning endpoints, and dynamically generating output bindings for alerting via Azure Notification Hubs. This would involve embedding AI models (via ONNX runtime) for edge-like predictions within the function

The main challenge here is cold-start delays when loading the model, custom trigger registration and schema validation with Event Grid, and occasional 429s from Cosmos DB during bursts.

Current simple Event Grid trigger:

// Current simple EventGrid trigger (works, but no ML or custom trigger)
const { app } = require('@azure/functions');
app.eventGrid('iotTelemetry', {
  handler: async (event, context) => {
    const data = event.data;
    // Want to replace this whole trigger with a custom ML-aware one
    const model = await tf.loadLayersModel('file://./model/anomaly.onnx'); // cold-start killer
    const tensor = tf.tensor2d([[data.temp, data.humidity]]);
    const score = (await model.predict(tensor)).dataSync()[0];
    if (score > 0.85) {
      // Need dynamic Notification Hubs output binding + custom trigger fire
      context.log(`Anomaly on device ${data.deviceId}: ${score}`);
    }
  }
});


Any working samples for a custom trigger + in-process ML inference + Durable Functions orchestration for stateful fallback would be gold.

Product/Version: Azure Functions | 4.0 | Node.js 20


Moved from: Microsoft Teams | Development

Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
{count} votes

1 answer

Sort by: Most helpful
  1. Kudos-Ng 10,400 Reputation points Microsoft External Staff Moderator
    2025-12-10T14:11:50.4066667+00:00

    Hi Luma Warren (Axon Enterprise),

    Thank you for posting your question in the Microsoft Q&A forum. 

    After reviewing your description, it’s clear your IoT solution is highly complex and integrates multiple Azure services. Enhancing it further with AI to detect anomalies is a challenging task that requires careful study and thoughtful design. Based on everything I currently understand plus a short research window, I may not be able to help substantially with a complete solution. However, I still want to contribute some insights and samples I collected to help you orient your approach and tackle the enhancements.

    Note: The code snippets below are for reference only and aren’t guaranteed to work; please verify and implement them properly for your environment.

    Some key insights:

    • Azure Functions doesn’t support fully custom triggers in Node.js like in .NET (via WebJobs SDK extensions). Use the Event Grid trigger as a base and embed your ML logic, anomaly scoring, and dynamic alerting within the handler. Register the function as an Event Grid subscription endpoint, and handle custom schema validation in code.
    • ML Integration with ONNX: Use the onnxruntime-node package (npm install onnxruntime-node). It’s lightweight and supports in‑process inference. For anomaly detection, assume a pre‑trained ONNX model (e.g., exported from Azure ML or scikit‑learn).
    • Cold Start Mitigation:
      • Consider Premium Plan for pre‑warmed instances (always‑on workers).
      • Load the ONNX model in a global singleton pattern outside the handler to avoid reloading per invocation.
      • Add a timer trigger that pings your function every 5–10 minutes to keep it warm.
    • Dynamic Output Bindings: In Node.js v4, use context.extraOutputs.set or return values for dynamic outputs. Notification Hubs output binding is not available on Functions v2+, so use the @azure/notification-hubs SDK directly for alerting.
    • Durable Functions for Stateful Fallback: Use Durable Functions to orchestrate long‑running or retryable workflows: e.g., fallback to batch processing if real‑time inference fails due to cold start or throttling.

    Sample 1: Enhanced Event Grid Trigger with ONNX ML Inference and Dynamic Alerting

    This builds on your Event Grid trigger. It includes:

    • ONNX model loading (singleton to reduce cold starts).
    • Anomaly detection.
    • Dynamic alerting via Notification Hubs SDK.
    • Custom schema validation (handles Event Grid subscription handshake).

    index.js (replace your simple trigger):

    const { app } = require('@azure/functions');
    const ort = require('onnxruntime-node');
    const { NotificationHubsClient } = require('@azure/notification-hubs');
    const { CosmosClient } = require('@azure/cosmos');
    
    // Singleton for model to mitigate cold starts (loaded once per worker)
    let anomalyModel;
    async function loadModel() {
      if (!anomalyModel) {
        anomalyModel = await ort.InferenceSession.create('./model/anomaly.onnx');
      }
      return anomalyModel;
    }
    
    // Notification Hubs client (initialize outside handler)
    const hubClient = new NotificationHubsClient(
      process.env.NOTIFICATION_HUB_CONNECTION_STRING,
      process.env.NOTIFICATION_HUB_NAME
    );
    
    // Cosmos client with retry config
    const cosmosClient = new CosmosClient({
      endpoint: process.env.COSMOS_ENDPOINT,
      key: process.env.COSMOS_KEY,
      connectionPolicy: {
        retryOptions: {
          maxRetryAttemptCount: 10,
          maxWaitTimeInSeconds: 30
        }
      }
    });
    const container = cosmosClient.database('IoTDatabase').container('Telemetry');
    
    app.eventGrid('iotTelemetryEnhanced', {
      handler: async (event, context) => {
        // Custom schema validation (Event Grid handshake)
        const aegEventType = context.req.headers['aeg-event-type']; // Assuming HTTP context for webhook
        if (aegEventType === 'SubscriptionValidation') {
          const validationEvent = event[0]; // Events come in array
          if (
            validationEvent &&
            validationEvent.data &&
            validationEvent.data.validationCode &&
            validationEvent.eventType === 'Microsoft.EventGrid.SubscriptionValidationEvent'
          ) {
            return { validationResponse: validationEvent.data.validationCode };
          }
          throw new Error('Invalid validation event');
        }
    
        // Process telemetry (assume event.data is your IoT payload)
        const data = event.data;
        if (!data || !data.deviceId || typeof data.temp !== 'number' || typeof data.humidity !== 'number') {
          throw new Error('Invalid schema: Missing required fields');
        }
    
        // Load and infer with ONNX
        const model = await loadModel();
        const inputTensor = new ort.Tensor('float32', [data.temp, data.humidity], [1, 2]); // Adjust shape to your model
        const feeds = { input: inputTensor }; // 'input' is your model's input name
        const results = await model.run(feeds);
        const score = results.output.data[0]; // Assume 'output' is anomaly score
    
        // Dynamic alerting if anomaly
        if (score > 0.85) {
          const notification = {
            title: `Anomaly on device ${data.deviceId}`,
            body: `Score: ${score}`
          };
          await hubClient.sendNotification('broadcast', notification); // Or target tags/devices
          context.log(`Anomaly alerted: ${data.deviceId}, score: ${score}`);
        }
    
        // Store to Cosmos with throttling handled by SDK
        try {
          await container.items.create({ id: data.deviceId, data });
        } catch (err) {
          if (err.code === 429) {
            context.log('Cosmos 429 - SDK will retry');
          } else {
            throw err;
          }
        }
    
        return { status: 'Processed' }; // Or use context.extraOutputs for other bindings    
      } 
    });
    

    Sample 2: Durable Functions Orchestration for Stateful Fallback

    Use Durable Functions to orchestrate a fallback workflow. For example, if real-time processing fails (cold start, 429), queue for batch retry.

    orchestrator.js (orchestrates ML and storage with retry):

    const df = require('durable-functions');
    module.exports = df.orchestrator(function* (context) {
      const telemetry = context.df.getInput();
      const outputs = [];
      try {
        // Call activity for ML inference
        const anomalyResult = yield context.df.callActivity('AnomalyDetectionActivity', telemetry);
        outputs.push(anomalyResult);
        // Call activity for Cosmos store with retry
        yield context.df.callActivity('StoreToCosmosActivity', { telemetry, anomalyResult });
      } catch (err) {
        // Fallback: Queue for batch processing
        yield context.df.callActivity('BatchFallbackActivity', telemetry);
        context.log('Fallback triggered due to error:', err);
      }
      return outputs;
    });
    

    AnomalyDetectionActivity.js (ML inference activity):

    const ort = require('onnxruntime-node');
    
    let anomalyModel;
    async function loadModel() {
      if (!anomalyModel) {
        anomalyModel = await ort.InferenceSession.create('./model/anomaly.onnx');
      }
      return anomalyModel;
    }
    
    module.exports = async function (context, telemetry) {
      const model = await loadModel();
      const inputTensor = new ort.Tensor('float32', [telemetry.temp, telemetry.humidity], [1, 2]);
      const results = await model.run({ input: inputTensor });
      const score = results.output.data[0];
    
      if (score > 0.85) {
        // Alert logic here (using SDK as above)
        return { anomaly: true, score };
      }
      return { anomaly: false };
    };
    
    

    StoreToCosmosActivity.js (storage with retry):

    const { CosmosClient } = require('@azure/cosmos');
    
    module.exports = async function (context, { telemetry, anomalyResult }) {
      const cosmosClient = new CosmosClient({ /* config with retries as above */ });
      const container = cosmosClient.database('IoTDatabase').container('Telemetry');
      await container.items.create({ id: telemetry.deviceId, data: telemetry, anomaly: anomalyResult });
    };
    
    

    BatchFallbackActivity.js (placeholder for batch):

    module.exports = async function (context, telemetry) {
      // e.g., Queue to Service Bus or batch insert
      context.log('Batch fallback for:', telemetry.deviceId);
    };
    

    starter.js (HTTP or Event Grid client to start orchestration):

    const df = require('durable-functions');
    const { app } = require('@azure/functions');
    
    app.http('startOrchestration', {
      methods: ['POST'],
      handler: async (request, context) => {
        const client = df.getClient(context);
        const telemetry = await request.json();
        const instanceId = await client.startNew('orchestrator', undefined, telemetry);
        return client.createCheckStatusResponse(context.bindingData.req, instanceId);
      }
    });
    
    

    References:

    Note: Microsoft is providing this information as a convenience to you. Some sites are not controlled by Microsoft, and Microsoft cannot make any representations regarding the quality, safety, or suitability of any software or information found there. Please ensure that you fully understand the risks before using any suggestions from the above link.

    I hope the information provided above helpful.


    Note: Please follow the steps in our documentation to enable e-mail notifications if you want to receive the related email notification for this thread.

    0 comments No comments

Your answer

Answers can be marked as 'Accepted' by the question author and 'Recommended' by moderators, which helps users know the answer solved the author's problem.