教程:创建、训练和评估提升模型

本教程介绍了 Microsoft Fabric 中 Synapse 数据科学工作流的端到端示例。 了解如何创建、训练和评估增益模型并应用增益建模技术。

先决条件

请按照笔记本进行操作

可以通过以下两种方式之一按照笔记本进行操作:

  • 在 Synapse 数据科学体验中打开并运行内置笔记本
  • 将笔记本从 GitHub 上传到 Synapse 数据科学体验

打开内置笔记本

示例营销增益建模笔记本是本教程随附的笔记本。 访问 要在 Synapse 数据科学体验中打开教程的内置示例笔记本:1. 转到 Synapse 数据科学主页。 1. 选择“使用示例”。 1. 选择相应的示例:* 来自默认“端到端工作流 (Python)”选项卡(如果示例适用于 Python 教程)。 * 来自“端到端工作流 (R)”选项卡(如果示例适用于 R 教程)。 * 来自“快速教程”选项卡(如果示例适用于快速教程)。1. 在开始运行代码之前,将湖屋连接到笔记本。 有关访问教程的内置示例笔记本的详细信息。

要在 Synapse 数据科学体验中打开教程的内置示例笔记本,请执行以下操作:

  1. 转到 Synapse 数据科学主页

  2. 选择“使用示例”

  3. 选择相应的示例:

    1. 来自默认“端到端工作流 (Python)”选项卡(如果示例适用于 Python 教程)
    2. 来自“端到端工作流 (R)“选项卡(如果示例适用于 R 教程)
    3. 来自“快速教程”选项卡(如果示例适用于快速教程)
  4. 在开始运行代码之前,将湖屋连接到笔记本

从 GitHub 导入笔记本

AIsample - Uplift Modeling.ipynb 本教程随附的笔记本。

若要打开本教程随附的笔记本,请按照让系统为数据科学做好准备教程中的说明操作,将该笔记本导入到工作区。

或者,如果要从此页面复制并粘贴代码,则可以创建新的笔记本

在开始运行代码之前,请务必将湖屋连接到笔记本

步骤 1:加载数据

数据集

Criteo AI 实验室创建了数据集。 该数据集有 1300 万行。 每行代表一个用户。 每行具有 12 个特征、一个处理指示器和两个二进制标签(包括访问和转换)。

显示 Criteo AI 实验室数据集结构的屏幕截图。

  • f0 - f11:特征值(密集、浮动值)
  • 处理:用户是否是处理(例如,广告)的随机目标(1 = 处理,0 = 对照)
  • 转换:用户(二进制、标签)是否发生了转换(例如,进行了购买)
  • 访问:用户(二进制、标签)是否发生了转换(例如,进行了购买)

引文

用于此笔记本的数据集需要以下 BibTex 引文:

@inproceedings{Diemert2018,
author = {{Diemert Eustache, Betlei Artem} and Renaudin, Christophe and Massih-Reza, Amini},
title={A Large Scale Benchmark for Uplift Modeling},
publisher = {ACM},
booktitle = {Proceedings of the AdKDD and TargetAd Workshop, KDD, London,United Kingdom, August, 20, 2018},
year = {2018}
}

提示

通过定义以下参数,你可以轻松地将此笔记本应用于不同的数据集。

IS_CUSTOM_DATA = False  # If True, the user must upload the dataset manually
DATA_FOLDER = "Files/uplift-modelling"
DATA_FILE = "criteo-research-uplift-v2.1.csv"

# Data schema
FEATURE_COLUMNS = [f"f{i}" for i in range(12)]
TREATMENT_COLUMN = "treatment"
LABEL_COLUMN = "visit"

EXPERIMENT_NAME = "aisample-upliftmodelling"  # MLflow experiment name

导入库

在处理之前,必须导入所需的 Spark 和 SynapseML 库。 还必须导入数据可视化库(例如,Seaborn,它是一个 Python 数据可视化库)。 数据可视化库提供了一个高级接口,用于在 DataFrame 和数组上构建可视化资源。 详细了解 SparkSynapseMLSeaborn

import os
import gzip

import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *

import numpy as np
import pandas as pd

import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.style as style
import seaborn as sns

%matplotlib inline

from synapse.ml.featurize import Featurize
from synapse.ml.core.spark import FluentAPI
from synapse.ml.lightgbm import *
from synapse.ml.train import ComputeModelStatistics

import mlflow

下载数据集并上传到湖屋

此代码将下载数据集的公开可用版本,然后将该数据资源存储在 Fabric 湖屋中。

