2016 年 4 月

第 31 卷,第 4 期

此文章由机器翻译。

大数据 - Spark 中的数据处理和机器学习

通过 Eugene Chuvyrov

下面是您一个问题 ︰ 从 Microsoft Dryad 项目借用很大程度的框架的名称是什么变得 2015年的最流行的开放源代码项目,并且还设置对 100 TB 的数据进行排序以只是 23 分钟为单位的数据处理记录? 回答是: Apache Spark。

在本文中,我将讨论的速度和 Spark 以及它为何有清除当前入选方在处理大数据和分析空间中的受欢迎程度。使用 Microsoft Azure 订阅,我将介绍解决使用 Spark,机器学习 (ML) 问题的示例并将它们放数据科学世界上从软件工程一小步。但我深入数据分析和 ML 之前,务必要谈一谈有关 Spark 框架的各种组件以及与 Azure Spark 的关系。

Spark 组件

Spark framework 值,它允许在商用计算机群集上的大数据工作负荷的处理。Spark Core 是,处理可能的、 打包数据查询和无缝地分布在各群集使其成为的引擎。除了 Spark Core 到 Spark 框架的一些其他组件和每个这些组件都是适用于特定的问题域。很可能,将永远不需要使用任何这些组件,如果您感兴趣,只是在操作和报告大型数据的工作负荷。但是,在本文中,我将使用 Spark MLLib 来构建会相当准确地告诉您的 ML 模型"猜测"已编写了手工 (大量对此进行详细更高版本) 的数字。Spark framework 的其他组件可以处理的流式处理数据 (Spark Streaming),操作的图形和著名 PageRank 算法 (GraphX) 的计算和运行分布式数据 (Spark SQL) 之上的 SQL 查询。

在 Azure 上运行 Spark

有相当多的选项中使用从托管的服务实验使用 Spark, databricks.com (该公司创建并继续增强了 Spark),对设置 Docker 容器和抓取于从 GitHub 获取整个源代码存储库已预安装的 Spark 图像从 Docker Hub (github.com/apache/spark) 和自行构建产品。但由于这篇文章是关于 Azure 中,我想向您展示如何在 Azure 上创建 Spark 群集。此选项非常有用的原因是因为 Azure 提供了部署到 Azure 上的 Spark 的企业级保证计算群集。Azure 为 Microsoft 支持的 99.9 %sla 提供所有 Spark 群集,并且还提供全天候企业支持和监视群集。群集部署的易用性以及大量的周围绘制添姿加 Azure 公告结合在 2016年期间这些保证生成会议使 Microsoft 云的极好环境可用于您大数据的作业。

Pixie 灰尘

使 Spark 今天之间数据科学家如此受欢迎的机密是两个方面 ︰ 速度快,最好到程序在此框架的乐趣。首先,让我们看一下是什么使 Spark 比它前面的框架快得多。

Spark 的前身,Hadoop MapReduce 是大数据分析空间的作业主力,自从 Doug 剪切和 Mike Cafarella 在 2005 年成立共同 Apache Hadoop 项目。MapReduce 工具以前仅在 Google 数据中心内可用和已完全关闭发出。Hadoop 很好地满足运行批处理分析处理在群集上,但遭受极端刚性。映射和化简操作合在一起; 将第一次完成映射任务中,然后完成化简任务。复杂的任务必须组合多个映射和减少步骤。此外,每个任务必须分解成了一个映射,以便减少操作。花费了很长时间才能运行这些顺序操作,但程序时非常枯燥。换而言之,这不是实时分析。

与此相反,Spark 框架将智能应用于手头的数据分析任务。构造定向非循环图形 (DAG) 的计划任务,非常类似于 SQL Server 如何构造查询执行计划之前执行数据检索或处理操作之前执行它。Dag 自身提供了有关将对数据执行的转换的信息和 Spark 是能够以智能方式组合多个这些转换为一个单独的步骤,然后一次性执行转换 — 了解最初率先开发的项目 Dryad 地区的 Microsoft Research。

