Creare un modello di Machine Learning con Apache Spark MLlib
Importante
Microsoft Fabric è in anteprima.
Questo articolo illustra come usare MLlib di Apache Spark per creare un'applicazione di Machine Learning che esegue una semplice analisi predittiva su un set di dati aperto di Azure. Spark offre librerie di apprendimento automatico predefinite. Questo esempio usa la classificazione tramite regressione logistica.
SparkML e MLlib sono librerie Spark di base che forniscono molte utilità utili per le attività di Machine Learning, incluse le utilità adatte per:
- Classificazione
- Regressione
- Clustering
- Modellazione di argomenti
- Scomposizione di valori singolari e analisi in componenti principali
- Testing e calcolo ipotetici di statistiche di esempio
Informazioni sulla classificazione e la regressione logistica
La classificazione, un'attività comune di apprendimento automatico, è il processo di ordinamento dei dati in categorie. È il processo di un algoritmo di classificazione per capire come assegnare etichette ai dati di input forniti. Ad esempio, è possibile pensare a un algoritmo di Machine Learning che accetta le informazioni sulle scorte come input e suddividere le scorte in due categorie: le scorte che si dovrebbe vendere e le scorte che si dovrebbe mantenere.
La regressione logistica è un algoritmo usato per la classificazione. L'API di regressione logistica di Spark è utile per la classificazione binaria, ovvero la classificazione di dati di input in uno di due gruppi. Per altre informazioni sulla regressione logistica, vedere Wikipedia.
In sintesi, il processo di regressione logistica produce una funzione logistica che è possibile usare per stimare la probabilità che un vettore di input appartiene a un gruppo o all'altro.
Esempio di analisi predittiva sui dati dei taxi di NYC
Per iniziare, installare azureml-opendatasets
. I dati sono disponibili tramite set di dati aperti di Azure. Questo subset del set di dati contiene informazioni sulle corse dei taxi gialli, tra cui l'ora di inizio e l'ora di fine e le posizioni, il costo e altri attributi.
%pip install azureml-opendatasets
Nel resto di questo articolo si userà Apache Spark per eseguire alcune analisi sui dati dei suggerimenti dei taxi di Nyc e quindi sviluppare un modello per stimare se un determinato viaggio include un suggerimento o meno.
Creare un modello di Machine Learning Apache Spark
Creare un notebook PySpark. Per le istruzioni, vedere Creare un notebook.
Importare i tipi necessari per questo notebook.
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluator
Verrà usato MLflow per tenere traccia degli esperimenti di Machine Learning e delle esecuzioni corrispondenti. Se l'assegnazione automatica di Microsoft Fabric è abilitata, vengono acquisite automaticamente le metriche e i parametri corrispondenti.
import mlflow
Costruire il dataframe di input
In questo esempio verranno caricati i dati in un dataframe Pandas e quindi convertirli in un dataframe Apache Spark. Usando questo formato, è possibile applicare altre operazioni apache Spark per pulire e filtrare il set di dati.
Eseguire le righe seguenti per creare un dataframe Spark incollando il codice in una nuova cella. Questo passaggio recupera i dati tramite l'API Open Datasets. È possibile filtrare questi dati in modo da esaminare una finestra specifica dei dati. Nell'esempio di codice seguente viene usato e
end_date
per applicare un filtro che restituiscestart_date
un singolo mese di dati.from azureml.opendatasets import NycTlcYellow end_date = parser.parse('2018-06-06') start_date = parser.parse('2018-05-01') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) nyc_tlc_pd = nyc_tlc.to_pandas_dataframe() nyc_tlc_df = spark.createDataFrame(nyc_tlc__pd).repartition(20)
Il codice seguente riduce il set di dati a circa 10.000 righe. Per velocizzare lo sviluppo e il training, il set di dati verrà campione per il momento.
# To make development easier, faster, and less expensive, sample down for now sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
Successivamente, si vuole esaminare i dati usando il comando predefinito
display()
. In questo modo è possibile visualizzare facilmente un esempio dei dati o esplorare le tendenze nei dati graficamente.#sampled_taxi_df.show(10) display(sampled_taxi_df.limit(10))
Preparare i dati
La preparazione dei dati è un passaggio fondamentale nel processo di Machine Learning. Comporta la pulizia, la trasformazione e l'organizzazione dei dati non elaborati per renderli adatti per l'analisi e la modellazione. Nel codice seguente si eseguono diversi passaggi di preparazione dei dati:
- Rimuovere outlier e valori non corretti filtrando il set di dati
- Rimuovere colonne non necessarie per il training del modello
- Creare nuove colonne dai dati non elaborati
- Generare un'etichetta per determinare se ci sarà un suggerimento o meno per il viaggio in taxi specificato
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Verrà quindi eseguito un secondo passaggio sui dati per aggiungere le funzionalità finali.
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Creare un modello di regressione logistica
L'attività finale consiste nel convertire i dati etichettati in un formato che può essere analizzato tramite regressione logistica. L'input di un algoritmo di regressione logistica deve essere un set di coppie di vettori di etichetta/funzionalità, in cui il vettore di funzionalità è un vettore di numeri che rappresentano il punto di input.
È quindi necessario convertire le colonne categorica in numeri. In particolare, è necessario convertire le trafficTimeBins
colonne e weekdayString
in rappresentazioni intere. Esistono più approcci per eseguire la conversione. Nell'esempio seguente viene adottato l'approccio OneHotEncoder
.
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Questa azione comporta un nuovo dataframe con tutte le colonne nel formato corretto per eseguire il training di un modello.
Eseguire il training di un modello di regressione logistica
La prima attività consiste nel suddividere il set di dati in un set di training e in un set di test o di convalida.
# Decide on the split between training and testing data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Ora che sono presenti due dataframe, l'attività successiva consiste nel creare la formula del modello ed eseguirla nel dataframe di training. È quindi possibile convalidare il dataframe di test. Sperimentare versioni diverse della formula del modello per visualizzare l'impatto di combinazioni diverse.
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
L'output di questa cella è:
Area under ROC = 0.9749430523917996
Creare una rappresentazione visiva della stima
È ora possibile creare una visualizzazione finale per interpretare i risultati del modello. Una curva ROC è un modo per esaminare il risultato.
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
Passaggi successivi
- Usare esempi di intelligenza artificiale per creare modelli di Machine Learning: Usare esempi di intelligenza artificiale
- Tenere traccia delle esecuzioni di Machine Learning usando esperimenti: Esperimenti di Machine Learning