共用方式為


教學課程:使用 Apache Spark MLlib 和 Azure Synapse Analytics 建置機器學習應用程式

在本文中,您將瞭解如何使用 Apache Spark MLlib 來建立機器學習應用程式,以在 Azure 開放式資料集上執行簡單的預測分析。 Spark 提供內建的機器學習程式庫。 此範例使用透過邏輯迴歸進行 分類

SparkML 和 MLlib 是核心 Spark 程式庫,提供許多適用於機器學習工作的公用程式,包括適用於下列用途的公用程式:

  • Classification
  • Regression
  • Clustering
  • 主題建模
  • 奇異值分解 (SVD) 和主成分分析 (PCA)
  • 假設檢定和計算樣本統計量

瞭解分類和邏輯迴歸

分類是一種流行的機器學習任務,是將輸入資料分類的過程。 分類演算法的工作是弄清楚如何將 標籤 指派給您提供的輸入資料。 例如,您可以考慮一種接受股票資訊作為輸入的機器學習演算法,並將股票分為兩類:您應該出售的股票和您應該保留的股票。

邏輯迴歸 是一種可用於分類的演算法。 Spark 的邏輯迴歸 API 對於 二進位分類或將輸入資料分類為兩組之一非常有用。 如需邏輯迴歸的詳細資訊,請參閱 維基百科

總之,邏輯迴歸過程會產生一個 邏輯函數 ,您可以使用它來預測輸入向量屬於一個組或另一個組的機率。

紐約市計程車資料的預測分析範例

在此範例中,您會使用 Spark 對來自紐約的計程車行程小費資料執行一些預測分析。 資料可透過 Azure 開放資料集取得。 資料集的這個子集包含有關黃色計程車行程的資訊,包括有關每次行程、開始和結束時間和位置、成本和其他有趣屬性的資訊。

這很重要

從其儲存位置提取此資料可能會產生額外費用。

在下列步驟中,您會開發一個模型來預測特定行程是否包含小費。

建立 Apache Spark 機器學習模型

  1. 使用 PySpark 核心建立筆記本。 如需指示,請參閱 建立筆記本

  2. 匯入此應用程式所需的類型。 將下列程式碼複製並貼到空白儲存格中,然後按 Shift+Enter。 或使用程式碼左邊的藍色播放圖示來執行儲存格。

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    

    由於 PySpark kernel,您不需要明確建立任何上下文。 當您執行第一個程式碼單元時,Spark 上下文將自動建立。

建構這個輸入 DataFrame

因為原始資料是 Parquet 格式,所以您可以使用 Spark 內容,將檔案直接提取到記憶體中做為 DataFrame。 雖然下列步驟中的程式碼會使用預設選項,但視需要,可以強制對應資料類型和其他結構描述屬性。

  1. 執行下列行,將程式碼貼到新的儲存格中,以建立 Spark DataFrame。 此步驟會透過開放資料集 API 擷取資料。 提取所有這些資料會產生大約 1.5 億列。

    視無伺服器 Apache Spark 集區的大小而定,原始資料可能太大或需要太多時間來操作。 您可以將此資料篩選為較小的量。 下列程式碼範例使用 start_dateend_date 來套用傳回單一月資料的篩選器。

    from azureml.opendatasets import NycTlcYellow
    
    from datetime import datetime
    from dateutil import parser
    
    end_date = parser.parse('2018-05-08 00:00:00')
    start_date = parser.parse('2018-05-01 00:00:00')
    
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    filtered_df = spark.createDataFrame(nyc_tlc.to_pandas_dataframe())
    
    
  2. 簡單過濾的缺點是,從統計的角度來看,它可能會在數據中引入偏差。 另一種方法是使用 Spark 內建的取樣。

    下列程式碼會在上述程式碼之後套用時,將資料集減少至大約 2,000 列。 您可以使用此取樣步驟來取代簡式篩選器,或與簡式篩選器結合使用。

    # To make development easier, faster, and less expensive, downsample for now
    sampled_taxi_df = filtered_df.sample(True, 0.001, seed=1234)
    
  3. 現在可以查看數據以查看讀取的內容。 通常最好使用子集而不是完整集來檢閱資料,具體取決於資料集的大小。

    下列程式碼提供兩種檢視資料的方式。 第一種方式是基本的。 第二種方式提供更豐富的網格體驗,以及以圖形方式視覺化資料的能力。

    #sampled_taxi_df.show(5)
    display(sampled_taxi_df)
    
  4. 根據所產生的資料集大小,以及您實驗或執行筆記本多次的需求而定,您可能想要在工作區中的本機快取資料集。 執行明確快取的方法有三種:

    • 將 DataFrame 在本機儲存為檔案。
    • 將 DataFrame 儲存為暫存資料表或檢視。
    • 將 DataFrame 儲存為永久資料表。

