次の方法で共有


Apache Spark MLlib で機械学習モデルを構築する

この記事では、Apache Spark MLlib を使用し、Azure のオープン データセットに対してシンプルな予測分析を実行する機械学習アプリケーションの作成方法について説明します。 Spark には、組み込みの機械学習ライブラリが用意されています。 この例では、ロジスティック回帰による分類を使用しています。

SparkML と MLlib は、機械学習タスクに役立つ多数のユーティリティを提供するコア Spark ライブラリです。これには、次のことに適したユーティリティが含まれます。

  • 分類
  • 回帰
  • クラスタリング
  • トピックのモデリング
  • 特異値分解 (SVD) と主成分分析 (PCA)
  • 仮説テストとサンプル統計の計算

分類およびロジスティック回帰について

一般的な Machine Learning タスクである分類は、入力データをカテゴリに分類するプロセスです。 ユーザーが指定した入力データに "ラベル" を割り当てる方法を決定するのは、分類アルゴリズムの仕事です。 たとえば、株式情報を入力として受け取り、株式を、売却する必要のある株式と保持する必要のある株式の 2 つのカテゴリに分類する機械学習アルゴリズムを考えてみます。

ロジスティック回帰は、分類に使用できるアルゴリズムです。 Spark のロジスティック回帰 API は、 二項分類(入力データを 2 つのグループのいずれかに分類する) に適しています。 ロジスティック回帰の詳細については、Wikipedia を参照してください。

要約すると、ロジスティック回帰のプロセスにより、入力ベクトルがどちらか 1 つのグループに属している確率を予測するために使用できる "ロジスティック関数" が生成されます。

NYC タクシー データの予測分析の例

まず、azureml-opendatasets をインストールします。 データは Azure Open Datasets から入手できます。 このデータセットのサブセットには、イエロー タクシー乗車に関する情報が格納され、出発と到着の時刻および場所、料金、その他の属性などが含まれています。

%pip install azureml-opendatasets

この記事の残りの部分では、Apache Spark を使用して NYC タクシー乗車のチップ データに対して分析を実行し、特定の乗車にチップが含まれるかどうかを予測するモデルを開発します。

Apache Spark 機械学習モデルを作成する

  1. PySpark ノートブックを作成します。 手順については、「ノートブックを作成する」を参照してください。

  2. このノートブックに必要な型をインポートします。

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. ここでは、MLflow を使用して、機械学習の実験と対応する実行を追跡します。 Microsoft Fabric の自動ログ記録が有効になっている場合、対応するメトリックとパラメーターが自動的にキャプチャされます。

    import mlflow
    

入力 DataFrame を作成する

この例では、Pandas データフレームにデータを読み込み、それを Apache Spark データフレームに変換します。 この形式を使用すると、他の Apache Spark 操作を適用してデータセットをクリーンアップおよびフィルター処理できます。

  1. 次の行を新しいセルに貼り付け、コードを実行して、Spark DataFrame を作成します。 このステップにより、Open Datasets API を介してデータが取得されます。 このデータをフィルター処理して、特定のデータ ウィンドウを確認できます。 次のコード例では、start_dateend_date を使用して、1 か月分のデータを返すフィルターを適用します。

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. 次のコードは、データセットを約 10,000 行に減らします。 開発とトレーニングをスピードアップするために、ここではデータセットをサンプリングダウンします。

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. 次に、組み込みの display() コマンドを使用してデータを確認します。 これにより、データのサンプルを簡単に表示したり、データの傾向をグラフィカルに調べることができます。

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

データを準備する

データの準備は、機械学習プロセスの重要なステップです。 生データをクリーニング、変換、整理して、分析とモデリングに適した形にします。 次のコードでは、いくつかのデータ準備手順を実行します。

  • データセットをフィルター処理して外れ値と正しくない値を削除します
  • モデルのトレーニングに必要のない列を削除します
  • 生データから新しい列を作成します
  • 特定のタクシー乗車にチップがあるかどうかを判断するラベルを生成します
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

その後、データに対して 2 番目のパスを繰り返して、最終的な特徴を追加します。

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

ロジスティック回帰モデルを作成する

最後のタスクは、ラベル付けされたデータをロジスティック回帰で分析できる形式に変換することです。 ロジスティック回帰アルゴリズムへの入力は、"ラベルと特徴ベクトルのペア" のセットである必要があります。ここで "特徴ベクトル" とは、入力ポイントを表す数のベクトルです。

そのため、カテゴリ列を数値に変換する必要があります。 具体的には、trafficTimeBinsweekdayString 列を整数表現に変換する必要があります。 変換を実行する方法は複数あります。 次の例では、OneHotEncoder のアプローチを使用しています。

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

このアクションにより、すべての列がモデルのトレーニングに適した形式になっている新しい DataFrame が得られます。

ロジスティック回帰モデルのトレーニング

最初のタスクは、データセットをトレーニング セットとテスト セットまたは検証セットに分割することです。

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

2 つの DataFrame が得られたところで、次のタスクは、モデル式を作成して、トレーニング DataFrame に対してそれを実行することです。 その後、テスト用 DataFrame に対して検証できます。 さまざまなバージョンのモデル式を試して、さまざまな組み合わせの影響を確認してください。

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

このセルからの出力:

Area under ROC = 0.9749430523917996

予測を視覚化する

これで、最終的な視覚化を構築して、モデルの結果を解釈できるようになりました。 ROC 曲線は、結果を確認する 1 つの方法です。

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graph that shows the ROC curve for logistic regression in the tip model.