January 2016

Volume 31 Number 1

[Big Data]

Real-Time Data Analytics for .NET Developers Using HDInsight

By Omid Afnan

Enterprises of all sizes have begun to recognize the value of their huge collections of data—and the need to take advantage of them. As organizations start on their Big Data journey, they usually begin by batch processing their Big Data assets. This could mean collecting and aggregating Web log data, user clicks from an application, telemetry from Internet of Things (IoT) devices or a host of other human- or machine-generated data. I covered the case of doing basic Web log analysis using Hive on HDInsight in an article a year ago (msdn.com/magazine/dn890370). However, as the benefits of batch processing to mine insights on historical data are realized, many organizations encounter the problem of dealing with real-time data and the question of how to collect, analyze and act on continuous streams in real time.

As you might guess, there are technologies in the Big Data space for dealing with such needs. The Microsoft Azure platform provides powerful Big Data solutions, including Azure Data Lake and HDInsight. There’s an open source technology that allows highly distributed real-time analytics called Apache Storm. It’s natively supported in HDInsight, which is the Azure managed offering of Apache Big Data services. In this article, I’ll walk you through a simple but powerful scenario that deals with a stream of tweets using Storm as the key tool to enable real-time, continuous analytics.

As you’ll see, Microsoft makes this kind of development signif­icantly easier than other current market offerings via powerful authoring and debugging tools in Visual Studio. The HDInsight Tools for Visual Studio (available as part of the Azure SDK) provides a coding and debugging environment that’s familiar to .NET developers. These tools offer a significantly easier way to work with Big Data technologies than the simple editors and command-line tools currently available in the open source world. While Storm for HDInsight fully supports the use of Java programming, Microsoft also enables .NET programmers to use C# for writing (and reusing) business logic. The examples in this article demonstrate these .NET capabilities.

A Sentiment-Tracking Scenario

The scenario of tracking and analyzing emerging trends is not new. News reporting, weather tracking and disaster detection are examples that pre-date cloud computing. However, both the range of areas where trend detection is desirable, and the scale at which data is available for analysis, have grown unimaginably as the cloud era progresses. Social networking has been fertile ground for sentiment analysis. Services like Twitter that make their social data available through APIs, together with pay-as-you-go Big Data platforms like HDInsight, bring sentiment analysis within the reach of organizations large and small.

The simplest kind of sentiment analysis using Twitter is to count how often people are tweeting about a certain topic, or hashtag, in a given period of time. Of course, doing this for just one period, say one minute, is not as interesting as doing this across every minute of the day and looking for a rise or fall in the rate. Identifying spikes in a certain term being used could be useful for detecting a trend. For example, the detection of terms related to a storm or earthquake might provide a very rapid indication of areas affected by a natural disaster and its severity.

To demonstrate the basics of how this is done, I’ll walk through how to set up a streaming topology that collects data from Twitter, selects some of the tweets, calculates metrics, saves everything into storage and publishes some of the results. You can see this topology pictured in Figure 1. For this article, I selected tweets using simple keyword matching. The metrics calculated are counts of tweets that matched the selection criteria. The selected tweets are put into a SQL database and are also published to a Web site. Everything is done in the Azure cloud using Storm, SQL Server and Web site services available today. After walking through the example, I’ll discuss some of the other technologies available for solving this type of streaming data analytics problem.

Sentiment Analysis Topology
Figure 1 Sentiment Analysis Topology

The Basics of Storm

Storm is an open source Apache project (storm.apache.org) that allows real-time distributed computations to be performed over streams of data. It’s part of the Hadoop ecosystem of Big Data processing tools and is directly supported in HDInsight. Storm jobs are defined as a graph of processing nodes connected by streams of data in the form of tuples. Such a graph is referred to as a “topology” in Storm. Topologies don’t finish like other queries—they continue to execute until they’re suspended or killed.