另外,Spark 是能够智能地将数据保持在内存通过调用弹性分布式数据集 (RDDs) 的构造,稍后我将介绍其中 — 和在 Dag 之间进行共享。此共享 Dag 允许作业之间的数据比不进行该优化更快地完成。图 1 显示"hello world"的数据科学空间 DAG — 在给定的文本文件中单词的计数。请注意了几种操作,即中读取文本文件、 flatMap 和映射中,组合成一个单独的步骤,以便更快的执行。下面的代码演示实际 Scala 代码 (因为 Spark 写在 Scala) 执行的字数 (即使您从未见过的一行 Scala 代码之前,我敢打赌,您可立即了解如何在 Spark 中实现的字数) ︰

val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word,1))  
  .reduceByKey((a, b) => a + b)

定向非循环图形 (DAG) 的字数统计
图 1 定向非循环图形 (DAG) 的字数统计

第二个 Spark 很受欢迎的原因是因为其编程模型。在 Spark (Scala 代码) 中实现字数统计是而不是实现在 Hadoop MapReduce 单词计数的简单得多。除了 Scala,您可以创建 Spark 应用程序在 Java 和 Python,这是我在本文中使用的语言。在之前 Spark,与 Hadoop MapReduce 数据科学家/程序员必须使用非自然范例的复杂任务拆分成一组映射和化简操作。与 Spark,功能性编程的方法使用 LINQ 和 lambda 函数的所有.NET 开发人员熟悉用于转换和分析数据。

稍后,您将看到是多么简单,但功能强大、 编程模型是的 Spark。但之前您可以编写出色功能代码同样的工作也在大型和小型数据集,你需要创建分布式的计算机将具有所有必需的组件的安装并准备好接受提交给它的编程任务的 Spark 群集。Spark 群集的创建是绝对令人望而生畏如果不得不创建和配置群集工具栏。幸运的是,Microsoft 云能够完成以下几个单击操作中进行设置。在下一部分中,我将演示您只是如何做到这一点。

部署 Spark 群集

现在,让我们在 Azure 上创建 HDInsight 群集。"HDInsight"看作是一个包含 Hadoop 和 Spark 的技术; 的涵盖性术语Hadoop HDInsight 和 Spark HDInsight 是 Azure 上托管的大数据服务的两个示例。

若要设置 Spark 群集时,登录到 Azure 门户 (portal.azure.com),然后单击浏览新建 |数据 + 分析 |HDInsight |创建。填写 HDInsight 群集属性,指定名称、 群集类型 = Spark,将群集操作系统设为 Linux,(因为在 Linux 上开发 Spark) 并将保持不变,版本字段中所示 图 2。完成所需的信息,包括指定的凭据登录到群集和存储帐户/容器名称的其余部分。然后按创建按钮。创建一个群集的过程需要 15 到 30 分钟。

在 Azure 中创建 Spark 群集
图 2 在 Azure 中创建 Spark 群集

在创建过程完成后,您必须磁贴在 Azure 门户中表示新创建的 HDInsight 群集。最后,您可以深入了解代码 ! 在开始之前的代码,但是,让我们检查的编程环境和语言可供你使用 Spark。

有若干方法来在 Spark 环境中的程序。首先,您可以访问通过 Spark shell,直观地足够 spark shell 命令中,所述在 bit.ly/1ON5Vy4, ,其中,此后建立到 Spark 群集头节点的 SSH 会话时,可以 REPL 类似的方式编写 Scala 程序,并提交编程构造一次一个地 (不要担心这句话听起来像在外语言编写,只需直接进入选项 3)。其次,您可以在 Spark 运行完整的 Scala 应用程序 (将它们通过提交 spark-submit 命令在 bit.ly/1fqgZHY)。最后,还有选项,可使用 Jupyter 笔记本 (jupyter.org) 在 Spark 基础之上。如果您不熟悉 Jupyter 项目,Jupyter 笔记本提供可视化、 基于 Web 的交互式环境,在其中运行的数据分析脚本。这些笔记本是我的数据分析的首选的方法,我相信,一旦您尝试它们,它们就会成为您首选的 Spark 上, 进行过编程方式。Azure HDInsight 为您轻松地开始使用它安装在群集之上的 Jupyter 笔记本环境。

