教學課程:在 Azure HDInsight 中建置 Apache Spark 機器學習應用程式

在本教學課程中,您將瞭解如何使用 Jupyter Notebook 來建置 適用於 Azure HDInsight 的 Apache Spark 機器學習應用程式。

MLlib 是 Spark 的可調整機器學習連結庫,其中包含常見的學習演算法和公用程式。 (分類、回歸、叢集、共同作業篩選和維度縮減。此外,基礎優化基本類型。)

在本教學課程中,您會了解如何:

  • 開發 Apache Spark 機器學習應用程式

必要條件

了解數據集

應用程式預設會使用所有叢集上可用的範例 HVAC.csv 數據。 檔案位於 \HdiSamples\HdiSamples\SensorSampleData\hvac。 數據顯示已安裝 HVAC 系統之部分建築物的目標溫度和實際溫度。 [系統] 數據行代表系統標識符,而 SystemAge 數據行代表 HVAC 系統在建築物中已就緒的年份。 您可以根據目標溫度、給定的系統標識碼和系統年齡,預測建築物會變熱或更冷。

Snapshot of data used for Spark machine learning example.

使用 Spark MLlib 開發 Spark 機器學習應用程式

此應用程式會使用 Spark ML 管線 來執行檔案分類。 ML 管線提供一組建置在 DataFrame 之上的高階 API。 DataFrame 可協助使用者建立和調整實用的機器學習管線。 在管線中,您會將檔分割成單字、將文字轉換成數值特徵向量,最後使用特徵向量和標籤來建置預測模型。 請執行下列步驟來建立應用程式。

  1. 使用 PySpark 核心建立 Jupyter Notebook。 如需指示,請參閱建立 Jupyter Notebook 檔案

  2. 匯入此案例所需的類型。 將下列代碼段貼到空白儲存格中,然後按 SHIFT + ENTER

    from pyspark.ml import Pipeline
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.feature import HashingTF, Tokenizer
    from pyspark.sql import Row
    
    import os
    import sys
    from pyspark.sql.types import *
    
    from pyspark.mllib.classification import LogisticRegressionWithLBFGS
    from pyspark.mllib.regression import LabeledPoint
    from numpy import array
    
  3. 載入資料 (hvac.csv),加以剖析,並使用它來定型模型。

    # Define a type called LabelDocument
    LabeledDocument = Row("BuildingID", "SystemInfo", "label")
    
    # Define a function that parses the raw CSV file and returns an object of type LabeledDocument
    def parseDocument(line):
        values = [str(x) for x in line.split(',')]
        if (values[3] > values[2]):
            hot = 1.0
        else:
            hot = 0.0
    
        textValue = str(values[4]) + " " + str(values[5])
    
        return LabeledDocument((values[6]), textValue, hot)
    
    # Load the raw HVAC.csv file, parse it using the function
    data = sc.textFile("/HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
    
    documents = data.filter(lambda s: "Date" not in s).map(parseDocument)
    training = documents.toDF()
    

    在代碼段中,您會定義一個函式,以比較實際溫度與目標溫度。 如果實際溫度更大,則建築物為熱度,以1.0表示。 否則,建築物是冷的,以0.0表示。

  4. 設定由三個階段組成的 Spark 機器學習管線: tokenizerhashingTFlr

    tokenizer = Tokenizer(inputCol="SystemInfo", outputCol="words")
    hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
    lr = LogisticRegression(maxIter=10, regParam=0.01)
    pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
    

    如需管線及其運作方式的詳細資訊,請參閱 Apache Spark 機器學習管線

  5. 將管線放入定型檔。

    model = pipeline.fit(training)
    
  6. 確認定型檔以檢查您的應用程式進度。

    training.show()
    

    輸出如下:

    +----------+----------+-----+
    |BuildingID|SystemInfo|label|
    +----------+----------+-----+
    |         4|     13 20|  0.0|
    |        17|      3 20|  0.0|
    |        18|     17 20|  1.0|
    |        15|      2 23|  0.0|
    |         3|      16 9|  1.0|
    |         4|     13 28|  0.0|
    |         2|     12 24|  0.0|
    |        16|     20 26|  1.0|
    |         9|      16 9|  1.0|
    |        12|       6 5|  0.0|
    |        15|     10 17|  1.0|
    |         7|      2 11|  0.0|
    |        15|      14 2|  1.0|
    |         6|       3 2|  0.0|
    |        20|     19 22|  0.0|
    |         8|     19 11|  0.0|
    |         6|      15 7|  0.0|
    |        13|      12 5|  0.0|
    |         4|      8 22|  0.0|
    |         7|      17 5|  0.0|
    +----------+----------+-----+
    

    比較輸出與原始 CSV 檔案。 例如,CSV 檔案的第一個數據列具有此數據:

    Output data snapshot for Spark machine learning example.

    請注意實際溫度小於建議建築物冷的目標溫度。 第一個數據列中卷標的0.0,這表示建置不是經常性。

  7. 準備數據集以針對執行定型的模型。 若要這樣做,您會傳遞系統標識碼和系統年齡(在定型輸出中表示為 SystemInfo )。 模型會預測具有該系統標識碼和系統年齡的建築物是否會變熱(以 1.0 表示)或冷卻器(以 0.0 表示)。

    # SystemInfo here is a combination of system ID followed by system age
    Document = Row("id", "SystemInfo")
    test = sc.parallelize([("1L", "20 25"),
                    ("2L", "4 15"),
                    ("3L", "16 9"),
                    ("4L", "9 22"),
                    ("5L", "17 10"),
                    ("6L", "7 22")]) \
        .map(lambda x: Document(*x)).toDF()
    
  8. 最後,對測試數據進行預測。

    # Make predictions on test documents and print columns of interest
    prediction = model.transform(test)
    selected = prediction.select("SystemInfo", "prediction", "probability")
    for row in selected.collect():
        print (row)
    

    輸出如下:

    Row(SystemInfo=u'20 25', prediction=1.0, probability=DenseVector([0.4999, 0.5001]))
    Row(SystemInfo=u'4 15', prediction=0.0, probability=DenseVector([0.5016, 0.4984]))
    Row(SystemInfo=u'16 9', prediction=1.0, probability=DenseVector([0.4785, 0.5215]))
    Row(SystemInfo=u'9 22', prediction=1.0, probability=DenseVector([0.4549, 0.5451]))
    Row(SystemInfo=u'17 10', prediction=1.0, probability=DenseVector([0.4925, 0.5075]))
    Row(SystemInfo=u'7 22', prediction=0.0, probability=DenseVector([0.5015, 0.4985]))
    

    觀察預測中的第一個數據列。 對於標識符為 20 且系統年齡為 25 年的 HVAC 系統,建築物為經常性(prediction=1.0)。 DenseVector (0.49999) 的第一個值對應至預測 0.0,而第二個值 (0.5001) 對應至預測 1.0。 在輸出中,即使第二個值略高,模型仍會顯示 prediction=1.0

  9. 關閉筆記本以釋放資源。 若要這麼做,請從 Notebook 的 [檔案] 功能表中,選取 [關閉並終止]。 此動作會關機並且關閉 Notebook。

使用 Anaconda scikit-learn 連結庫進行 Spark 機器學習

HDInsight 中的 Apache Spark 叢集包含 Anaconda 連結庫。 它也包含適用於機器學習的 scikit-learn 連結庫。 連結庫也包含各種數據集,可用來直接從 Jupyter Notebook 建置範例應用程式。 如需使用 scikit-learn 連結庫的範例,請參閱 https://scikit-learn.org/stable/auto_examples/index.html

清除資源

如果您不打算繼續使用此應用程式,請使用下列步驟刪除您所建立的叢集:

  1. 登入 Azure 入口網站

  2. 在頂端的 [搜尋] 方塊中,輸入 HDInsight

  3. 在 [服務] 底下,選取 [HDInsight 叢集]

  4. 在顯示的 HDInsight 叢集清單中,選取 您為此教學課程建立的叢集旁的 ...

  5. 選取 [刪除]。 選取 [是]

Azure portal deletes an HDInsight cluster.

下一步

在本教學課程中,您已瞭解如何使用 Jupyter Notebook 來建置適用於 Azure HDInsight 的 Apache Spark 機器學習應用程式。 前進到下一個教學課程,瞭解如何使用 IntelliJ IDEA for Spark 作業。