2016 年 1 月

第 31 卷,第 1 期

大数据 - .NET 开发者可使用 HDInsight 进行实时数据分析

作者:Omid Afnan

各种规模的企业已经开始认识到大量收集数据的价值,以及充分利用这些数据的需要。随着组织踏上其大数据旅程,他们通常首先对其大数据资产进行批处理。这就意味着 Web 日志数据的收集和汇总、应用程序的用户点击率、对物联网 (IoT) 设备或者一系列人为或机器生成的数据的遥测。我一年前在一篇文章中介绍了使用 Hive on HDInsight 进行基本 Web 日志分析的情况 (msdn.com/magazine/dn890370)。但是,随着实现批处理了解历史数据的优势,很多组织也开始遇到处理实时数据的问题以及如何实时收集、分析和处理持续的数据流这一难题。

您可能会猜到,大数据领域应该会有处理这些需求的技术。Microsoft Azure 平台提供了强大的大数据解决方案,包括 Azure Data Lake 和 HDInsight。有一种开放源技术叫作 Apache Storm,可以进行高度分布式实时分析。HDInsight 中会对该技术提供本机支持,HDInsight 是 Apache 大数据服务的 Azure 托管产品。在本文中,我将逐步介绍一款简单但功能强大的方案,即使用 Storm 作为关键工具来启用实时持续分析,从而处理推文流。

正如您将看到的,Microsoft 通过 Visual Studio 中的强大编写和调试工具使得这种开发要比目前市场上其他产品容易许多。Visual Studio 的 HDInsight 工具(作为 Azure SDK 的一部分提供)提供了 .NET 开发者熟悉的编码和调试环境。相较于开放源领域当前可用的简单编辑器和命令行工具,这些工具提供了明显容易很多的方式来使用大数据技术。尽管 Storm for HDInsight 完全支持使用 Java 编程,但是 Microsoft 也支持 .NET 编程人员使用 C# 编写(和重用)业务逻辑。本文中的示例演示了这些 .NET 功能。

情绪跟踪方案

跟踪和分析新兴趋势已不是新方案。新闻报道、天气跟踪和灾难检测都是云计算之前的例子。但是,随着云时代的进步,不论是需要进行趋势检测的区域范围,还是可用于进行分析的数据规模,增长都超乎想象。社交网络一直是进行情绪分析的沃土。通过 API 提供其社交数据的服务(如 Twitter)以及即付即用大数据平台(如 HDInsight),使得大大小小的组织均能进行情绪分析。

使用 Twitter 进行情绪分析最简单的一种就是计算给定时段内人们就特定主题或哈希标记发布推文的频率。当然,仅针对一个时段(比如一分钟)执行此操作不如跨当天的每一分钟执行此操作并查看比率的的起伏有趣。识别所用特定词条的剧增对于检测趋势非常有用。例如,检测与风暴或地震相关的词条可能会快速指出受自然灾害影响的区域及其严重程度。

为了演示完成此操作的基本知识,我将逐步介绍如何设置从 Twitter 收集数据的流式拓扑,选择某些推文、计算指标、将所有内容保存到存储器以及发布某些结果。您可以在图 1 中看到此拓扑。在本文中,我利用简单关键字匹配选择了推文。计算的指标是与选择条件匹配的推文数量。所选的推文将放入 SQL 数据库,也会发布到网站上。使用当前可用的 Storm、SQL Server 和网站服务在 Azure 云中完成了所有操作。逐步了解完本示例之后,接下来我将讨论一些可用于解决此类流数据分析问题的其他技术。

情绪分析拓扑
图 1 情绪分析拓扑

Storm 基本信息

Storm 是开放源 Apache 项目 (storm.apache.org),允许通过数据流执行实时分布式计算。它属于大数据处理工具的 Hadoop 生态系统,HDInsight 中会直接提供支持。Storm 作业定义为通过元组形式的数据流连接的处理节点图表。此类图表在 Storm 中称为“拓扑”。拓扑不像其他查询那样完成,它们将继续执行直到其挂起或终止。