In the Azure management portal, you can create a new HDInsight cluster and choose Storm as the type. This will cause Azure to set up a cluster of machines preloaded with all the necessary OS, Hadoop and Storm components within minutes. I can choose the number of nodes I want, choose different core and memory sizes, and scale the number of nodes up or down at any time. In terms of simplifying the Hadoop experience, this has already saved me considerable time and headache associated with acquiring and configuring multiple machines.

The components of a topology are called spouts and bolts. Spouts produce streams of tuples, which are basically sets of type and value pairs. In other words, a spout is a piece of code that knows how to collect or generate data and then emit it in chunks. Bolts are units of code that can consume a stream of data. They may process the data to clean it, or calculate statistics. In such cases, they likely emit another stream of tuples to downstream bolts. Other bolts write data to storage or to another system.

Each of these components can execute many parallel tasks. This is the key to the scalability and reliability of Storm. I can specify the degree of parallelism for each component and Storm will allocate that many tasks to execute the logic in my spout or bolt. Storm provides fault tolerance by managing tasks and restarting failed ones automatically. Finally, a given topology executes on a set of worker processes that are essentially execution containers. Workers can be added to increase the processing capacity of a topology. These features provide the essential characteristics that enable scale and fault tolerance for Storm.

A topology can be as complex as necessary to carry out the processing required by the overall real-time analytics scenario. The architecture lends itself to the reuse of components, but it also creates a challenging management and deployment problem as the number of spouts and bolts grows. The Visual Studio project concept is a useful way to manage the code and configuration components needed to instantiate a topology. Because the very idea of a topology is essentially graphical in nature, it also makes sense that being able to visualize the topology is very useful during both development and operation of the system. This can be seen in the execution view of the HDInsight tools for Visual Studio shown in Figure 2.

Monitoring View of an Active Storm Topology
Figure 2 Monitoring View of an Active Storm Topology

Storm architecture is based on Apache Thrift, a framework that allows the development of services implemented in multiple languages. While many developers use Java to write spouts and bolts, it’s not a requirement. With the introduction of the SCP.Net package of libraries, I can use C# to develop my spouts and bolts. This package is included in the HDInsight Tools for Visual Studio download, but can also be downloaded through NuGet.

Filtering Tweets in Near-Real Time

Let’s take a look at building the Tweet stream filtering topology to see how these parts work in practice. My sample topology is made up of one spout and three bolts. You can see the graphical view of this topology in Figure 2, as shown by the HDInsight Tools for Visual Studio. When I submit a Storm project for execution in Azure, Visual Studio shows me this graphical view and updates it over time with the number of events flowing through the system, as well as any error conditions that occur in any of the nodes.

Here TwitterSpout is responsible for pulling the stream of Tweets I want to process. It does so by interacting with the Twitter APIs to collect Tweets, and turns them into tuples of data that can stream through the rest of the topology. TwitterBolt picks up the stream and can do aggregations, like counting Tweets or combining them with other data pulled from other data sources. This bolt emits a new stream, with a possibly new format, based on the business logic it has executed. The AzureSQLBolt and SignalRBroadcastBolt components consume this stream and write portions of the data into an Azure-hosted SQLServer database and a SignalR Web site, respectively.

Because I’m using C# to build my Storm solution, I can use many existing libraries to help simplify and speed up my development. Two key packages for this example are the Tweetinvi libraries on CodePlex (bit.ly/1kI9sqV) and the SCP.Net libraries on NuGet (bit.ly/1QwICPj).

The SCP.Net framework reduces much of the complexity of dealing with the Storm programming model and provides base classes to encapsulate much of the work I’d otherwise need to do by hand. I start by inheriting from the Microsoft.SCP.ISCPSpout base class. This gives me three core methods needed for a spout: NextTuple, Ack and Fail. NextTuple emits the next piece of available data for the stream, or nothing at all. This method is called in a tight loop by Storm, and it’s the right place to introduce some sleep time if I don’t have any tuples to emit. This is one way to make sure I don’t end up consuming 100 percent of my CPU cycles as the topology runs continuously.