若要访问 Jupyter 笔记本,请单击该群集仪表板磁贴中所示 图 3, ,然后单击滑出窗口上的 Jupyter 笔记本磁贴。在使用在群集创建期间指定的凭据进行登录,您应该看到 Jupyter 环境已准备好接受新建或编辑旧笔记本。现在,单击右上角中的新按钮并选择 Python 2。为什么 Python 2? 因为尽管 Spark 本身编写 Scala 中并在 Scala 中完成了大量编程的 Spark,此外还有 Python 桥通过 Pyspark 可用。顺便说一下,还有 raging 辩论是否应在 Scala 或 Python 代码。每种语言有明显的好处,与 Scala 正在可能更快,而 Python 它或许更具表现力和最常用的语言数据科学 (请参阅 bit.ly/1WTSemP)。这样就可以在 Spark 群集的基础上进行编程时将表达,但简洁的 Python 配合使用。Python 也是我首选的语言进行数据分析 (连同 R),我可以利用所有的功能强大 Python 库我习惯。

访问通过群集仪表板的 Azure HDInsight 中的 Jupyter 笔记本
图 3 访问通过群集仪表板的 Azure HDInsight 中的 Jupyter 笔记本

最后,即可深入和执行内部 Jupyter 笔记本的 ML 和数据分析任务。

机器学习使用 Spark

为了说明 ML Spark 中的,我将 ML 中使用的典型问题的窗体中的"小"数据示例 — 识别手写的数字,如出现在信封上的邮政编码的那些。虽然此数据集并不通过任何方式大,此解决方案的优点在于,数据应增加一个千位折叠,您可以向群集添加更多计算机,并且仍在合理的时间内完成的数据分析。此处所示的代码对任何更改都将有必要 — Spark framework 将负责分发工作负荷对群集中的各台计算机。将使用该数据文件也是一个传统 — 它通常称为 MNIST 数据集,并且它包含 50000 手写的数字,以便您进行分析。尽管有许多可联机获得 MNIST 数据集,Kaggle 网站为您提供方便地访问这些数据 (请参阅 bit.ly/1QJN20c)。

同时也请注意,如果你不熟悉 kaggle.com, ,它承载 ML 竞赛联机,其中从世界各地的几乎 500000 数据科学家争夺货币奖品或机会访谈,网址顶级 ML 公司之一。我已完成在五个 Kaggle 竞赛中,并且如果您是竞争的人,它会极能上瘾体验。和在 Azure 上运行 Kaggle 站点本身 !

让我们花点时间了解 train.csv 的内容。该文件的每一行表示的按像素表示形式包含手写的数字,如中所示的 28 x 28 映像 图 4 (图所示的放大表示)。第一列包含数字实际上是什么;其余列包含像素强度,从 0 到 255 的所有 784 像素 (28 x 28)。

放大的数字表示 MNIST 数据集中的"7"示例
图 4 放大示例"7"MNIST 数据集中所表示的数字

新的 Jupyter 笔记本打开后,将以下代码粘贴到第一个单元格 ︰

from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import RandomForest
import time
sc = SparkContext(appName="MNISTDigitsDT")
#TODO: provide your own path to the train.csv in the line(s) below, 
# you can use Azure Storage 
#Explorer to upload files into the cloud and to read their full path
fileNameTrain = 'wasb://datasets@chuvyrov.blob.core.windows.net/trainingsample.csv'
fileNameTest = 'wasb://datasets@chuvyrov.blob.core.windows.net/validationsample.csv'
mnist_train = sc.textFile(fileNameTrain)
mnist_test = sc.textFile(fileNameTest)

此代码将导入必需的库,用于执行 ML 在 Spark,然后指定将用于定型集和测试模型的数据文件的位置 (请注意这些文件应位于您的存储帐户,可从 Microsoft 云中的 Spark wasb 通过访问: / / 引用)。最后,最后两行是 rdd 可以创建从文本文件的位置。RDDs 是 Spark 神秘 — — 它们分布在数据结构,但其实现的复杂程度通常隐藏的程序员/用户。此外,这些 RDDs 惰式计算,并将持久化,因此,如果您需要再次使用该 RDD,所以可立即用于而无需重新 computation/检索。当操作 RDDs 时,就会引发 Dag 的生成任务和执行的分阶段在 Spark 群集中,如前面部分接触时。

