Apache Spark MLlib ile makine öğrenmesi modeli oluşturma

Bu makalede Apache Spark MLlib kullanarak Azure açık veri kümesinde basit tahmine dayalı analizler yapabilen bir makine öğrenmesi uygulaması oluşturmayı öğreneceksiniz. Spark, yerleşik makine öğrenmesi kitaplıkları sağlar. Bu örnek lojistik regresyon aracılığıyla sınıflandırmayı kullanır.

SparkML ve MLlib, aşağıdakiler için uygun yardımcı programlar da dahil olmak üzere makine öğrenmesi görevleri için yararlı olan birçok yardımcı program sağlayan temel Spark kitaplıklarıdır:

  • Sınıflandırma
  • Regresyon
  • Kümeleme
  • Konu modelleme
  • Tekil değer ayrıştırma (SVD) ve asıl bileşen analizi (PCA)
  • Hipotez testi ve örnek istatistikleri hesaplama

Sınıflandırmayı ve lojistik regresyonu anlama

Popüler bir makine öğrenmesi görevi olan sınıflandırma, giriş verilerini kategorilere ayırma işlemidir. Sağladığınız giriş verilerine etiket atamayı öğrenmek bir sınıflandırma algoritmasının işidir. Örneğin, hisse senedi bilgilerini girdi olarak kabul eden ve hisseyi iki kategoriye ayıran bir makine öğrenmesi algoritması düşünebilirsiniz: satmanız gereken hisse senetleri ve tutmanız gereken hisse senetleri.

Lojistik regresyon , sınıflandırma için kullanabileceğiniz bir algoritmadır. Spark'ın lojistik regresyon API'si ikili sınıflandırma veya giriş verilerini iki gruptan birinde sınıflandırmak için kullanışlıdır. Lojistik regresyon hakkında daha fazla bilgi için bkz . Vikipedi.

Özetle, lojistik regresyon işlemi, giriş vektörünün bir gruba veya diğerine ait olma olasılığını tahmin etmek için kullanabileceğiniz bir lojistik işlev oluşturur.

NYC taksi verileriyle ilgili tahmine dayalı analiz örneği

Başlamak için yükleyin azureml-opendatasets. Veriler Azure Açık Veri Kümeleri aracılığıyla kullanılabilir. Veri kümesinin bu alt kümesi, başlangıç ve bitiş saati ile konumlar, maliyet ve diğer öznitelikler de dahil olmak üzere sarı taksi yolculukları hakkında bilgi içerir.

%pip install azureml-opendatasets

Bu makalenin geri kalanında Apache Spark'ı kullanarak NYC taksi yolculuğu ipucu verileri üzerinde bazı analizler yapacak ve ardından belirli bir seyahatin ipucu içerip içermediğini tahmin etmek için bir model geliştireceğiz.

Apache Spark makine öğrenmesi modeli oluşturma

  1. PySpark not defteri oluşturun. Yönergeler için bkz . Not defteri oluşturma.

  2. Bu not defteri için gereken türleri içeri aktarın.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Makine öğrenmesi denemelerimizi ve buna karşılık gelen çalıştırmaları izlemek için MLflow kullanacağız. Microsoft Fabric Otomatik Kaydetme etkinleştirildiyse, ilgili ölçümler ve parametreler otomatik olarak yakalanır.

    import mlflow
    

Giriş DataFrame'ini oluşturma

Bu örnekte, verileri pandas veri çerçevesine yükleyip apache Spark veri çerçevesine dönüştüreceğiz. Bu biçimi kullanarak, veri kümesini temizlemek ve filtrelemek için diğer Apache Spark işlemlerini uygulayabiliriz.

  1. Kodu yeni bir hücreye yapıştırarak Spark DataFrame oluşturmak için aşağıdaki satırları çalıştırın. Bu adım, Açık Veri Kümeleri API'sini kullanarak verileri alır. Belirli bir veri penceresine bakmak için bu verileri filtreleyebiliriz. Aşağıdaki kod örneği, tek bir aylık veri döndüren bir filtre uygulamak için ve end_date kullanırstart_date.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Aşağıdaki kod veri kümesini yaklaşık 10.000 satıra düşürür. Geliştirme ve eğitimi hızlandırmak için şimdilik veri kümemizi örnekleyeceğiz.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Ardından, yerleşik display() komutunu kullanarak verilerimize göz atmak istiyoruz. Bu sayede verilerin bir örneğini kolayca görüntüleyebilir veya verilerdeki eğilimleri grafiksel olarak keşfedebilirsiniz.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Verileri hazırlama

Veri hazırlama, makine öğrenmesi sürecinde önemli bir adımdır. Analiz ve modelleme için uygun hale getirmek için ham verilerin temizlenmesini, dönüştürülmesini ve düzenlenmesini içerir. Aşağıdaki kodda çeşitli veri hazırlama adımları gerçekleştirirsiniz:

  • Veri kümesini filtreleyerek aykırı değerleri ve yanlış değerleri kaldırma
  • Model eğitimi için gerekli olmayan sütunları kaldırma
  • Ham verilerden yeni sütunlar oluşturma
  • Verilen Taksi yolculuğu için bir ipucu olup olmadığını belirlemek için bir etiket oluşturun
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Ardından son özellikleri eklemek için veriler üzerinde ikinci bir geçiş yapacağız.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Lojistik regresyon modeli oluşturma

Son görev etiketlenmiş verileri lojistik regresyon aracılığıyla çözümlenebilen bir biçime dönüştürmektir. Lojistik regresyon algoritmasına giriş, özellik vektörünün giriş noktasını temsil eden sayı vektörleri olduğu bir dizi etiket/özellik vektör çifti olmalıdır.

Bu nedenle, kategorik sütunları sayılara dönüştürmeniz gerekir. Özellikle, ve weekdayString sütunlarını trafficTimeBins tamsayı gösterimlerine dönüştürmeniz gerekir. Dönüştürmeyi gerçekleştirmek için birden çok yaklaşım vardır. Aşağıdaki örnek yaklaşımını benimser OneHotEncoder .

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Bu eylem, modeli eğitmek için tüm sütunların doğru biçimde olduğu yeni bir DataFrame'e neden olur.

Lojistik regresyon modelini eğitme

İlk görev, veri kümesini bir eğitim kümesine ve bir test veya doğrulama kümesine bölmektir.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

artık iki DataFrame olduğuna göre, sonraki görev model formülünü oluşturmak ve eğitim DataFrame'inde çalıştırmaktır. Ardından test dataFrame'iyle doğrulayabilirsiniz. Farklı birleşimlerin etkisini görmek için model formülünün farklı sürümleriyle denemeler yapın.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Bu hücrenin çıkışı:

Area under ROC = 0.9749430523917996

Tahminin görsel gösterimini oluşturma

Artık model sonuçlarını yorumlamak için son bir görselleştirme oluşturabilirsiniz. ROC eğrisi, sonucu gözden geçirmenin bir yoludur.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graph that shows the ROC curve for logistic regression in the tip model.