Crear un modelo de Machine Learning con Apache Spark MLlib

En este artículo, aprenderá a usar Apache Spark MLlib para crear una aplicación de aprendizaje automático que controle el análisis predictivo en un conjunto de datos abierto Azure. Spark proporciona bibliotecas de aprendizaje automático integradas. En este ejemplo se usa la clasificación a través de la regresión logística.

En este tutorial se describen estos pasos:

  • Configura el cuaderno y las importaciones
  • Carga y muestra de datos de taxis de Nueva York
  • Preparación e ingeniería de características
  • Codificación de características de categorías
  • Entrenamiento del modelo de regresión logística
  • Evaluación y visualización de los resultados

Las bibliotecas básicas SparkML y MLlib de Spark proporcionan varias utilidades que resultan prácticas para las tareas de aprendizaje automático. Estas utilidades son adecuadas para:

  • clasificación
  • Agrupación en clústeres
  • Comprobación de hipótesis y cálculo de estadísticas de ejemplo
  • Regresión
  • Descomposición en valores singulares (SVD) y Análisis de los componentes principales (PCA)
  • Modelado de tema

Prerequisites

Comprensión de la clasificación y la regresión logística

Clasificación, una tarea habitual en el aprendizaje automático, implica la ordenación de datos de entrada en categorías. Un algoritmo de clasificación determina cómo asignar etiquetas a los datos de entrada proporcionados. Por ejemplo, un algoritmo de aprendizaje automático podría aceptar información de acciones como entrada y dividir el stock en dos categorías: acciones que debe vender y acciones que debe conservar.

El algoritmo de regresión logística es útil para la clasificación. La API de regresión logística de Spark es útil para la clasificación binaria de datos de entrada en uno de los dos grupos. Para más información acerca de la regresión logística, consulte la Wikipedia.

La regresión logística genera una función logística que predice la probabilidad de que un vector de entrada pertenezca a un grupo u otro.

Ejemplo de análisis predictivo de los datos de los taxis de Nueva York

Los datos están disponibles a través del recurso Azure Open Datasets. Este subconjunto de datos contiene información sobre los trayectos de los taxis amarillos, como la hora y la ubicación de inicio y fin, el costo y otros atributos.

En este tutorial se usa Apache Spark para realizar análisis en los datos de propinas de taxi de Nueva York y desarrollar un modelo para predecir si un viaje determinado incluye una propina.

Creación de un modelo de Machine Learning de Apache Spark

  1. Crear un cuaderno de PySpark. Para obtener más información, consulte Creación de un cuaderno.

    Después de crear el cuaderno, adjúntelo a un lakehouse seleccionando Agregar lakehouse en el panel izquierdo.

  2. Importe los tipos necesarios para este cuaderno. Pegue el código siguiente en la primera celda y ejecútelo.

    import matplotlib.pyplot as plt
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    

    Verificar: La celda se completa sin ImportError. Si ve un error, confirme que el cuaderno usa el entorno de ejecución de PySpark.

  3. Use MLflow para realizar un seguimiento de los experimentos de aprendizaje automático y las ejecuciones correspondientes. Si el registro automático de Microsoft Fabric está habilitado, se capturan automáticamente las métricas y los parámetros correspondientes.

    import mlflow
    

    Verificar: La celda se completa sin errores. Ejecute print(mlflow.__version__) para confirmar que MLflow está disponible.

Construir el DataFrame de entrada

En este ejemplo se cargan los datos del almacenamiento de Azure Open Datasets en un DataFrame de Apache Spark. A continuación, aplique operaciones de Spark para limpiar y filtrar el conjunto de datos.

  1. Pegue el código siguiente en una nueva celda y ejecútelo para crear un dataframe de Spark. Este paso recupera los datos de taxis amarillos de Nueva York filtrados a mayo de 2018.

    blob_account_name = "azureopendatastorage"
    blob_container_name = "nyctlc"
    blob_relative_path = "yellow"
    wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}"
    
    nyc_tlc_df = spark.read.parquet(wasbs_path) \
        .filter((col("tpepPickupDateTime") >= "2018-05-01") & (col("tpepPickupDateTime") < "2018-06-01")) \
        .repartition(20)
    

    Comprobar: ejecute la celda siguiente para confirmar que los datos se cargan correctamente.

    print(f"Loaded {nyc_tlc_df.count()} rows")
    # Expected output: Loaded approximately 9,000,000+ rows
    
  2. Tome una muestra del conjunto de datos para acelerar el desarrollo y el entrenamiento.

    # Sample without replacement to avoid duplicates
    sampled_taxi_df = nyc_tlc_df.sample(False, 0.001, seed=1234)
    

    Comprobar: confirme que el tamaño de la muestra es manejable.

    print(f"Sampled {sampled_taxi_df.count()} rows")
    # Expected output: Sampled approximately 9,000-10,000 rows
    
  3. Vea los datos mediante el comando integrado display() para explorar el ejemplo de datos.

    display(sampled_taxi_df.limit(10))
    

    Comprobar: aparece una tabla con 10 filas que muestra columnas como tpepPickupDateTime, fareAmount, tipAmounty tripDistance.