若要执行粘贴的代码的 Jupyter 单元格内按 Shift + Enter。没有消息应该是好消息 (如果没有获得一条错误消息,您是很好),现在,您应具有 RDDs 可用于查询和操作。这些 RDDs 包含以逗号分隔的文本行的时刻,因为这是 MNIST 数据的是如何。

马上要做的下一件事是定义将帮助你将这些文本行转换为自定义的 LabeledPoint 对象的简单函数。此对象是必需的将用于定型和进行预测的 ML 算法。简单地说,此对象包含的"功能"数组 (有时,很方便地将数据库表中的列视为功能) 或正在尝试学习预测单个数据点,并为其"标签"或值的特征。如果这听起来有点不清楚稍后再试,或许看一看 MNIST train.csv 文件可以帮助。您会注意到 train.csv 中的每一行中的所有其他列具有第一列和一组数字,从 0 到 255 之间的数字。第一列称为"label",因为我们想要了解如何进行预测数。所有其他列"功能",并且在采取所有内容的功能称为"特征向量"。 这些功能是该数字图片中的每个数字化像素的强度,0 表示黑色和 255 white、 与多个值之间。图片是所有 28 像素高和 28 个像素宽,组成 784 包含像素强度 train.csv 文件 (28 x 28 = 784) 中的列。

复制并粘贴到新的 Jupyter 笔记本的单元格的以下函数 ︰

def parsePoint(line):
  #Parse a line of text into an MLlib LabeledPoint object
  values = line.split(',')
  values = [0 if e == '' else int(e) for e in values]
  return LabeledPoint(int(values[0]), values[1:])

按 Shift + Enter 执行代码。现在,您已定义了 parsePoint 函数,它已进行了评估通过 Spark,可为您要用于你中读取的数据集。此函数接受以逗号分隔的文本的单行、 将其拆分为单独的值并将这些值转换为 LabeledPoint 对象。

接下来,执行一些基本数据清理以准备学习算法中;遗憾的是,学习算法尚未智能程度足以知道哪些部分的数据具有预测值。因此,跳过使用借用成为黑客对 train.csv 文件的标头 stackoverflow.com; 然后,将打印结果 rdd 可以以确保它处于预期它处于状态的第一行 ︰

#skip header
header = mnist_train.first() #extract header
mnist_train = mnist_train.filter(lambda x:x !=header) 
#filter out header using a lambda
print mnist_train.first()

现在,您已准备好将应用在下一节中.map(parsePoint) 运算符,可将 rdd 可以转换为格式准备就绪中的 Spark ML 算法功能性编程的方法。此转换将实质上是分析 mnist_train RDD 内的每一行,并将该 rdd 可以转换为一组 LabeledPoint 对象。

RDDs 和交互的能力 ︰ 主要基础之一 Spark 电源

有几个重要的问题。首先,您正在使用分布在群集的计算机 (RDD),一种数据结构,但从您几乎完全隐藏的分布式计算的复杂性。要将功能转换应用到 RDD,和 Spark 优化所有处理和繁重的任务跨在后台的可用计算机群集为您 ︰

labeledPoints = mnist_train.map(parsePoint)
#Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = labeledPoints.randomSplit([0.7, 0.3])
print mnist_train.first()

以交互方式查询大型数据集的能力 (使用 print 语句) 的最后一行可能看起来微不足道,尽管极其强大,几乎没有来自之前 Spark 的大型数据集的世界。在您的数据科学和较大的数据操作项目,它将是非常有用的技术,以验证正在应用您的想法的转换实际上正在将应用。此功能强大的交互式处理尚 Spark 相比其他大数据处理框架,另一个优势。

此外请注意数据拆分到训练和测试数据集使用 randomSplit 函数。没有创建 ML 模型使用 trainingData RDD 中的数据,也要使用 testData 中的数据对模型进行测试的想法 RDD,当您稍后将看到在代码中。

