Créer un modèle Machine Learning avec Apache Spark MLlib

Cet article explique comment utiliser Apache Spark MLlib pour créer une application d’apprentissage automatique qui effectue une analyse prédictive simple sur un jeu de données ouvert Azure. Spark fournit des bibliothèques d’apprentissage automatique intégrées. Cet exemple utilise un classification par régression logistique.

SparkML et MLlib sont des bibliothèques Spark de base qui fournissent de nombreux utilitaires utiles pour les tâches de machine learning. Certains de ces utilitaires conviennent pour les tâches suivantes :

  • classification ;
  • régression ;
  • Clustering
  • Modélisation de rubrique
  • Décomposition de valeur singulière (SVD) et analyse des composants principaux (PCA)
  • Hypothèse de test et de calcul des exemples de statistiques

Comprendre la classification et la régression logistique

Une classification, tâche de Machine Learning très courante, est le processus de tri de données d’entrée par catégories. La fonction d’un algorithme de classification consiste à déterminer comment affecter des étiquettes aux données d’entrée que vous fournissez. Par exemple, vous pouvez imaginer un algorithme de machine learning qui accepte des informations de stock en entrée et classe le stock en deux catégories : le stock à vendre et le stock à conserver.

Une régression logistique est un algorithme que vous utilisez pour effectuer une classification. L’API de régression logistique de Spark est utile pour la classification binaireou pour classer les données d’entrée dans un des deux groupes. Pour plus d’informations sur la régression logistique, consultez Wikipedia.

En résumé, le processus de régression logistique génère une fonction logistique dont vous pouvez vous servir pour prédire la probabilité qu’un vecteur d’entrée appartient à un groupe ou à l’autre.

Exemple d’analyse prédictive sur les données des taxis de New York

Pour commencer, installez azureml-opendatasets. Les données sont disponibles dans Azure Open Datasets. Ce sous-ensemble du jeu de données contient des informations sur les courses de taxis jaunes, notamment les heures et les lieux de départ et d’arrivée, le coût et d’autres attributs.

%pip install azureml-opendatasets

Dans le reste de cet article, nous allons utiliser Apache Spark pour effectuer une analyse sur les données de pourboires laissés aux taxis de New York, puis développer un modèle pour prédire si un voyage particulier inclut ou non un pourboire.

Créer un modèle de machine learning Apache Spark

  1. Créez un notebook PySpark. Pour obtenir des instructions, consultez Créer un notebook.

  2. Importez les types nécessaires pour ce notebook.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Nous utiliserons MLflow pour suivre nos expériences de Machine Learning et les exécutions correspondantes. Si la journalisation automatique Microsoft Fabric est activée, les métriques et paramètres correspondants sont automatiquement capturés.

    import mlflow
    

Construire le DataFrame d’entrée

Dans cet exemple, nous allons charger les données dans un dataframe Pandas, puis convertir ce dernier en dataframe Apache Spark. Grâce à ce format, nous pouvons appliquer d’autres opérations Apache Spark pour nettoyer et filtrer le jeu de données.

  1. Exécutez les lignes suivantes pour créer un DataFrame Spark en collant le code dans une nouvelle cellule. Cette étape récupère les données via l’API Open Datasets. Nous pouvons filtrer ces données pour examiner une fenêtre de données spécifique. L’exemple de code suivant utilise start_date et end_date pour appliquer un filtre qui retourne un seul mois de données.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Le code suivant réduit le jeu de données à environ 10 000 lignes. Pour accélérer le développement et l’entraînement, nous allons échantillonner notre jeu de données pour l’instant.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Ensuite, nous examinerons les données à l’aide de la commande intégrée display(). Cela nous permettra d’afficher facilement un échantillon des données ou d’explorer les tendances dans les données par graphique.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Préparer les données

La préparation des données est une étape cruciale du processus de Machine Learning. Cela passe par le nettoyage, la transformation et l’organisation des données brutes pour les rendre adaptées à l’analyse et à la modélisation. Dans le code suivant, plusieurs étapes de préparation des données sont effectuées :

  • Suppression des valeurs hors norme et des valeurs incorrectes en filtrant le jeu de données
  • Suppression des colonnes non nécessaires à l’entraînement de modèle
  • Création de colonnes à partir des données brutes
  • Génération d’une étiquette pour déterminer s’il y aura ou non un pourboire pour la course de taxi donnée
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Vous effectuerez ensuite une deuxième passe sur les données pour ajouter les caractéristiques finales.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Créer un modèle de régression logistique

La dernière tâche consiste à convertir les données étiquetées en format analysable par régression logistique. L’entrée dans un algorithme de régression logistique doit être un jeu de paires de vecteurs étiquette/caractéristique, où le vecteur caractéristique est un vecteur de nombres qui représentent le point d’entrée.

Vous devez donc convertir les colonnes catégoriques en nombres. Plus précisément, vous devez convertir les colonnes trafficTimeBins et weekdayString en représentations entières. Cette conversion peut être effectuée en suivant plusieurs approches. L’exemple suivant suit l’approche OneHotEncoder.

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Cette action génère un nouveau DataFrame dont les colonnes sont toutes dans un format adapté à l’entraînement d’un modèle.

Entraîner un modèle de régression logistique

La première tâche consiste à diviser le jeu de données en un jeu d’entraînement et un jeu de test ou de validation.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Maintenant qu’il existe deux DataFrames, la tâche suivante consiste à créer la formule du modèle et à l’exécuter sur le DataFrame d’entraînement. Vous pouvez ensuite effectuer une validation par rapport au DataFrame de test. Faites des essais avec différentes versions de la formule du modèle pour voir l’impact de différentes combinaisons.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

La sortie de cette cellule est :

Area under ROC = 0.9749430523917996

Créer une représentation visuelle de la prédiction

Vous pouvez maintenant construire une visualisation finale pour interpréter les résultats du modèle. La courbe ROC est une façon d’examiner les résultats.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graph that shows the ROC curve for logistic regression in the tip model.