下列程式碼範例包含前兩種方法。

建立暫存資料表或檢視表會提供不同的資料存取路徑,但它只會在 Spark 實例會話期間持續。

sampled_taxi_df.createOrReplaceTempView("nytaxi")

準備資料

原始形式的資料通常不適合直接傳遞至模型。 您必須對資料執行一系列動作,才能讓資料進入模型可以使用資料的狀態。

在下列程式碼中,您會執行四類作業:

  • 透過篩選移除異常值或不正確的值。
  • 對不需要的列進行移除。
  • 建立衍生自未經處理資料的新資料行,讓模型運作更有效率。 這項作業有時稱為特徵化。
  • 標籤。 因為您正在進行二元分類(給定的旅行是否有小費),所以需要將小費金額轉換為 0 或 1 值。
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                                , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                                , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                                , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                                , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                                , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                                )\
                        .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                                & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                                & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                                & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                                & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                                & (sampled_taxi_df.rateCodeId <= 5)
                                & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                                )

然後,您對資料進行第二次傳遞以新增最終特徵。

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

建立邏輯迴歸模型

最後一項任務是將標記的資料轉換為可以透過邏輯迴歸進行分析的格式。 邏輯迴歸演算法的輸入必須是一組 標籤/特徵向量對,其中 特徵向量 是代表輸入點的數字向量。

因此,您需要將分類列轉換為數字。 具體來說,您需要將 和 trafficTimeBins 列轉換為weekdayString整數表示。 有多種方法可以執行轉換。 下列範例採用 OneHotEncoder 常見的方法。

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

此動作會產生一個新的 DataFrame,其中包含所有資料行的正確格式,以定型模型。

訓練邏輯迴歸模型

第一項任務是將資料集拆分為訓練集和測試或驗證集。 這裡的分裂是任意的。 嘗試不同的分割設定,看看它們是否會影響模型。

# Decide on the split between training and testing data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

現在有兩個資料框架,下一個工作是建立模型公式,然後針對定型資料框架執行。 然後,您可以針對測試 DataFrame 進行驗證。 嘗試不同版本的模型公式,看看不同組合的影響。

備註

若要儲存模型,請將 儲存體 Blob 資料參與者 角色指派給 Azure SQL 資料庫伺服器的資源層級。 如需詳細步驟,請參閱使用 Azure 入口網站指派 Azure 角色。 只有具有擁有者權限的成員才能執行此步驟。

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Saving the model is optional, but it's another form of inter-session cache
datestamp = datetime.now().strftime('%m-%d-%Y-%s')
fileName = "lrModel_" + datestamp
logRegDirfilename = fileName
lrModel.save(logRegDirfilename)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

此儲存格的輸出為:

Area under ROC = 0.9779470729751403

建立預測的視覺化表示法

您現在可以建構最終視覺化,以協助您推斷此測試的結果。 ROC 曲線是檢閱結果的一種方式。

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

顯示尖端模型中邏輯迴歸的 ROC 曲線的圖表。

關閉 Spark 執行個體

執行完應用程式之後,請關閉筆記本,以關閉索引標籤來釋放資源。或從筆記本底部的狀態面板中選取 [結束工作階段]。

另請參閱

後續步驟

備註

某些官方 Apache Spark 檔依賴使用 Spark 主控台,這在 Azure Synapse Analytics 中的 Apache Spark 上無法使用。 請改用 筆記本IntelliJ 體驗。