共用方式為


教學課程:建立、訓練和評估提升模型

本教學課程會在 Microsoft Fabric 中呈現 Synapse 資料科學工作流程的端對端範例。 您將了解如何建立、訓練及評估提升模型,並套用提升模型化技術。

必要條件

遵循筆記本中的指示

您可以透過下列兩種方式之一遵循筆記本中的指示操作:

  • 在 Synapse 資料科學體驗中開啟並執行內建筆記本
  • 將筆記本從 GitHub 上傳至 Synapse 資料科學體驗

開啟內建筆記本

本教學課程隨附範例提升模型化筆記本。 瀏覽以在 Synapse 資料科學體驗中開啟教學課程的內建範例筆記本:1. 移至 Synapse 資料科學首頁。 1. 選取 [使用範例]。 1. 選取對應的範例:* 如果範例適用於 Python 教學課程,則從預設的 [端對端工作流程 (Python)] 索引標籤選取。 * 如果範例適用於 R 教學課程,則從 [端對端工作流程] 索引標籤選取。 * 如果範例適用於快速教學課程,則從 [快速教學課程] 索引標籤選取。1. 開始執行程式碼之前,將 Lakehouse 連結至筆記本。 以便存取教學課程之內建範例筆記本的詳細資訊。

在 Synapse 資料科學體驗中開啟教學課程的內建範例筆記本:

  1. 移至 Synapse 資料科學首頁

  2. 選取 [使用範例]

  3. 選取對應的範例︰

    1. 如果範例適用於 Python 教學課程,則從預設的 [端對端工作流程 (Python)] 索引標籤選取
    2. 如果範例適用於 R 教學課程,則從 [端對端工作流程] 索引標籤選取
    3. 如果範例適用於快速教學課程,則從 [快速教學課程] 索引標籤選取
  4. 開始執行程式碼之前,將 Lakehouse 連結至筆記本

從 GitHub 匯入筆記本

本教學課程隨附 AIsample - Uplift Modeling.ipynb 筆記本。

若要開啟本教學課程隨附的筆記本,請遵循為資料科學教學課程準備系統中的指示,將筆記本匯入您的工作區。

如果您想要複製並貼上此頁面中的程式碼,可以建立新的筆記本

開始執行程式碼之前,請務必將 Lakehouse 連結至筆記本

步驟 1:載入資料

資料集

Criteo AI 實驗室已建立資料集。 該資料集有 13M 個資料列。 每個資料列都代表一個使用者。 每個資料列都有 12 個特徵、處理指標,以及包括瀏覽和轉換的兩個二進位標記。

顯示 Criteo AI 實驗室資料集結構的螢幕擷取畫面。

  • f0 - f11:特徵值 (密集、浮動值)
  • 處理:使用者是否隨機成為處理的目標 (例如廣告) (1 = 處理, 0 = 控制)
  • 轉換:使用者是否發生轉換 (例如,已購買) (二進位、標籤)
  • 瀏覽:使用者是否發生轉換 (例如,已購買) (二進位、標籤)

引文

本筆記本使用的資料集需要此 BibTex 引文:

@inproceedings{Diemert2018,
author = {{Diemert Eustache, Betlei Artem} and Renaudin, Christophe and Massih-Reza, Amini},
title={A Large Scale Benchmark for Uplift Modeling},
publisher = {ACM},
booktitle = {Proceedings of the AdKDD and TargetAd Workshop, KDD, London,United Kingdom, August, 20, 2018},
year = {2018}
}

提示

藉由定義下列參數,可以輕鬆地將此筆記本套用到不同的資料集上。

IS_CUSTOM_DATA = False  # If True, the user must upload the dataset manually
DATA_FOLDER = "Files/uplift-modelling"
DATA_FILE = "criteo-research-uplift-v2.1.csv"

# Data schema
FEATURE_COLUMNS = [f"f{i}" for i in range(12)]
TREATMENT_COLUMN = "treatment"
LABEL_COLUMN = "visit"

EXPERIMENT_NAME = "aisample-upliftmodelling"  # MLflow experiment name

匯入程式庫

處理之前,必須匯入必要的 Spark 和 SynapseML 程式庫。 您也必須匯入資料視覺效果程式庫,例如 Seaborn、Python 資料視覺效果程式庫。 資料視覺效果程式庫提供高階介面,可在 DataFrame 和陣列上建置視覺化資源。 深入了解 SparkSynapseMLSeaborn。

import os
import gzip

import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *

import numpy as np
import pandas as pd

import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.style as style
import seaborn as sns

%matplotlib inline

from synapse.ml.featurize import Featurize
from synapse.ml.core.spark import FluentAPI
from synapse.ml.lightgbm import *
from synapse.ml.train import ComputeModelStatistics

import mlflow

