Azure Insider

使用 Microsoft Azure 服务进行遥测引入和分析

**Bruno Terkaly
Ricardo Villalobos
Thomas Conte

Bruno Terkaly and Ricardo Villalobos[定期专栏作家 Bruno Terkaly 和里卡多 · 比利亚洛沃斯目前为本月的专栏特约专栏作家。他们将返回与下一次分期付款。 — — Ed.]

基于传感器的每个设备生成的遥测数据。解释该数据是其价值主张的核心。在消费者的世界,一个驱动程序看他连接的汽车仪表板,看到他的驾驶风格对燃料消耗和交通的影响。在工业世界中,进行比较的一台机器在工厂车间的其他人的平均温度可以帮助操作员识别与失败的风险并执行预防性维护。

这些情况下,需要从数十或数百数千个连接的设备的遥测数据。更重要的是,您需要分析此数据,以提供有意义的可视化效果和见解。当处理如此大量的数据,大数据框架,例如 Hadoop 牢固的基础数据处理,可以使用设备的安装基础扩大规模。

在本文中,您将学习如何创建一个简单遥测摄入的体系结构使用微软 Azure 服务总线。然后你消耗分析此数据在可伸缩的方式使用名为 HDInsight 的微软 Azure Hadoop 服务。

解决方案体系结构

在以前的专栏,Bruno Terkaly 和里卡多 · 比利亚洛沃斯显示如何使用服务总线来建立一种命令通道与连接对象进行通信。在本文中,我会用服务总线作为一个中间件层缓冲遥测设备所发送的消息。

这些设备将直接沟通服务总线将遥测消息发送到一个专门的主题 (见图 1)。然后一个或几个订阅将重复数据队列中工作者角色这些消息并将它们存储为 Blob 存储中的平面文件。Hadoop 群集然后可以使用这些输入的文件来执行分析和计算。

Basic Flow of Big Data Telemetry Solution
图 1 基本流的大数据遥测技术解决方案

此体系结构具有解耦各种不同的块从彼此的利益。服务总线作为中间件,可以缓冲数据如果工人不能读取它们还不够快。您可以监视队列长度和使用,作为基础进行自动缩放的辅助层。

此外可用于执行简单筛选传入的数据和路由到不同的后端处理层订阅。例如,您可以将消息发送到实时警报系统,进行紧急订阅和使用一切订阅来捕获所有数据供以后分析。

因为工人们只是将数据移动到存储 — — 是否 Hadoop 分布式文件系统 (HDFS) 或 Blob 存储 — — 它脱钩 Hadoop 加工件。这可以运行独立的传入数据节奏。你可以选择有一个永久运行的 Hadoop 群集。这允许您处理小批量所有的时间和减少计算延迟。您也可以选择存钱,只是每天一次,在一个批处理中执行的所有计算 HDInsight 群集启动。您还可以二者的混合。

接收遥测数据使用服务总线

Azure 服务总线提供两个协议选择将邮件发送到一个主题:HTTP 或 AMQP。在连接的设备,经常与有限的带宽,AMQP 具有一些优势。它是一个高效率、 二进制、 可靠和便携式的协议。它也有许多种语言、 运行时环境和操作系统库。当您的设备直接连接到服务总线发送遥测消息,这使你灵活性。

若要测试这种方法,我用覆盆子 Pi 板饲料温度和其他传感器的数据,使用 Apache Qpid 质子 AMQP 库。质子是光秃的骨头,便携式库您可以在各种环境中发送 AMQP 消息编译。它将与 Azure 服务总线完全互操作。了解更多有关在质子 AMQP 图书馆 bit.ly/1icc6Ag

对于此示例,我已经编译直接上覆盆子 Pi 板的质子库。我用的 Python 绑定来编写一个简单的脚本来捕获传感器读数从 USB 串行端口并将它们发送到 Azure 服务总线,你可以看到在图 2

图 2 Python 代码中的覆盆子 Pi 读来捕获传感器的读数

#!/usr/bin/python
import sys
import commands
import re
import uuid
import serial
from proton import *
# Device ID
id = uuid.getnode()
# Topic address
address = "amqps://owner:key@address.servicebus.windows.
net/telemetry"
# Open serial port
ser = serial.Serial('/dev/ttyACM0', 9600)
# Create Proton objects
messenger = Messenger()
while True:
  # Read values from Arduino in the form K1:V1_K2:V2_...
