Esercitazione: Creare, eseguire il training e valutare un modello di uplift
Questa esercitazione presenta un esempio end-to-end di un flusso di lavoro di data science di Synapse in Microsoft Fabric. Si apprenderà come creare, eseguire il training e valutare modelli di uplift e applicare tecniche di uplift modeling.
Prerequisiti
Ottenere una sottoscrizione di Microsoft Fabric. In alternativa, iscriversi per ottenere una versione di valutazione di Microsoft Fabric gratuita.
Accedere a Microsoft Fabric.
Usare l’opzione esperienza sul lato sinistro della home page per passare all'esperienza Science di Synapse.
- Familiarità con i notebook di Microsoft Fabric
- Un lakehouse per questo notebook, per archiviare i dati per questo esempio. Per altre informazioni, vedere Aggiungere un lakehouse al notebook
Seguire la procedura in un notebook
È possibile seguire la procedura in un notebook in uno dei due modi seguenti:
- Aprire ed eseguire il notebook predefinito nell'esperienza di data science di Synapse
- Caricare il notebook da GitHub nell'esperienza di data science di Synapse
Aprire il notebook predefinito
Il notebook di Uplift modeling di esempio accompagna questa esercitazione. Visitare per aprire il notebook di esempio predefinito dell'esercitazione nell'esperienza Synapse Data Science:1. Andare sulla pagina iniziale di data science di Synapse. 1. Selezionare Usa esempio. 1. Selezionare l'esempio corrispondente:* Dalla scheda predefinita Flussi di lavoro end-to-end (Python), se l'esempio è relativo a un'esercitazione su Python. * Dalla scheda predefinita Flussi di lavoro end-to-end (R), se l'esempio è relativo a un'esercitazione su R. * Dalla scheda Esercitazioni rapide, se l'esempio è per un'esercitazione rapida.1. Collegare un lakehouse al notebook prima di iniziare a eseguire il codice. per altre informazioni sull'accesso ai notebook di esempio predefiniti per le esercitazioni.
Per aprire il notebook di esempio predefinito dell'esercitazione nell'esperienza di data science di Synapse:
Andare sulla pagina iniziale di data science di Synapse
Selezionare Usa un esempio
Selezionare l'esempio corrispondente:
- Dalla scheda predefinita Flussi di lavoro end-to-end (Python), se l'esempio è relativo a un'esercitazione su Python
- Dalla scheda predefinita Flussi di lavoro end-to-end (R), se l'esempio è relativo a un'esercitazione su R
- Dalla scheda Esercitazioni rapide, se l'esempio è per un'esercitazione rapida
Collegare un lakehouse al notebook prima di iniziare a eseguire il codice
Importare il notebook da GitHub
Il notebook AIsample - Uplift Modeling.ipynb accompagna questa esercitazione.
Per aprire il notebook di accompagnamento per questa esercitazione, seguire le istruzioni riportate in Preparare il sistema per le esercitazioni di data science per importare il notebook nell'area di lavoro.
È possibile creare un nuovo notebook se si preferisce copiare e incollare il codice da questa pagina.
Assicurarsi di collegare un lakehouse al notebook prima di iniziare a eseguire il codice.
Passaggio 1: caricare i dati
Set di dati
Criteo AI Lab ha creato il set di dati. Tale set di dati ha 13M righe. Ogni riga rappresenta un utente. Ogni riga ha 12 funzionalità, un indicatore di trattamento e due etichette binarie che includono visita e conversione.
- f0 - f11: valori di funzionalità (valori densi, mobili)
- trattamento: se un utente è stato selezionato casualmente per il trattamento (ad esempio, pubblicità) (1 = trattamento, 0 = controllo)
- conversione: se si è verificata una conversione (ad esempio, è stato effettuato un acquisto) per un utente (binario, etichetta)
- visita: se si è verificata una conversione (ad esempio, è stato effettuato un acquisto) per un utente (binario, etichetta)
Citazione
- Home page del set di dati: https://ailab.criteo.com/criteo-uplift-prediction-dataset/
Il set di dati usato per questo notebook richiede questa citazione di BibTex:
@inproceedings{Diemert2018,
author = {{Diemert Eustache, Betlei Artem} and Renaudin, Christophe and Massih-Reza, Amini},
title={A Large Scale Benchmark for Uplift Modeling},
publisher = {ACM},
booktitle = {Proceedings of the AdKDD and TargetAd Workshop, KDD, London,United Kingdom, August, 20, 2018},
year = {2018}
}
Suggerimento
Definendo i parametri seguenti, è possibile applicare facilmente questo notebook a set di dati diversi.
IS_CUSTOM_DATA = False # If True, the user must upload the dataset manually
DATA_FOLDER = "Files/uplift-modelling"
DATA_FILE = "criteo-research-uplift-v2.1.csv"
# Data schema
FEATURE_COLUMNS = [f"f{i}" for i in range(12)]
TREATMENT_COLUMN = "treatment"
LABEL_COLUMN = "visit"
EXPERIMENT_NAME = "aisample-upliftmodelling" # MLflow experiment name
Importare le librerie
Prima dell'elaborazione, è necessario importare le librerie Spark e SynapseML necessarie. È anche necessario importare una libreria di visualizzazione dei dati, ad esempio Seaborn, una libreria di visualizzazione dei dati Python. Una libreria di visualizzazioni dati offre un'interfaccia di alto livello per creare risorse visive su DataFrame e matrici. Altre informazioni su Spark, SynapseML e Seaborn.
import os
import gzip
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.style as style
import seaborn as sns
%matplotlib inline
from synapse.ml.featurize import Featurize
from synapse.ml.core.spark import FluentAPI
from synapse.ml.lightgbm import *
from synapse.ml.train import ComputeModelStatistics
import mlflow
Scaricare un set di dati e caricarlo nel lakehouse
Questo codice scarica una versione disponibile pubblicamente del set di dati, quindi archivia quella risorsa dati in un lakehouse di Fabric.
Importante
Assicurarsi di aggiungere un lakehouse al notebook prima di eseguirlo. In caso contrario, verrà generato un errore.
if not IS_CUSTOM_DATA:
# Download demo data files into lakehouse if not exist
import os, requests
remote_url = "http://go.criteo.net/criteo-research-uplift-v2.1.csv.gz"
download_file = "criteo-research-uplift-v2.1.csv.gz"
download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError("Default lakehouse not found, please add a lakehouse and restart the session.")
os.makedirs(download_path, exist_ok=True)
if not os.path.exists(f"{download_path}/{DATA_FILE}"):
r = requests.get(f"{remote_url}", timeout=30)
with open(f"{download_path}/{download_file}", "wb") as f:
f.write(r.content)
with gzip.open(f"{download_path}/{download_file}", "rb") as fin:
with open(f"{download_path}/{DATA_FILE}", "wb") as fout:
fout.write(fin.read())
print("Downloaded demo data files into lakehouse.")
Avviare la registrazione del runtime di questo notebook.
# Record the notebook running time
import time
ts = time.time()
Configurare il rilevamento dell'esperimento di MLflow
Per estendere le funzionalità di registrazione di MLflow, la registrazione automatica acquisisce automaticamente i valori dei parametri di input e le metriche di output di un modello di Machine Learning durante il training. Queste informazioni vengono quindi registrate nell'area di lavoro, da cui le API MLflow o l'esperimento corrispondente nell'area di lavoro possono accedervi e visualizzarle. Visitare questa risorsa per altre informazioni sulla registrazione automatica.
# Set up the MLflow experiment
import mlflow
mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True) # Disable MLflow autologging
Nota
Per disabilitare la registrazione automatica di Microsoft Fabric in una sessione del notebook, chiamare mlflow.autolog()
e impostare disable=True
.
Leggere i dati dal lakehouse
Leggere i dati non elaborati dalla sezione Files del lakehouse e aggiungere altre colonne per parti di data diverse. Le stesse informazioni vengono usate per creare una tabella delta partizionata.
raw_df = spark.read.csv(f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True).cache()
Passaggio 2: analisi esplorativa dei dati
Usare il comando display
per visualizzare le statistiche di alto livello del set di dati. È anche possibile mostrare le visualizzazioni grafico per visualizzare facilmente i sottoinsiemi del set di dati.
display(raw_df.limit(20))
Esaminare la percentuale di utenti che visitano, la percentuale di utenti che converte e la percentuale dei visitatori che converte.
raw_df.select(
F.mean("visit").alias("Percentage of users that visit"),
F.mean("conversion").alias("Percentage of users that convert"),
(F.sum("conversion") / F.sum("visit")).alias("Percentage of visitors that convert"),
).show()
L'analisi indica che il 4,9% degli utenti del gruppo di trattamento, utenti che hanno ricevuto il trattamento o pubblicità, ha visitato il negozio online. Solo il 3,8% degli utenti del gruppo di controllo, cioè gli utenti che non hanno mai ricevuto il trattamento o che non sono mai stati offerti o esposti alla pubblicità, hanno fatto lo stesso. Inoltre, lo 0,31% di tutti gli utenti del gruppo di trattamento ha convertito o effettuato un acquisto, mentre solo lo 0,19% degli utenti del gruppo di controllo lo ha fatto. Di conseguenza, il tasso di conversione dei visitatori che ha effettuato un acquisto, che erano anche membri del gruppo di trattamento, è pari al 6,36%, rispetto solo al 5,07%** per gli utenti del gruppo di controllo. In base a questi risultati, il trattamento può potenzialmente migliorare il tasso di visita di circa l'1%, e il tasso di conversione dei visitatori di circa l'1,3%. Il trattamento porta a un miglioramento significativo.
Passaggio 3: Definire il modello per il training
Preparare i set di dati di training e di test
In questo caso, si adatta un trasformatore Featurize al DataFrame raw_df
per estrarre le funzionalità dalle colonne di input specificate e restituire tali funzionalità in una nuova colonna denominata features
.
Il DataFrame risultante viene archiviato in un nuovo DataFrame denominato df
.
transformer = Featurize().setOutputCol("features").setInputCols(FEATURE_COLUMNS).fit(raw_df)
df = transformer.transform(raw_df)
# Split the DataFrame into training and test sets, with a 80/20 ratio and a seed of 42
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# Print the training and test dataset sizes
print("Size of train dataset: %d" % train_df.count())
print("Size of test dataset: %d" % test_df.count())
# Group the training dataset by the treatment column, and count the number of occurrences of each value
train_df.groupby(TREATMENT_COLUMN).count().show()
Preparare i set di dati di trattamento e controllo
Dopo aver creato i set di dati di training e test, è anche necessario formare i set di dati di trattamento e controllo per eseguire il training dei modelli di Machine Learning per misurare l'uplift.
# Extract the treatment and control DataFrames
treatment_train_df = train_df.where(f"{TREATMENT_COLUMN} > 0")
control_train_df = train_df.where(f"{TREATMENT_COLUMN} = 0")
Dopo aver preparato i dati, è possibile procedere con il training di un modello con LightGBM.
Uplift modeling: T-Learner con LightGBM
I modelli di meta-apprendimento sono un set di algoritmi, basati su algoritmi di Machine Learning come LightGBM, Xgboost e così via. Consentono di stimare l'effetto del trattamento medio condizionale o CATE. T-learner è un modello di meta-apprendimento che non usa un singolo modello. T-learner usa invece un modello per ogni variabile di trattamento. Di conseguenza, vengono sviluppati due modelli e si fa riferimento al modello di meta-apprendimento come T-learner. T-learner usa più modelli di Machine Learning per superare il problema di eliminare completamente il trattamento, forzando l'apprendimento a suddividerlo per primo.
mlflow.autolog(exclusive=False)
classifier = (
LightGBMClassifier(dataTransferMode="bulk")
.setFeaturesCol("features") # Set the column name for features
.setNumLeaves(10) # Set the number of leaves in each decision tree
.setNumIterations(100) # Set the number of boosting iterations
.setObjective("binary") # Set the objective function for binary classification
.setLabelCol(LABEL_COLUMN) # Set the column name for the label
)
# Start a new MLflow run with the name "uplift"
active_run = mlflow.start_run(run_name="uplift")
# Start a new nested MLflow run with the name "treatment"
with mlflow.start_run(run_name="treatment", nested=True) as treatment_run:
treatment_run_id = treatment_run.info.run_id # Get the ID of the treatment run
treatment_model = classifier.fit(treatment_train_df) # Fit the classifier on the treatment training data
# Start a new nested MLflow run with the name "control"
with mlflow.start_run(run_name="control", nested=True) as control_run:
control_run_id = control_run.info.run_id # Get the ID of the control run
control_model = classifier.fit(control_train_df) # Fit the classifier on the control training data
Usa set di dati di test per una previsione
In questo caso si usa treatment_model
e control_model
, entrambi definiti in precedenza, per trasformare il set di dati di test test_df
. Quindi, si calcola l'uplift previsto. Si definisce l'uplift previsto come differenza tra il risultato previsto del trattamento e il risultato del controllo stimato. Maggiore è la differenza di uplift previsto, maggiore è l'efficacia del trattamento (ad esempio, pubblicità) su un individuo o un sottogruppo.
getPred = F.udf(lambda v: float(v[1]), FloatType())
# Cache the resulting DataFrame for easier access
test_pred_df = (
test_df.mlTransform(treatment_model)
.withColumn("treatment_pred", getPred("probability"))
.drop("rawPrediction", "probability", "prediction")
.mlTransform(control_model)
.withColumn("control_pred", getPred("probability"))
.drop("rawPrediction", "probability", "prediction")
.withColumn("pred_uplift", F.col("treatment_pred") - F.col("control_pred"))
.select(TREATMENT_COLUMN, LABEL_COLUMN, "treatment_pred", "control_pred", "pred_uplift")
.cache()
)
# Display the first twenty rows of the resulting DataFrame
display(test_pred_df.limit(20))
Eseguire valutazioni del modello
Poiché non è possibile osservare un uplift effettivo per ogni individuo, è necessario misurare l'uplift su un gruppo di individui. Si usa una curva di uplift che traccia il vero e cumulativo uplift della popolazione.
L'asse x rappresenta il rapporto tra la popolazione selezionata per il trattamento. Un valore di 0 indica l'assenza di un gruppo di trattamento; nessuno è esposto o ha ricevuto il trattamento. Un valore di 1 indica un gruppo di trattamento completo; tutti sono esposti o hanno ricevuto il trattamento. L'asse y mostra la misura di uplift. L'obiettivo è trovare le dimensioni del gruppo di trattamento, o la percentuale della popolazione che verrebbe offerta o esposta al trattamento (ad esempio, pubblicità). Questo approccio ottimizza la selezione di destinazione per ottimizzare il risultato.
Prima di tutto, classificare l'ordine del DataFrame di test in base all'uplift stimato. L'uplift previsto è la differenza tra il risultato previsto del trattamento e il risultato del controllo stimato.
# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))
display(test_ranked_df.limit(20))
Calcolare quindi la percentuale cumulativa di visite nei gruppi di trattamento e di controllo.
# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()
# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
test_ranked_df.withColumn(
"control_label",
F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"treatment_label",
F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"control_cumsum",
F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
)
.withColumn(
"treatment_cumsum",
F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
)
)
# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))
Infine, a ogni percentuale, calcolare l'uplift del gruppo come differenza tra la percentuale cumulativa di visite tra il trattamento e i gruppi di controllo.
test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))
Tracciare ora la curva di uplift per la stima del set di dati di test. È necessario convertire il DataFrame PySpark in un DataFrame Pandas prima di tracciare.
def uplift_plot(uplift_df):
"""
Plot the uplift curve
"""
gain_x = uplift_df.percent_rank
gain_y = uplift_df.group_uplift
# Plot the data
fig = plt.figure(figsize=(10, 6))
mpl.rcParams["font.size"] = 8
ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")
plt.plot(
[0, gain_x.max()],
[0, gain_y.max()],
"--",
color="tab:orange",
label="Random Treatment",
)
plt.legend()
plt.xlabel("Porportion Targeted")
plt.ylabel("Uplift")
plt.grid()
return fig, ax
test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)
mlflow.log_figure(fig, "UpliftCurve.png")
L'asse x rappresenta il rapporto tra la popolazione selezionata per il trattamento. Un valore di 0 indica l'assenza di un gruppo di trattamento; nessuno è esposto o ha ricevuto il trattamento. Un valore di 1 indica un gruppo di trattamento completo; tutti sono esposti o hanno ricevuto il trattamento. L'asse y mostra la misura di uplift. L'obiettivo è trovare le dimensioni del gruppo di trattamento, o la percentuale della popolazione che verrebbe offerta o esposta al trattamento (ad esempio, pubblicità). Questo approccio ottimizza la selezione di destinazione per ottimizzare il risultato.
Prima di tutto, classificare l'ordine del DataFrame di test in base all'uplift stimato. L'uplift previsto è la differenza tra il risultato previsto del trattamento e il risultato del controllo stimato.
# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))
display(test_ranked_df.limit(20))
Calcolare quindi la percentuale cumulativa di visite nei gruppi di trattamento e di controllo.
# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()
# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
test_ranked_df.withColumn(
"control_label",
F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"treatment_label",
F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"control_cumsum",
F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
)
.withColumn(
"treatment_cumsum",
F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
)
)
# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))
Infine, a ogni percentuale, calcolare l'uplift del gruppo come differenza tra la percentuale cumulativa di visite tra il trattamento e i gruppi di controllo.
test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))
Tracciare ora la curva di uplift per la stima del set di dati di test. È necessario convertire il DataFrame PySpark in un DataFrame Pandas prima di tracciare.
def uplift_plot(uplift_df):
"""
Plot the uplift curve
"""
gain_x = uplift_df.percent_rank
gain_y = uplift_df.group_uplift
# Plot the data
fig = plt.figure(figsize=(10, 6))
mpl.rcParams["font.size"] = 8
ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")
plt.plot(
[0, gain_x.max()],
[0, gain_y.max()],
"--",
color="tab:orange",
label="Random Treatment",
)
plt.legend()
plt.xlabel("Porportion Targeted")
plt.ylabel("Uplift")
plt.grid()
return fig, ax
test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)
mlflow.log_figure(fig, "UpliftCurve.png")
L'analisi e la curva di uplift mostrano entrambi che la popolazione superiore del 20%, come classificato dalla previsione, avrebbe un grande guadagno se ricevessero il trattamento. Ciò significa che il 20% della popolazione rappresenta il gruppo di persuadabili. Pertanto, è possibile impostare il punteggio di cutoff per le dimensioni desiderate del gruppo di trattamento al 20%, per identificare i clienti di selezione di destinazione per il maggiore impatto.
cutoff_percentage = 0.2
cutoff_score = test_ranked_pd_df.iloc[int(len(test_ranked_pd_df) * cutoff_percentage)][
"pred_uplift"
]
print("Uplift scores that exceed {:.4f} map to Persuadables.".format(cutoff_score))
mlflow.log_metrics(
{"cutoff_score": cutoff_score, "cutoff_percentage": cutoff_percentage}
)
Passaggio 4: Registrare il modello di ML finale
Si usa MLflow per tenere traccia e registrare tutti gli esperimenti sia per i gruppi di trattamento che per i gruppi di controllo. Questo rilevamento e registrazione includono i parametri, le metriche e i modelli corrispondenti. Queste informazioni vengono registrate con il nome dell'esperimento, nell'area di lavoro, per usarle in un secondo momento.
# Register the model
treatment_model_uri = "runs:/{}/model".format(treatment_run_id)
mlflow.register_model(treatment_model_uri, f"{EXPERIMENT_NAME}-treatmentmodel")
control_model_uri = "runs:/{}/model".format(control_run_id)
mlflow.register_model(control_model_uri, f"{EXPERIMENT_NAME}-controlmodel")
mlflow.end_run()
Per visualizzare gli esperimenti:
- Nel pannello a sinistra, selezionare l'area di lavoro.
- Trovare e selezionare il nome dell'esperimento, in questo caso aisample-upliftmodelling.
Passaggio 5: salvare i risultati della previsione
Microsoft Fabric offre PREDICT: una funzione scalabile che supporta l'assegnazione dei punteggi batch in qualsiasi motore di calcolo. Consente ai clienti di rendere operativi i modelli di Machine Learning. Gli utenti possono creare previsioni di batch direttamente da un notebook o dalla pagina di un elemento per uno specifico modello. Visitare questa risorsa per altre informazioni su PREDICT e per informazioni su come usare PREDICT in Microsoft Fabric.
# Load the model back
loaded_treatmentmodel = mlflow.spark.load_model(treatment_model_uri, dfs_tmpdir="Files/spark")
loaded_controlmodel = mlflow.spark.load_model(control_model_uri, dfs_tmpdir="Files/spark")
# Make predictions
batch_predictions_treatment = loaded_treatmentmodel.transform(test_df)
batch_predictions_control = loaded_controlmodel.transform(test_df)
batch_predictions_treatment.show(5)
# Save the predictions in the lakehouse
batch_predictions_treatment.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/batch_predictions_treatment"
)
batch_predictions_control.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/batch_predictions_control"
)
# Determine the entire runtime
print(f"Full run cost {int(time.time() - ts)} seconds.")