Compartir vía


Tutorial: Creación, entrenamiento y evaluación de un modelo de elevación

En este tutorial se presenta un ejemplo completo de un flujo de trabajo de ciencia de datos de Synapse en Microsoft Fabric. Aprenderá a crear, entrenar y evaluar modelos de elevación y a aplicar técnicas de modelado de elevación.

Prerrequisitos

Seguimiento en un cuaderno

Puede seguir estos pasos en un cuaderno de una de estas dos maneras:

  • Abra y ejecute el cuaderno integrado.
  • Cargue el cuaderno desde GitHub.

Abra el cuaderno integrado.

El cuaderno Modelado de elevación de ejemplo acompaña este tutorial.

  1. Para abrir el cuaderno de ejemplo de este tutorial, siga las instrucciones de Preparar el sistema para tutoriales de ciencia de datos.

  2. Asegúrese de adjuntar un almacén de lago de datos al cuaderno antes de ejecutar el código.

Importación del cuaderno desde GitHub

El cuaderno AIsample - Uplift Modeling.ipynb acompaña a este tutorial.

Para abrir el cuaderno complementario para este tutorial, siga las instrucciones de Preparar el sistema para tutoriales de ciencia de datos, para importar el cuaderno al área de trabajo.

Puede crear un cuaderno nuevo si prefiere copiar y pegar el código de esta página.

Asegúrese de adjuntar un almacén de lago de datos al cuaderno antes de empezar a ejecutar código.

Paso 1: Cargar los datos

Conjunto de datos

Criteo AI Lab creó el conjunto de datos. Ese conjunto de datos tiene 13 millones de filas. Cada fila representa un usuario. Cada fila tiene 12 características, un indicador de tratamiento y dos etiquetas binarias que incluyen visita y conversión.

Captura de pantalla que muestra la estructura del conjunto de datos de Criteo AI Lab.

  • f0 - f11: valores de características (densos, valores flotantes)
  • tratamiento: si un usuario fue o no asignado aleatoriamente para el tratamiento (por ejemplo, publicidad) (1 = tratamiento, 0 = control)
  • conversión: si se ha producido una conversión (por ejemplo, se ha realizado una compra) para un usuario (binario, etiqueta)
  • visita: si se produjo una conversión (por ejemplo, realizó una compra) para un usuario (binario, etiqueta)

Cita

El conjunto de datos usado para este cuaderno requiere esta cita de BibTex:

@inproceedings{Diemert2018,
author = {{Diemert Eustache, Betlei Artem} and Renaudin, Christophe and Massih-Reza, Amini},
title={A Large Scale Benchmark for Uplift Modeling},
publisher = {ACM},
booktitle = {Proceedings of the AdKDD and TargetAd Workshop, KDD, London,United Kingdom, August, 20, 2018},
year = {2018}
}

Sugerencia

Al definir los parámetros siguientes, puede aplicar este cuaderno a diferentes conjuntos de datos fácilmente.

IS_CUSTOM_DATA = False  # If True, the user must upload the dataset manually
DATA_FOLDER = "Files/uplift-modelling"
DATA_FILE = "criteo-research-uplift-v2.1.csv"

# Data schema
FEATURE_COLUMNS = [f"f{i}" for i in range(12)]
TREATMENT_COLUMN = "treatment"
LABEL_COLUMN = "visit"

EXPERIMENT_NAME = "aisample-upliftmodelling"  # MLflow experiment name

Importar bibliotecas

Antes del procesamiento, debe importar las bibliotecas de Spark y SynapseML necesarias. También debe importar una biblioteca de visualización de datos( por ejemplo, Seaborn, una biblioteca de visualización de datos de Python). Una biblioteca de visualización de datos proporciona una interfaz de alto nivel para crear recursos visuales en dataframes y matrices. Obtenga más información sobre Spark, SynapseML y Seaborn.

import os
import gzip

import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *

import numpy as np
import pandas as pd

import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.style as style
import seaborn as sns

%matplotlib inline

from synapse.ml.featurize import Featurize
from synapse.ml.core.spark import FluentAPI
from synapse.ml.lightgbm import *
from synapse.ml.train import ComputeModelStatistics