Preparación de los datos

La preparación de datos es un paso fundamental en el proceso de aprendizaje automático. Implica la limpieza, transformación y organización de datos sin procesar para que sea adecuado para el análisis y el modelado. En esta sección, realice varios pasos de preparación de datos:

  • Filtre el conjunto de datos para quitar valores atípicos e incorrectos.
  • Quite las columnas que no son necesarias para el entrenamiento del modelo.
  • Cree nuevas columnas a partir de los datos sin procesar.
  • Genere una etiqueta para determinar si un viaje de taxi determinado implica una propina.

Ejecute el siguiente código para seleccionar las columnas relevantes, calcular características derivadas y filtrar valores atípicos:

taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount',
                    'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime',
                    date_format('tpepPickupDateTime', 'HH').cast('integer').alias('pickupHour'),
                    date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString'),
                    (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs'),
                    (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                    ) \
            .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)
                    & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)
                    & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)
                    & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)
                    & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)
                    & (sampled_taxi_df.rateCodeId <= 5)
                    & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                    )

Importante

La date_format función usa el patrón 'HH' (formato de 24 horas, valores 0-23) en lugar 'hh' de (formato de 12 horas, valores 1-12). El formato de 24 horas es obligatorio para la lógica de agrupación en intervalos por hora del día que aparece a continuación.

A continuación, añada la función de intervalos horarios del tráfico en función de la hora del día:

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount',
                                    'tripDistance', 'weekdayString', 'pickupHour', 'tripTimeSecs', 'tipped',
                                    when((col('pickupHour') <= 6) | (col('pickupHour') >= 20), "Night")
                                    .when((col('pickupHour') >= 7) & (col('pickupHour') <= 10), "AMRush")
                                    .when((col('pickupHour') >= 11) & (col('pickupHour') <= 15), "Afternoon")
                                    .when((col('pickupHour') >= 16) & (col('pickupHour') <= 19), "PMRush")
                                    .otherwise("Other").alias('trafficTimeBins')
                                    ) \
                            .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Verificar: confirme que los intervalos de tiempo del tráfico se distribuyen correctamente.

taxi_featurised_df.groupBy('trafficTimeBins').count().show()
# Expected output: Shows counts for Night, AMRush, Afternoon, PMRush categories

Crear un modelo de regresión logística

La última tarea consiste en convertir los datos etiquetados a un formato que se pueda analizar con la regresión logística. La entrada a un algoritmo de regresión logística debe tener una estructura de pares de vector de etiqueta-característica, donde el vector de característica es un vector de números que representa el punto de entrada.

Convierta las columnas categóricas trafficTimeBins y weekdayString en representaciones enteras mediante el OneHotEncoder enfoque:

# Convert categorical features into numeric representations
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(inputCol="weekdayIndex", outputCol="weekdayVec")

# Apply the encodings to create a new DataFrame
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Comprobar: confirme que el dataframe codificado tiene las nuevas columnas esperadas.

print("Columns:", encoded_final_df.columns)
print(f"Row count: {encoded_final_df.count()}")
# Expected output: Columns list includes 'trafficTimeBinsVec' and 'weekdayVec'

Entrenamiento de un modelo de regresión logística

Divida el conjunto de datos en un conjunto de entrenamiento (70%) y un conjunto de pruebas (30%):

# Split the DataFrame into training and test sets
trainingFraction = 0.7
testingFraction = (1 - trainingFraction)
seed = 1234

train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Verificar: confirme que la división dio como resultado tamaños razonables.