重要

确保在运行笔记本之前向笔记本添加湖屋。 否则可能会导致出错。

if not IS_CUSTOM_DATA:
    # Download demo data files into lakehouse if not exist
    import os, requests

    remote_url = "http://go.criteo.net/criteo-research-uplift-v2.1.csv.gz"
    download_file = "criteo-research-uplift-v2.1.csv.gz"
    download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found, please add a lakehouse and restart the session.")
    os.makedirs(download_path, exist_ok=True)
    if not os.path.exists(f"{download_path}/{DATA_FILE}"):
        r = requests.get(f"{remote_url}", timeout=30)
        with open(f"{download_path}/{download_file}", "wb") as f:
            f.write(r.content)
        with gzip.open(f"{download_path}/{download_file}", "rb") as fin:
            with open(f"{download_path}/{DATA_FILE}", "wb") as fout:
                fout.write(fin.read())
    print("Downloaded demo data files into lakehouse.")

开始录制此笔记本的运行时。

# Record the notebook running time
import time

ts = time.time()

设置 MLflow 试验跟踪

为了扩展 MLflow 日志记录功能,自动日志记录会在训练期间自动捕获机器学习模型的输入参数和输出指标的值。 然后,此信息会记录到工作区,MLflow API 或工作区中的相应试验可以访问并可视化该信息。 有关自动记录的详细信息,请访问此资源。

# Set up the MLflow experiment
import mlflow

mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True)  # Disable MLflow autologging

注意

若要在笔记本会话中禁用 Microsoft Fabric 自动日志记录,请调用 mlflow.autolog() 并设置 disable=True

从湖屋读取数据

从湖屋“文件”部分读取原始数据,并为不同日期部分添加更多列。 使用相同的信息创建分区增量表。

raw_df = spark.read.csv(f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True).cache()

步骤 2:探索性数据分析

使用 display 命令查看有关数据集的高级统计信息。 还可以显示图表视图以方便地可视化数据集的子集。

display(raw_df.limit(20))

检查访问的用户百分比、转换的用户百分比和转换的访问者百分比。

raw_df.select(
    F.mean("visit").alias("Percentage of users that visit"),
    F.mean("conversion").alias("Percentage of users that convert"),
    (F.sum("conversion") / F.sum("visit")).alias("Percentage of visitors that convert"),
).show()

分析表明,实验组中 4.9% 的用户(收到处理或广告的用户)访问了在线商店。 对照组中只有 3.8% 的用户(从未收到处理的用户,或者从未被提供或暴露给广告的用户)这样做。 此外,实验组所有用户中 0.31% 的用户已转换或进行了购买,而对照组中只有 0.19% 的用户这样做。 因此,进行购买的访问者(也是实验组的成员)的转换率为 6.36%,而对照组的用户只有 5.07%。 根据这些结果,处理可能会使访问率提高约 1%,访问者的转换率约为 1.3%。 处理带来了显著改善。

步骤 3:定义训练模型

准备训练并测试数据集

在这里,可以将 Featurize 转换器拟合到 raw_df DataFrame,以便从指定的输入列中提取特征,并将这些特征输出到名为 features 的新列。

生成的 DataFrame 会存储在名为 df 的新 DataFrame 中。

transformer = Featurize().setOutputCol("features").setInputCols(FEATURE_COLUMNS).fit(raw_df)
df = transformer.transform(raw_df)
# Split the DataFrame into training and test sets, with a 80/20 ratio and a seed of 42
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Print the training and test dataset sizes
print("Size of train dataset: %d" % train_df.count())
print("Size of test dataset: %d" % test_df.count())

# Group the training dataset by the treatment column, and count the number of occurrences of each value
train_df.groupby(TREATMENT_COLUMN).count().show()

准备处理和对照数据集

创建训练和测试数据集后,还必须形成处理和对照数据集,以训练机器学习模型来测量提升。

# Extract the treatment and control DataFrames
treatment_train_df = train_df.where(f"{TREATMENT_COLUMN} > 0")
control_train_df = train_df.where(f"{TREATMENT_COLUMN} = 0")

准备好数据后,可以继续使用 LightGBM 训练模型。

提升建模:使用 LightGBM 的 T-Learner

元学习器是基于 LightGBM、Xgboost 等机器学习算法构建的一组算法。它们有助于估计条件平均处理效应 (CATE)。 T-learner 是不使用单一模型的元学习器。 相反,T-learner 为每个处理变量使用一个模型。 因此,开发了两个模型,我们将元学习器称为 T-learner。 T-learner 使用多个机器学习模型来克服完全放弃处理的问题,方法是迫使学习器首先对其进行拆分。