在 Azure 管理门户中,您可以创建新的 HDInsight 群集并选择 Storm 作为类型。这将导致 Azure 在数分钟内设置预装有所有必要操作系统、Hadoop 和 Storm 组件的计算机群集。我可以选择我想要的节点数、选择不同的核心和内存大小,以及随时增加或减少节点数。就简化 Hadoop 体验而言,这已经为我节省了大量时间,并避免了采购和配置多台计算机的难题。

拓扑的组件称为 spout 和 bolt。spout 生成元组流,基本上是几组类型和值对。换言之,spout 是知道如何收集或生成数据,然后将其成块发射的代码段。bolt 是可以使用数据流的代码单位。它们可能会处理数据进行清理或者计算统计信息。在此类情况下,它们可能会将另一元组流发送至下游 bolt。其他 bolt 将数据写入存储器或者其他系统。

每个组件可以执行很多并行任务。这是 Storm 可扩展性及可靠性的关键。我可以为每个组件指定并行度,Storm 将会分配很多任务在我的 spout 或 bolt 中执行逻辑。Storm 可通过管理任务并自动重新启动失败的任务提供容错能力。最后,给定拓扑在实际上是执行容器的一组工作线程进程上执行。可以添加工作线程以提高拓扑的处理能力。这些功能提供了支持 Storm 扩展和容错的基本特性。

一个拓扑可以尽可能复杂以执行整体实时分析方案所需的处理。体系结构将租借自身以便重用组件,但是随着 spout和 bolt 数量的增加,它还带来充满挑战的管理和部署问题。Visual Studio 项目概念是管理对拓扑进行实例化所需的代码和配置组件非常有用的方式。由于拓扑这一概念本质上就是图形,因此在开发和运行系统期间能够可视化拓扑无疑是非常有用的。这可以在 Visual Studio 的 HDInsight 工具的执行视图中看到,如图 2 所示。

活动 Storm 拓扑的监视视图
图 2 活动 Storm 拓扑的监视视图

Storm 体系结构基于 Apache Thrift,后者是一个允许以多种语言实现服务开发的框架。虽然很多开发者使用 Java 编写 spout 和 bolt,但并不要求这么做。随着 SCP.Net 包库的推出,我可以使用 C# 开发我的 spout 和 bolt。Visual Studio 的 HDInsight 工具下载中包含此包,但是也可以通过 NuGet 下载。

近实时筛选推文

让我们看一下构建推文流筛选拓扑,以了解这些部分实际如何运行。我的样例拓扑由一个 spout 和三个 bolt 组成。您可以在图 2 中看到此拓扑的图形视图,如 Visual Studio 的 HDInsight 工具所示。当我提交一个 Storm 项目在 Azure 中执行时,Visual Studio 会向我显示此图形视图,并且随着时间的推移,通过流经系统的事件数以及任意节点中发生的任意错误条件对该视图进行更新。

这里 TwitterSpout 负责提取我想要处理的推文流。它通过与 Twitter API 交互来收集推文,并将其转变为可以流经其余拓扑的数据元组来完成这一操作。TwitterBolt 挑选流并可以进行汇总,比如计算推文数量或者将其与从其他数据源中提取的其他数据相结合。该 bolt 基于它已执行的业务逻辑用可能的新格式发射一个新流。AzureSQLBolt 和 SignalRBroadcastBolt 组件使用此流并将部分数据分别写入 Azure 托管的 SQLServer 数据库和 SignalR 网站。

因为我使用 C# 构建我的 Storm 解决方案,所以我可以使用很多现有库来帮助简化和加快我们的开发过程。本示例的两个关键包是 CodePlex 上的 Tweetinvi 库 (bit.ly/1kI9sqV) 和 NuGet 上的 SCP.Net 库 (bit.ly/1QwICPj)。

SCP.Net 框架大大降低了处理 Storm 编程模型的复杂性并提供了基类来囊括我需要手动进行的大量工作。我首先从 Microsoft.SCP.ISCPSpout 基类继承。这为我提供了 spout 需要的三种核心方法: NextTuple、Ack 和 Fail。NextTuple 为流发射下一个可用数据段,或者不发射任何内容。该方法在紧密循环中由 Storm 调用,如果我没有要发射的元组,正好可以在该循环中引入一些睡眠时间。这是可以确保在拓扑持续运行时我最终不会耗用全部 CPU 周期的一种方法。