print(f"Training rows: {train_data_df.count()}, Test rows: {test_data_df.count()}")
# Expected output: Approximately 70%/30% split of the encoded data

Cree la fórmula del modelo, entrene el modelo de regresión logística y evalúelo mediante el uso de área bajo la curva ROC (característica operativa del receptor):

# Create a logistic regression model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol='label')

# Define the formula: 'tipped' is the response variable, right-hand side are predictors
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType + trafficTimeBinsVec")

# Train the model using a pipeline
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

# Generate predictions on the test dataset
predictions = lrModel.transform(test_data_df)

# Evaluate using Area Under ROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Area under ROC = {auc}")

Comprobar: la salida muestra un valor de AUC. Un modelo de buen rendimiento genera un valor cercano a 1,0.

Area under ROC = 0.97 (approximately)

Nota:

El valor exacto de AUC varía en función del ejemplo de datos. Los valores superiores a 0,90 indican un rendimiento predictivo seguro para este conjunto de datos.

Creación de una representación visual de la predicción

Cree una visualización final para interpretar los resultados del modelo. Una curva ROC presenta el equilibrio entre la tasa de verdaderos positivos y la tasa de falsos positivos.

# Plot the ROC curve from the model training summary
modelSummary = lrModel.stages[-1].summary

# Extract FPR and TPR values as plain lists
roc_data = modelSummary.roc.select('FPR', 'TPR').toPandas()

plt.figure(figsize=(8, 6))
plt.plot([0, 1], [0, 1], 'r--', label='Random classifier')
plt.plot(roc_data['FPR'], roc_data['TPR'], label=f'Logistic Regression (AUC = {auc:.4f})')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - NYC Taxi Tip Prediction')
plt.legend(loc='lower right')
plt.show()

Verifique: aparece un gráfico que muestra la curva ROC por encima de la línea diagonal discontinua roja. La curva debe inclinarse hacia la esquina superior izquierda, lo que indica un rendimiento de clasificación fuerte.

Gráfico que muestra la curva ROC para la regresión logística en el modelo de propina.

Limpieza de recursos

Después de finalizar este tutorial, elimine el cuaderno y lakehouse para liberar capacidad del área de trabajo:

  1. En el área de trabajo, haga clic con el botón derecho en el cuaderno y seleccione Eliminar.
  2. Si creó un lakehouse específicamente para este tutorial, haga clic con el botón derecho en él y seleccione Eliminar.

Para conservar el modelo entrenado para su uso futuro, agregue el código siguiente antes de la limpieza:

# Save the model to the lakehouse
model_path = "abfss://<your-workspace>@onelake.dfs.fabric.microsoft.com/<your-lakehouse>.Lakehouse/Files/models/taxi_tip_model"
lrModel.write().overwrite().save(model_path)
print(f"Model saved to: {model_path}")

Solución de problemas

Cuestión Causa Solución
Py4JJavaError al leer Parquet Conectividad de red a Azure Blob Storage Compruebe que el área de trabajo de Fabric tiene acceso saliente a Internet. Intente reiniciar la sesión de Spark.
AnalysisException: cannot resolve column Error tipográfico en el nombre de la columna o incompatibilidad de esquema Ejecute nyc_tlc_df.printSchema() para inspeccionar las columnas disponibles. El esquema del conjunto de datos NYC taxi puede cambiar de un año a otro.
DataFrame vacío después del filtrado Condiciones de filtro demasiado restrictivas para la ventana de datos Aumente el intervalo de fechas o compruebe sampled_taxi_df.count() antes del filtrado.
IllegalArgumentException en StringIndexer Etiquetas no vistas en la transformación Agregue handleInvalid="skip" a las StringIndexer llamadas: StringIndexer(inputCol="...", outputCol="...", handleInvalid="skip")
AUC baja (por debajo de 0,6) Datos insuficientes o ingeniería de características incorrectas Aumente la fracción de muestra (por ejemplo, 0.01 en lugar de 0.001) y compruebe trafficTimeBins que las categorías están equilibradas.
OutOfMemoryError Conjunto de datos demasiado grande para la capacidad disponible Reduzca la fracción de muestreo o aumente el nivel de capacidad de Fabric.
El gráfico ROC no se muestra Problema del backend de Matplotlib en notebook Agregue %matplotlib inline en la parte superior del cuaderno.