Anmerkung
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In diesem Artikel erfahren Sie, wie Sie Apache Spark MLlib verwenden, um eine Machine Learning-Anwendung zu erstellen, die einfache Predictive Analysis für ein offenes Azure-Dataset durchführt. Spark bietet integrierte Machine Learning-Bibliotheken. In diesem Beispiel wird die Klassifizierung durch logistische Regression verwendet.
SparkML und MLlib sind Kern-Spark-Bibliotheken, die viele Hilfsprogramme bereitstellen, die für Machine Learning-Aufgaben nützlich sind, einschließlich Dienstprogrammen, die für folgendes geeignet sind:
- Klassifizierung
- Rückentwicklung
- Clusterbildung
- Themenmodellierung
- Singulärwertzerlegung (SVD) und Hauptkomponentenanalyse (PCA)
- Hypothesentests und Berechnen von Stichprobenstatistiken
Verstehen der Klassifizierung und logistischen Regression
Klassifizierung, eine beliebte Machine Learning-Aufgabe, ist der Prozess der Sortierung von Eingabedaten in Kategorien. Es ist die Aufgabe eines Klassifizierungsalgorithmus, herauszufinden, wie Sie Eingabedaten, die Sie bereitstellen, Bezeichnungen zuweisen. Sie können sich beispielsweise einen Machine Learning-Algorithmus vorstellen, der Aktieninformationen als Eingabe akzeptiert und die Aktie in zwei Kategorien unterteilt: Aktien, die Sie verkaufen und aktien sollten, die Sie behalten sollten.
Logistische Regression ist ein Algorithmus, den Sie für die Klassifizierung verwenden können. Die Logistische Regressions-API von Spark ist nützlich für die binäre Klassifizierung oder das Klassifizieren von Eingabedaten in einer von zwei Gruppen. Weitere Informationen zur logistischen Regression finden Sie in Wikipedia.
Zusammenfassend erzeugt der Prozess der logistischen Regression eine Logistikfunktion , die Sie verwenden können, um die Wahrscheinlichkeit vorherzusagen, dass ein Eingabevektor in einer Gruppe oder der anderen gehört.
Predictive Analysis-Beispiel für NYC-Taxidaten
In diesem Beispiel verwenden Sie Spark, um einige Predictive Analysis für Taxi-Trip-Tippdaten aus New York durchzuführen. Die Daten sind über Azure Open Datasets verfügbar. Diese Teilmenge des Datasets enthält Informationen zu gelben Taxifahrten, einschließlich Informationen über jede Reise, die Start- und Endzeit und Standorte, die Kosten und andere interessante Attribute.
Von Bedeutung
Es kann zusätzliche Gebühren für das Abrufen dieser Daten vom Speicherort geben.
In den folgenden Schritten entwickeln Sie ein Modell, um vorherzusagen, ob eine bestimmte Reise einen Tipp enthält oder nicht.
Erstellen eines Apache Spark Machine Learning-Modells
Erstellen Sie ein Notizbuch mithilfe des PySpark-Kernels. Anweisungen finden Sie unter Erstellen eines Notizbuchs.
Importieren Sie die für diese Anwendung erforderlichen Typen. Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Zelle ein, und drücken Sie dann UMSCHALT+EINGABETASTE. oder führen Sie die Zelle mithilfe des blauen Wiedergabesymbols auf der linken Seite des Codes aus.
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluatorAufgrund des PySpark-Kernels müssen Sie keine Kontexte explizit erstellen. Der Spark-Kontext wird automatisch für Sie erstellt, wenn Sie die erste Codezelle ausführen.
Erstellen des Eingabedatenframes
Da sich die Rohdaten in einem Parkettformat befinden, können Sie den Spark-Kontext verwenden, um die Datei direkt als DataFrame in den Arbeitsspeicher zu übertragen. Obwohl der Code in den folgenden Schritten die Standardoptionen verwendet, ist es möglich, die Zuordnung von Datentypen und anderen Schemaattributen bei Bedarf zu erzwingen.
Führen Sie die folgenden Zeilen aus, um einen Spark DataFrame zu erstellen, indem Sie den Code in eine neue Zelle einfügen. In diesem Schritt werden die Daten über die Open Datasets-API abgerufen. Durch Das Ziehen aller dieser Daten werden etwa 1,5 Milliarden Zeilen generiert.
Je nach Größe Ihres serverlosen Apache Spark-Pools sind die Rohdaten möglicherweise zu groß oder nehmen zu viel Zeit in Anspruch. Sie können diese Daten auf etwas Kleineres eingrenzen. Im folgenden Codebeispiel werden
start_dateundend_dateverwendet, um einen Filter anzuwenden, der einen einzelnen Monat an Daten zurückgibt.from azureml.opendatasets import NycTlcYellow from datetime import datetime from dateutil import parser end_date = parser.parse('2018-05-08 00:00:00') start_date = parser.parse('2018-05-01 00:00:00') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) filtered_df = spark.createDataFrame(nyc_tlc.to_pandas_dataframe())Der Nachteil der einfachen Filterung besteht darin, dass aus statistischer Sicht eine Verzerrung in die Daten entsteht. Ein weiterer Ansatz besteht darin, das in Spark integrierte Sampling zu verwenden.
Der folgende Code reduziert das Dataset auf etwa 2.000 Zeilen, wenn es nach dem vorherigen Code angewendet wird. Sie können diesen Samplingschritt anstelle des einfachen Filters oder in Verbindung mit dem einfachen Filter verwenden.
# To make development easier, faster, and less expensive, downsample for now sampled_taxi_df = filtered_df.sample(True, 0.001, seed=1234)Es ist jetzt möglich, die Daten zu betrachten, um zu sehen, was gelesen wurde. Es ist normalerweise besser, Daten mit einer Teilmenge statt des vollständigen Satzes zu überprüfen, je nach Größe des Datasets.
Der folgende Code bietet zwei Möglichkeiten zum Anzeigen der Daten. Der erste Weg ist einfach. Die zweite Möglichkeit bietet eine viel umfangreichere Rastererfahrung sowie die Möglichkeit, die Daten grafisch zu visualisieren.
#sampled_taxi_df.show(5) display(sampled_taxi_df)Abhängig von der Größe des generierten Datasets und Ihrer Notwendigkeit, das Notizbuch mehrmals zu experimentieren oder auszuführen, sollten Sie das Dataset möglicherweise lokal im Arbeitsbereich zwischenspeichern. Es gibt drei Möglichkeiten zum Ausführen expliziter Zwischenspeicherung:
- Speichern Sie dataFrame lokal als Datei.
- Speichern Sie den DataFrame als temporäre Tabelle oder Ansicht.
- Speichern Sie den DataFrame als permanente Tabelle.
Die ersten beiden Ansätze sind in den folgenden Codebeispielen enthalten.
Das Erstellen einer temporären Tabelle oder Ansicht bietet unterschiedliche Zugriffspfade für die Daten, dauert jedoch nur für die Dauer der Spark-Instanzsitzung.
sampled_taxi_df.createOrReplaceTempView("nytaxi")
Vorbereiten der Daten
Die Daten in der rohen Form eignen sich häufig nicht für die direkte Übergabe an ein Modell. Sie müssen eine Reihe von Aktionen für die Daten ausführen, um sie in einen Zustand zu versetzen, in dem das Modell es nutzen kann.
Im folgenden Code führen Sie vier Vorgangsklassen aus:
- Das Entfernen von Ausreißern oder falschen Werten durch Filtern.
- Das Entfernen von Spalten, die nicht benötigt werden.
- Die Erstellung neuer Spalten, die aus den Rohdaten abgeleitet wurden, damit das Modell effektiver funktioniert. Dieser Vorgang wird manchmal auch als Featurisierung bezeichnet.
- Kennzeichnung. Da Sie eine binäre Klassifizierung durchführen (gibt es einen Tipp oder nicht auf einer bestimmten Reise), müssen Sie den Tippbetrag in einen Wert von 0 oder 1 konvertieren.
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Anschließend nehmen Sie einen zweiten Durchlauf über die Daten vor, um die endgültigen Features hinzuzufügen.
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Erstellen eines logistischen Regressionsmodells
Die letzte Aufgabe besteht darin, die bezeichneten Daten in ein Format zu konvertieren, das durch logistische Regression analysiert werden kann. Die Eingabe für einen logistischen Regressionsalgorithmus muss eine Reihe von Bezeichnungs-/Featurevektorpaaren sein, wobei der Featurevektor ein Vektor von Zahlen ist, der den Eingabepunkt darstellt.
Daher müssen Sie die kategorisierten Spalten in Zahlen konvertieren. Insbesondere müssen Sie die trafficTimeBins und weekdayString Spalten in ganzzahlige Darstellungen konvertieren. Es gibt mehrere Ansätze zum Ausführen der Konvertierung. Im folgenden Beispiel wird der häufig verwendete Ansatz OneHotEncoder eingesetzt.
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Diese Aktion führt zu einem neuen DataFrame mit allen Spalten im richtigen Format, um ein Modell zu trainieren.
Trainieren eines logistischen Regressionsmodells
Die erste Aufgabe besteht darin, das Dataset in einen Schulungssatz und einen Test- oder Validierungssatz aufzuteilen. Die Teilung hier ist beliebig. Experimentieren Sie mit verschiedenen geteilten Einstellungen, um festzustellen, ob sie sich auf das Modell auswirken.
# Decide on the split between training and testing data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Da nun zwei DataFrames vorhanden sind, besteht die nächste Aufgabe darin, die Modellformel zu erstellen und mit dem Schulungsdatenframe auszuführen. Anschließend können Sie anhand des Testdatenframes überprüfen. Experimentieren Sie mit verschiedenen Versionen der Modellformel, um die Auswirkungen verschiedener Kombinationen zu sehen.
Hinweis
Um das Modell zu speichern, weisen Sie dem Ressourcenbereich des Azure SQL-Datenbank-Servers die Rolle Mitwirkender an Storage-Blobdaten zu. Ausführliche Schritte finden Sie unter Zuweisen von Azure-Rollen mithilfe des Azure-Portals. Nur Mitglieder mit Besitzerberechtigungen können diesen Schritt ausführen.
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Saving the model is optional, but it's another form of inter-session cache
datestamp = datetime.now().strftime('%m-%d-%Y-%s')
fileName = "lrModel_" + datestamp
logRegDirfilename = fileName
lrModel.save(logRegDirfilename)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
Die Ausgabe dieser Zelle lautet:
Area under ROC = 0.9779470729751403
Erstellen einer visuellen Darstellung der Vorhersage
Sie können nun eine endgültige Visualisierung erstellen, um Ihre Überlegungen zu den Ergebnissen dieses Tests zu unterstützen. Eine ROC-Kurve ist eine Möglichkeit, das Ergebnis zu überprüfen.
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
Herunterfahren der Spark-Instanz
Nachdem Sie die Anwendung ausgeführt haben, beenden Sie das Notizbuch, um die Ressourcen freizugeben, indem Sie die Registerkarte schließen. Oder wählen Sie " Sitzung beenden " im Statusbereich unten im Notizbuch aus.
Siehe auch
Nächste Schritte
Hinweis
Einige der offiziellen Apache Spark-Dokumentationen basieren auf der Verwendung der Spark-Konsole, die in Apache Spark in Azure Synapse Analytics nicht verfügbar ist. Verwenden Sie stattdessen die Notebook-Umgebung oder die IntelliJ-Oberfläche.