下載資料集並上傳至 Lakehouse

此程式碼會下載公開可用的資料集版本,然後將該資料資源儲存在 Fabric Lakehouse 中。

重要

在執行之前,請確定已將 Lakehouse 新增至筆記本。 無法執行這項操作時,將會發生錯誤。

if not IS_CUSTOM_DATA:
    # Download demo data files into lakehouse if not exist
    import os, requests

    remote_url = "http://go.criteo.net/criteo-research-uplift-v2.1.csv.gz"
    download_file = "criteo-research-uplift-v2.1.csv.gz"
    download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found, please add a lakehouse and restart the session.")
    os.makedirs(download_path, exist_ok=True)
    if not os.path.exists(f"{download_path}/{DATA_FILE}"):
        r = requests.get(f"{remote_url}", timeout=30)
        with open(f"{download_path}/{download_file}", "wb") as f:
            f.write(r.content)
        with gzip.open(f"{download_path}/{download_file}", "rb") as fin:
            with open(f"{download_path}/{DATA_FILE}", "wb") as fout:
                fout.write(fin.read())
    print("Downloaded demo data files into lakehouse.")

開始錄製此筆記本的執行階段。

# Record the notebook running time
import time

ts = time.time()

設定 MLflow 實驗追蹤

若要擴充 MLflow 記錄功能,自動記錄會在訓練期間自動擷取機器學習模型的輸入參數和輸出計量值。 此資訊接著會記錄到工作區,MLflow API 或工作區中的對應實驗可以存取並視覺化該資訊。 如需自動記錄的詳細資訊,請瀏覽此資源。

# Set up the MLflow experiment
import mlflow

mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True)  # Disable MLflow autologging

注意

若要停用筆記本工作階段的 Microsoft Fabric 自動記錄,請呼叫 mlflow.autolog() 並設定 disable=True

從 Lakehouse 讀取資料

從 Lakehouse [檔案] 區段讀取未經處理資料,並針對不同的日期部分新增更多資料行。 使用相同的資訊來建立資料分割的差異資料表。

raw_df = spark.read.csv(f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True).cache()

步驟 2:探索式資料分析

使用 display 命令來檢視資料集相關的高階統計資料。 您也可以顯示圖表檢視,輕鬆地將資料集的子集視覺化。

display(raw_df.limit(20))

檢查瀏覽的使用者百分比、轉換的使用者百分比,以及轉換的訪客百分比。

raw_df.select(
    F.mean("visit").alias("Percentage of users that visit"),
    F.mean("conversion").alias("Percentage of users that convert"),
    (F.sum("conversion") / F.sum("visit")).alias("Percentage of visitors that convert"),
).show()

分析表明,來自處理群組的 4.9% 的使用者,也就是接受處理或廣告的使用者,瀏覽了線上商店。 只有來自控制群組的 3.8% 使用者,即從未收到處理的使用者,或從未收到或暴露於廣告的使用者這樣做。 此外,來自處理群組的所有使用者中,有 0.31% 轉換或進行購買,而來自控制群組的使用者中,只有 0.19% 這樣做。 因此,在進行購買的訪客中,處理群組成員的轉換率為 6.36%,相比之下,控制群組的使用者僅有 5.07%。 根據這些結果,處理可能會使造訪率提升約 1%,訪客的轉換率提升約 1.3%。 處理會帶來顯著改善。

步驟 3:定義訓練模型

準備訓練和測試資料集

在這裡,您會將 Featurize 轉換器放入 raw_df DataFrame 中,以從指定的輸入資料行擷取特徵,並將這些特徵輸出至名為 features 的新資料行。

產生的 DataFrame 會儲存在名為 df 的新 DataFrame 中。

transformer = Featurize().setOutputCol("features").setInputCols(FEATURE_COLUMNS).fit(raw_df)
df = transformer.transform(raw_df)
# Split the DataFrame into training and test sets, with a 80/20 ratio and a seed of 42
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Print the training and test dataset sizes
print("Size of train dataset: %d" % train_df.count())
print("Size of test dataset: %d" % test_df.count())

# Group the training dataset by the treatment column, and count the number of occurrences of each value
train_df.groupby(TREATMENT_COLUMN).count().show()

準備處理和控制資料集

建立訓練和測試資料集之後,您還必須形成處理和控制資料集,以訓練機器學習模型來測量提升。

# Extract the treatment and control DataFrames
treatment_train_df = train_df.where(f"{TREATMENT_COLUMN} > 0")
control_train_df = train_df.where(f"{TREATMENT_COLUMN} = 0")

現在您已準備好資料,可以繼續使用 LightGBM 來訓練模型。

提升模型化:使用 LightGBM 的 T-Learner