import mlflow

Descarga de un conjunto de datos y carga en lakehouse

Este código descarga una versión disponible públicamente del conjunto de datos y, a continuación, almacena ese recurso de datos en una instancia de Fabric Lakehouse.

Importante

Asegúrese de Agregar un almacén de lago de datos al cuaderno antes de ejecutarlo. Si no lo hace, se producirá un error.

if not IS_CUSTOM_DATA:
    # Download demo data files into lakehouse if not exist
    import os, requests

    remote_url = "http://go.criteo.net/criteo-research-uplift-v2.1.csv.gz"
    download_file = "criteo-research-uplift-v2.1.csv.gz"
    download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found, please add a lakehouse and restart the session.")
    os.makedirs(download_path, exist_ok=True)
    if not os.path.exists(f"{download_path}/{DATA_FILE}"):
        r = requests.get(f"{remote_url}", timeout=30)
        with open(f"{download_path}/{download_file}", "wb") as f:
            f.write(r.content)
        with gzip.open(f"{download_path}/{download_file}", "rb") as fin:
            with open(f"{download_path}/{DATA_FILE}", "wb") as fout:
                fout.write(fin.read())
    print("Downloaded demo data files into lakehouse.")

Inicie la grabación del tiempo de ejecución de este cuaderno.

# Record the notebook running time
import time

ts = time.time()

Configuración del seguimiento de experimentos de MLflow

Para ampliar las funcionalidades de registro de MLflow, el registro automático captura automáticamente los valores de los parámetros de entrada y las métricas de salida de un modelo de Machine Learning durante su entrenamiento. A continuación, esta información se registra en el área de trabajo, donde las API de MLflow o el experimento correspondiente del área de trabajo pueden acceder a ella y visualizarla. Visite este recurso para obtener más información sobre el registro automático.

# Set up the MLflow experiment
import mlflow

mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True)  # Disable MLflow autologging

Nota

Para deshabilitar el registro automático de Microsoft Fabric en una sesión de cuaderno, llame a mlflow.autolog() y establezca disable=True.

Lectura de datos desde el almacén de lago de datos

Lea datos sin procesar de la sección Archivos del almacén de lago de datos y agregue más columnas para diferentes partes de fecha. La misma información se usa para crear una tabla delta con particiones.

raw_df = spark.read.csv(f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True).cache()

Paso 2: Análisis de datos exploratorios

Use el comando display para ver las estadísticas de alto nivel sobre el conjunto de datos. También puede mostrar las vistas de gráfico para visualizar fácilmente subconjuntos del conjunto de datos.

display(raw_df.limit(20))

Examine el porcentaje de los usuarios que visitan, el porcentaje de usuarios que convierten y el porcentaje de los visitantes que convierten.

raw_df.select(
    F.mean("visit").alias("Percentage of users that visit"),
    F.mean("conversion").alias("Percentage of users that convert"),
    (F.sum("conversion") / F.sum("visit")).alias("Percentage of visitors that convert"),
).show()

El análisis indica que 4.9% de los usuarios del grupo de tratamiento, es decir, los usuarios que recibieron el tratamiento o la publicidad, visitaron la tienda en línea. Solo 3.8% de usuarios del grupo de control - usuarios que nunca recibieron el tratamiento, ni se les ofreció ni se expusieron a la publicidad - hicieron lo mismo. Además, 0,31% de todos los usuarios del grupo de tratamiento se convirtieron o realizaron una compra, mientras que solo 0,19% de usuarios del grupo de control lo hicieron. Como resultado, la tasa de conversión de los visitantes que realizaron una compra, que también eran miembros del grupo de tratamiento, es 6,36%, en comparación con solo 5,07%** para los usuarios del grupo de control. En función de estos resultados, el tratamiento puede mejorar potencialmente la tasa de visitas en aproximadamente 1 %y la tasa de conversión de visitantes en torno a 1,3 %. El tratamiento conduce a una mejora significativa.

Paso 3: Definir el modelo para el entrenamiento

Preparación del entrenamiento y prueba de los conjuntos de datos