If I want to implement guaranteed message processing, such as “at least once” semantics for my tuples, I’d use the Ack and Fail methods to implement the needed handshakes between bolts. In this example I don’t use any retry mechanism, so only the NextTuple method is implemented, using code that takes Tweets from a private queue member in the TwitterSpout class and ships it out to the topology.

Streams within the topology are captured as schemas that are published by a spout or bolt. These are used as the contract between components in the topology, and also as the serialization and de-serialization rules that SCP.Net uses when transferring the data. The Context class is used to store configuration information per instance of a spout or bolt. The schema of the tuples emitted by the spout is stored in the Context and used by SCP.Net to build component connections.

Let’s look at the code for initialization of the TwitterSpout class, shown in part in Figure 3.

Figure 3 Initializing the TwitterSpout Class

public TwitterSpout(Context context)
{
  this.context = context;
  Dictionary<string, List<Type>> outputSchema =
    new Dictionary<string, List<Type>>();
  outputSchema.Add(Constants.DEFAULT_STREAM_ID,
    new List<Type>() { typeof(SerializableTweet) });
  this.context.DeclareComponentSchema(new ComponentStreamSchema(
    null, outputSchema));
  // Specify your Twitter credentials
  TwitterCredentials.SetCredentials(
    ConfigurationManager.AppSettings["TwitterAccessToken"],
    ConfigurationManager.AppSettings["TwitterAccessTokenSecret"],
    ConfigurationManager.AppSettings["TwitterConsumerKey"],
    ConfigurationManager.AppSettings["TwitterConsumerSecret"]);
  // Setup a Twitter Stream
  var stream = Tweetinvi.Stream.CreateFilteredStream();
  stream.MatchingTweetReceived += (sender, args) => { NextTweet(args.Tweet); };
  // Setup your filter criteria
  stream.AddTrack("China");
  stream.StartStreamMatchingAnyConditionAsync();
}

Figure 3 shows the initialization of the context for this spout using a passed-in context during topology startup. This context is then updated with the addition of a schema definition. I create a Dictionary object in which I add an identifier for the type of stream (DEFAULT_STREAM) and a list of the types for all the fields in my tuple—in this case simply a SerializableTweet. The context now contains the schema definition I have to follow when I emit tuples in this class, as well as when I consume them in TwitterBolt.

The rest of this snippet shows the setup of the Twitter stream. The Tweetinvi package provides abstractions for both REST and streaming APIs from Twitter. After encoding the appropriate credentials, I simply instantiate the kind of source I want to use. In the case of streaming sources, I can choose from one of several types, including filtered, sampled or user streams. These provide simplified interfaces for doing keyword filtering across all Tweets, sampling random public Tweets, and tracking events associated with a specific user. Here, I use the filtered stream, which allows the selection of tweets from across all public tweets by checking for the existence of any one of a multiple set of keywords.

Here I perform the desired filtering of Tweets at the spout, because the Tweetinvi APIs make this easy to do. I could also do filtering in the TwitterBolt component, along with any other calcu­lations or aggregations I want to do to massage the Tweets. Filtering at the spout allows me to reduce the volume of data streaming through the topology at an early stage. However, the power of Storm is that it allows me to handle large volumes at any component in the topology by scaling out. Storm provides almost linear scaling with added resources, which allows me to use more workers to add scale wherever a bottleneck occurs. HDInsight supports this approach by letting me select the size of my cluster and the types of nodes when I set it up, and to add nodes to it later. Using this scale-out approach, I can build Storm clusters that process millions of events per second. I’m charged by the number of running nodes in my cluster, so I have to keep in mind the trade-off between cost and scale.

The only other part to call out in Figure 3 is the registration of a callback for the Tweetinvi stream object to call when it finds a tweet that matches my criteria. The NextTweet method is that callback, which simply adds the provided tweet to the previously mentioned private queue in the TwitterSpout class:

