Crear un modelo de Machine Learning con Apache Spark MLlib
En este artículo, aprenderá a usar MLlib de Apache Spark para crear una aplicación de aprendizaje automático para efectuar análisis predictivos simples en un conjunto de datos abierto de Azure. Spark proporciona bibliotecas de aprendizaje automático integradas. En este ejemplo se usa la clasificación a través de la regresión logística.
SparkML y MLlib son bibliotecas básicas de Spark que proporcionan varias utilidades que resultan prácticas para las tareas de aprendizaje automático, incluidas las utilidades adecuadas para:
- clasificación
- Regresión
- Agrupación en clústeres
- Modelado de tema
- Descomposición en valores singulares (SVD) y Análisis de los componentes principales (PCA)
- Comprobación de hipótesis y cálculo de estadísticas de ejemplo
Comprensión de la clasificación y la regresión logística
Clasificación, una tarea habitual en el aprendizaje automático, es el proceso de ordenación de datos de entrada en categorías. Es trabajo de un algoritmo de clasificación averiguar cómo asignar etiquetas a los datos de entrada que se proporcionan. Por ejemplo, piense en un algoritmo de aprendizaje automático que acepta información bursátil como entrada y divide las existencias en dos categorías: acciones que se deberían vender y acciones que se deberían conservar.
La regresión logística es un algoritmo que puede usar para la clasificación. La API de regresión logística de Spark es útil para la clasificación binaria, o para la clasificación de datos de entrada en uno de los dos grupos. Para más información acerca de la regresión logística, consulte la Wikipedia.
En resumen, el proceso de regresión logística genera una función logística que se puede usar para predecir la probabilidad de que un vector de entrada pertenezca a un grupo o al otro.
Ejemplo de análisis predictivo en los datos de los taxis de Nueva York
Para empezar, instale azureml-opendatasets
. Los datos están disponibles a través de Azure Open Datasets. Este subconjunto de datos contiene información sobre los trayectos de los taxis amarillos, como la hora y la ubicación de inicio y fin, el costo y otros atributos.
%pip install azureml-opendatasets
En el resto de este artículo, utilizaremos Apache Spark para realizar algunos análisis sobre los datos de propinas de los taxis de Nueva York y, a continuación, desarrollaremos un modelo para predecir si un trayecto concreto incluye o no una propina.
Creación de un modelo de Machine Learning de Apache Spark
Crear un cuaderno de PySpark. Para obtener instrucciones, consulte Creación de un cuaderno.
Importar los tipos necesarios para este cuaderno.
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluator
Usaremos MLflow para realizar un seguimiento de los experimentos de aprendizaje automático y las ejecuciones correspondientes. Si el registro automático de Microsoft Fabric está habilitado, se capturan automáticamente las métricas y los parámetros correspondientes.
import mlflow
Construcción de una trama de datos de entrada
En este ejemplo, cargaremos los datos en dataframe de Pandas y, a continuación, los convertiremos en un dataframe de Apache Spark. Con este formato, se pueden aplicar otras operaciones de Apache Spark para limpiar y filtrar el conjunto de datos.
Ejecute las líneas siguientes para crear un marco de datos de Spark pegando el código en una nueva celda. Se recupera este paso mediante la API de Open Datasets. Podemos filtrar estos datos para examinar una ventana específica de datos. En el ejemplo de código siguiente se usa
start_date
yend_date
para aplicar un filtro que devuelve un mes único de datos.from azureml.opendatasets import NycTlcYellow end_date = parser.parse('2018-06-06') start_date = parser.parse('2018-05-01') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) nyc_tlc_pd = nyc_tlc.to_pandas_dataframe() nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
El código siguiente reduce el conjunto de datos a aproximadamente 10 000 filas. Para acelerar el desarrollo y el entrenamiento, por ahora haremos un muestreo de nuestro conjunto de datos.
# To make development easier, faster, and less expensive, sample down for now sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
A continuación, queremos echar un vistazo a nuestros datos mediante el comando integrado
display()
. Esto nos permite ver fácilmente una muestra de los datos o explorar las tendencias de los datos gráficamente.#sampled_taxi_df.show(10) display(sampled_taxi_df.limit(10))
Preparación de los datos
La preparación de datos es un paso fundamental en el proceso de aprendizaje automático. Consiste en limpiar, transformar y organizar los datos sin procesar para hacerlos aptos para el análisis y la modelización. En el código siguiente, se realizan varios pasos de preparación de datos:
- Eliminar valores atípicos e incorrectos mediante el filtrado del conjunto de datos
- Quitar columnas que no son necesarias para el entrenamiento del modelo
- Crear nuevas columnas a partir de los datos sin procesar
- Generar una etiqueta para determinar si habrá propina o no para el trayecto de taxi determinado
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Después, realizaremos un segundo paso sobre los datos para agregar las características finales.
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Crear un modelo de regresión logística
La última tarea consiste en convertir los datos etiquetados a un formato que se pueda analizar mediante la regresión logística. La entrada a un algoritmo de regresión logística debe ser un conjunto de pares de vector de etiqueta-característica, donde el vector de característica es un vector de números que representa el punto de entrada.
Por lo tanto, es necesario convertir las columnas de categorías en números. En concreto, debe convertir las columnas trafficTimeBins
y weekdayString
en representaciones de enteros. Hay varios enfoques para realizar la conversión. En el ejemplo siguiente, se adopta el enfoque de OneHotEncoder
.
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Esta acción da como resultado un nuevo marco de datos con todas las columnas en el formato correcto para entrenar un modelo.
Entrenamiento de un modelo de regresión logística
La primera tarea consiste en dividir el conjunto de filas en un conjunto de entrenamiento y un conjunto de pruebas o validación.
# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Ahora que hay dos marcos de datos, la siguiente tarea consiste en crear la fórmula del modelo y ejecutarla en el marco de datos de entrenamiento. Después, puede realizar la validación en el DataFrame de la prueba. Experimente con versiones diferentes de la fórmula del modelo para ver el impacto de las distintas combinaciones.
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
La salida de esta celda es:
Area under ROC = 0.9749430523917996
Creación de una representación visual de la predicción
Ahora puede construir una visualización final para interpretar los resultados del modelo. Una curva ROC es una manera de revisar el resultado.
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
Contenido relacionado
- Uso de ejemplos de IA para crear modelos de aprendizaje automático: Uso de ejemplos de IA
- Seguimiento de ejecuciones de aprendizaje automático mediante experimentos: Experimentos de Machine Learning
Comentarios
https://aka.ms/ContentUserFeedback.
Próximamente: A lo largo de 2024 iremos eliminando gradualmente GitHub Issues como mecanismo de comentarios sobre el contenido y lo sustituiremos por un nuevo sistema de comentarios. Para más información, vea:Enviar y ver comentarios de