Share via


Crear un modelo de Machine Learning con Apache Spark MLlib

En este artículo, aprenderá a usar MLlib de Apache Spark para crear una aplicación de aprendizaje automático para efectuar análisis predictivos simples en un conjunto de datos abierto de Azure. Spark proporciona bibliotecas de aprendizaje automático integradas. En este ejemplo se usa la clasificación a través de la regresión logística.

SparkML y MLlib son bibliotecas básicas de Spark que proporcionan varias utilidades que resultan prácticas para las tareas de aprendizaje automático, incluidas las utilidades adecuadas para:

  • clasificación
  • Regresión
  • Agrupación en clústeres
  • Modelado de tema
  • Descomposición en valores singulares (SVD) y Análisis de los componentes principales (PCA)
  • Comprobación de hipótesis y cálculo de estadísticas de ejemplo

Comprensión de la clasificación y la regresión logística

Clasificación, una tarea habitual en el aprendizaje automático, es el proceso de ordenación de datos de entrada en categorías. Es trabajo de un algoritmo de clasificación averiguar cómo asignar etiquetas a los datos de entrada que se proporcionan. Por ejemplo, piense en un algoritmo de aprendizaje automático que acepta información bursátil como entrada y divide las existencias en dos categorías: acciones que se deberían vender y acciones que se deberían conservar.

La regresión logística es un algoritmo que puede usar para la clasificación. La API de regresión logística de Spark es útil para la clasificación binaria, o para la clasificación de datos de entrada en uno de los dos grupos. Para más información acerca de la regresión logística, consulte la Wikipedia.

En resumen, el proceso de regresión logística genera una función logística que se puede usar para predecir la probabilidad de que un vector de entrada pertenezca a un grupo o al otro.

Ejemplo de análisis predictivo en los datos de los taxis de Nueva York

Para empezar, instale azureml-opendatasets. Los datos están disponibles a través de Azure Open Datasets. Este subconjunto de datos contiene información sobre los trayectos de los taxis amarillos, como la hora y la ubicación de inicio y fin, el costo y otros atributos.

%pip install azureml-opendatasets

En el resto de este artículo, utilizaremos Apache Spark para realizar algunos análisis sobre los datos de propinas de los taxis de Nueva York y, a continuación, desarrollaremos un modelo para predecir si un trayecto concreto incluye o no una propina.

Creación de un modelo de Machine Learning de Apache Spark

  1. Crear un cuaderno de PySpark. Para obtener instrucciones, consulte Creación de un cuaderno.

  2. Importar los tipos necesarios para este cuaderno.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Usaremos MLflow para realizar un seguimiento de los experimentos de aprendizaje automático y las ejecuciones correspondientes. Si el registro automático de Microsoft Fabric está habilitado, se capturan automáticamente las métricas y los parámetros correspondientes.

    import mlflow
    

Construcción de una trama de datos de entrada

En este ejemplo, cargaremos los datos en dataframe de Pandas y, a continuación, los convertiremos en un dataframe de Apache Spark. Con este formato, se pueden aplicar otras operaciones de Apache Spark para limpiar y filtrar el conjunto de datos.

  1. Ejecute las líneas siguientes para crear un marco de datos de Spark pegando el código en una nueva celda. Se recupera este paso mediante la API de Open Datasets. Podemos filtrar estos datos para examinar una ventana específica de datos. En el ejemplo de código siguiente se usa start_date y end_date para aplicar un filtro que devuelve un mes único de datos.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. El código siguiente reduce el conjunto de datos a aproximadamente 10 000 filas. Para acelerar el desarrollo y el entrenamiento, por ahora haremos un muestreo de nuestro conjunto de datos.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. A continuación, queremos echar un vistazo a nuestros datos mediante el comando integrado display(). Esto nos permite ver fácilmente una muestra de los datos o explorar las tendencias de los datos gráficamente.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Preparación de los datos

La preparación de datos es un paso fundamental en el proceso de aprendizaje automático. Consiste en limpiar, transformar y organizar los datos sin procesar para hacerlos aptos para el análisis y la modelización. En el código siguiente, se realizan varios pasos de preparación de datos:

  • Eliminar valores atípicos e incorrectos mediante el filtrado del conjunto de datos
  • Quitar columnas que no son necesarias para el entrenamiento del modelo
  • Crear nuevas columnas a partir de los datos sin procesar
  • Generar una etiqueta para determinar si habrá propina o no para el trayecto de taxi determinado
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Después, realizaremos un segundo paso sobre los datos para agregar las características finales.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Crear un modelo de regresión logística

La última tarea consiste en convertir los datos etiquetados a un formato que se pueda analizar mediante la regresión logística. La entrada a un algoritmo de regresión logística debe ser un conjunto de pares de vector de etiqueta-característica, donde el vector de característica es un vector de números que representa el punto de entrada.

Por lo tanto, es necesario convertir las columnas de categorías en números. En concreto, debe convertir las columnas trafficTimeBins y weekdayString en representaciones de enteros. Hay varios enfoques para realizar la conversión. En el ejemplo siguiente, se adopta el enfoque de OneHotEncoder.

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Esta acción da como resultado un nuevo marco de datos con todas las columnas en el formato correcto para entrenar un modelo.

Entrenamiento de un modelo de regresión logística

La primera tarea consiste en dividir el conjunto de filas en un conjunto de entrenamiento y un conjunto de pruebas o validación.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Ahora que hay dos marcos de datos, la siguiente tarea consiste en crear la fórmula del modelo y ejecutarla en el marco de datos de entrenamiento. Después, puede realizar la validación en el DataFrame de la prueba. Experimente con versiones diferentes de la fórmula del modelo para ver el impacto de las distintas combinaciones.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

La salida de esta celda es:

Area under ROC = 0.9749430523917996

Creación de una representación visual de la predicción

Ahora puede construir una visualización final para interpretar los resultados del modelo. Una curva ROC es una manera de revisar el resultado.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graph that shows the ROC curve for logistic regression in the tip model.