public void NextTweet(ITweet tweet)
{
  queue.Enqueue(new SerializableTweet(tweet));
}

The bolts for my topology are coded in a similar fashion. They derive from the Microsoft.SCP.ISCPBolt class and must implement the Execute method. Here the tuple is passed in as a generic type of SCPTuple and must be converted to the correct type first. Then I can write C# code to execute whatever detailed processing I need. In this case I simply use a global variable to accumulate a count of the number of tuples seen by the bolt, and log the count and tweet text. Finally, I emit a new type of tuple for downstream bolts to consume. Here’s the code:

public void Execute(SCPTuple tuple)
{
  var tweet = tuple.GetValue(0) as SerializableTweet;
  count++;
  Context.Logger.Info("ExecuteTweet: Count = {0}, Tweet = {1}", count, tweet.Text);
  this.context.Emit(new Values(count, tweet.Text));
}

In the case of a bolt, I have to specify both input and output schemas when I set it up. The format is exactly the same as the previous schema definition for a spout. I simply define another Dictionary variable called outputSchema and list the integer and string types of the output fields, as shown in Figure 4.

Figure 4 Specifying Input and Output Schemas for TwitterBolt

public TwitterBolt(Context context, Dictionary<string, Object> parms)
{
  this.context = context;
  Dictionary<string, List<Type>> inputSchema =
    new Dictionary<string, List<Type>>();
  inputSchema.Add(Constants.DEFAULT_STREAM_ID,
    new List<Type>() { typeof(SerializableTweet) });
  Dictionary<string, List<Type>> outputSchema =
    new Dictionary<string, List<Type>>();
  outputSchema.Add(Constants.DEFAULT_STREAM_ID,
    new List<Type>() { typeof(long), typeof(string) });
  this.context.DeclareComponentSchema(
    new ComponentStreamSchema(inputSchema,
    outputSchema));
}

The other bolts follow the same pattern, but call specific APIs for SQL Azure and SignalR. The final key element is to define the topology by enumerating the components and their connections. To accomplish this, there’s another method that must be implemented in all spouts and bolts—the Get method, which simply instantiates an object of this class with a Context variable that gets called by the SCPContext during the launch of the Storm task. SCP.Net will instantiate a child C# process, which will start your C# spout or bolt task using the following delegate method:

return new TwitterSpout(context);

With the spouts and bolts in place, I can now create the topology. Again, SCP.Net provides a class and helper functions to do this. I create a class derived from Microsoft.SCP.Topology.Topology­Descriptor and override the GetTopologyBuilder method. In this method I use an object of type TopologyBuilder that provides the methods SetSpout and SetBolt. These methods let me specify the name and input and output schemas of the component. They also let me specify the Get delegate to use to initialize the component and, most important, to specify the upstream component to connect with the current component. Figure 5 shows the code defining my topology.

Figure 5 Creating the Twitter AnalysisTopology

namespace TwitterStream
{
  [Active(true)]
  class TwitterTopology : TopologyDescriptor
  {
    public ITopologyBuilder GetTopologyBuilder()
    {
      TopologyBuilder topologyBuilder = new TopologyBuilder(
        typeof(TwitterTopology).Name + DateTime.Now.ToString("-yyyyMMddHHmmss"));
      topologyBuilder.SetSpout(
        typeof(TwitterSpout).Name,
        TwitterSpout.Get,
        new Dictionary<string, List<string>>()
        {
          {Constants.DEFAULT_STREAM_ID, new List<string>(){"tweet"}}
        },
        1);
      topologyBuilder.SetBolt(
        typeof(TwitterBolt).Name,
        TwitterBolt.Get,
        new Dictionary<string, List<string>>()
        {
          {Constants.DEFAULT_STREAM_ID, new List<string>(){"count", "tweet"}}
        },
        1).shuffleGrouping(typeof(TwitterSpout).Name);
      topologyBuilder.SetBolt(
        typeof(SqlAzureBolt).Name,
        SqlAzureBolt.Get,
        new Dictionary<string, List<string>>(),
        1).shuffleGrouping(typeof(TwitterBolt).Name);
      topologyBuilder.SetBolt(
        typeof(SignalRBroadcastBolt).Name,
        SignalRBroadcastBolt.Get,
        new Dictionary<string, List<string>>(),
        1).shuffleGrouping(typeof(TwitterBolt).Name);
      return topologyBuilder;
    }
  }
}