mlflow.autolog(exclusive=False)
classifier = (
    LightGBMClassifier(dataTransferMode="bulk")
    .setFeaturesCol("features")  # Set the column name for features
    .setNumLeaves(10)  # Set the number of leaves in each decision tree
    .setNumIterations(100)  # Set the number of boosting iterations
    .setObjective("binary")  # Set the objective function for binary classification
    .setLabelCol(LABEL_COLUMN)  # Set the column name for the label
)

# Start a new MLflow run with the name "uplift"
active_run = mlflow.start_run(run_name="uplift")

# Start a new nested MLflow run with the name "treatment"
with mlflow.start_run(run_name="treatment", nested=True) as treatment_run:
    treatment_run_id = treatment_run.info.run_id  # Get the ID of the treatment run
    treatment_model = classifier.fit(treatment_train_df)  # Fit the classifier on the treatment training data

# Start a new nested MLflow run with the name "control"
with mlflow.start_run(run_name="control", nested=True) as control_run:
    control_run_id = control_run.info.run_id  # Get the ID of the control run
    control_model = classifier.fit(control_train_df)  # Fit the classifier on the control training data
     

使用测试数据集进行预测

在这里,使用前面定义的 treatment_modelcontrol_model 来转换 test_df 测试数据集。 然后,计算预测的提升。 可将预测提升定义为预测处理结果与预测控制结果之间的差异。 这种预测的提升差异越大,处理(例如,广告)对个人或子组的有效性越大。

getPred = F.udf(lambda v: float(v[1]), FloatType())

# Cache the resulting DataFrame for easier access
test_pred_df = (
    test_df.mlTransform(treatment_model)
    .withColumn("treatment_pred", getPred("probability"))
    .drop("rawPrediction", "probability", "prediction")
    .mlTransform(control_model)
    .withColumn("control_pred", getPred("probability"))
    .drop("rawPrediction", "probability", "prediction")
    .withColumn("pred_uplift", F.col("treatment_pred") - F.col("control_pred"))
    .select(TREATMENT_COLUMN, LABEL_COLUMN, "treatment_pred", "control_pred", "pred_uplift")
    .cache()
)

# Display the first twenty rows of the resulting DataFrame
display(test_pred_df.limit(20))

执行模型评估

由于无法观察到每个个体的实际提升,因此需要测量一组个体的实际提升。 可以使用一条提升曲线来绘制群体的实际累积提升。

显示标准化提升模型曲线与随机处理的图表屏幕截图。

x 轴表示选择用于处理的人群的比例。 值为 0 表示没有实验组,即没有人暴露给或被提供处理。 值为 1 表示一个完整的实验组,每个人都暴露给或被提供处理。 y 轴显示了提升度量值。 目的是查找实验组的大小,或者将被提供或暴露给处理(例如,广告)的人群百分比。 此方法优化了目标选择来优化结果。

首先,按预测增量对测试 DataFrame 订单进行排名。 预测提升是指预测处理结果与预测控制结果之间的差异。

# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))

display(test_ranked_df.limit(20))

接下来,计算实验组和对照组的累计访问百分比。

# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()

# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
    test_ranked_df.withColumn(
        "control_label",
        F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "treatment_label",
        F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "control_cumsum",
        F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
    )
    .withColumn(
        "treatment_cumsum",
        F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
    )
)

# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))

最后,在每个百分比处,将组的提升计算为实验组与对照组之间累计访问百分比之间的差异。

test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))

现在,绘制测试数据集预测的提升曲线。 在绘制之前,必须将 PySpark DataFrame 转换为 Pandas DataFrame。

def uplift_plot(uplift_df):
    """
    Plot the uplift curve
    """
    gain_x = uplift_df.percent_rank
    gain_y = uplift_df.group_uplift
    # Plot the data
    fig = plt.figure(figsize=(10, 6))
    mpl.rcParams["font.size"] = 8

    ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")

    plt.plot(
        [0, gain_x.max()],
        [0, gain_y.max()],
        "--",
        color="tab:orange",
        label="Random Treatment",
    )
    plt.legend()
    plt.xlabel("Porportion Targeted")
    plt.ylabel("Uplift")
    plt.grid()

    return fig, ax


test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)

mlflow.log_figure(fig, "UpliftCurve.png")

显示标准化提升模型曲线与随机处理的图表屏幕截图。

