Aracılığıyla paylaş


Apache Spark MLlib ile makine öğrenmesi modeli oluşturma

Bu makalede Apache Spark MLlib kullanarak Azure açık veri kümesinde basit tahmine dayalı analizi işleyen bir makine öğrenmesi uygulaması oluşturmayı öğreneceksiniz. Spark, yerleşik makine öğrenmesi kitaplıkları sağlar. Bu örnek lojistik regresyon aracılığıyla sınıflandırmayı kullanır.

Temel SparkML ve MLlib Spark kitaplıkları, makine öğrenmesi görevleri için yararlı olan birçok yardımcı program sağlar. Bu yardımcı programlar şunlar için uygundur:

  • Sınıflandırma
  • Kümeleme
  • Hipotez testi ve örnek istatistikleri hesaplama
  • Regresyon
  • Tekil değer ayrıştırma (SVD) ve asıl bileşen analizi (PCA)
  • Konu modelleme

Sınıflandırmayı ve lojistik regresyonu anlama

Popüler bir makine öğrenmesi görevi olan sınıflandırma, giriş verilerini kategorilere ayırmayı içerir. Sınıflandırma algoritması, sağlanan giriş verilerine etiketlerin nasıl atandığını bulmalıdır. Örneğin, bir makine öğrenmesi algoritması hisse senedi bilgilerini giriş olarak kabul edebilir ve hisseyi iki kategoriye bölebilir: satmanız gereken hisse senetleri ve tutmanız gereken hisse senetleri.

Lojistik regresyon algoritması sınıflandırma için kullanışlıdır. Spark lojistik regresyon API'si, giriş verilerinin iki gruptan birinde ikili sınıflandırması için kullanışlıdır. Lojistik regresyon hakkında daha fazla bilgi için bkz . Vikipedi.

Lojistik regresyon, giriş vektörlerinin bir gruba veya diğerine ait olma olasılığını tahmin eden bir lojistik işlev oluşturur.

NYC taksi verilerinin tahmine dayalı analiz örneği

İlk olarak yükleyin azureml-opendatasets. Veriler Azure Açık Veri Kümeleri kaynağı üzerinden kullanılabilir. Bu veri kümesi, başlangıç saatleri, bitiş saatleri, başlangıç konumları, bitiş konumları, seyahat maliyetleri ve diğer öznitelikler de dahil olmak üzere sarı taksi yolculukları hakkındaki bilgileri barındırır.

%pip install azureml-opendatasets

Bu makalenin geri kalanı, önce NYC taksi yolculuğu ipucu verileri üzerinde bazı analizler yapmak ve ardından belirli bir seyahatin ipucu içerip içermediğini tahmin etmek için bir model geliştirmek için Apache Spark'a dayanır.

Apache Spark makine öğrenmesi modeli oluşturma

  1. PySpark not defteri oluşturun. Daha fazla bilgi için Not defteri oluşturma'yı ziyaret edin.

  2. Bu not defteri için gereken türleri içeri aktarın.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Makine öğrenmesi denemelerimizi ve buna karşılık gelen çalıştırmaları izlemek için MLflow kullanacağız. Microsoft Fabric Otomatik Kaydetme etkinleştirildiyse, ilgili ölçümler ve parametreler otomatik olarak yakalanır.

    import mlflow
    

Giriş DataFrame'ini oluşturma

Bu örnek, verileri bir Pandas veri çerçevesine yükler ve ardından apache Spark veri çerçevesine dönüştürür. Bu biçimde, veri kümesini temizlemek ve filtrelemek için diğer Apache Spark işlemlerini uygulayabiliriz.

  1. Bu satırları yeni bir hücreye yapıştırın ve Spark DataFrame oluşturmak için bunları çalıştırın. Bu adım, Açık Veri Kümeleri API'sini kullanarak verileri alır. Belirli bir veri penceresini incelemek için bu verileri filtreleyebiliriz. Kod örneği, tek bir aylık veri döndüren bir filtre uygulamak için ve end_date kullanırstart_date.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Bu kod, veri kümesini yaklaşık 10.000 satıra indirir. Geliştirme ve eğitimi hızlandırmak için kod şimdilik veri kümemizi örneklemektedir.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Yerleşik display() komutunu kullanarak verilerimize bakmak istiyoruz. Bu komutla bir veri örneğini kolayca görüntüleyebilir veya verilerdeki eğilimleri grafiksel olarak keşfedebiliriz.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Verileri hazırlama

Veri hazırlama, makine öğrenmesi sürecinde önemli bir adımdır. Analiz ve modelleme için uygun hale getirmek için ham verilerin temizlenmesini, dönüştürülmesi ve düzenlenmesini içerir. Bu kod örneğinde birkaç veri hazırlama adımı gerçekleştirirsiniz:

  • Aykırı değerleri ve yanlış değerleri kaldırmak için veri kümesini filtreleme
  • Model eğitimi için gerekli olmayan sütunları kaldırma
  • Ham verilerden yeni sütunlar oluşturma
  • Belirli bir Taksi yolculuğunda ipucu bulunup bulunmadığına karar vermek için etiket oluşturma
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Ardından, son özellikleri eklemek için verilerin üzerinden ikinci bir geçiş yapın.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Lojistik regresyon modeli oluşturma

Son görev etiketlenmiş verileri lojistik regresyonun işleyebileceği bir biçime dönüştürür. Lojistik regresyon algoritmasına giriş, özellik vektörünün giriş noktasını temsil eden sayılardan oluşan bir vektör olduğu bir etiket/özellik vektör çiftleri yapısına sahip olmalıdır.

Son görev gereksinimlerine bağlı olarak, kategorik sütunları sayılara dönüştürmemiz gerekir. Özellikle ve weekdayString sütunlarını trafficTimeBins tamsayı gösterimlerine dönüştürmemiz gerekir. Bu gereksinimi işlemek için kullanabileceğimiz birçok seçenek vardır. Bu örnek şu yaklaşımı içerir OneHotEncoder :

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Bu eylem, modeli eğitmek için tüm sütunların uygun biçimde olduğu yeni bir DataFrame'e neden olur.

Lojistik regresyon modelini eğitme

İlk görev, veri kümesini bir eğitim kümesine ve bir test veya doğrulama kümesine böler.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

İki DataFrame'imiz olduktan sonra model formülünü oluşturup eğitim DataFrame'inde çalıştırmamız gerekir. Ardından test dataFrame'iyle doğrulayabiliriz. Farklı birleşimlerin etkilerini görmek için model formülünün farklı sürümleriyle denemeler yapın.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Hücre çıkışları:

Area under ROC = 0.9749430523917996

Tahminin görsel gösterimini oluşturma

Artık model sonuçlarını yorumlamak için son bir görselleştirme oluşturabiliriz. ROC eğrisi sonucu kesinlikle sunabilir.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

İpucu modelinde lojistik regresyon için ROC eğrisini gösteren grafik.