Créer un modèle Machine Learning avec Apache Spark MLlib
Cet article explique comment utiliser Apache Spark MLlib pour créer une application d’apprentissage automatique qui effectue une analyse prédictive simple sur un jeu de données ouvert Azure. Spark fournit des bibliothèques d’apprentissage automatique intégrées. Cet exemple utilise un classification par régression logistique.
SparkML et MLlib sont des bibliothèques Spark de base qui fournissent de nombreux utilitaires utiles pour les tâches de machine learning. Certains de ces utilitaires conviennent pour les tâches suivantes :
- classification ;
- régression ;
- Clustering
- Modélisation de rubrique
- Décomposition de valeur singulière (SVD) et analyse des composants principaux (PCA)
- Hypothèse de test et de calcul des exemples de statistiques
Comprendre la classification et la régression logistique
Une classification, tâche de Machine Learning très courante, est le processus de tri de données d’entrée par catégories. La fonction d’un algorithme de classification consiste à déterminer comment affecter des étiquettes aux données d’entrée que vous fournissez. Par exemple, vous pouvez imaginer un algorithme de machine learning qui accepte des informations de stock en entrée et classe le stock en deux catégories : le stock à vendre et le stock à conserver.
Une régression logistique est un algorithme que vous utilisez pour effectuer une classification. L’API de régression logistique de Spark est utile pour la classification binaireou pour classer les données d’entrée dans un des deux groupes. Pour plus d’informations sur la régression logistique, consultez Wikipedia.
En résumé, le processus de régression logistique génère une fonction logistique dont vous pouvez vous servir pour prédire la probabilité qu’un vecteur d’entrée appartient à un groupe ou à l’autre.
Exemple d’analyse prédictive sur les données des taxis de New York
Pour commencer, installez azureml-opendatasets
. Les données sont disponibles dans Azure Open Datasets. Ce sous-ensemble du jeu de données contient des informations sur les courses de taxis jaunes, notamment les heures et les lieux de départ et d’arrivée, le coût et d’autres attributs.
%pip install azureml-opendatasets
Dans le reste de cet article, nous allons utiliser Apache Spark pour effectuer une analyse sur les données de pourboires laissés aux taxis de New York, puis développer un modèle pour prédire si un voyage particulier inclut ou non un pourboire.
Créer un modèle de machine learning Apache Spark
Créez un notebook PySpark. Pour obtenir des instructions, consultez Créer un notebook.
Importez les types nécessaires pour ce notebook.
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluator
Nous utiliserons MLflow pour suivre nos expériences de Machine Learning et les exécutions correspondantes. Si la journalisation automatique Microsoft Fabric est activée, les métriques et paramètres correspondants sont automatiquement capturés.
import mlflow
Construire le DataFrame d’entrée
Dans cet exemple, nous allons charger les données dans un dataframe Pandas, puis convertir ce dernier en dataframe Apache Spark. Grâce à ce format, nous pouvons appliquer d’autres opérations Apache Spark pour nettoyer et filtrer le jeu de données.
Exécutez les lignes suivantes pour créer un DataFrame Spark en collant le code dans une nouvelle cellule. Cette étape récupère les données via l’API Open Datasets. Nous pouvons filtrer ces données pour examiner une fenêtre de données spécifique. L’exemple de code suivant utilise
start_date
etend_date
pour appliquer un filtre qui retourne un seul mois de données.from azureml.opendatasets import NycTlcYellow end_date = parser.parse('2018-06-06') start_date = parser.parse('2018-05-01') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) nyc_tlc_pd = nyc_tlc.to_pandas_dataframe() nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
Le code suivant réduit le jeu de données à environ 10 000 lignes. Pour accélérer le développement et l’entraînement, nous allons échantillonner notre jeu de données pour l’instant.
# To make development easier, faster, and less expensive, sample down for now sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
Ensuite, nous examinerons les données à l’aide de la commande intégrée
display()
. Cela nous permettra d’afficher facilement un échantillon des données ou d’explorer les tendances dans les données par graphique.#sampled_taxi_df.show(10) display(sampled_taxi_df.limit(10))
Préparer les données
La préparation des données est une étape cruciale du processus de Machine Learning. Cela passe par le nettoyage, la transformation et l’organisation des données brutes pour les rendre adaptées à l’analyse et à la modélisation. Dans le code suivant, plusieurs étapes de préparation des données sont effectuées :
- Suppression des valeurs hors norme et des valeurs incorrectes en filtrant le jeu de données
- Suppression des colonnes non nécessaires à l’entraînement de modèle
- Création de colonnes à partir des données brutes
- Génération d’une étiquette pour déterminer s’il y aura ou non un pourboire pour la course de taxi donnée
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Vous effectuerez ensuite une deuxième passe sur les données pour ajouter les caractéristiques finales.
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Créer un modèle de régression logistique
La dernière tâche consiste à convertir les données étiquetées en format analysable par régression logistique. L’entrée dans un algorithme de régression logistique doit être un jeu de paires de vecteurs étiquette/caractéristique, où le vecteur caractéristique est un vecteur de nombres qui représentent le point d’entrée.
Vous devez donc convertir les colonnes catégoriques en nombres. Plus précisément, vous devez convertir les colonnes trafficTimeBins
et weekdayString
en représentations entières. Cette conversion peut être effectuée en suivant plusieurs approches. L’exemple suivant suit l’approche OneHotEncoder
.
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Cette action génère un nouveau DataFrame dont les colonnes sont toutes dans un format adapté à l’entraînement d’un modèle.
Entraîner un modèle de régression logistique
La première tâche consiste à diviser le jeu de données en un jeu d’entraînement et un jeu de test ou de validation.
# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Maintenant qu’il existe deux DataFrames, la tâche suivante consiste à créer la formule du modèle et à l’exécuter sur le DataFrame d’entraînement. Vous pouvez ensuite effectuer une validation par rapport au DataFrame de test. Faites des essais avec différentes versions de la formule du modèle pour voir l’impact de différentes combinaisons.
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
La sortie de cette cellule est :
Area under ROC = 0.9749430523917996
Créer une représentation visuelle de la prédiction
Vous pouvez maintenant construire une visualisation finale pour interpréter les résultats du modèle. La courbe ROC est une façon d’examiner les résultats.
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
Contenu connexe
- Utiliser des exemples d’IA pour créer des modèles d’apprentissage automatique : Utiliser des exemples d’IA
- Suivre les exécutions d’apprentissage automatique à l’aide d’Expériences : Expériences d’apprentissage automatique
Commentaires
https://aka.ms/ContentUserFeedback.
Bientôt disponible : Tout au long de 2024, nous allons supprimer progressivement GitHub Issues comme mécanisme de commentaires pour le contenu et le remplacer par un nouveau système de commentaires. Pour plus d’informations, consultezEnvoyer et afficher des commentaires pour