Aquí, ajusta un transformador Featurize al DataFrame raw_df para extraer características de las columnas de entrada especificadas y añadir esas características a una nueva columna denominada features.

El dataframe resultante se almacena en un nuevo dataframe denominado df.

transformer = Featurize().setOutputCol("features").setInputCols(FEATURE_COLUMNS).fit(raw_df)
df = transformer.transform(raw_df)
# Split the DataFrame into training and test sets, with a 80/20 ratio and a seed of 42
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Print the training and test dataset sizes
print("Size of train dataset: %d" % train_df.count())
print("Size of test dataset: %d" % test_df.count())

# Group the training dataset by the treatment column, and count the number of occurrences of each value
train_df.groupby(TREATMENT_COLUMN).count().show()

Preparación de los conjuntos de datos de tratamiento y control

Después de crear los conjuntos de datos de entrenamiento y prueba, también debe formar los conjuntos de datos de tratamiento y control para entrenar los modelos de aprendizaje automático y medir la mejora.

# Extract the treatment and control DataFrames
treatment_train_df = train_df.where(f"{TREATMENT_COLUMN} > 0")
control_train_df = train_df.where(f"{TREATMENT_COLUMN} = 0")

Ahora que preparó los datos, puede continuar para entrenar un modelo con LightGBM.

Modelado de elevación: T-Learner con LightGBM

Los meta-aprendices son un conjunto de algoritmos, creados sobre algoritmos de aprendizaje automático, como LightGBM, Xgboost, etc. Ayudan a calcular el efecto medio del tratamiento condicional o CATE. T-learner es un meta-learner que no usa un solo modelo. En su lugar, T-learner usa un modelo por variable de tratamiento. Por lo tanto, se desarrollan dos modelos y nos referimos al meta-learner como T-learner. T-learner usa varios modelos de aprendizaje automático para superar el problema de descartar completamente el tratamiento, y obliga al aprendiz a dividirlo primero.

mlflow.autolog(exclusive=False)
classifier = (
    LightGBMClassifier(dataTransferMode="bulk")
    .setFeaturesCol("features")  # Set the column name for features
    .setNumLeaves(10)  # Set the number of leaves in each decision tree
    .setNumIterations(100)  # Set the number of boosting iterations
    .setObjective("binary")  # Set the objective function for binary classification
    .setLabelCol(LABEL_COLUMN)  # Set the column name for the label
)

# Start a new MLflow run with the name "uplift"
active_run = mlflow.start_run(run_name="uplift")

# Start a new nested MLflow run with the name "treatment"
with mlflow.start_run(run_name="treatment", nested=True) as treatment_run:
    treatment_run_id = treatment_run.info.run_id  # Get the ID of the treatment run
    treatment_model = classifier.fit(treatment_train_df)  # Fit the classifier on the treatment training data

# Start a new nested MLflow run with the name "control"
with mlflow.start_run(run_name="control", nested=True) as control_run:
    control_run_id = control_run.info.run_id  # Get the ID of the control run
    control_model = classifier.fit(control_train_df)  # Fit the classifier on the control training data
     

Uso del conjunto de datos de prueba para una predicción

Aquí, usa el treatment_model y el control_model, ambos definidos anteriormente, para transformar el conjunto de datos de prueba test_df. A continuación, calculas el incremento predicho. El aumento previsto se define como la diferencia entre el resultado de tratamiento previsto y el resultado del control previsto. Cuanto mayor sea esta diferencia de elevación prevista, mayor será la eficacia del tratamiento (por ejemplo, publicidad) en un individuo o subgrupo.

getPred = F.udf(lambda v: float(v[1]), FloatType())

# Cache the resulting DataFrame for easier access
test_pred_df = (
    test_df.mlTransform(treatment_model)
    .withColumn("treatment_pred", getPred("probability"))
    .drop("rawPrediction", "probability", "prediction")
    .mlTransform(control_model)
    .withColumn("control_pred", getPred("probability"))
    .drop("rawPrediction", "probability", "prediction")
    .withColumn("pred_uplift", F.col("treatment_pred") - F.col("control_pred"))
    .select(TREATMENT_COLUMN, LABEL_COLUMN, "treatment_pred", "control_pred", "pred_uplift")
    .cache()
)

