Condividi tramite


Creare un modello di Machine Learning con Apache Spark MLlib

Questo articolo illustra come usare Apache Spark MLlib per creare un'applicazione di Machine Learning che gestisce un'analisi predittiva semplice in un set di dati aperto di Azure. Spark offre librerie di apprendimento automatico predefinite. Questo esempio usa la classificazione tramite regressione logistica.

Le librerie SparkML e MLlib Spark di base offrono molte utilità utili per le attività di Machine Learning. Queste utilità sono adatte per:

  • Classificazione
  • Cluster
  • Testing e calcolo ipotetici di statistiche di esempio
  • Regressione
  • Scomposizione di valori singolari e analisi in componenti principali
  • Modellazione di argomenti

Informazioni sulla classificazione e la regressione logistica

La classificazione, un'attività di Machine Learning più diffusa, prevede l'ordinamento dei dati di input in categorie. Un algoritmo di classificazione deve determinare come assegnare etichette ai dati di input forniti. Ad esempio, un algoritmo di Machine Learning potrebbe accettare informazioni sulle azioni come input e dividere il titolo in due categorie: azioni da vendere e azioni da conservare.

L'algoritmo di regressione logistica è utile per la classificazione. L'API di regressione logistica Spark è utile per la classificazione binaria dei dati di input in uno dei due gruppi. Per altre informazioni sulla regressione logistica, vedere Wikipedia.

La regressione logistica produce una funzione logistica in grado di stimare la probabilità che un vettore di input appartenga a un gruppo o all'altro.

Esempio di analisi predittiva dei dati dei taxi di New York

Installare prima di tutto azureml-opendatasets. I dati sono disponibili tramite la risorsa Set di dati aperti di Azure. Questo sottoinsieme di set di dati ospita informazioni sulle corse in taxi gialle, inclusi gli orari di inizio, le ore di fine, le posizioni di inizio, le posizioni di fine, i costi delle corse e altri attributi.

%pip install azureml-opendatasets

Il resto di questo articolo si basa su Apache Spark per eseguire prima un'analisi sui dati relativi alle mance dei taxi di New York e quindi sviluppare un modello per stimare se una determinata corsa include o meno una mancia.

Creare un modello di Machine Learning con Apache Spark

  1. Creare un notebook PySpark. Per altre informazioni, vedere Creare un notebook.

  2. Importare i tipi necessari per questo notebook.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Si userà MLflow per tenere traccia degli esperimenti di Machine Learning e delle esecuzioni corrispondenti. Se l'assegnazione automatica di Microsoft Fabric è abilitata, vengono acquisite automaticamente le metriche e i parametri corrispondenti.

    import mlflow
    

Creare il DataFrame di input

Questo esempio carica i dati in un dataframe Pandas e quindi lo converte in un dataframe Apache Spark. In questo formato, è possibile applicare altre operazioni apache Spark per pulire e filtrare il set di dati.

  1. Incollare queste righe in una nuova cella ed eseguirle per creare un dataframe Spark. Questo passaggio consente di recuperare dati tramite l'API dei set di dati aperti di Azure. È possibile filtrare questi dati in modo da esaminare una finestra di dati specifica. Nell'esempio di codice viene start_date usato e end_date per applicare un filtro che restituisce un singolo mese di dati.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Questo codice riduce il set di dati a circa 10.000 righe. Per velocizzare lo sviluppo e il training, per il momento il set di dati viene ridotto.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Si vogliono esaminare i dati usando il comando predefinito display() . Con questo comando è possibile visualizzare facilmente un esempio di dati o esplorare graficamente le tendenze nei dati.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Preparare i dati

La preparazione dei dati è un passaggio fondamentale nel processo di Machine Learning. Implica la pulizia, la trasformazione e l'organizzazione dei dati non elaborati, per renderli adatti per l'analisi e la modellazione. In questo esempio di codice vengono eseguiti diversi passaggi di preparazione dei dati:

  • Filtrare il set di dati per rimuovere outlier e valori non corretti
  • Rimuovere colonne non necessarie per il training del modello
  • Creare nuove colonne dai dati non elaborati
  • Generare un'etichetta per determinare se una determinata corsa taxi comporta una mancia
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Successivamente, eseguire un secondo passaggio sui dati per aggiungere le funzionalità finali.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Creare un modello di regressione logistica

L'attività finale converte i dati etichettati in un formato che la regressione logistica può gestire. L'input di un algoritmo di regressione logistica deve avere una struttura di coppie etichetta/vettore di funzionalità, in cui il vettore di funzionalità è un vettore di numeri che rappresentano il punto di input.

In base ai requisiti finali dell'attività, è necessario convertire le colonne categorica in numeri. In particolare, è necessario convertire le trafficTimeBins colonne e weekdayString in rappresentazioni integer. Sono disponibili molte opzioni per gestire questo requisito. Questo esempio prevede l'approccio OneHotEncoder seguente:

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Questa azione comporta un nuovo dataframe con tutte le colonne nel formato appropriato per eseguire il training di un modello.

Eseguire il training di un modello di regressione logistica

La prima attività suddivide il set di dati in un set di training e un set di test o convalida.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Dopo aver creato due dataframe, è necessario creare la formula del modello ed eseguirla sul dataframe di training. È quindi possibile eseguire la convalida rispetto al dataframe di test. Sperimentare versioni diverse della formula del modello per visualizzare gli effetti di combinazioni diverse.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Output della cella:

Area under ROC = 0.9749430523917996

Creare una rappresentazione visiva della stima

È ora possibile creare una visualizzazione finale per interpretare i risultati del modello. Una curva ROC può certamente presentare il risultato.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Grafico che mostra la curva ROC per la regressione logistica nel modello di mancia.