Erstellen eines Machine Learning-Modells mit Apache Spark MLlib

In diesem Artikel erfahren Sie, wie Sie mithilfe von Apache Spark MLlib eine Machine-Learning-Anwendung erstellt, die eine einfache Vorhersageanalyse für ein Azure Open Dataset ausführt. Spark bietet integrierte Machine Learning-Bibliotheken. In diesem Beispiel wird eine Klassifizierung mittels logistischer Regression verwendet.

SparkML und MLlib sind Spark-Kernbibliotheken, die viele praktische Hilfsprogramme für Machine Learning-Aufgaben wie die folgenden enthalten:

  • Klassifizierung
  • Regression
  • Clustering
  • Themenmodellierung
  • Singulärwertzerlegung (Singular Value Decomposition, SVD) und Hauptkomponentenanalyse (Principal Component Analysis, PCA)
  • Testen von Hypothesen und Berechnen von Beispielstatistiken

Grundlegendes zu Klassifizierung und logistischer Regression

Klassifizierung, eine Aufgabe im Bereich des Machine Learning, ist der Prozess, bei dem Eingabedaten in Kategorien sortiert werden. Der Klassifizierungsalgorithmus hat die Aufgabe, herauszufinden, wie Bezeichnungen den von Ihnen bereitgestellten Eingabedaten zugewiesen werden sollen. So kann ein Machine Learning-Algorithmus beispielsweise Börsendaten als Eingabe akzeptieren und die Daten in zwei Kategorien einteilen: Aktien, die Sie verkaufen sollten, und solche, die Sie behalten sollten.

Logistische Regression ist ein Algorithmus, den Sie für die Klassifizierung verwenden können. Die API für die logistische Regression von Spark ist nützlich für eine binäre Klassifizierungoder für die Klassifizierung der Eingabedaten in einer von zwei Gruppen. Weitere Informationen zur logistischen Regression finden Sie in Wikipedia.

Kurz gesagt: Der Prozess der logistischen Regression erzeugt eine logistische Funktion, die verwendet werden kann, um die Wahrscheinlichkeit vorherzusagen, dass ein Eingabevektor zu einer Gruppe gehört.

Beispiel für eine Vorhersageanalyse mit NYC-Taxidaten

Als Erstes installieren Sie azureml-opendatasets. Die Daten sind über Azure Open Datasets verfügbar. Diese Teilmenge des Datasets enthält Informationen zu Taxifahrten von Yellow Cabs, einschließlich Informationen zu den Start- und Endzeiten, den Start- und Zielorten, den Kosten und anderer Attribute.

%pip install azureml-opendatasets

Im weiteren Verlauf dieses Artikels verwenden Sie Apache Spark, um einige Analysen der NYC-Taxi-Trinkgelddaten durchzuführen und dann ein Modell zu entwickeln, um vorherzusagen, ob eine bestimmte Reise ein Trinkgeld enthält oder nicht.

Erstellen eines Apache Spark-Machine Learning-Modells

  1. Erstellen Sie ein PySpark-Notebook. Eine entsprechende Anleitung finden Sie unter Erstellen eines Notebooks.

  2. Importieren Sie die für dieses Notebook erforderlichen Typen.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Sie verwenden MLflow, um die Machine Learning-Experimente und die entsprechenden Ausführungen nachzuverfolgen. Wenn die automatische Protokollierung von Microsoft Fabric aktiviert ist, werden die entsprechenden Metriken und Parameter automatisch erfasst.

    import mlflow
    

Erstellen des Eingabedatenrahmens

In diesem Beispiel laden Sie die Daten in einen Pandas-Datenrahmen und konvertieren sie dann in einen Apache Spark-Datenrahmen. Mit diesem Format können Sie andere Apache Spark-Vorgänge anwenden, um das Dataset zu bereinigen und zu filtern.

  1. Führen Sie die folgenden Zeilen zum Erstellen eines Spark-Datenrahmens aus, indem Sie den Code in eine neue Zelle einfügen. In diesem Schritt werden die Daten über die Open Datasets-API abgerufen. Sie können diese Daten durch Filtern reduzieren, um ein bestimmtes Datenfenster anzuzeigen. Im folgenden Codebeispiel werden start_date und end_date verwendet, um einen Filter anzuwenden, der Daten für einen einzelnen Monat zurückgibt.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Der folgende Code reduziert das Dataset auf etwa 10.000 Zeilen. Um die Entwicklung und das Training zu beschleunigen, werden Sie vorerst das Dataset abtasten.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Als Nächstes werden Sie mit dem integrierten display()-Befehl einen Blick auf Ihre Daten werfen. Dies ermöglicht es Ihnen, einfach eine Stichprobe der Daten anzuzeigen oder die Trends in den Daten grafisch zu untersuchen.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Vorbereiten der Daten

Die Datenaufbereitung ist ein wichtiger Schritt im Machine Learning-Prozess. Sie umfasst das Bereinigen, Transformieren und Organisieren von Rohdaten, um sie für die Analyse und Modellierung geeignet zu machen. Im folgenden Code führen Sie mehrere Datenaufbereitungsschritte aus:

  • Entfernen von Ausreißern und falschen Werten durch Filtern des Datasets
  • Entfernen von Spalten, die für das Modelltraining nicht benötigt werden
  • Erstellen neuer Spalten aus den Rohdaten
  • Generieren einer Bezeichnung, um zu bestimmen, ob für die angegebene Taxifahrt ein Trinkgeld vorhanden ist oder nicht
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

In einem zweiten Durchlauf für die Daten werden dann die endgültigen Features hinzugefügt.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Erstellen eines logistischen Regressionsmodells

Die letzte Aufgabe besteht darin, die bezeichneten Daten in ein Format zu konvertieren, das mit der logistischen Regression analysiert werden kann. Die Eingabe für einen logistischen Regressionsalgorithmus muss eine Gruppe von Paaren aus Bezeichnung und Featurevektor sein, wobei der Featurevektor aus Zahlen besteht, die den Eingabepunkt darstellen.

Die Kategoriespalten müssen also in Zahlen konvertiert werden. Genauer gesagt, müssen die Spalten trafficTimeBins und weekdayString in ganzzahlige Darstellungen konvertiert werden. Die Konvertierung kann auf verschiedene Arten durchgeführt werden. Im folgenden Beispiel wird der OneHotEncoder-Ansatz verwendet.

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Diese Aktion führt zu einem neuen Datenrahmen mit allen Spalten im richtigen Format, um ein Modell zu trainieren.

Trainieren eines logistischen Regressionsmodells

Die erste Aufgabe besteht darin, das Dataset in ein Trainingsdataset und ein Test- oder Validierungsdataset aufzuteilen.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Nachdem nun zwei Datenrahmen vorhanden sind, besteht die nächste Aufgabe darin, die Modellformel zu erstellen und für den Trainingsdatenrahmen auszuführen. Anschließend können Sie eine Überprüfung anhand des Testdatenrahmens ausführen. Experimentieren Sie mit verschiedenen Versionen der Modellformel, um die Auswirkungen verschiedener Kombinationen zu ermitteln.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Die Ausgabe dieser Zelle ist:

Area under ROC = 0.9749430523917996

Erstellen einer visuellen Darstellung der Vorhersage

Sie können jetzt eine endgültige Visualisierung erstellen, um die Modellergebnisse zu interpretieren. Eine ROC-Kurve ist eine Möglichkeit, um das Ergebnis zu überprüfen.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graph that shows the ROC curve for logistic regression in the tip model.