Esercitazione: Creare, eseguire il training e valutare un modello di sollevamento

Questa esercitazione presenta un esempio end-to-end di un flusso di lavoro di data science synapse in Microsoft Fabric. Si apprenderà come creare, eseguire il training e valutare modelli di uplift e applicare tecniche di modellazione di uplift.

Prerequisiti

Seguire la procedura in un notebook

È possibile seguire la procedura in un notebook in uno dei due modi seguenti:

  • Aprire ed eseguire il notebook predefinito nell'esperienza di data science di Synapse
  • Caricare il notebook da GitHub nell'esperienza di data science di Synapse

Aprire il notebook predefinito

Il notebook di modellazione Uplift di esempio accompagna questa esercitazione . Vedere Per aprire il notebook di esempio predefinito dell'esercitazione nell'esperienza di data science di Synapse:1. Passare alla home page di Synapse Data Science. 1. Selezionare Usa un esempio. 1. Selezionare l'esempio corrispondente:* Dalla scheda predefinita Flussi di lavoro end-to-end (Python), se l'esempio è per un'esercitazione su Python. * Dalla scheda Flussi di lavoro end-to-end (R), se l'esempio è per un'esercitazione R. * Dalla scheda Esercitazioni rapide, se l'esempio è per un'esercitazione rapida.1. Collegare una lakehouse al notebook prima di iniziare a eseguire il codice. per altre informazioni sull'accesso ai notebook di esempio predefiniti per le esercitazioni.

Per aprire il notebook di esempio predefinito dell'esercitazione nell'esperienza di data science di Synapse:

  1. Passare alla home page di Synapse Data Science

  2. Selezionare Usa un esempio

  3. Selezionare l'esempio corrispondente:

    1. Dalla scheda Predefinita Flussi di lavoro end-to-end (Python), se l'esempio è relativo a un'esercitazione su Python
    2. Dalla scheda Flussi di lavoro end-to-end (R), se l'esempio è per un'esercitazione su R
    3. Dalla scheda Esercitazioni rapide, se l'esempio è per un'esercitazione rapida
  4. Collegare una lakehouse al notebook prima di iniziare a eseguire il codice

Importare il notebook da GitHub

Il notebook AIsample - Uplift Modeling.ipynb accompagna questa esercitazione.

Per aprire il notebook a accompagnamento per questa esercitazione, seguire le istruzioni riportate in Preparare il sistema per le esercitazioni sull'analisi scientifica dei dati per importare il notebook nell'area di lavoro.

È possibile creare un nuovo notebook se si preferisce copiare e incollare il codice da questa pagina.

Assicurarsi di collegare un lakehouse al notebook prima di iniziare a eseguire il codice.

Passaggio 1: Caricare i dati

Set di dati

Criteo AI Lab ha creato il set di dati. Tale set di dati ha 13M righe. Ogni riga rappresenta un utente. Ogni riga ha 12 caratteristiche, un indicatore di trattamento e due etichette binarie che includono visita e conversione.

f0 f1 f2 f3 f4 f5 f6 f7 f8 f9 f10 f11 conversione del trattamento

  • f0 - f11: valori di funzionalità (densi, valori mobili)
  • trattamento: se un utente è stato bersaglio casualmente per il trattamento (ad esempio, pubblicità) (1 = trattamento, 0 = controllo)
  • conversione: indica se si è verificata una conversione (ad esempio, è stato effettuato un acquisto) per un utente (binario, etichetta)
  • visit: indica se si è verificata una conversione (ad esempio, effettuato un acquisto) per un utente (binario, etichetta)

Citazione

Il set di dati usato per questo notebook richiede questa citazione di BibTex:

@inproceedings{Diemert2018,
author = {{Diemert Eustache, Betlei Artem} and Renaudin, Christophe and Massih-Reza, Amini},
title={A Large Scale Benchmark for Uplift Modeling},
publisher = {ACM},
booktitle = {Proceedings of the AdKDD and TargetAd Workshop, KDD, London,United Kingdom, August, 20, 2018},
year = {2018}
}

Suggerimento

Definendo i parametri seguenti, è possibile applicare facilmente questo notebook a set di dati diversi.

IS_CUSTOM_DATA = False  # If True, the user must upload the dataset manually
DATA_FOLDER = "Files/uplift-modelling"
DATA_FILE = "criteo-research-uplift-v2.1.csv"

