共用方式為


使用 Apache Spark MLlib 建置機器學習模型

在本文中,您將瞭解如何使用 Apache Spark MLlib 建立機器學習應用程式,以處理 Azure 開放數據集上的簡單預測性分析。 Spark 提供內建的機器學習連結庫。 此範例會透過羅吉斯回歸使用 分類

核心 SparkML 和 MLlib Spark 連結庫提供許多適用於機器學習工作的公用程式。 這些公用程式適用於:

  • 分類
  • 叢集
  • 假設測試和計算範例統計資料
  • 迴歸
  • 奇異值分解 (SVD) 和主體元件分析 (PCA)
  • 主題模型化

了解分類和羅吉斯迴歸

分類是熱門的機器學習工作,其牽涉到將輸入數據排序為類別。 分類演算法應該瞭解如何將標籤指派給提供的輸入數據。 例如,機器學習演算法可以接受股票資訊作為輸入,並將股票分成兩個類別:您應該出售的股票,以及您應該保留的股票。

羅吉斯回歸演算法適用於分類。 Spark 羅吉斯回歸 API 適用於 將輸入數據二元分類 成兩個群組的其中一個。 如需羅吉斯回歸的詳細資訊,請參閱 維琪百科

羅吉斯回歸會產生 羅吉斯函式 ,可預測輸入向量屬於一個群組或其他群組的機率。

NYC 計程車數據的預測性分析範例

首先,安裝 azureml-opendatasets。 數據可透過 Azure 開放資料集 資源取得。 此數據集子集裝載黃色計程車車程的相關信息,包括開始時間、結束時間、開始位置、結束位置、車程成本和其他屬性。

%pip install azureml-opendatasets

本文的其餘部分會依賴 Apache Spark 先對 NYC 計程車車程小費數據執行一些分析,然後開發模型來預測特定車程是否包含小費。

建立 Apache Spark 機器學習模型

  1. 建立 PySpark 筆記本。 如需詳細資訊,請流覽 建立筆記本

  2. 匯入此筆記本所需的類型。

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. 我們將使用 MLflow 來追蹤機器學習實驗和對應的執行。 如果已啟用Microsoft網狀架構自動記錄,系統會自動擷取對應的計量和參數。

    import mlflow
    

建構輸入DataFrame

此範例會將數據載入 Pandas 資料框架,然後將它轉換成 Apache Spark 資料框架。 在該格式中,我們可以套用其他 Apache Spark 作業來清除和篩選數據集。

  1. 將這些行貼到新的數據格中,然後執行它們以建立Spark DataFrame。 此步驟會透過開放式數據集 API 擷取數據。 我們可以篩選此數據,以檢查特定數據視窗。 程式代碼範例會使用 start_dateend_date 來套用傳回單一月份數據的篩選。

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. 此程式代碼會將數據集減少到大約 10,000 個數據列。 為了加速開發和定型,程式代碼會立即向下取樣我們的數據集。

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. 我們想要使用內 display() 建命令來查看數據。 使用此命令,我們可以輕鬆地檢視數據範例,或以圖形方式探索數據的趨勢。

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

準備資料

數據準備是機器學習程式中的重要步驟。 它牽涉到清理、轉換及組織原始數據,使其適合分析和模型化。 在此程式代碼範例中,您會執行數個數據準備步驟:

  • 篩選數據集以移除極端值和不正確的值
  • 拿掉模型定型不需要的數據行
  • 從原始數據建立新的數據行
  • 產生標籤,以判斷指定的計程車車程是否牽涉到小費
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

接下來,對數據進行第二次傳遞,以新增最終功能。

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

建立羅吉斯迴歸模型

最終工作會將標示的數據轉換成羅吉斯回歸可以處理的格式。 羅吉斯回歸演算法的輸入必須具有 標籤/特徵向量組 結構,其中 特徵向量是代表輸入點的數位向量

根據最終工作需求,我們必須將類別數據行轉換成數位。 具體而言,我們必須將 trafficTimeBinsweekdayString 數據行轉換成整數表示法。 有許多選項可用來處理這項需求。 此範例牽涉到 OneHotEncoder 方法:

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

此動作會產生具有適當格式之所有數據行的新 DataFrame,以定型模型。

定型羅吉斯回歸模型

第一個工作會將數據集分割成定型集,以及測試或驗證集。

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

一旦我們有兩個 DataFrame,我們必須建立模型公式,並針對定型 DataFrame 執行它。 然後,我們可以針對測試數據框架進行驗證。 試驗不同版本的模型公式,以查看不同組合的效果。

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

儲存格會輸出:

Area under ROC = 0.9749430523917996

建立預測的視覺表示法

我們現在可以建置最終視覺效果來解譯模型結果。 ROC 曲線當然可以呈現結果。

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

顯示提示模型中羅吉斯回歸之ROC曲線的圖表。