temp = ser.readline().rstrip('\r\n')
  print temp
  # Create AMQP message
  message = Message()
  # Initialize properties
  message.properties = dict()
  message.properties[symbol("did")] = symbol(id)
  # Map string to list, symbolize, create dict and merge
  pairs=map(lambda x:x.split(':'), temp.split('_'))
  symbols = map(lambda x:(symbol(x[0]),int(x[1])), pairs)
  message.properties.update(dict(symbols))
  message.address = address
  messenger.put(message)
  messenger.send()

Python 脚本直接地址 Azure 服务巴士主题命名为"遥测技术"。它使用一个连接字符串,包括标准的服务总线的身份验证令牌,并指定使用 AMQP 协议。 在现实世界环境中,您将需要使用一个更复杂的身份验证机制,以确保您的连接参数不会遭到损坏。

承担大量的这些覆盆子设备开始收集数据。 每一个会发送第 ID (DID 设备) 你以后要用再来计算的平均温度。 在此示例中,将 DID 生成带有 UUID 模块检索系统的 MAC 地址。

连接到通过 USB 树莓 Pi Arduino Esplora 板收集读数。 Esplora 是与内置传感器-一板。 这使得它容易读取温度或其他环境参数并将它们发送到串行总线。 然后将 USB 电缆的另一端的 Python 脚本读取输出值。 Arduino 架构的打印传感器值到串行端口的示例所示图 3

图 3 Arduino 代码收集覆盆子 Pi 读数

void loop()
{
  int celsius = Esplora.readTemperature(DEGREES_C);
  int loudness = Esplora.readMicrophone();
  int light = Esplora.readLightSensor();
  Serial.print("T:");
  Serial.print(celsius);
  Serial.print("_");
  Serial.print("M:");
  Serial.print(loudness);
  Serial.print("_");
  Serial.print("L:");
  Serial.print(light);
  Serial.println();
  // Wait a second
  delay(1000);
}

请选择您的大数据部署

你有几个选择为哪种类型的 Hadoop 的解决方案,您将使用的数据分析。 部署类型的选择将决定如何以及在何处你需要聚合数据进行分析。

Azure 提供令人信服的解决方案与 HDInsight。 这暴露了 Hadoop 框架作为一种服务。 这种分布的 Hadoop,基于 Hortonworks 的数据平台 (HDP) 上的 Windows,都带有一种连接器,可以直接从 Azure Blob 存储访问输入的数据的工作。

这意味着你不需要有了 Hadoop 集群和运行,以接收的输入的文件。 您可以将文件上载到 HDInsight 将使用后的 Blob 存储容器。 当你分析一个批处理文件时,可以在几分钟内开始 HDInsight 群集、 对于几个小时,执行一系列的工作岗位,然后关闭它。 这转化为低法案在计算小时数。

另一方面,如果您选择部署 HDP,如标准 Hadoop 分布或分配对天青虚拟机 (Vm) 的 Cloudera,你会负责保持最新的群集。 也可以有它正确地配置为最佳操作。 此方法很有意义,如果您打算使用不包含在 HDInsight,如 HBase 作为存储机制中的自定义 Hadoop 组件。

遥测数据保存到 Blob 存储

从 Azure 服务总线中提取数据的过程很简单。 为订阅使用作为"读者"或"侦听器"工作者角色。 然后,积聚成 HDInsight 可以使用的输入文件的消息。

首先,设置您的 Azure 服务巴士主题上的一个或几个订阅。 这为您提供一些纬度时拆分或路由根据要求的流数据。 至少,在它是个好主意,以创建一个"包罗万象"订阅来存储所有传入的邮件。 您还可以使用筛选器 Azure 服务总线的订阅。 这将创建附加特定的消息流。 创建的主题和订阅使用 C# 和 Azure 服务总线 SDK 库的示例所示图 4

图 4 Azure 服务总线订阅

var namespaceManager = 
  NamespaceManager.CreateFromConnectionString(connectionString);
// Create the Topic
if (!
namespaceManager.TopicExists("telemetry"))
{
  namespaceManager.CreateTopic("telemetry");
}
// Create a "catch-all" Subscription
if (!
namespaceManager.SubscriptionExists("telemetry", "all"))
{
  namespaceManager.CreateSubscription("telemetry", "all");
}
// Create an "alerts" subscription
if (!
namespaceManager.SubscriptionExists("telemetry", "alert"))
{
  SqlFilter alertFilter = new SqlFilter("type = 99");
  namespaceManager.CreateSubscription("telemetry", 
  "alert", alertFilter);
}

一旦创建了 Azure 服务总线订阅,您可以接收并保存的消息。 此示例使用 CSV 格式,这是很容易阅读和理解都由人类和机器。 阅读作为传入的邮件,尽可能快地工人创建一定数量的任务 (在此示例中有 10)。 它还使用异步方法来读的邮件,而不是读他们一次一个批次。 "所有人"的订阅和"遥测"主题将接收消息 (见图 5)。