# Data schema
FEATURE_COLUMNS = [f"f{i}" for i in range(12)]
TREATMENT_COLUMN = "treatment"
LABEL_COLUMN = "visit"

EXPERIMENT_NAME = "aisample-upliftmodelling"  # MLflow experiment name

Importare le librerie

Prima dell'elaborazione, è necessario importare le librerie Spark e SynapseML necessarie. È anche necessario importare una libreria di visualizzazione dei dati, ad esempio Seaborn, una libreria di visualizzazione dei dati Python. Una libreria di visualizzazioni dati offre un'interfaccia di alto livello per creare risorse visive su dataframe e matrici. Altre informazioni su Spark, SynapseML e Seaborn.

import os
import gzip

import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *

import numpy as np
import pandas as pd

import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.style as style
import seaborn as sns

%matplotlib inline

from synapse.ml.featurize import Featurize
from synapse.ml.core.spark import FluentAPI
from synapse.ml.lightgbm import *
from synapse.ml.train import ComputeModelStatistics

import mlflow

Scaricare un set di dati e caricarlo in lakehouse

Questo codice scarica una versione disponibile pubblicamente del set di dati e quindi archivia tale risorsa dati in un lakehouse di Fabric.

Importante

Assicurarsi di aggiungere un lakehouse al notebook prima di eseguirlo. In caso contrario, verrà generato un errore.

if not IS_CUSTOM_DATA:
    # Download demo data files into lakehouse if not exist
    import os, requests

    remote_url = "http://go.criteo.net/criteo-research-uplift-v2.1.csv.gz"
    download_file = "criteo-research-uplift-v2.1.csv.gz"
    download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found, please add a lakehouse and restart the session.")
    os.makedirs(download_path, exist_ok=True)
    if not os.path.exists(f"{download_path}/{DATA_FILE}"):
        r = requests.get(f"{remote_url}", timeout=30)
        with open(f"{download_path}/{download_file}", "wb") as f:
            f.write(r.content)
        with gzip.open(f"{download_path}/{download_file}", "rb") as fin:
            with open(f"{download_path}/{DATA_FILE}", "wb") as fout:
                fout.write(fin.read())
    print("Downloaded demo data files into lakehouse.")

Avviare la registrazione del runtime di questo notebook.

# Record the notebook running time
import time

ts = time.time()

Configurare il rilevamento dell'esperimento MLflow

Per estendere le funzionalità di registrazione di MLflow, l'assegnazione automatica acquisisce automaticamente i valori dei parametri di input e le metriche di output di un modello di Machine Learning durante il training. Queste informazioni vengono quindi registrate nell'area di lavoro, in cui le API MLflow o l'esperimento corrispondente nell'area di lavoro possono accedervi e visualizzarlo. Visitare questa risorsa per altre informazioni sull'assegnazione automatica dei tag.

# Set up the MLflow experiment
import mlflow

mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True)  # Disable MLflow autologging

Nota

Per disabilitare l'assegnazione automatica di tag di Microsoft Fabric in una sessione del notebook, chiamare mlflow.autolog() e impostare disable=True.

Leggere i dati dal lakehouse

Leggere i dati non elaborati dalla sezione Lakehouse Files e aggiungere altre colonne per parti di data diverse. Le stesse informazioni vengono usate per creare una tabella delta partizionata.

raw_df = spark.read.csv(f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True).cache()

Passaggio 2: Analisi esplorativa dei dati

Usare il display comando per visualizzare statistiche generali sul set di dati. È anche possibile visualizzare le visualizzazioni grafico per visualizzare facilmente i subset del set di dati.

display(raw_df.limit(20))

Esaminare la percentuale di utenti che visitano, la percentuale di utenti che converte e la percentuale dei visitatori che converte.

raw_df.select(
    F.mean("visit").alias("Percentage of users that visit"),
    F.mean("conversion").alias("Percentage of users that convert"),
    (F.sum("conversion") / F.sum("visit")).alias("Percentage of visitors that convert"),
).show()