您现在已准备好将 ML 算法应用于分布式数据集,您刚才创建 (mnist_train)。作为快速查看,请记住,在 ML 问题几乎在所有情况下有两个不同组发生的步骤 ︰ 首先,为具有已知的结论; 使用已知的数据集模型的定型第二,可基于创建,或在第一步中学习的模型的预测。在下面的代码中,正在使用 RandomForest 算法中的 Spark 机器学习框架 (Spark MLLib) 可用来训练该模型。RandomForest 是 Spark MLLib 中可用的多种分布式算法之一,它是功能最强大之一。将以下内容粘贴到新的单元格 ︰

depthLevel = 4
treeLevel = 3
#start timer
start_time = time.time()
#this is building a model using the Random Forest algorithm from Spark MLLib
model = RandomForest.trainClassifier(trainingData, numClasses=10, 
  categoricalFeaturesInfo={},
  numTrees=treeLevel, featureSubsetStrategy="auto",
  impurity='gini', maxDepth=depthLevel, maxBins=32) 
print("Training time --- %s seconds ---" % (time.time() - start_time))

请注意此代码以测量算法的执行时间的启动方式然后对于某些所需的 RandomForest 算法参数即 maxDepth 和 numTrees 设置初始值。通过按 Shift + Enter 执行该代码。您可能想知道这 RandomForest 件事什么以及它如何工作? RandomForest 是较高级别的工作原理是通过随机选择一个变量来根据拆分决策树构造的数据的许多决策树的 ML 算法 (即,一个树很简单,内容为"如果右下角的像素设置为白色,则可能是否。 第 2"),然后创建在轮询构造的所有树后最終決定。幸运的是,没有已算法的分布式的版本可以在 Spark。但是,不会阻止您从编写自己的算法决定要执行此操作;分布式的 k 最近的邻居 (kNN) 算法仍不存在于 Spark framework。

现在,回到 MNIST 数字识别任务。如果您有类似的环境来挖掘,您应获取定型算法 21 秒的间隔的执行时间。这意味着,21 秒后,你已了解有关 — 使用 RandomForest 算法 — 您可以使用它来预测的数字您要查看给定已分析的功能的模型。现在您已准备好进行 ML 任务的最重要的部分 — — 使您已创建基于模型的预测。此外,您也要准备评估这些预测的准确性中所示 图 5

图 5 评估预测的准确性

# Evaluate model on test instances and compute test error
1 #start timer
2 start_time = time.time()
3 #make predictions using the Machine Learning created prior
4 predictions = model.predict(testData.map(lambda x: x.features))
5 #validate predictions using the training set
6 labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
7 testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() /
8   float(testData.count())
9 print('Test Error = ' + str(testErr))
10 print("Prediction time --- %s seconds ---" % (time.time() - start_time))
11 #print('Learned classification tree model:')
12 #print(model.toDebugString())

请注意的第 4 行上的 model.predict 构造 图 5。这是这种线条使实际基于已在前面生成的模型的预测。行上进行预测 (5-7 行) 之后, 您使用的临时的策略与一些基本的数据操作 — 通过 zip 函数,您可供下载的一部分作为您的实际值的预测的值。然后,您只需计算给定此数据的正确预测的百分比和打印执行时间。

此初始分类与如此之高的错误的结果是略有带来不便 (即,对待您的模型工作根本错误率接近 43%?)。您可以提高使用称为"网格 hyperparameter 搜索"构建出该模型,它立即并最终集中于为您提供最佳的整体性能的 hyperparameter 值测试时尝试的一系列值的概念模型。换而言之,请尝试大量的系统化试验来确定哪些模型参数具有最佳的预测值。

超参数,您将应用到的网格搜索也将被 numTrees 和 maxDepth;将代码粘贴到所示 图 6 到新笔记本中的单元格。

图 6 迭代"网格中搜索"在 Spark 中 RandomForest 算法的最佳参数