图 5 从订阅接收消息并将其存储在 Blob 存储

SubscriptionClient client = 
  SubscriptionClient.CreateFromConnectionString(connectionString, 
  "telemetry", "all", ReceiveMode.ReceiveAndDelete);
List<Task> tasks = new List<Task>();
for (int i = 0; i < NBTASKS; i++)
{
  var id = i; // Closure alert
  Task t = Task.Run(async () =>
  {
    BlobStorageWriter writer = new BlobStorageWriter(id);
    while (true)
    {
      var messages = await client.ReceiveBatchAsync(BATCH_SIZE);
      foreach (var message in messages)
      {
        try
        {
          await writer.WriteOneLine(TelemetryMessage.Stringify(message));
        }
        catch (Exception ex)
        {
          Trace.TraceError(ex.Message);
        }
      }
    }
  });
  tasks.Add(t);
}
Task.WaitAll(tasks.ToArray());

TelemetryMessage.Stringify 方法只是返回一行的文本中包含的遥测数据的 CSV 格式。 它还可以从的 Azure 服务总线标头,例如消息的消息 ID 或入队时间提取一些有用的字段。

BlobStorageWriter.WriteOneLine 的工作是把线直接写入 Blob。 因为 10 个任务可用的同时,将一次影响同一数量的 Blob。 WriteOneLine 还旋转时,不时为 HDInsight 去接他们的文件。 我使用两个参数来决定何时切换到一个新的文件:由于将 Blob 创建的 (例如,创建一个新的文件每小时或当它到达 1,000,000 线) 写入该文件和时间的行数。 此方法使用异步调用来避免阻塞消息写入 Blob 流时 (请参见图 6)。

图 6 将消息的数据写入到 Azure Blob

public async Task WriteOneLine(string line)
{
  var bytes = Encoding.UTF8.GetBytes(string.Format("{0}\n", line));
  await destinationStream.WriteAsync(bytes, 0, bytes.Length);
  TimeSpan ts = DateTime.Now - startBlobTime;
  if (++linesWritten > MAX_LINES || ts.TotalSeconds > MAX_SECONDS)
  {
    Trace.TraceInformation(
      "Wrote " + linesWritten + " lines to " + currentBlob.Name);
    GetNextBlob();
    linesWritten = 0;
  }
}

生成的文件包含数据提取遥测邮件,如图所示:

145268284e8e498282e20b01170634df,test,24,980,21,2014-03-14 13:43:32
dbb52a3cf690467d8401518fc5e266fd,test,24,980,21,2014-03-14 13:43:32
e9b5f508ef8c4d1e8d246162c02e7732,test,24,980,21,2014-03-14 13:43:32

他们包括消息 ID、 设备 ID,三个读数和消息队列的日期。 这种格式很容易下, 一步分析。

使用分析数据 HDInsight

最令人印象深刻的 HDInsight 好处是你可以启动完整的 Hadoop 集群、 运行作业和取消设置直接从命令行群集。 你不用再登录到虚拟机上或执行任何自定义的配置。 你可以调配和管理与 Windows PowerShell 基于 Windows,或 Mac 或 Linux 上使用跨平台的命令行工具 HDInsight。

您可以下载从综合的 Azure PowerShell commandlets bit.ly/1tGirZk。 这些 commandlets 包括一切您需要管理你蔚蓝的基础设施,包括 HDInsight 集群。 一旦你已经导入您的发布设置和所选的默认订阅,您只需要一个命令行,以创建一个新的 HDInsight 群集:

New-AzureHDInsightCluster -Name "hditelemetry" -Location "North Europe" -DefaultStorageAccountName "telemetry.blob.core.windows.
net" -DefaultStorageAccountKey "storage-account-key" -DefaultStorageContainerName "data" -ClusterSizeInNodes 4

此命令指示 HDInsight 群集使用的现有存储帐户和容器作为其文件系统的根目录。 这是它将如何访问所有遥测接收过程生成的数据。 您还可以选择多少个工人节点群集应根据数据量的使用,你需要多少并行度。

一旦群集启动并运行,您可以启用远程桌面访问。 这样做可以让其他用户登录到要启动交互式会话与标准 Hadoop 命令和工具的头节点上。 然而,它是要使用远程命令,利用 Windows PowerShell 启动地图减少、 蜂巢或猪作业快得多。