L'analisi indica che il 4,9% degli utenti del gruppo di trattamento - utenti che hanno ricevuto il trattamento, o pubblicità - ha visitato il negozio online. Solo il 3,8% degli utenti del gruppo di controllo - gli utenti che non hanno mai ricevuto il trattamento, o non sono mai stati offerti o esposti alla pubblicità - hanno fatto lo stesso. Inoltre, lo 0,31% di tutti gli utenti del gruppo di trattamento convertito o effettuato un acquisto - mentre solo lo 0,19% degli utenti del gruppo di controllo lo ha fatto. Di conseguenza, il tasso di conversione dei visitatori che ha effettuato un acquisto, che erano anche membri del gruppo di trattamento, è pari al 6,36%, rispetto solo al 5,07%** per gli utenti del gruppo di controllo. In base a questi risultati, il trattamento può potenzialmente migliorare il tasso di visita di circa l'1%, e il tasso di conversione dei visitatori di circa l'1,3%. Il trattamento porta a un miglioramento significativo.

Passaggio 3: Definire il modello per il training

Preparare il training e testare i set di dati

In questo caso, si adatta un trasformatore Featurize al raw_df DataFrame per estrarre le funzionalità dalle colonne di input specificate e restituire tali funzionalità in una nuova colonna denominata features.

Il dataframe risultante viene archiviato in un nuovo dataframe denominato df.

transformer = Featurize().setOutputCol("features").setInputCols(FEATURE_COLUMNS).fit(raw_df)
df = transformer.transform(raw_df)
# Split the DataFrame into training and test sets, with a 80/20 ratio and a seed of 42
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Print the training and test dataset sizes
print("Size of train dataset: %d" % train_df.count())
print("Size of test dataset: %d" % test_df.count())

# Group the training dataset by the treatment column, and count the number of occurrences of each value
train_df.groupby(TREATMENT_COLUMN).count().show()

Preparare i set di dati di trattamento e controllo

Dopo aver creato i set di dati di training e test, è anche necessario formare i set di dati di trattamento e controllo per eseguire il training dei modelli di Machine Learning per misurare l'aumento delle prestazioni.

# Extract the treatment and control DataFrames
treatment_train_df = train_df.where(f"{TREATMENT_COLUMN} > 0")
control_train_df = train_df.where(f"{TREATMENT_COLUMN} = 0")

Dopo aver preparato i dati, è possibile procedere con il training di un modello con LightGBM.

Modellazione di uplift: T-Learner con LightGBM

I meta-studenti sono un set di algoritmi, basati su algoritmi di Machine Learning come LightGBM, Xgboost e così via. Consentono di stimare l'effetto del trattamento medio condizionale o CATE. T-learner è un meta-learner che non usa un singolo modello. T-learner usa invece un modello per ogni variabile di trattamento. Di conseguenza, vengono sviluppati due modelli e si fa riferimento al meta-learner come T-learner. T-learner usa più modelli di Machine Learning per superare il problema di eliminare completamente il trattamento, forzando l'apprendimento a suddividerlo per primo.

mlflow.autolog(exclusive=False)
classifier = (
    LightGBMClassifier()
    .setFeaturesCol("features")  # Set the column name for features
    .setNumLeaves(10)  # Set the number of leaves in each decision tree
    .setNumIterations(100)  # Set the number of boosting iterations
    .setObjective("binary")  # Set the objective function for binary classification
    .setLabelCol(LABEL_COLUMN)  # Set the column name for the label
)

# Start a new MLflow run with the name "uplift"
active_run = mlflow.start_run(run_name="uplift")

# Start a new nested MLflow run with the name "treatment"
with mlflow.start_run(run_name="treatment", nested=True) as treatment_run:
    treatment_run_id = treatment_run.info.run_id  # Get the ID of the treatment run
    treatment_model = classifier.fit(treatment_train_df)  # Fit the classifier on the treatment training data

# Start a new nested MLflow run with the name "control"
with mlflow.start_run(run_name="control", nested=True) as control_run:
    control_run_id = control_run.info.run_id  # Get the ID of the control run
    control_model = classifier.fit(control_train_df)  # Fit the classifier on the control training data
     

Usare il set di dati di test per una stima

In questo caso si usa treatment_model e control_model, entrambi definiti in precedenza, per trasformare il test_df set di dati di test. Quindi, si calcola l'elevazione stimata. Si definisce l'elevazione stimata come differenza tra il risultato previsto del trattamento e il risultato del controllo stimato. Maggiore è la differenza di aumento stimato, maggiore è l'efficacia del trattamento (ad esempio, pubblicità) su un individuo o un sottogruppo.

getPred = F.udf(lambda v: float(v[1]), FloatType())

