Creare un modello di Machine Learning con Apache Spark MLlib

Questo articolo illustra come usare MLlib di Apache Spark per creare un'applicazione di Machine Learning che esegue una semplice analisi predittiva su un set di dati aperto di Azure. Spark offre librerie di apprendimento automatico predefinite. Questo esempio usa la classificazione tramite regressione logistica.

SparkML e MLlib sono librerie Spark di base che offrono molte utilità utili per le attività di Machine Learning, incluse le utilità adatte per:

  • Classificazione
  • Regressione
  • Cluster
  • Modellazione di argomenti
  • Scomposizione di valori singolari e analisi in componenti principali
  • Testing e calcolo ipotetici di statistiche di esempio

Informazioni sulla classificazione e la regressione logistica

La classificazione, un'attività comune di apprendimento automatico, è il processo di ordinamento dei dati in categorie. È il processo di un algoritmo di classificazione per capire come assegnare etichette ai dati di input forniti. Ad esempio, è possibile pensare a un algoritmo di Machine Learning che accetta le informazioni sulle scorte come input e dividere le azioni in due categorie: azioni che è consigliabile vendere e azioni che è necessario mantenere.

La regressione logistica è un algoritmo usato per la classificazione. L'API di regressione logistica di Spark è utile per la classificazione binaria, ovvero la classificazione di dati di input in uno di due gruppi. Per altre informazioni sulla regressione logistica, vedere Wikipedia.

In sintesi, il processo di regressione logistica produce una funzione logistica che è possibile usare per stimare la probabilità che un vettore di input appartenga a un gruppo o all'altro.

Esempio di analisi predittiva sui dati dei taxi di New York

Per iniziare, installare azureml-opendatasets. I dati sono disponibili tramite set di dati aperti di Azure. Questo sottoinsieme del set di dati contiene informazioni sulle corse di taxi gialle, tra cui l'ora di inizio e la fine e le posizioni, il costo e altri attributi.

%pip install azureml-opendatasets

Nel resto di questo articolo si userà Apache Spark per eseguire alcune analisi sui dati relativi alle mance dei taxi di New York e quindi sviluppare un modello per stimare se una determinata corsa include o meno una mancia.

Creare un modello di Machine Learning con Apache Spark

  1. Creare un notebook PySpark. Per le istruzioni, vedere Creare un notebook.

  2. Importare i tipi necessari per questo notebook.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. MLflow verrà usato per tenere traccia degli esperimenti di Machine Learning e delle esecuzioni corrispondenti. Se l'assegnazione automatica di Microsoft Fabric è abilitata, vengono acquisite automaticamente le metriche e i parametri corrispondenti.

    import mlflow
    

Costruire il dataframe di input

In questo esempio i dati verranno caricati in un dataframe Pandas e quindi convertirli in un dataframe Apache Spark. Usando questo formato, è possibile applicare altre operazioni apache Spark per pulire e filtrare il set di dati.

  1. Eseguire le righe seguenti per creare un dataframe Spark incollando il codice in una nuova cella. Questo passaggio recupera i dati tramite l'API Open Datasets. È possibile filtrare questi dati in modo da esaminare una finestra di dati specifica. Nell'esempio di codice seguente viene start_date usato e end_date per applicare un filtro che restituisce un singolo mese di dati.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Il codice seguente riduce il set di dati a circa 10.000 righe. Per velocizzare lo sviluppo e il training, verrà eseguito il campionamento del set di dati per il momento.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Successivamente, si vuole esaminare i dati usando il comando predefinito display() . In questo modo è possibile visualizzare facilmente un campione dei dati o esplorare le tendenze nei dati graficamente.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Preparare i dati

La preparazione dei dati è un passaggio fondamentale nel processo di Machine Learning. Comporta la pulizia, la trasformazione e l'organizzazione dei dati non elaborati per renderli adatti per l'analisi e la modellazione. Nel codice seguente vengono eseguiti diversi passaggi di preparazione dei dati:

  • Rimuovere outlier e valori non corretti filtrando il set di dati
  • Rimuovere colonne non necessarie per il training del modello
  • Creare nuove colonne dai dati non elaborati
  • Generare un'etichetta per determinare se ci sarà una mancia o meno per la corsa taxi specificata
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Si eseguirà quindi un secondo passaggio sui dati per aggiungere le funzionalità finali.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Creare un modello di regressione logistica

L'attività finale consiste nel convertire i dati etichettati in un formato che può essere analizzato tramite la regressione logistica. L'input di un algoritmo di regressione logistica deve essere un set di coppie etichetta/vettore di funzionalità, in cui il vettore di funzionalità è un vettore di numeri che rappresentano il punto di input.

È quindi necessario convertire le colonne categorica in numeri. In particolare, è necessario convertire le trafficTimeBins colonne e weekdayString in rappresentazioni integer. Esistono più approcci per eseguire la conversione. Nell'esempio seguente viene adottato l'approccio OneHotEncoder .

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Questa azione comporta un nuovo dataframe con tutte le colonne nel formato corretto per eseguire il training di un modello.

Eseguire il training di un modello di regressione logistica

La prima attività consiste nel suddividere il set di dati in un set di training e in un set di test o di convalida.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Ora che sono presenti due dataframe, l'attività successiva consiste nel creare la formula del modello ed eseguirla sul dataframe di training. È quindi possibile eseguire la convalida rispetto al dataframe di test. Sperimentare versioni diverse della formula del modello per visualizzare l'impatto di combinazioni diverse.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

L'output di questa cella è:

Area under ROC = 0.9749430523917996

Creare una rappresentazione visiva della stima

È ora possibile costruire una visualizzazione finale per interpretare i risultati del modello. Una curva ROC è un modo per esaminare il risultato.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graph that shows the ROC curve for logistic regression in the tip model.