中繼學習模組是一組演算法,建立在 LightGBM、Xgboost 等機器學習演算法之上。它們有助於估計條件式平均處理效果,或 CATE。 T-learner 是不使用單一模型的中繼學習模組。 相反,T-learner 會為每個處理變數使用一個模型。 因此,我們開發了兩個模型,並將中繼學習模組稱為 T-learner。 T-learner 會使用多個機器學習模型來克服完全捨棄處理的問題,方法是強制學習者先對其進行分割。

mlflow.autolog(exclusive=False)
classifier = (
    LightGBMClassifier(dataTransferMode="bulk")
    .setFeaturesCol("features")  # Set the column name for features
    .setNumLeaves(10)  # Set the number of leaves in each decision tree
    .setNumIterations(100)  # Set the number of boosting iterations
    .setObjective("binary")  # Set the objective function for binary classification
    .setLabelCol(LABEL_COLUMN)  # Set the column name for the label
)

# Start a new MLflow run with the name "uplift"
active_run = mlflow.start_run(run_name="uplift")

# Start a new nested MLflow run with the name "treatment"
with mlflow.start_run(run_name="treatment", nested=True) as treatment_run:
    treatment_run_id = treatment_run.info.run_id  # Get the ID of the treatment run
    treatment_model = classifier.fit(treatment_train_df)  # Fit the classifier on the treatment training data

# Start a new nested MLflow run with the name "control"
with mlflow.start_run(run_name="control", nested=True) as control_run:
    control_run_id = control_run.info.run_id  # Get the ID of the control run
    control_model = classifier.fit(control_train_df)  # Fit the classifier on the control training data
     

使用測試資料集執行預測

在這裡,您會使用 treatment_modelcontrol_model,兩者於稍早定義,以轉換 test_df 測試資料集。 然後,您可以計算預測的提升。 您可以將預測提升定義為預測處理結果與預測控制結果之間的差異。 這個預測提升的差異越大,對個人或子群組的處理 (例如廣告) 就越有效。

getPred = F.udf(lambda v: float(v[1]), FloatType())

# Cache the resulting DataFrame for easier access
test_pred_df = (
    test_df.mlTransform(treatment_model)
    .withColumn("treatment_pred", getPred("probability"))
    .drop("rawPrediction", "probability", "prediction")
    .mlTransform(control_model)
    .withColumn("control_pred", getPred("probability"))
    .drop("rawPrediction", "probability", "prediction")
    .withColumn("pred_uplift", F.col("treatment_pred") - F.col("control_pred"))
    .select(TREATMENT_COLUMN, LABEL_COLUMN, "treatment_pred", "control_pred", "pred_uplift")
    .cache()
)

# Display the first twenty rows of the resulting DataFrame
display(test_pred_df.limit(20))

執行模型評估

由於無法觀察到每個人的實際提升,因此需要測量一個群組的提升。 您可以使用提升曲線來繪製整個人口的實際累積提升。

顯示標準化提升模型曲線與隨機處理之圖表的螢幕擷取畫面。

X 軸代表為處理選取的人口比例。 值為 0 表示沒有處理群組 - 沒有人暴露於或被提供處理。 值為 1 表示有一個完整的處理群組 - 每個人都暴露於或被提供處理。 Y 軸顯示提升量值。 目標是尋找處理群組的大小,或接受或暴露於處理 (例如廣告) 中的人口百分比。 此方法會最佳化目標選取範圍,以最佳化結果。

首先,依預測的提升對測試 DataFrame 順序進行排名。 預測的提升是預測處理結果與預測控制結果之間的差異。

# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))

display(test_ranked_df.limit(20))

接下來,計算處理和控制群組的累計瀏覽百分比。

# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()

# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
    test_ranked_df.withColumn(
        "control_label",
        F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "treatment_label",
        F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "control_cumsum",
        F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
    )
    .withColumn(
        "treatment_cumsum",
        F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
    )
)

# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))

最後,在每個百分比上,將群組的提升計算為處理群組與控制群組瀏覽累計百分比之間的差異。

test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))

現在,繪製測試資料集預測的提升曲線。 在繪製之前,必須將 PySpark DataFrame 轉換成 Pandas DataFrame。

def uplift_plot(uplift_df):
    """
    Plot the uplift curve
    """
    gain_x = uplift_df.percent_rank
    gain_y = uplift_df.group_uplift
    # Plot the data
    fig = plt.figure(figsize=(10, 6))
    mpl.rcParams["font.size"] = 8

    ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")

    plt.plot(
        [0, gain_x.max()],
        [0, gain_y.max()],
        "--",
        color="tab:orange",
        label="Random Treatment",
    )
    plt.legend()
    plt.xlabel("Porportion Targeted")
    plt.ylabel("Uplift")
    plt.grid()

    return fig, ax


test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)