# Cache the resulting DataFrame for easier access
test_pred_df = (
    test_df.mlTransform(treatment_model)
    .withColumn("treatment_pred", getPred("probability"))
    .drop("rawPrediction", "probability", "prediction")
    .mlTransform(control_model)
    .withColumn("control_pred", getPred("probability"))
    .drop("rawPrediction", "probability", "prediction")
    .withColumn("pred_uplift", F.col("treatment_pred") - F.col("control_pred"))
    .select(TREATMENT_COLUMN, LABEL_COLUMN, "treatment_pred", "control_pred", "pred_uplift")
    .cache()
)

# Display the first twenty rows of the resulting DataFrame
display(test_pred_df.limit(20))

Eseguire la valutazione del modello

Poiché non è possibile osservare un aumento effettivo per ogni individuo, è necessario misurare l'elevazione su un gruppo di individui. Si usa una curva di sollevamento che traccia il vero e cumulativo aumento della popolazione.

Screenshot di un grafico che mostra una curva del modello di aumento normalizzata rispetto al trattamento casuale.

L'asse x rappresenta il rapporto tra la popolazione selezionata per il trattamento. Un valore pari a 0 suggerisce che nessun gruppo di trattamento - nessuno è esposto a, o offerto, il trattamento. Un valore pari a 1 suggerisce un gruppo di trattamento completo - tutti sono esposti a, o offerto, il trattamento. L'asse y mostra la misura di sollevamento. L'obiettivo è trovare le dimensioni del gruppo di trattamento, o la percentuale della popolazione che verrebbe offerta o esposta al trattamento (ad esempio, pubblicità). Questo approccio ottimizza la selezione di destinazione per ottimizzare il risultato.

Prima di tutto, classificare l'ordine del dataframe di test in base all'aumento stimato. L'elevazione stimata è la differenza tra il risultato previsto del trattamento e il risultato del controllo stimato.

# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))

display(test_ranked_df.limit(20))

Calcolare quindi la percentuale cumulativa di visite nei gruppi di trattamento e di controllo.

# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()

# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
    test_ranked_df.withColumn(
        "control_label",
        F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "treatment_label",
        F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "control_cumsum",
        F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
    )
    .withColumn(
        "treatment_cumsum",
        F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
    )
)

# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))

Infine, a ogni percentuale, calcolare l'aumento del gruppo come differenza tra la percentuale cumulativa di visite tra il trattamento e i gruppi di controllo.

test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))

Tracciare ora la curva di aumento per la stima del set di dati di test. È necessario convertire il dataframe PySpark in un dataframe Pandas prima di tracciare.

def uplift_plot(uplift_df):
    """
    Plot the uplift curve
    """
    gain_x = uplift_df.percent_rank
    gain_y = uplift_df.group_uplift
    # Plot the data
    fig = plt.figure(figsize=(10, 6))
    mpl.rcParams["font.size"] = 8

    ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")

    plt.plot(
        [0, gain_x.max()],
        [0, gain_y.max()],
        "--",
        color="tab:orange",
        label="Random Treatment",
    )
    plt.legend()
    plt.xlabel("Porportion Targeted")
    plt.ylabel("Uplift")
    plt.grid()

    return fig, ax


test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)

mlflow.log_figure(fig, "UpliftCurve.png")

Screenshot di un grafico che mostra una curva del modello di aumento normalizzata rispetto al trattamento casuale.

L'asse x rappresenta il rapporto tra la popolazione selezionata per il trattamento. Un valore pari a 0 suggerisce che nessun gruppo di trattamento - nessuno è esposto a, o offerto, il trattamento. Un valore pari a 1 suggerisce un gruppo di trattamento completo - tutti sono esposti a, o offerto, il trattamento. L'asse y mostra la misura di sollevamento. L'obiettivo è trovare le dimensioni del gruppo di trattamento, o la percentuale della popolazione che verrebbe offerta o esposta al trattamento (ad esempio, pubblicità). Questo approccio ottimizza la selezione di destinazione per ottimizzare il risultato.

Prima di tutto, classificare l'ordine del dataframe di test in base all'aumento stimato. L'elevazione stimata è la differenza tra il risultato previsto del trattamento e il risultato del controllo stimato.

# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))

display(test_ranked_df.limit(20))

Calcolare quindi la percentuale cumulativa di visite nei gruppi di trattamento e di controllo.

# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()

# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
    test_ranked_df.withColumn(
        "control_label",
        F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "treatment_label",
        F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "control_cumsum",
        F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
    )
    .withColumn(
        "treatment_cumsum",
        F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
    )
)

# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))

Infine, a ogni percentuale, calcolare l'aumento del gruppo come differenza tra la percentuale cumulativa di visite tra il trattamento e i gruppi di controllo.

test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))

Tracciare ora la curva di aumento per la stima del set di dati di test. È necessario convertire il dataframe PySpark in un dataframe Pandas prima di tracciare.

def uplift_plot(uplift_df):
    """
    Plot the uplift curve
    """
    gain_x = uplift_df.percent_rank
    gain_y = uplift_df.group_uplift
    # Plot the data
    fig = plt.figure(figsize=(10, 6))
    mpl.rcParams["font.size"] = 8

    ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")

    plt.plot(
        [0, gain_x.max()],
        [0, gain_y.max()],
        "--",
        color="tab:orange",
        label="Random Treatment",
    )
    plt.legend()
    plt.xlabel("Porportion Targeted")
    plt.ylabel("Uplift")
    plt.grid()

    return fig, ax


test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)

mlflow.log_figure(fig, "UpliftCurve.png")

Screenshot di un grafico che mostra una curva del modello di aumento normalizzata rispetto al trattamento casuale.

L'analisi e la curva di aumento mostrano entrambi che la popolazione superiore del 20%, come classificato dalla stima, avrebbe un grande guadagno se ricevessero il trattamento. Ciò significa che il 20% della popolazione rappresenta il gruppo di persuadabili. Pertanto, è possibile impostare il punteggio di cutoff per le dimensioni desiderate del gruppo di trattamento al 20%, per identificare i clienti di selezione di destinazione per il maggiore impatto.

cutoff_percentage = 0.2
cutoff_score = test_ranked_pd_df.iloc[int(len(test_ranked_pd_df) * cutoff_percentage)][
    "pred_uplift"
]

print("Uplift scores that exceed {:.4f} map to Persuadables.".format(cutoff_score))
mlflow.log_metrics(
    {"cutoff_score": cutoff_score, "cutoff_percentage": cutoff_percentage}
)

Passaggio 4: Registrare il modello di Machine Learning finale

Si usa MLflow per tenere traccia e registrare tutti gli esperimenti sia per i gruppi di trattamento che per i gruppi di controllo. Questo rilevamento e registrazione includono i parametri, le metriche e i modelli corrispondenti. Queste informazioni vengono registrate con il nome dell'esperimento, nell'area di lavoro, per usarle in un secondo momento.

# Register the model
treatment_model_uri = "runs:/{}/model".format(treatment_run_id)
mlflow.register_model(treatment_model_uri, f"{EXPERIMENT_NAME}-treatmentmodel")

control_model_uri = "runs:/{}/model".format(control_run_id)
mlflow.register_model(control_model_uri, f"{EXPERIMENT_NAME}-controlmodel")

mlflow.end_run()

Per visualizzare gli esperimenti:

  1. Nel pannello sinistro selezionare l'area di lavoro.
  2. Trovare e selezionare il nome dell'esperimento, in questo caso aisample-upliftmodelling.

Screenshot che mostra i risultati dell'esperimento di modellazione di aumento dell'intelligenza artificiale.

Passaggio 5: Salvare i risultati della stima

Microsoft Fabric offre PREDICT: una funzione scalabile che supporta l'assegnazione dei punteggi batch in qualsiasi motore di calcolo. Consente ai clienti di rendere operativi i modelli di Machine Learning. Gli utenti possono creare stime batch direttamente da un notebook o dalla pagina degli elementi per un modello specifico. Visitare questa risorsa per altre informazioni su PREDICT e per informazioni su come usare PREDICT in Microsoft Fabric.

# Load the model back
loaded_treatmentmodel = mlflow.spark.load_model(treatment_model_uri, dfs_tmpdir="Files/spark")
loaded_controlmodel = mlflow.spark.load_model(control_model_uri, dfs_tmpdir="Files/spark")

# Make predictions
batch_predictions_treatment = loaded_treatmentmodel.transform(test_df)
batch_predictions_control = loaded_controlmodel.transform(test_df)
batch_predictions_treatment.show(5)
# Save the predictions in the lakehouse
batch_predictions_treatment.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions_treatment"
)
batch_predictions_control.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions_control"
)
# Determine the entire runtime
print(f"Full run cost {int(time.time() - ts)} seconds.")