The complete Twitter analysis project can be built in Visual Studio using the Storm project type. This project conveniently lays out the various components you need in a simple and familiar way that can be viewed in Solution Explorer, as shown in Figure 6. You can add components like bolts and spouts using the Add | New Item option from the context menu of a project. Choosing from the Storm item types adds a new file and includes the outline for all the required methods. Using the Visual Studio Storm project, I can add references to libraries, like Tweetinvi, directly or through NuGet. Submitting the topology to run on Azure is a single click from the Solution Explorer context menu. All necessary components are uploaded to the HDInsight Storm cluster of my choice and the topology is submitted.

Submitting a Topology from Solution Explorer
Figure 6 Submitting a Topology from Solution Explorer

After submission, I see the Topology View from Figure 2, where I can monitor the state of my topology. Storm allows several states for my topology, including activated, deactivated and killed, as well as allowing a rebalance of tasks across workers based on scalability parameters. I can manage all of these state transitions from Visual Studio, as well as observe the current flow of tuples. In order to investigate components in detail and debug issues, I can drill down into individual components like the SqlAzureBolt, which is showing an error condition (the red outline and marker on the Topology View). Double-clicking this bolt shows more detailed stats about tuple flow, as well as the summary of errors from the bolt. You can even click on the Error Port link to go to the full logs of individual tasks without having to leave Visual Studio.

The code and the project for the simple topology covered in this article can be found on GitHub under the MicrosoftBigData repository. Look for the HDInsight folder and the TwitterStream sample project. You’ll find additional articles and samples at bit.ly/1MCfsqM.

Moving to More Complex Analysis

The Storm topology example I presented is a simple one. There are a number of ways I can increase the power and complexity of my real-time processing in Storm.

As already mentioned, the number of resources assigned to a Storm cluster in HDInsight can be scaled up as needed. I can observe the performance of my system from the data provided in Visual Studio’s runtime view of the topology from Figure 2. Here I can see the number of tuples being emitted, number of executors and tasks and latencies. Figure 7 shows the Azure Management Portal view, which provides further details about the number of nodes, their type and core counts that are in use now. Based on these, I can decide to scale my cluster and add more supervisor (worker) nodes to the cluster. This scale-up doesn’t require a restart and will happen within minutes when I trigger a rebalance from my Visual Studio topology view or from the Management Portal.

Azure Management Portal View of Storm Cluster
Figure 7 Azure Management Portal View of Storm Cluster

Most analytics applications will operate on multiple unstructured Big Data streams. In this case, the topology would contain multiple spouts and bolts that can read from more than one spout. This can be expressed easily in the topology configuration by specifying several inputs in the SetBolt method invocation. However, the business logic of dealing with the multiple sources in the same bolt will be more complex as individual tuples arrive under different stream IDs. As the complexity of the business problem grows, it’s likely that relational or structured data sources will also be needed during processing. While spouts are ideal for queue-like data sources, relational data is more likely to be brought in by a bolt. Again, the flexible implementation of bolts and the use of C# or Java make it possible to easily code access to a database using established APIs or query languages. The complexity here arises from the fact that these calls will be made remotely from Storm containers in a cluster to the database server. SQL Azure and HDInsight work on the same Azure fabric and interact easily, but there are other choices for cloud-based services that can also be used.

