Compartir vía


Crear un modelo de Machine Learning con Apache Spark MLlib

En este artículo, aprenderá a usar MLlib de Apache Spark para crear una aplicación de aprendizaje automático para efectuar análisis predictivos simples en un conjunto de datos abierto de Azure. Spark proporciona bibliotecas de aprendizaje automático integradas. En este ejemplo se usa la clasificación a través de la regresión logística.

Las bibliotecas básicas SparkML y MLlib de Spark proporcionan varias utilidades que resultan prácticas para las tareas de aprendizaje automático. Estas utilidades son adecuadas para:

  • clasificación
  • Agrupación en clústeres
  • Comprobación de hipótesis y cálculo de estadísticas de ejemplo
  • Regresión
  • Descomposición en valores singulares (SVD) y Análisis de los componentes principales (PCA)
  • Modelado de tema

Comprensión de la clasificación y la regresión logística

Clasificación, una tarea habitual en el aprendizaje automático, implica la ordenación de datos de entrada en categorías. Un algoritmo de clasificación debe averiguar cómo asignar etiquetas a los datos de entrada proporcionados. Por ejemplo, un algoritmo de aprendizaje automático puede aceptar información bursátil como entrada y divide las existencias en dos categorías: acciones que se deberían vender y acciones que se deberían conservar.

El algoritmo de regresión logística es útil para la clasificación. La API de regresión logística de Spark es útil para la clasificación binaria de datos de entrada en uno de los dos grupos. Para más información acerca de la regresión logística, consulte la Wikipedia.

La regresión logística genera una función logística que se puede usar para predecir la probabilidad de que un vector de entrada pertenezca a un grupo o al otro.

Ejemplo de análisis predictivo de los datos de los taxis de Nueva York

En primer lugar, instale azureml-opendatasets. Los datos están disponibles a través del recurso Azure Open Datasets. Este subconjunto de datos contiene información sobre los trayectos de los taxis amarillos, como la hora y la ubicación de inicio y fin, el costo y otros atributos.

%pip install azureml-opendatasets

En el resto de este artículo, utilizaremos Apache Spark para realizar en primer lugar algunos análisis sobre los datos de propinas de los taxis de Nueva York y, a continuación, desarrollaremos un modelo para predecir si un trayecto concreto incluye o no una propina.

Creación de un modelo de Machine Learning de Apache Spark

  1. Crear un cuaderno de PySpark. Para obtener más información, visite Crear un cuaderno.

  2. Importar los tipos necesarios para este cuaderno.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Usaremos MLflow para realizar un seguimiento de los experimentos de aprendizaje automático y las ejecuciones correspondientes. Si el registro automático de Microsoft Fabric está habilitado, se capturan automáticamente las métricas y los parámetros correspondientes.

    import mlflow
    

Construcción de una trama de datos de entrada

En este ejemplo, se cargan los datos en dataframe de Pandas y, a continuación, se convierten en un dataframe de Apache Spark. Con ese formato, se pueden aplicar otras operaciones de Apache Spark para limpiar y filtrar el conjunto de datos.

  1. Pegue estas líneas en una nueva celda y ejecútelas para crear un dataframe de Spark. Se recupera este paso mediante la API de Open Datasets. Podemos filtrar estos datos para examinar una ventana específica de datos. En el ejemplo de código se usa start_date y end_date para aplicar un filtro que devuelve un mes único de datos.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Este código reduce el conjunto de datos a aproximadamente 10 000 filas. Para acelerar el desarrollo y el entrenamiento, por ahora se realizará un muestreo de código de nuestro conjunto de datos.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Queremos echar un vistazo a nuestros datos mediante el comando integrado display(). Con este comando, podemos ver fácilmente una muestra de los datos o explorar las tendencias de los datos gráficamente.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Preparación de los datos

La preparación de datos es un paso fundamental en el proceso de aprendizaje automático. Consiste en limpiar, transformar y organizar los datos sin procesar para hacerlos aptos para el análisis y la modelización. En el ejemplo de código, se realizan varios pasos de preparación de datos:

  • Filtrar el conjunto de datos para eliminar valores atípicos e incorrectos
  • Quitar columnas que no son necesarias para el entrenamiento del modelo
  • Crear nuevas columnas a partir de los datos sin procesar
  • Generar una etiqueta para determinar si un viaje de taxi determinado implica una propina
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Después, realice un segundo paso sobre los datos para agregar las características finales.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Crear un modelo de regresión logística

La última tarea consiste en convertir los datos etiquetados a un formato que se pueda analizar con la regresión logística. La entrada a un algoritmo de regresión logística debe tener una estructura de pares de vector de etiqueta-característica, donde el vector de característica es un vector de números que representa el punto de entrada.

En función de los requisitos finales de la tarea, debemos convertir las columnas categóricas en números. En concreto, debemos convertir las columnas trafficTimeBins y weekdayString en representaciones de enteros. Tenemos muchas opciones disponibles para controlar este requisito. Este ejemplo implica el enfoque OneHotEncoder:

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Esta acción da como resultado un nuevo marco de datos con todas las columnas en el formato correcto para entrenar un modelo.

Entrenamiento de un modelo de regresión logística

La primera tarea consiste en dividir el conjunto de filas en un conjunto de entrenamiento y un conjunto de pruebas o validación.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Una vez que tengamos dos DataFrames, debemos crear la fórmula del modelo y ejecutarla en el dataframe de entrenamiento. Después, podemos realizar la validación en el DataFrame de la prueba. Experimente con versiones diferentes de la fórmula del modelo para ver los efectos de las distintas combinaciones.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

La celda genera:

Area under ROC = 0.9749430523917996

Creación de una representación visual de la predicción

Ahora podemos construir una visualización final para interpretar los resultados del modelo. Ciertamente, una curva ROC puede presentar el resultado.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Gráfico que muestra la curva ROC para la regresión logística en el modelo de propina.