我用猪作业来计算平均温度值。 猪最初是在雅虎开发的。 它允许使用 Hadoop 重点分析大型数据集上更多的人,花更少的时间编写映射器和减速机的程序。 猪的脚本通常有三个阶段:

  1. 加载您想要操作的数据。
  2. 运行一系列的数据转换 (这翻译成一组映射器和减速机的任务)。
  3. 转储的结果的屏幕,或将结果存储在一个文件中。

下面的示例演示如何你通常实现这通过在探索性数据分析 (EDA) 阶段,与猪译员以交互方式运行脚本:

data = load '/telemetry*.csv' using PigStorage(',') as (id:chararray, did:chararray, temp:int, light:int, mic:int, timestamp:datetime);
data1 = group data by did;
data2 = foreach data1 generate group as did, COUNT(data), AVG(data.temp);
dump data2;

如果此脚本直接键入到猪解释器,它将显示一个表包含用于每个 DID 温度数据点的数量和平均测量的值。 正如您所看到的猪的语法是相当明确的。 清楚地分隔不同的数据操作步骤:

  • 负载的第一个语句用来从 CSV 文件,描述的名称和类型的输入域中加载数据。
  • 按 DID,或每个设备,然后被分组数据。
  • 结果数据集生成与像计数和平均聚合函数

一旦完成该脚本,可以自动执行此任务与 Windows PowerShell。 使用新 AzureHDInsightPigJob­定义 commandlet 来初始化一个猪工作与创建的脚本。 然后您可以使用启动 AzureHDInsightJob 和等待-AzureHD­InsightJob 开始作业,等待它的结论 (见图 7)。 然后可以使用 Get AzureHDInsightJobOutput 检索结果。

图 7 插入、 分析,并在 HDInsight 启动作业

$PigScript = "data = load '/telemetry*.csv' using PigStorage(',') as (id:chararray, did:chararray, temp:int, light:int, mic:int, timestamp:datetime);" +
"data1 = group data by did;" +
"data2 = foreach data1 generate group as did, COUNT(data), AVG(data.temp);" +
"dump data2;"
# Define a Pig job
$pigJobDefinition = New-AzureHDInsightPigJobDefinition -Query $PigScript
# Start the job
$pigJob = Start-AzureHDInsightJob -Cluster "hditelemetry" -JobDefinition $pigJobDefinition
# Wait for the job to finish
Wait-AzureHDInsightJob -Job $pigJob -WaitTimeoutInSeconds 3600
# Get the job results
Get-AzureHDInsightJobOutput -Cluster "hditelemetry" -JobId $pigJob.JobId –StandardOutput

在命令行控制台中显示的结果看起来像这样:

C:\> Get-AzureHDInsightJobOutput -Cluster "hditelemetry" -JobId $pigJob.JobId
(test,29091,24.0)
(49417795060,3942,30.08371385083714)

在这种情况下,有很多测试测量和覆盆子 Pi 的约 4,000 读数。 读数平均 30 度。

总结

Azure 服务总线是可靠和快速的方式,从的各种设备中收集数据。 为了存储和分析这些数据,您需要一个可靠的存储和分析引擎。 天青 HDInsight 摘要创建和维护 Hadoop 集群的这种程度的存储的过程。 它是一个高度可扩展的解决方案您可以配置和自动化使用工具 (如 Windows PowerShell 或 Azure Mac/Linux 命令行界面。

Thomas Conte 是为微软 Azure 平台的开发人员技术福音传教士 & 平台传福音 (DPE) 司。他的作用是促进获得技术为开发人员、 架构师和软件合作伙伴通过代码示例、 出版物和公共演讲。他致力于尽可能开放源代码世界上尽可能多的非 Microsoft 技术运行微软 Azure。跟着他在 twitter.com/tomconte

Bruno Terkaly 是微软开发者福音传教士。他的知识深度来源于多年来相关领域以及使用大量平台、语言、框架、SDK、库和 API 编写代码的经验。他花时间编写代码,写博客,给现场演示上构建基于云计算的应用程序,专门使用微软 Azure 平台。您可以阅读他的博客在 blogs.msdn.com/b/brunoterkaly

Ricardo Villalobos 斯 是具有超过 15 年的经验设计和创建应用程序的公司在多个行业的经验丰富的软件设计师。他从达拉斯大学工商管理持有不同的技术认证,以及硕士学位,为微软,帮助世界各地的公司要在微软 Azure 实施解决方案作为一个云建筑师在 DPE 全球范围内参与合作伙伴团队工作。您可以阅读他的博客在 blog.ricardovillalobos.com

衷心感谢以下 Microsoft 技术专家对本文的审阅:拉斐尔 Godinho 和耶利米达尔加尔