Membangun model pembelajaran mesin dengan Apache Spark MLlib

Dalam artikel ini, Anda akan mempelajari cara menggunakan Apache Spark MLlib untuk membuat aplikasi pembelajaran mesin yang melakukan analisis prediktif sederhana pada himpunan data terbuka Azure. Spark menyediakan pustaka pembelajaran mesin bawaan. Contoh ini menggunakan klasifikasi melalui regresi logistik.

SparkML dan MLlib adalah pustaka Spark inti yang menyediakan banyak utilitas yang berguna untuk tugas pembelajaran mesin, termasuk utilitas yang cocok untuk:

  • Klasifikasi
  • Regresi
  • Pengklusteran
  • Pemodelan topik
  • Penguraian nilai tunggal (SVD) dan analisis komponen utama (PCA)
  • Pengujian hipotesis dan penghitungan statistik sampel

Memahami klasifikasi dan regresi logistik

Klasifikasi, tugas pembelajaran mesin yang populer, adalah proses pengurutan data input ke dalam kategori. Inilah tugas algoritma klasifikasi untuk mencari tahu cara menetapkan label untuk memasukkan data yang Anda berikan. Contohnya, Anda dapat memikirkan algoritma pembelajaran mesin yang menerima informasi saham sebagai input dan membagi saham menjadi dua kategori: saham yang harus Anda jual dan saham yang harus Anda simpan.

Regresi logistik adalah algoritma yang dapat Anda gunakan untuk klasifikasi. API regresi logistik Spark berguna untuk klasifikasi biner, atau mengklasifikasikan data input ke dalam salah satu dari dua grup. Untuk informasi selengkapnya tentang regresi logistik, lihat Wikipedia.

Singkatnya, proses regresi logistik menghasilkan fungsi logistik yang dapat Anda gunakan untuk memprediksi probabilitas bahwa vektor input termasuk dalam satu kelompok atau yang lain.

Contoh analisis prediktif pada data taksi NYC

Untuk memulai, instal azureml-opendatasets. Data tersedia melalui Azure Open Datasets. Subset himpunan data ini berisi informasi tentang perjalanan taksi kuning, termasuk waktu dan lokasi awal dan akhir, biaya, dan atribut lainnya.

%pip install azureml-opendatasets

Di sisa artikel ini, kita akan menggunakan Apache Spark untuk melakukan beberapa analisis pada data tip perjalanan taksi NYC dan kemudian mengembangkan model untuk memprediksi apakah perjalanan tertentu menyertakan tip atau tidak.

Buat model pembelajaran mesin Apache Spark

  1. Buat buku catatan PySpark. Untuk instruksi, lihat Membuat buku catatan.

  2. Impor tipe yang diperlukan untuk buku catatan ini.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Kami akan menggunakan MLflow untuk melacak eksperimen pembelajaran mesin dan eksekusi yang sesuai. Jika Microsoft Fabric Autologging diaktifkan, metrik dan parameter yang sesuai akan diambil secara otomatis.

    import mlflow
    

Menyusun DataFrame input

Dalam contoh ini, kita akan memuat data ke dalam dataframe Pandas dan kemudian mengonversinya menjadi dataframe Apache Spark. Dengan menggunakan format ini, kita dapat menerapkan operasi Apache Spark lainnya untuk membersihkan dan memfilter himpunan data.

  1. Jalankan baris berikut untuk membuat Spark DataFrame dengan menempelkan kode ke dalam sel baru. Langkah ini mengambil data melalui Open Datasets API. Kita dapat memfilter data ini ke bawah untuk melihat jendela data tertentu. Contoh kode berikut menggunakan start_date dan end_date untuk menerapkan filter yang mengembalikan satu bulan data.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Kode berikut mengurangi himpunan data menjadi sekitar 10.000 baris. Untuk mempercepat pengembangan dan pelatihan, kami akan mengambil sampel himpunan data kami untuk saat ini.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Selanjutnya, kita ingin melihat data kita menggunakan perintah bawaan display() . Ini memungkinkan kita untuk dengan mudah melihat sampel data atau menjelajahi tren dalam data secara grafis.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Menyiapkan data

Persiapan data adalah langkah penting dalam proses pembelajaran mesin. Ini melibatkan pembersihan, transformasi, dan pengorganisasian data mentah agar cocok untuk analisis dan pemodelan. Dalam kode berikut, Anda melakukan beberapa langkah persiapan data:

  • Menghapus outlier dan nilai yang salah dengan memfilter himpunan data
  • Menghapus kolom yang tidak diperlukan untuk pelatihan model
  • Membuat kolom baru dari data mentah
  • Buat label untuk menentukan apakah akan ada tip atau tidak untuk perjalanan Taksi yang diberikan
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Kami kemudian akan membuat pass kedua atas data untuk menambahkan fitur akhir.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Membuat model regresi logistik

Tugas akhir adalah mengonversi data berlabel menjadi format yang dapat dianalisis melalui regresi logistik. Input ke algoritma regresi logistik perlu serangkaian pasangan vektor label/fitur, di mana vektor fitur adalah vektor angka yang mewakili titik input.

Jadi, Anda perlu mengonversi kolom kategoris menjadi angka. Secara khusus, Anda perlu mengonversi kolom trafficTimeBins dan weekdayString menjadi perwakilan bilangan bulat. Ada beberapa pendekatan untuk melakukan konversi. Contoh berikut mengambil OneHotEncoder pendekatan.

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Tindakan ini menghasilkan DataFrame baru dengan semua kolom dalam format yang tepat untuk melatih model.

Melatih model regresi logistik

Tugas pertama adalah membagi himpunan data menjadi kumpulan pelatihan dan kumpulan pengujian atau validasi.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Sekarang, ada dua DataFrame, tugas berikutnya adalah membuat rumus model dan menjalankannya terhadap pelatihan DataFrame. Kemudian Anda dapat memvalidasi terhadap dataFrame pengujian. Bereksperimen dengan versi rumus model yang berbeda untuk melihat dampak kombinasi yang berbeda.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Output dari sel ini adalah:

Area under ROC = 0.9749430523917996

Membuat representasi visual dari prediksi

Anda sekarang dapat membuat visualisasi akhir untuk menginterpretasikan hasil model. Kurva ROC adalah salah satu cara untuk meninjau hasilnya.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graph that shows the ROC curve for logistic regression in the tip model.