The Storm runtime allows me to set or tweak many fine-grain behaviors of the system. Many such settings appear as configuration parameters that can be applied at the topology or task level. These can be accessed in the Microsoft.SCP.Topology.StormConfig class and used to tune the overall Storm workload. Examples include settings for the maximum number of pending tuples per spout, tick tuples and spout sleep strategy. Other changes to the topology can be made in the topology builder. In my example topology, the streaming between all components is set to “shuffle grouping.” For any given component, the Storm execution system can and will create many individual tasks. These tasks are independent worker threads that can run in parallel across cores or containers to spread the workload for the bolt across multiple resources. I can control how work is passed from one bolt to the next. By choosing shuffle grouping, I’m saying that any tuple can go to any worker process in the next bolt. I can also choose other options like “field grouping,” which would cause tuples to be sent to the same worker based on the value of a particular field in the tuple. This option can be used to control the flow of data for operations that have a state, like a running count for a particular word in the Tweet stream.

Finally, a real-time analytics system might be part of a larger pipeline of analytics within an organization. For example, a Web log analytics system is likely to have a large batch-oriented portion that processes the logs for a Web service on a daily basis. This would produce Web site traffic summaries and provide lightly aggregated data suitable for pattern discovery by a data scientist. Based on this analysis, the team might decide to create real-time triggers for certain behaviors, like detection of system failures or malicious use. This latter portion would require real-time analysis of log or telemetry streams, but is likely to depend on reference data that’s updated daily by the batch system. Such larger pipelines require a workflow management tool that allows the synchronization of tasks across a variety of computation models and technologies. The Azure Data Factory (ADF) provides a workflow management system that natively supports the Azure analytics and storage services and allows coordination across tasks based on input data availability. ADF supports HDInsight and Azure Data Lake Analytics, as well as moving data between Azure Storage, Azure Data Lake Storage, Azure SQL Database and on-premises data sources.

Other Streaming Technologies

In this article I introduced the basics of real-time streaming analytics using Storm in HDInsight. Of course, Storm can also be set up on your own cluster of machines in your own datacenter or lab. The Storm distribution can be obtained through Hortonworks, Cloudera or directly from Apache. The installation and configuration in these cases is considerably more time-consuming, but the concepts and code artifacts are the same.

Spark (spark.apache.org) is another Apache project that can be used used for real-time analytics and has gained great popu­larity. It supports general Big Data processing, but its support for in-memory processing and a library of streaming functions makes it an interesting choice for high-performance real-time processing. HDInsight offers Spark cluster types where you can experiment with this technology. The service includes Zeppelin and Jupyter notebooks, which are interfaces that let you build queries in these languages and see interactive results. These are ideal for data exploration and developing queries over Big Data sets.

The interest in real-time streaming analytics is building as organizations work their way through increasingly complex scenarios for Big Data analytics. At the same time, technologies in this space continue to grow and mature, providing new opportunities for gaining insights from Big Data. Look to these pages for future articles on the use of technologies such as Spark and Azure Data Lake Analytics.


Omid Afnan is a principal program manager in the Azure Big Data team working on implementations of distributed computation systems and related developer toolchains. He lives and works in China. Reach him at omafnan@microsoft.com.

Thanks to the following technical experts for reviewing this article: Asad Khan and Ravi Tandon
Ravi Tandon (Microsoft), Asad Khan (Microsoft)

Asad Khan is a Principal Program Manager in Microsoft Big Data group focused on Hadoop-powered experiences in the Cloud through the Azure HDInsight Service. Currently he is focused on Spark and the real time analytics through Apache Storm. He has spent the last few years working on the next generation data access technologies from Microsoft including Hadoop, OData and BI over big data. Asad holds a master's degree from Stanford University

Ravi Tandon is a Senior Software Engineer in the Microsoft Azure HDInsight team. He works on the Apache Storm and Apache Kafka offerings on Microsoft Azure HDInsight.