Erstellen eines Machine Learning-Modells mit Apache Spark MLlib
Wichtig
Microsoft Fabric befindet sich derzeit in der Vorschauversion. Diese Informationen beziehen sich auf eine Vorabversion des Produkts, an der vor der Veröffentlichung noch wesentliche Änderungen vorgenommen werden können. Microsoft übernimmt keine Garantie, weder ausdrücklich noch stillschweigend, für die hier bereitgestellten Informationen.
In diesem Artikel erfahren Sie, wie Sie mithilfe von Apache Spark MLlib eine Machine-Learning-Anwendung erstellt, die eine einfache Vorhersageanalyse für ein Azure Open Dataset ausführt. Spark bietet integrierte Machine Learning-Bibliotheken. In diesem Beispiel wird eine Klassifizierung mittels logistischer Regression verwendet.
SparkML und MLlib sind Spark-Kernbibliotheken, die viele praktische Hilfsprogramme für Machine Learning-Aufgaben wie die folgenden enthalten:
- Klassifizierung
- Regression
- Clustering
- Themenmodellierung
- Singulärwertzerlegung (Singular Value Decomposition, SVD) und Hauptkomponentenanalyse (Principal Component Analysis, PCA)
- Testen von Hypothesen und Berechnen von Beispielstatistiken
Grundlegendes zu Klassifizierung und logistischer Regression
Klassifizierung, eine Aufgabe im Bereich des Machine Learning, ist der Prozess, bei dem Eingabedaten in Kategorien sortiert werden. Der Klassifizierungsalgorithmus hat die Aufgabe, herauszufinden, wie Bezeichnungen den von Ihnen bereitgestellten Eingabedaten zugewiesen werden sollen. So kann ein Machine Learning-Algorithmus beispielsweise Börsendaten als Eingabe akzeptieren und die Daten in zwei Kategorien einteilen: Aktien, die Sie verkaufen sollten, und solche, die Sie behalten sollten.
Logistische Regression ist ein Algorithmus, den Sie für die Klassifizierung verwenden können. Die API für die logistische Regression von Spark ist nützlich für eine binäre Klassifizierungoder für die Klassifizierung der Eingabedaten in einer von zwei Gruppen. Weitere Informationen zur logistischen Regression finden Sie in Wikipedia.
Kurz gesagt: Der Prozess der logistischen Regression erzeugt eine logistische Funktion, die verwendet werden kann, um die Wahrscheinlichkeit vorherzusagen, dass ein Eingabevektor zu einer Gruppe gehört.
Beispiel für eine Vorhersageanalyse mit NYC-Taxidaten
Installieren Sie azureml-opendatasets
zunächst . Die Daten sind über Azure Open Datasets verfügbar. Diese Teilmenge des Datasets enthält Informationen zu gelben Taxifahrten, einschließlich der Start- und Endzeit und der Standorte, der Kosten und anderer Attribute.
%pip install azureml-opendatasets
Im Rest dieses Artikels verwenden wir Apache Spark, um eine Analyse der Nyc-Taxi-Trinkgelddaten durchzuführen und dann ein Modell zu entwickeln, um vorherzusagen, ob eine bestimmte Reise ein Trinkgeld enthält oder nicht.
Erstellen eines Apache Spark-Machine Learning-Modells
Erstellen Sie ein PySpark-Notebook. Eine entsprechende Anleitung finden Sie unter Erstellen eines Notebooks.
Importieren Sie die für dieses Notebook erforderlichen Typen.
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluator
Wir werden MLflow verwenden, um unsere Machine Learning-Experimente und die entsprechenden Ausführungen nachzuverfolgen. Wenn die automatische Protokollierung von Microsoft Fabric aktiviert ist, werden die entsprechenden Metriken und Parameter automatisch erfasst.
import mlflow
Erstellen des Eingabedatenrahmens
In diesem Beispiel laden wir die Daten in einen Pandas-Datenrahmen und konvertieren sie dann in einen Apache Spark-Datenrahmen. Mit diesem Format können wir andere Apache Spark-Vorgänge auf sauber und Filtern des Datasets anwenden.
Führen Sie die folgenden Zeilen zum Erstellen eines Spark-Datenrahmens aus, indem Sie den Code in eine neue Zelle einfügen. In diesem Schritt werden die Daten über die Open Datasets-API abgerufen. Wir können diese Daten nach unten filtern, um ein bestimmtes Datenfenster anzuzeigen. Im folgenden Codebeispiel werden
start_date
undend_date
verwendet, um einen Filter anzuwenden, der Daten für einen einzelnen Monat zurückgibt.from azureml.opendatasets import NycTlcYellow end_date = parser.parse('2018-06-06') start_date = parser.parse('2018-05-01') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) nyc_tlc_pd = nyc_tlc.to_pandas_dataframe() nyc_tlc_df = spark.createDataFrame(nyc_tlc__pd).repartition(20)
Der folgende Code reduziert das Dataset auf etwa 10.000 Zeilen. Um die Entwicklung und Das Training zu beschleunigen, werden wir zunächst ein Stichprobenbeispiel für unser Dataset erstellen.
# To make development easier, faster, and less expensive, sample down for now sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
Als Nächstes möchten wir mithilfe des integrierten
display()
Befehls einen Blick auf unsere Daten werfen. Dies ermöglicht es uns, einfach ein Beispiel der Daten anzuzeigen oder die Trends in den Daten grafisch zu untersuchen.#sampled_taxi_df.show(10) display(sampled_taxi_df.limit(10))
Vorbereiten der Daten
Die Datenaufbereitung ist ein wichtiger Schritt im Machine Learning-Prozess. Es umfasst das Bereinigen, Transformieren und Organisieren von Rohdaten, um sie für die Analyse und Modellierung geeignet zu machen. Im folgenden Code führen Sie mehrere Datenvorbereitungsschritte aus:
- Entfernen von Ausreißern und falschen Werten durch Filtern des Datasets
- Entfernen von Spalten, die für das Modelltraining nicht benötigt werden
- Erstellen neuer Spalten aus den Rohdaten
- Generieren sie eine Bezeichnung, um zu bestimmen, ob es für die angegebene Taxifahrt ein Trinkgeld gibt oder nicht
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Anschließend werden wir einen zweiten Durchlauf über die Daten vornehmen, um die endgültigen Features hinzuzufügen.
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Erstellen eines logistischen Regressionsmodells
Die letzte Aufgabe besteht darin, die bezeichneten Daten in ein Format zu konvertieren, das mit der logistischen Regression analysiert werden kann. Die Eingabe für einen logistischen Regressionsalgorithmus muss eine Gruppe von Paaren aus Bezeichnung und Featurevektor sein, wobei der Featurevektor aus Zahlen besteht, die den Eingabepunkt darstellen.
Die Kategoriespalten müssen also in Zahlen konvertiert werden. Genauer gesagt, müssen die Spalten trafficTimeBins
und weekdayString
in ganzzahlige Darstellungen konvertiert werden. Die Konvertierung kann auf verschiedene Arten durchgeführt werden. Im folgenden Beispiel wird der OneHotEncoder
Ansatz verwendet.
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Diese Aktion führt zu einem neuen Datenrahmen mit allen Spalten im richtigen Format, um ein Modell zu trainieren.
Trainieren eines logistischen Regressionsmodells
Die erste Aufgabe besteht darin, das Dataset in ein Trainingsdataset und ein Test- oder Validierungsdataset aufzuteilen.
# Decide on the split between training and testing data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Nachdem nun zwei Datenrahmen vorhanden sind, besteht die nächste Aufgabe darin, die Modellformel zu erstellen und für den Trainingsdatenrahmen auszuführen. Anschließend können Sie eine Überprüfung anhand des Testdatenrahmens ausführen. Experimentieren Sie mit verschiedenen Versionen der Modellformel, um die Auswirkungen verschiedener Kombinationen zu ermitteln.
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
Die Ausgabe dieser Zelle ist:
Area under ROC = 0.9749430523917996
Erstellen einer visuellen Darstellung der Vorhersage
Sie können jetzt eine endgültige Visualisierung erstellen, um die Modellergebnisse zu interpretieren. Eine ROC-Kurve ist eine Möglichkeit, das Ergebnis zu überprüfen.
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
Nächste Schritte
- Verwenden von KI-Beispielen zum Erstellen von Machine Learning-Modellen: Verwenden von KI-Beispielen
- Nachverfolgen von Machine Learning-Ausführungen mithilfe von Experimenten: Machine Learning-Experimente