如果我想要实施有保证的消息处理,例如针对我的元组的“至少一次”语义,我将会使用 Ack 和 Fail 方法在 bolt 之间实施所需的握手。在本例中,我不使用任何重试机制,因此只实施 NextTuple 方法,使用从 TwitterSpout 类中的专用队列成员提取推文的代码并将其发出至拓扑。

拓扑内的流被捕获为由 spout 或 bolt 发布的架构。这些被用作拓扑中组件之间的协定,也用作 SCP.Net 在传输数据时所用的序列化和反序列化规则。Context 类用于按 spout 或 bolt 的实例存储配置信息。spout 发射的元组架构存储在 Context 中并由 SCP.Net 用来生成组件连接。

让我们来看一下用于初始化 TwitterSpout 类的代码,如图 3 中部分所示。

图 3 初始化 TwitterSpout 类

public TwitterSpout(Context context)
{
  this.context = context;
  Dictionary<string, List<Type>> outputSchema =
    new Dictionary<string, List<Type>>();
  outputSchema.Add(Constants.DEFAULT_STREAM_ID,
    new List<Type>() { typeof(SerializableTweet) });
  this.context.DeclareComponentSchema(new ComponentStreamSchema(
    null, outputSchema));
  // Specify your Twitter credentials
  TwitterCredentials.SetCredentials(
    ConfigurationManager.AppSettings["TwitterAccessToken"],
    ConfigurationManager.AppSettings["TwitterAccessTokenSecret"],
    ConfigurationManager.AppSettings["TwitterConsumerKey"],
    ConfigurationManager.AppSettings["TwitterConsumerSecret"]);
  // Setup a Twitter Stream
  var stream = Tweetinvi.Stream.CreateFilteredStream();
  stream.MatchingTweetReceived += (sender, args) => { NextTweet(args.Tweet); };
  // Setup your filter criteria
  stream.AddTrack("China");
  stream.StartStreamMatchingAnyConditionAsync();
}

图 3 显示了拓扑启动期间使用传入上下文初始化此 spout 的上下文。然后,通过增加架构定义来更新此上下文。我创建了一个 Dictionary 对象,其中我为流类型 (DEFAULT_STREAM) 添加了一个标识符并为我元组中的所有字段添加了一个类型列表(本例中就是 SerializableTweet)。现在,上下文中包含我在该类中发射元组以及在 TwitterBolt 中使用这些元组时需要遵循的架构定义。

此代码段的其余部分则显示了 Twitter 流的设置。Tweetinvi 包为 Twitter 中的 REST API 和流式 API 均提供了抽象。对相应凭据进行编码后,我只需实例化我要使用的源种类。在对源进行流式处理时,我可以从中选择一个类型,包括已筛选流、已采样流或用户流。这些流提供了简化的接口,可用于在所有推文中进行关键字筛选、采样随机公共推文以及跟踪与特定用户相关联的事件。此处我使用已筛选流,它允许通过检查是否存在多组关键字中的任意一组从所有公共推文中选择推文。

此处我对 spout 的推文执行所需的筛选,因为 Tweetinvi API 使得可以轻松完成此操作。我还可以在 TwitterBolt 组件中执行筛选,并根据自己的需要执行任何其他计算或汇总以发送推文。在 spout 执行筛选使我可以减少早期流经拓扑的数据量。但是,Storm 的威力在于它允许我通过横向扩展在拓扑中的任意组件处理大型卷。Storm 通过添加的资源提供近乎线性的扩展,从而允许我在发生瓶颈时使用更多工作线程增加规模。HDInsight 支持此方法,具体原理是允许我在创建群集时选择群集大小以及节点类型,并在稍后向群集中添加节点。使用此横向扩展方法,我可以构建每秒处理数百万事件的 Storm 群集。我是按照我群集中运行的节点数量付费的,所以我必须谨记在成本与规模之间进行权衡。

图 3 中唯一另一个要注意的部分是回叫的注册,以便 Tweetinvi 流对象在发现与我的条件匹配的推文时进行呼叫。NextTweet 方法就是该回叫,它仅将提供的推文添加到 TwitterSpout 类中先前提及的专用队列中:

