Consuming Azure Stream Analytics output in Azure Logic Apps
Stream Analytics
Azure Stream Analytics lets users perform real-time analytics for their Internet of Things (IoT) solutions. In this article, we will use Event Hubs as a source and Service Bus Queue as a destination for Stream Analytics. Azure Event Hubs is a hyper-scale telemetry ingestion service that can store millions of events. And, Azure Service Bus is a highly-reliable cloud messaging service with Queues offering a First In, First Out (FIFO) message delivery to competing consumers.
Event Hubs (source)
Create an Event Hubs namespace in Azure Portal.
Then add a new Event Hub into the namespace.
Service Bus Queue (destination)
Create a Service Bus namespace in Azure Portal.
Then add a new queue into the namespace.
Stream Analytics job
Create a new Azure Stream Analytics job that processes messages coming through Event Hub and stores the output into Service Bus queue. For this post, we are using a pass-through Stream Analytics.
Service Bus Queue Reader Logic App
The Service Bus Queue Reader Logic App reads a message from a Service Bus queue, uses Azure Functions to remove special characters that get added by Azure Stream Analytics and then sends it to a child Logic App for processing the message. Click here to download complete Logic App definition.
Service Bus Queue Trigger
Add a new Service Bus queue trigger to read messages from a Service Bus queue and trigger the Logic App.
Special characters handling using Azure Functions
Azure Stream Analytics adds certain special (Unicode) characters in output messages which cause failures in Logic Apps when trying to decode them (e.g. to JSON format). As a result, we need to pre-process the message using Azure Functions and remove those special characters before going ahead with regular message processing in Logic Apps.
Create a new C# Webhook Function App (let’s call it ParseStreamAnalyticsJSONContent) in Azure Portal and insert below code snippet into it. Create the Function App in same region as Logic Apps for being callable from Logic Apps. It is recommended to create it in same Resource Group as Logic Apps. Click here to download the source code.
Then add a Function App action in Logic Apps and choose ParseStreamAnalyticsJSONContent function app that was created in previous step.
Call Message Processor child Logic App
Call the Message Processor child Logic App that has logic for processing these messages, details of which are covered in next section.
Message Processor Logic App
The Message Processor Logic App takes an event data in JSON format and send an e-mail using Office 365 Connector. Click here to download complete Logic App definition.
Request Trigger
Create a request trigger and provide it JSON schema for incoming message. JSON schema helps tokenize message properties which come handy in subsequent steps.
Office365 Send Email
Send an email using Office 365 Connector. You can use message properties coming from the request trigger to craft the email template (subject, body etc.).
Response action
Add a response action to complete this Logic App. Having a Response action is a MUST to be able to call this Logic App from another Logic App.