# Display the first twenty rows of the resulting DataFrame
display(test_pred_df.limit(20))

Realización de la evaluación del modelo

Puesto que no se puede observar el aumento real para cada individuo, debe medir el aumento sobre un grupo de individuos. Usa una curva de elevación que traza el aumento real acumulado en toda la población.

Captura de pantalla de un gráfico que muestra una curva de modelo de elevación normalizada frente al tratamiento aleatorio.

El eje X representa la relación de la población seleccionada para el tratamiento. Un valor de 0 sugiere que no hay ningún grupo de tratamiento: nadie está expuesto al tratamiento ni se le ofrece. Un valor de 1 sugiere un grupo de tratamiento completo: todos están expuestos al tratamiento o se les ofrece el tratamiento. En el eje Y se muestra la medida de aumento. El objetivo es encontrar el tamaño del grupo de tratamiento, o el porcentaje de la población que se ofrecería o exponería al tratamiento (por ejemplo, publicidad). Este enfoque optimiza la selección de destino para optimizar el resultado.

Primero, clasifique el orden de DataFrame de prueba según el aumento previsto. El aumento previsto es la diferencia entre el resultado del tratamiento previsto y el resultado del control previsto.

# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))

display(test_ranked_df.limit(20))

A continuación, calcule el porcentaje acumulado de visitas en los grupos de tratamiento y control.

# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()

# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
    test_ranked_df.withColumn(
        "control_label",
        F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "treatment_label",
        F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "control_cumsum",
        F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
    )
    .withColumn(
        "treatment_cumsum",
        F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
    )
)

# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))

Por último, en cada porcentaje, calcule el aumento del grupo como la diferencia entre el porcentaje acumulado de visitas entre el tratamiento y los grupos de control.

test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))

Ahora, trazar la curva de elevación para la predicción del conjunto de datos de prueba. Debe convertir el DataFrame de PySpark en un DataFrame de Pandas antes de graficarlo.

def uplift_plot(uplift_df):
    """
    Plot the uplift curve
    """
    gain_x = uplift_df.percent_rank
    gain_y = uplift_df.group_uplift
    # Plot the data
    fig = plt.figure(figsize=(10, 6))
    mpl.rcParams["font.size"] = 8

    ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")

    plt.plot(
        [0, gain_x.max()],
        [0, gain_y.max()],
        "--",
        color="tab:orange",
        label="Random Treatment",
    )
    plt.legend()
    plt.xlabel("Porportion Targeted")
    plt.ylabel("Uplift")
    plt.grid()

    return fig, ax


test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)

mlflow.log_figure(fig, "UpliftCurve.png")

Captura de pantalla de un gráfico que muestra una curva de modelo de elevación normalizada frente al tratamiento aleatorio.

El eje X representa la relación de la población seleccionada para el tratamiento. Un valor de 0 indica que no hay ningún grupo de tratamiento: nadie está expuesto al tratamiento ni se le ofrece el tratamiento. Un valor de 1 sugiere un grupo de tratamiento completo: todos están expuestos al tratamiento, o se les ofrece el tratamiento. En el eje Y se muestra la medida de aumento. El objetivo es encontrar el tamaño del grupo de tratamiento, o el porcentaje de la población que se ofrecería o exponería al tratamiento (por ejemplo, publicidad). Este enfoque optimiza la selección de destino para optimizar el resultado.

Primero, clasifique el orden de DataFrame de prueba según el aumento previsto. El aumento previsto es la diferencia entre el resultado del tratamiento previsto y el resultado del control previsto.

# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))

display(test_ranked_df.limit(20))

A continuación, calcule el porcentaje acumulado de visitas en los grupos de tratamiento y control.

# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()

# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
    test_ranked_df.withColumn(
        "control_label",
        F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "treatment_label",
        F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "control_cumsum",
        F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
    )
    .withColumn(
        "treatment_cumsum",
        F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
    )
)

# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))