public void NextTweet(ITweet tweet)
{
  queue.Enqueue(new SerializableTweet(tweet));
}

我的拓扑的 bolt 采用类似的方式进行编码。它们派生自 Microsoft.SCP.ISCPBolt 类且必须采用 Execute 方法。此处元组作为通用 SCPTuple 类型传入并且必须先转换为正确的类型。然后,我可以写入 C# 代码以便根据需要执行任何详细的处理。在这种情况下,我只需使用全局变量累积 bolt 看到的元组数量,并记录该计数和推文。最后,我发射一个新的元组类型供下游 bolt 使用。代码如下:

public void Execute(SCPTuple tuple)
{
  var tweet = tuple.GetValue(0) as SerializableTweet;
  count++;
  Context.Logger.Info("ExecuteTweet: Count = {0}, Tweet = {1}", count, tweet.Text);
  this.context.Emit(new Values(count, tweet.Text));
}

如果是 bolt,我需要在创建 bolt 时指定输入和输出架构。格式与 spout 之前的架构定义完全相同。我只需定义另一个名为 outputSchema 的 Dictionary 变量,并列出输出字段的整数和字符串类型,如图 4 所示。

图 4 指定 TwitterBolt 的输入和输出架构

public TwitterBolt(Context context, Dictionary<string, Object> parms)
{
  this.context = context;
  Dictionary<string, List<Type>> inputSchema =
    new Dictionary<string, List<Type>>();
  inputSchema.Add(Constants.DEFAULT_STREAM_ID,
    new List<Type>() { typeof(SerializableTweet) });
  Dictionary<string, List<Type>> outputSchema =
    new Dictionary<string, List<Type>>();
  outputSchema.Add(Constants.DEFAULT_STREAM_ID,
    new List<Type>() { typeof(long), typeof(string) });
  this.context.DeclareComponentSchema(
    new ComponentStreamSchema(inputSchema,
    outputSchema));
}

其他 bolt 遵循相同模式,但是调用 SQL Azure 和 SignalR 的特定 API。最后一个关键要素是通过枚举组件及其连接定义拓扑。为完成此操作,必须在所有 spout 和 bolt 中实施另一方法,即 Get 方法,它只是在启动 Storm 任务期间使用 SCPContext 调用的 Context 变量实例化此类的对象。SCP.Net 将实例化子 C# 进程,这将使用以下委派方法启动您的 C# spout 或 bolt 任务:

return new TwitterSpout(context);

spout和 bolt 就绪后,我现在便可以创建拓扑了。并且,SCP.Net 还提供了类和帮助程序函数来执行此操作。我创建了派生自 Microsoft.SCP.Topology.Topology­Descriptor 的类并覆盖了 GetTopologyBuilder 方法。在该方法中,我使用提供 SetSpout 和 SetBolt 方法的 TopologyBuilder 类型的对象。这些方法使我可以指定组件的名称及输入和输出架构。它们还允许我指定要用于初始化组件的 Get 委派,最重要的是,允许我指定要与当前组件连接的上游组件。图 5 显示了定义我拓扑的代码。

图 5 创建 Twitter 分析拓扑

namespace TwitterStream
{
  [Active(true)]
  class TwitterTopology : TopologyDescriptor
  {
    public ITopologyBuilder GetTopologyBuilder()
    {
      TopologyBuilder topologyBuilder = new TopologyBuilder(
        typeof(TwitterTopology).Name + DateTime.Now.ToString("-yyyyMMddHHmmss"));
      topologyBuilder.SetSpout(
        typeof(TwitterSpout).Name,
        TwitterSpout.Get,
        new Dictionary<string, List<string>>()
        {
          {Constants.DEFAULT_STREAM_ID, new List<string>(){"tweet"}}
        },
        1);
      topologyBuilder.SetBolt(
        typeof(TwitterBolt).Name,
        TwitterBolt.Get,
        new Dictionary<string, List<string>>()
        {
          {Constants.DEFAULT_STREAM_ID, new List<string>(){"count", "tweet"}}
        },
        1).shuffleGrouping(typeof(TwitterSpout).Name);
      topologyBuilder.SetBolt(
        typeof(SqlAzureBolt).Name,
        SqlAzureBolt.Get,
        new Dictionary<string, List<string>>(),
        1).shuffleGrouping(typeof(TwitterBolt).Name);
      topologyBuilder.SetBolt(
        typeof(SignalRBroadcastBolt).Name,
        SignalRBroadcastBolt.Get,
        new Dictionary<string, List<string>>(),
        1).shuffleGrouping(typeof(TwitterBolt).Name);
      return topologyBuilder;
    }
  }
}