mlflow.log_figure(fig, "UpliftCurve.png")

顯示標準化提升模型曲線與隨機處理之圖表的螢幕擷取畫面。

X 軸代表為處理選取的人口比例。 值為 0 表示沒有處理群組 - 沒有人暴露於或被提供處理。 值為 1 表示有一個完整的處理群組 - 每個人都暴露於或被提供處理。 Y 軸顯示提升量值。 目標是尋找處理群組的大小,或接受或暴露於處理 (例如廣告) 中的人口百分比。 此方法會最佳化目標選取範圍,以最佳化結果。

首先,依預測的提升對測試 DataFrame 順序進行排名。 預測的提升是預測處理結果與預測控制結果之間的差異。

# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))

display(test_ranked_df.limit(20))

接下來,計算處理和控制群組的累計瀏覽百分比。

# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()

# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
    test_ranked_df.withColumn(
        "control_label",
        F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "treatment_label",
        F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "control_cumsum",
        F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
    )
    .withColumn(
        "treatment_cumsum",
        F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
    )
)

# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))

最後,在每個百分比上,將群組的提升計算為處理群組與控制群組瀏覽累計百分比之間的差異。

test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))

現在,繪製測試資料集預測的提升曲線。 在繪製之前,必須將 PySpark DataFrame 轉換成 Pandas DataFrame。

def uplift_plot(uplift_df):
    """
    Plot the uplift curve
    """
    gain_x = uplift_df.percent_rank
    gain_y = uplift_df.group_uplift
    # Plot the data
    fig = plt.figure(figsize=(10, 6))
    mpl.rcParams["font.size"] = 8

    ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")

    plt.plot(
        [0, gain_x.max()],
        [0, gain_y.max()],
        "--",
        color="tab:orange",
        label="Random Treatment",
    )
    plt.legend()
    plt.xlabel("Porportion Targeted")
    plt.ylabel("Uplift")
    plt.grid()

    return fig, ax


test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)

mlflow.log_figure(fig, "UpliftCurve.png")

顯示標準化提升模型曲線與隨機處理之圖表的螢幕擷取畫面。

分析和提升曲線都顯示,如果人口的前 20% 按預測排名接受處理,將有很大的收益。 這表示人口的前 20% 代表可說服的群組。 因此,您可以將所需處理群組規模的分界點設定為 20%,以識別目標選取客戶,取得最大影響。

cutoff_percentage = 0.2
cutoff_score = test_ranked_pd_df.iloc[int(len(test_ranked_pd_df) * cutoff_percentage)][
    "pred_uplift"
]

print("Uplift scores that exceed {:.4f} map to Persuadables.".format(cutoff_score))
mlflow.log_metrics(
    {"cutoff_score": cutoff_score, "cutoff_percentage": cutoff_percentage}
)

步驟 4:註冊最終 ML 模型

您可以使用 MLflow 來追蹤和記錄處理和控制群組的所有實驗。 此追蹤和記錄包含對應的參數、計量及模型。 該資訊會記錄在工作區中的實驗名稱下,以供日後使用。

# Register the model
treatment_model_uri = "runs:/{}/model".format(treatment_run_id)
mlflow.register_model(treatment_model_uri, f"{EXPERIMENT_NAME}-treatmentmodel")

control_model_uri = "runs:/{}/model".format(control_run_id)
mlflow.register_model(control_model_uri, f"{EXPERIMENT_NAME}-controlmodel")

mlflow.end_run()

要檢視您的實驗:

  1. 在左側面板中,選取您的工作區。
  2. 尋找並選取實驗名稱,在此案例中為 aisample-upmodelling

顯示 aisample 提升模型化實驗結果的螢幕擷取畫面。

步驟 5:儲存預測結果

Microsoft Fabric 提供 PREDICT - 可調整函數,可支援任何計算引擎中的批次評分。 它可使客戶運作機器學習模型。 使用者可以直接從筆記本或特定模型的項目頁面建立批次預測。 請瀏覽此資源以深入了解 PREDICT,並了解如何在 Microsoft Fabric 中使用 PREDICT。

# Load the model back
loaded_treatmentmodel = mlflow.spark.load_model(treatment_model_uri, dfs_tmpdir="Files/spark")
loaded_controlmodel = mlflow.spark.load_model(control_model_uri, dfs_tmpdir="Files/spark")

# Make predictions
batch_predictions_treatment = loaded_treatmentmodel.transform(test_df)
batch_predictions_control = loaded_controlmodel.transform(test_df)
batch_predictions_treatment.show(5)
# Save the predictions in the lakehouse
batch_predictions_treatment.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions_treatment"
)
batch_predictions_control.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions_control"
)
# Determine the entire runtime
print(f"Full run cost {int(time.time() - ts)} seconds.")