Por último, en cada porcentaje, calcule el aumento del grupo como la diferencia entre el porcentaje acumulado de visitas entre el tratamiento y los grupos de control.

test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))

Ahora, trazar la curva de elevación para la predicción del conjunto de datos de prueba. Debe convertir el DataFrame de PySpark en un DataFrame de Pandas antes de trazarlo.

def uplift_plot(uplift_df):
    """
    Plot the uplift curve
    """
    gain_x = uplift_df.percent_rank
    gain_y = uplift_df.group_uplift
    # Plot the data
    fig = plt.figure(figsize=(10, 6))
    mpl.rcParams["font.size"] = 8

    ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")

    plt.plot(
        [0, gain_x.max()],
        [0, gain_y.max()],
        "--",
        color="tab:orange",
        label="Random Treatment",
    )
    plt.legend()
    plt.xlabel("Porportion Targeted")
    plt.ylabel("Uplift")
    plt.grid()

    return fig, ax


test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)

mlflow.log_figure(fig, "UpliftCurve.png")

Captura de pantalla de un gráfico que muestra una curva de modelo de elevación normalizada frente al tratamiento aleatorio.

El análisis y la curva de elevación muestran que el 20 % de la población, según la predicción, tendría una gran ganancia si recibiera el tratamiento. Esto significa que el 20 % de la población representa el grupo que se puede persuadir. Por tanto, puede establecer la puntuación de corte para el tamaño deseado del grupo de tratamiento en 20 %, para identificar a los clientes objetivo de la selección y lograr el mayor impacto.

cutoff_percentage = 0.2
cutoff_score = test_ranked_pd_df.iloc[int(len(test_ranked_pd_df) * cutoff_percentage)][
    "pred_uplift"
]

print("Uplift scores that exceed {:.4f} map to Persuadables.".format(cutoff_score))
mlflow.log_metrics(
    {"cutoff_score": cutoff_score, "cutoff_percentage": cutoff_percentage}
)

Paso 4: Registrar el modelo de ML final

Use MLflow para realizar un seguimiento y registrar todos los experimentos de los grupos de tratamiento y control. Este seguimiento y registro incluyen los parámetros, las métricas y los modelos correspondientes. Esta información se registra en el nombre del experimento, en el área de trabajo, para su uso posterior.

# Register the model
treatment_model_uri = "runs:/{}/model".format(treatment_run_id)
mlflow.register_model(treatment_model_uri, f"{EXPERIMENT_NAME}-treatmentmodel")

control_model_uri = "runs:/{}/model".format(control_run_id)
mlflow.register_model(control_model_uri, f"{EXPERIMENT_NAME}-controlmodel")

mlflow.end_run()

Para ver los experimentos:

  1. En el panel izquierdo, seleccione el área de trabajo.
  2. Busque y seleccione el nombre del experimento, en este caso aisample-upliftmodelling.

Recorte de pantalla en el que se muestran los resultados del experimento aisample-upliftmodeling.

Paso 5: Guardar los resultados de la predicción

Microsoft Fabric ofrece PREDICT: una función escalable que admite la puntuación por lotes en cualquier motor de proceso. Permite a los clientes poner en marcha modelos de aprendizaje automático. Los usuarios pueden crear predicciones por lotes directamente desde un cuaderno o la página de elementos de un modelo específico. Visite este recurso para obtener más información sobre PREDICT y aprender a usar PREDICT en Microsoft Fabric.

# Load the model back
loaded_treatmentmodel = mlflow.spark.load_model(treatment_model_uri, dfs_tmpdir="Files/spark")
loaded_controlmodel = mlflow.spark.load_model(control_model_uri, dfs_tmpdir="Files/spark")

# Make predictions
batch_predictions_treatment = loaded_treatmentmodel.transform(test_df)
batch_predictions_control = loaded_controlmodel.transform(test_df)
batch_predictions_treatment.show(5)
# Save the predictions in the lakehouse
batch_predictions_treatment.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions_treatment"
)
batch_predictions_control.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions_control"
)
# Determine the entire runtime
print(f"Full run cost {int(time.time() - ts)} seconds.")