完整的 Twitter 分析项目可以使用 Storm 项目类型在 Visual Studio 中构建。该项目方便地通过一种简单、熟悉的方式布设了您所需的各种组件,可以在解决方案资源管理器中查看,如图 6 所示。您可以使用项目上下文菜单中的“添加 | 新增项”选项添加 bolt 和 spout 等组件。从 Storm 项目类型选择将添加一个新文件并包括全部所需方法的大纲。使用 Visual Studio Storm 项目,我可以直接或通过 NuGet 添加对库(如 Tweetinvi)的引用。提交拓扑以在 Azure 上运行只需在“解决方案资源管理器”上下文菜单中单击一下。所有必要的组件都会上传至我选择的 HDInsight Storm 群集,并会提交拓扑。

从解决方案资源管理器提交拓扑
图 6 从解决方案资源管理器提交拓扑

提交之后,我从图 2 中看到拓扑视图,我可以在其中监视拓扑的状态。Storm 允许我的拓扑存在多个状态,包括已激活、已停用和已终止,并且允许基于可扩展性参数再平衡工作线程之间的任务。我可以通过 Visual Studio 管理所有这些状态转变,并且我还可以观察元组的当前流。为了详细调查组件并调试问题,我可以深入各组件(如 SqlAzureBolt),其显示错误状况(拓扑视图上的红色边框和标记)。双击此 bolt 可显示有关元组流的详细统计信息以及 bolt 中的错误摘要。您甚至可以单击“错误端口”链接转至各个任务的完整日志,而不用离开 Visual Studio。

本文中介绍的简单拓扑的代码和项目可以在 GitHub 下的 MicrosoftBigData 存储库中找到。查找 HDInsight 文件夹和 TwitterStream 示例项目。您将会在 bit.ly/1MCfsqM 找到其他文章和示例。

进行更复杂的分析

我展示的 Storm 拓扑示例是个简单的例子。我有许多种方法可以提高我在 Storm 中的实时处理能力和复杂性。

正如已经提到的,可以按需纵向扩展分配给 HDInsight 中的 Storm 群集的资源数量。我可以通过图 2 拓扑的 Visual Studio 运行时视图中提供的数据观察我系统的性能。我可以在此处看到发射的元组数量、执行器、任务以及延迟的数量。图 7 显示了“Azure 管理门户”视图,其中提供了有关节点数量、其类型以及当前使用的核心数的进一步详细信息。基于这些信息,我可以决定扩展我的群集并向群集中添加更多主管(工作线程)节点。这一纵向扩展不需要重新启动,并且会在我从我的 Visual Studio 拓扑视图或者管理门户触发再平衡时,在数分钟内发生。

Storm 群集的 Azure 管理门户视图
图 7 Storm 群集的 Azure 管理门户视图

大多分析应用程序将运行于多个非结构化大数据流。在这种情况下,拓扑将包含多个 spout 以及可以从多个 spout 中读取的 bolt。这可以通过在 SetBolt 方法调用中指定多个输入在拓扑配置中轻松表达。但是,由于各个元组以不同的流 ID 到达,所以在同一 bolt 中处理多个源的业务逻辑将会更加复杂。随着业务问题的复杂度不断增加,很可能在处理过程中也将需要关系或结构化数据源。虽然 spout 是类似队列的数据源的理想之选,但是 bolt 更有可能引入关系数据。并且,通过 bolt 的灵活实施以及 C# 或 Java 的使用,可以使用确立的 API 或查询语言轻松对数据库进行代码访问。此处的复杂性源自于这些调用将从群集中的 Storm 容器远程进行,目标位置是数据库服务器。SQL Azure 和 HDInsight 运行于同一 Azure 结构并轻松交互,但是针对基于云的服务还可以使用其他选择。