x 轴表示选择用于处理的人群的比例。 值为 0 表示没有实验组,即没有人暴露给或被提供处理。 值为 1 表示一个完整的实验组,每个人都暴露给或被提供处理。 y 轴显示了提升度量值。 目的是查找实验组的大小,或者将被提供或暴露给处理(例如,广告)的人群百分比。 此方法优化了目标选择来优化结果。

首先,按预测增量对测试 DataFrame 订单进行排名。 预测提升是指预测处理结果与预测控制结果之间的差异。

# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))

display(test_ranked_df.limit(20))

接下来,计算实验组和对照组的累计访问百分比。

# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()

# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
    test_ranked_df.withColumn(
        "control_label",
        F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "treatment_label",
        F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "control_cumsum",
        F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
    )
    .withColumn(
        "treatment_cumsum",
        F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
    )
)

# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))

最后,在每个百分比处,将组的提升计算为实验组与对照组之间累计访问百分比之间的差异。

test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))

现在,绘制测试数据集预测的提升曲线。 在绘制之前,必须将 PySpark DataFrame 转换为 Pandas DataFrame。

def uplift_plot(uplift_df):
    """
    Plot the uplift curve
    """
    gain_x = uplift_df.percent_rank
    gain_y = uplift_df.group_uplift
    # Plot the data
    fig = plt.figure(figsize=(10, 6))
    mpl.rcParams["font.size"] = 8

    ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")

    plt.plot(
        [0, gain_x.max()],
        [0, gain_y.max()],
        "--",
        color="tab:orange",
        label="Random Treatment",
    )
    plt.legend()
    plt.xlabel("Porportion Targeted")
    plt.ylabel("Uplift")
    plt.grid()

    return fig, ax


test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)

mlflow.log_figure(fig, "UpliftCurve.png")

显示标准化提升模型曲线与随机处理的图表屏幕截图。

分析和提升曲线都表明,根据预测,排名前 20% 的人群如果接受处理,将有很大收益。 这意味着,群体的前 20% 代表可说服的组。 因此,可以将所需实验组大小的临界分数设置为 20%,以识别目标选择客户,以产生最大的影响。

cutoff_percentage = 0.2
cutoff_score = test_ranked_pd_df.iloc[int(len(test_ranked_pd_df) * cutoff_percentage)][
    "pred_uplift"
]

print("Uplift scores that exceed {:.4f} map to Persuadables.".format(cutoff_score))
mlflow.log_metrics(
    {"cutoff_score": cutoff_score, "cutoff_percentage": cutoff_percentage}
)

步骤 4:注册最终 ML 模型

可使用 MLflow 跟踪和记录实验组和对照组的所有试验。 此跟踪和日志记录包括相应的参数、指标以及模型。 这些信息记录在工作区的试验名称下,以备日后使用。

# Register the model
treatment_model_uri = "runs:/{}/model".format(treatment_run_id)
mlflow.register_model(treatment_model_uri, f"{EXPERIMENT_NAME}-treatmentmodel")

control_model_uri = "runs:/{}/model".format(control_run_id)
mlflow.register_model(control_model_uri, f"{EXPERIMENT_NAME}-controlmodel")

mlflow.end_run()

查看试验:

  1. 在左侧面板中,选择工作区。
  2. 查找并选择试验名称,在本例中为 aisample-upliftmodelling

显示 aisample 提升建模试验结果的屏幕截图。

步骤 5:保存预测结果

Microsoft Fabric 提供 PREDICT,这是一种可缩放函数,支持在任何计算引擎中进行批量评分。 它让客户能够操作机器学习模型。 用户可以直接从笔记本或特定模型的项目页面创建批量预测。 请访问此资源,以了解有关 PREDICT 的更多信息,并了解如何在 Microsoft Fabric 中使用 PREDICT。

# Load the model back
loaded_treatmentmodel = mlflow.spark.load_model(treatment_model_uri, dfs_tmpdir="Files/spark")
loaded_controlmodel = mlflow.spark.load_model(control_model_uri, dfs_tmpdir="Files/spark")

# Make predictions
batch_predictions_treatment = loaded_treatmentmodel.transform(test_df)
batch_predictions_control = loaded_controlmodel.transform(test_df)
batch_predictions_treatment.show(5)
# Save the predictions in the lakehouse
batch_predictions_treatment.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions_treatment"
)
batch_predictions_control.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions_control"
)
# Determine the entire runtime
print(f"Full run cost {int(time.time() - ts)} seconds.")