1 bestModel = None
2 bestTestErr = 100
3 #Define a range of hyperparameters to try
4 maxDepths = range(4,10)
5 maxTrees = range(3,10)
6
7 #Loop over parameters for depth and tree level(s)
8 for depthLevel in maxDepths:
9 for treeLevel in maxTrees:
10       
11   #start timer
12   start_time = time.time()
13   #Train RandomForest machine learning classifier
14   model = RandomForest.trainClassifier(trainingData,
15     numClasses=10, categoricalFeaturesInfo={},
16     numTrees=treeLevel, featureSubsetStrategy="auto",
17     impurity='gini', maxDepth=depthLevel, maxBins=32)       
18              
19   #Make predictions using the model created above
20   predictions = model.predict(testData.map(lambda x: x.features))
21   #Join predictions with actual values from the data and determine the error rate
22   labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
23   testErr = labelsAndPredictions.filter(lambda (v, p): v != p)
24     .count() / float(testData.count())
25       
26   #Print information about the model as we proceed with each iteration of the loop
27   print ('\maxDepth = {0:.1f}, trees = {1:.1f}: trainErr = {2:.5f}'
28          .format(depthLevel, treeLevel, testErr))
29   print("Prediction time --- %s seconds ---" % (time.time() - start_time))
30   if (testErr < bestTestErr):
31       bestModel = model
32       bestTestErr = testErr
33           
34 print ('Best Test Error: = {0:.3f}\n'.format(bestTestErr))

请注意如何在多行 8-14 您扫描一套 numTrees 参数为 10,3 从随机林算法创建模型和评估其性能。接下来,您可以在行 30-32,如果它为您提供更好的结果比您尝试过,之前模型中的任何捕获型号或否则关闭该模型。让此循环一些时间来运行;在运行结束时,您应该看到不应超过 10%的预测错误值。

总结

时,我将编写这篇文章,我的主要目标是以通过示例显示使用进行编程 Spark,尤其是多么容易如果您热衷于函数式编程和 Azure。我第二个目的是演示如何能够在大型和小型 Spark MLLib 库的帮助下的数据集上执行 ML 任务。与此同时,我想解释为什么 Spark 加快执行速度上比其早期版本的分布式数据和共享的琐碎内容,我们在今天的分布式的数据分析空间中的到达方式的位。

Microsoft 正在全力未来的大数据,ML,分析和具体而言,Spark。这是正确的时间,若要了解这些技术来充分利用提供的 Microsoft 云的 hyperscale 计算和数据分析的机会。Azure 使获取使用 Spark 快速、 轻松并准备转到向上扩展到大型数据集均由仅期待最佳的企业云提供商的服务级别保证。

现在,您已创建 ML 模型并对已知的数据进行预测,也可以不包括其真正的标签; 对数据进行预测也就是说,对从该 test.csv 文件 Kaggle.com。然后,您可以使提交到 Kaggle.com 作为数字识别器竞争对手的产品在该平台上的一部分。所有这篇文章的代码要写入提交文件的代码位于 GitHub.com/echuvyrov/SparkOnAzure。我将非常乐于了解您得到的分数。发送电子邮件问题、 意见、 建议和 ML 成就 eugene.chuvyrov@microsoft.com


Eugene Chuvyrov是云解决方案架构师,协助 Microsoft 技术推广和开发团队,他可以帮助各地 San Francisco 海湾地区的公司中充分利用 Microsoft 云所提供的超小数位数。尽管他当前关注高缩放性数据的合作伙伴,他尚未作为一名软件工程师忘记其根,并且喜欢用 C#、 JavaScript 和 Python 编写云就绪代码。在 Twitter 上关注他 ︰ @EugeneChuvyrov

感谢以下的微软技术专家对本文的审阅: Bruno Terkaly
Bruno Terkaly 是 microsoft 的首席软件工程师,以启用跨设备开发的业界领先的应用程序和服务的目标。他负责从技术支持的角度,在美国以及全世界推动顶级云和移动的机会。他通过在 ISV 评估、开发和部署期间提供结构性指导和深入参与技术问题,帮助合作伙伴将他们的应用程序投向市场。Terkaly 还与云和移动工程组提供反馈并影响路线图紧密合作。