Storm 运行时允许我设置或调整系统的很多精细行为。很多此类设置作为配置参数显示,可以在拓扑或任务级别应用。可以在 Microsoft.SCP.Topology.StormConfig 类中访问这些设置并将其用于调整总体 Storm 工作负载。示例包括每个 spout 最大挂起元组数设置、刻度元组数设置以及 spout 睡眠策略设置。对拓扑进行的其他更改可以在拓扑生成器中进行。在我的示例拓扑中,所有组件之间的流设置为“随机分组”。 对于任何给定组件,Storm 执行系统可以并将创建很多单独的任务。这些任务是可以跨核心或容器并行运行的独立工作线程,用于跨多个资源分配 bolt 的工作负载。我可以控制如何将工作从一个 bolt 传递至下一个。通过选择随机分组,表示任何元组都可以进入下一 bolt 中的任意工作线程进程。我还可以选择“字段分组”等其他选项,该选项将导致基于元组中特定字段的值将元组发送至同一工作线程。该选项可用于控制具有状态的操作的数据流,如 Tweet 流中特定单词的运行计数。

最后,实时分析系统可能会成为组织内较大分析管道的一部分。例如,Web 日志分析系统可能具有较大的批处理导向部分,按天处理 Web 服务的日志。这将生成网站流量摘要并提供适于数据专家发现的模式的轻度汇总数据。基于此分析,团队可能决定针对特定行为(如系统故障或恶意使用检测)创建实时触发器。后面部分将需要对日志或遥测流进行实时分析,但是可能取决于批处理系统每日更新的引用数据。此类较大的管道需要工作流管理工具,以便跨各种计算模型与技术同步任务。Azure Data Factory (ADF) 提供了工作流管理系统,可本机支持 Azure 分析和存储服务,并允许基于输入数据的可用性在任务中进行协调。ADF 支持 HDInsight 和 Azure Data Lake Analytics,以及在 Azure Storage、Azure Data Lake Storage、Azure SQL Database 和内部部署数据源之间移动数据。

其他流式技术

在本文中,我介绍了使用 HDInsight 中的 Storm 进行实时流分析的基本知识。当然,Storm 也可以在您自己的数据中心或实验室中的计算机群集上进行设置。可以通过 Hortonworks、Cloudera 或直接从 Apache 获得 Storm 分发。这些案例中的安装和配置相当耗时,但是概念和代码项目是一样的。

Spark (spark.apache.org) 是另一个 Apache 项目,可用于实时分析,并且很受欢迎。它支持常规大数据处理,但是它对内存中处理以及流函数库的支持使得其成为高性能实时处理的有趣选择。HDInsight 提供了 Spark 群集类型,其中您可以通过该技术进行试验。服务包括 Zeppelin 和 Jupyter 笔记本,这是可让您以这些语言生成查询并查看交互结果的接口。这些是数据浏览以及通过大数据集生成查询的理想之选。

随着组织奋力应对日益复杂的大数据分析方案,正逐渐对实时流分析产生兴趣。同时,该领域的技术继续发展并日渐成熟,从而为深入了解大数据提供新的机会。查看这些页面获取未来关于使用 Spark 和 Azure Data Lake Analytics 等技术的文章。


Omid Afnan是 Azure 大数据团队的首席计划经理,主要从事实施分布式计算系统和相关开发人员工具链的工作。他在中国生活和工作。您可以通过 omafnan@microsoft.com 与他取得联系。

衷心感谢以下技术专家对本文的审阅: Asad Khan 和 Ravi Tandon
Ravi Tandon (Microsoft)、Asad Khan (Microsoft)

Asad Khan 是 Microsoft 大数据组的首席计划经理,专注于通过 Azure HDInsight 服务获得云中 Hadoop 体验。目前,他主要研究 Spark 以及通过 Apache Storm 进行实时分析。他过去几年一直潜心钻研 Microsoft 的下一代数据访问技术,包括 Hadoop、OData 和大数据 BI。Asad 拥有斯坦福大学硕士学位。

Ravi Tandon 是 Microsoft Azure HDInsight 团队的一名高级软件工程师。他主要研究 Microsoft Azure HDInsight 的 Apache Storm 和 Apache Kafka 产品。