次の方法で共有


チュートリアル: アップリフト モデルを作成、トレーニング、評価する

このチュートリアルでは、使用して、Microsoft Fabric での Synapse Data Science ワークフローのエンド ツー エンドの例について説明します。 アップリフト モデルを作成、トレーニング、評価し、アップリフト モデリング手法を適用する方法について説明します。

前提条件

ノートブックで作業を進める

ノートブックの次の 2 つの方法のいずれかに従うことができます。

  • Synapse Data Science 環境のビルトイン ノートブックを開いて実行します
  • GitHub から Synapse Data Science 環境にノートブックをアップロードします

組み込みのノートブックを開く

このチュートリアルには、サンプルのアップリフト モデリングノートブックが付属しています。 「チュートリアルのビルトインのサンプル ノートブックを Synapse Data Science 環境で開くには」を参照してください: 1. Synapse Data Science のホーム ページに移動します。 1. [サンプルの使用] を選択してください。 1. 対応するサンプルを選択する:* サンプルが Python チュートリアル用の場合は、既定の [エンド ツー エンド ワークフロー (Python)] タブから。 * サンプルが R チュートリアル用の場合は、[エンド ツー エンド ワークフロー (R)] タブから。 * サンプルがクイック チュートリアル 1. 用の場合は、[クイック チュートリアル] タブから。 コードの実行を開始する前に、[Lakehouseをノートブックにアタッチします]。 チュートリアル用の組み込みのサンプル ノートブックへのアクセスの詳細について。

チュートリアルのビルトインのサンプル ノートブックを Synapse Data Science 環境で開くには:

  1. Synapse Data Science のホーム ページに移動します

  2. [サンプルの使用] を選択する

  3. 対応するサンプルを選択してください。

    1. サンプルが Python チュートリアル用の場合は、既定の [エンド ツー エンド ワークフロー (Python)] タブから
    2. サンプルが R チュートリアル用の場合は、[エンド ツー エンド ワークフロー (R)] タブから
    3. サンプルがクイック チュートリアル用の場合は、[クイック チュートリアル] タブから
  4. コードの実行を開始する前に、[レイクハウスをノートブックにアタッチします]

GitHub からノートブックをインポートする

このチュートリアルには、AIsample - Uplift Modeling.ipynb ノートブックが付属しています。

このチュートリアルに付随するノートブックを開くには、「データ サイエンス用にシステムを準備する」チュートリアル の手順に従い、ノートブックをお使いのワークスペースにインポートします。

このページからコードをコピーしてペーストする場合は、[新規ノートブックを作成する] ことができます。

コードの実行を開始する前に、必ずLakehouseをノートブックにアタッチしてください。

ステップ 1: データをロードする

Dataset

Criteo AI Lab によってデータセットが作成されました。 そのデータセットには 1300 万行があります。 各行は 1 人のユーザーを表します。 各行には、12 の特性、処置のインジケーターと 2 つのバイナリ ラベル (訪問数とコンバージョン) も含まれます。

Criteo AI Lab のデータセット構造を示すスクリーンショット。

  • f0 ~ f11: 特徴値 (高密度、フローティング値)
  • 処置: ユーザーがランダムに処置の対象であったかどうか (例: 広告) (1 = 処置、0 = 制御)
  • 変換: ユーザー (バイナリ、ラベル) の変換 (購入など) が発生したかどうか
  • 訪問: ユーザーのコンバージョン (購入など) が発生したかどうか (バイナリ、ラベル)

引用

このノートブックに使用されるデータセットには、次の BibTex 引用が必要です:

@inproceedings{Diemert2018,
author = {{Diemert Eustache, Betlei Artem} and Renaudin, Christophe and Massih-Reza, Amini},
title={A Large Scale Benchmark for Uplift Modeling},
publisher = {ACM},
booktitle = {Proceedings of the AdKDD and TargetAd Workshop, KDD, London,United Kingdom, August, 20, 2018},
year = {2018}
}

ヒント

次のパラメーターを定義することで、このノートブックをさまざまなデータセットに簡単に適用できます。

IS_CUSTOM_DATA = False  # If True, the user must upload the dataset manually
DATA_FOLDER = "Files/uplift-modelling"
DATA_FILE = "criteo-research-uplift-v2.1.csv"

# Data schema
FEATURE_COLUMNS = [f"f{i}" for i in range(12)]
TREATMENT_COLUMN = "treatment"
LABEL_COLUMN = "visit"

EXPERIMENT_NAME = "aisample-upliftmodelling"  # MLflow experiment name

ライブラリをインポートする

処理する前に、必要な Spark および SynapseML ライブラリをインポートする必要があります。 また、データ視覚化ライブラリ (Seaborn、Python データ視覚化ライブラリなど) もインポートする必要があります。 データ可視化ライブラリは、DataFrame と配列でビジュアル リソースを構築するための高度なインターフェイスを提供します。 SparkSynapseML、および Seaborn の詳細について説明します。

import os
import gzip

import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *

import numpy as np
import pandas as pd

import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.style as style
import seaborn as sns

%matplotlib inline

from synapse.ml.featurize import Featurize
from synapse.ml.core.spark import FluentAPI
from synapse.ml.lightgbm import *
from synapse.ml.train import ComputeModelStatistics

import mlflow

データセットをダウンロードしてレイクハウスにアップロードする

このコードは、一般公開されているバージョンのデータセットをダウンロードし、そのテータ リソースを Fabric レイクハウスに格納します。

重要

実行する前に、必ずノートブックに レイクハウスを追加 してください。 間違ってインストールすると、エラーが発生します。

if not IS_CUSTOM_DATA:
    # Download demo data files into lakehouse if not exist
    import os, requests

    remote_url = "http://go.criteo.net/criteo-research-uplift-v2.1.csv.gz"
    download_file = "criteo-research-uplift-v2.1.csv.gz"
    download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found, please add a lakehouse and restart the session.")
    os.makedirs(download_path, exist_ok=True)
    if not os.path.exists(f"{download_path}/{DATA_FILE}"):
        r = requests.get(f"{remote_url}", timeout=30)
        with open(f"{download_path}/{download_file}", "wb") as f:
            f.write(r.content)
        with gzip.open(f"{download_path}/{download_file}", "rb") as fin:
            with open(f"{download_path}/{DATA_FILE}", "wb") as fout:
                fout.write(fin.read())
    print("Downloaded demo data files into lakehouse.")

このノートブックの実行時間の記録を開始します。

# Record the notebook running time
import time

ts = time.time()

MLflow 実験追跡を設定する

MLflow ログ機能を拡張するために、機械学習モデルの自動ログ機能は自動的に入力パラメーターと出力メトリックの値をトレーニング中にキャプチャします。 その後、この情報はワークスペースに記録され、MLflow API またはワークスペース内の対応する実験によりアクセス、視覚化が可能になります。 自動ログ記録の詳細については、こちらのリソースを参照してください。

# Set up the MLflow experiment
import mlflow

mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True)  # Disable MLflow autologging

Note

ノートブック セッションで Microsoft Fabric の自動ログ記録を無効にする場合は、mlflow.autolog() を呼び出して disable=True を設定します。

Lakehouseからデータを読み取る

[ファイル] セクションから生データを読み取り、異なる日付要素ごとにさらに列を追加します。 パーティション分割されたデルタ テーブルの作成にも同じ情報が使用されます。

raw_df = spark.read.csv(f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True).cache()

手順 2: 探索的データ分析

display コマンドを使用してデータセットに関する高レベル統計を表示します。 また、グラフ ビューを表示すると、データセットのサブセットを簡単に視覚化できます。

display(raw_df.limit(20))

アクセスするユーザーの割合、転換するユーザーの割合、および転換する訪問者の割合を調べます。

raw_df.select(
    F.mean("visit").alias("Percentage of users that visit"),
    F.mean("conversion").alias("Percentage of users that convert"),
    (F.sum("conversion") / F.sum("visit")).alias("Percentage of visitors that convert"),
).show()

この分析では、処置グループのユーザーの 4.9% (処置または広告を受けたユーザー) がオンライン ストアにアクセスしたことを示しています。 コントロール グループのユーザーの 3.8% (処置を受けなかったユーザー、または一度も広告を提供されたり触れたりしたことのないユーザー) のみが同じくアクセスしました。 さらに、処置グループのすべてのユーザーの 0.31% が転換または購入を行いましたが、コントロール グループのユーザーで行ったのは 0.19% のみでした。 結果として、処置グループのメンバーで、購入した訪問者のコンバージョン率は 6.36% になりますが、コントロール グループのユーザーのコンバージョン率は 5.07% のみでした。 これらの結果に基づき、この処置は訪問率を約 1 %改善し、訪問者のコンバージョン率を約 1.3% 向上させることができます。 この処置は、有意な改善をもたらします。

手順 3: トレーニング用のモデルを定義する

トレーニング データセットとテスト データセットを準備する

ここでは、Featurize 変換を raw_df DataFrame に適合させ、features という名前の入力列から特徴を抽出し、それらの特徴を新しい列に出力します。

結果の DataFrame は、df という名前の新しい DataFrame に格納されます。

transformer = Featurize().setOutputCol("features").setInputCols(FEATURE_COLUMNS).fit(raw_df)
df = transformer.transform(raw_df)
# Split the DataFrame into training and test sets, with a 80/20 ratio and a seed of 42
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Print the training and test dataset sizes
print("Size of train dataset: %d" % train_df.count())
print("Size of test dataset: %d" % test_df.count())

# Group the training dataset by the treatment column, and count the number of occurrences of each value
train_df.groupby(TREATMENT_COLUMN).count().show()

処置のデータセットと制御のデータセットを準備する

トレーニング データセットとテスト データセットを作成した後、機械学習モデルをトレーニングして上昇率を測定するために、処置データセットと制御データセットも形成する必要があります。

# Extract the treatment and control DataFrames
treatment_train_df = train_df.where(f"{TREATMENT_COLUMN} > 0")
control_train_df = train_df.where(f"{TREATMENT_COLUMN} = 0")

データを準備したら、LightGBM を使用してモデルのトレーニングに進むことができます。

アップリフト モデリング: LightGBM を使用した T-Learner

Meta-learner は、LightGBM、Xgboost などの機械学習アルゴリズムに基づいて構築された一連のアルゴリズムです。条件付き平均処置効果 (CATE) の推定に役立ちます。 T-learner は、単一モデルを使用しない Meta-learner です。 その代わり、T-learner は処置変数ごとに 1 つのモデルを使用します。 そのため、2 種のモデルが開発され、Meta-learner を T-learner と呼んでいます。 T-learner は、複数の機械学習モデルを使用して、最初に分割することを強制することで、その処置を完全に解消する問題を克服します。

mlflow.autolog(exclusive=False)
classifier = (
    LightGBMClassifier(dataTransferMode="bulk")
    .setFeaturesCol("features")  # Set the column name for features
    .setNumLeaves(10)  # Set the number of leaves in each decision tree
    .setNumIterations(100)  # Set the number of boosting iterations
    .setObjective("binary")  # Set the objective function for binary classification
    .setLabelCol(LABEL_COLUMN)  # Set the column name for the label
)

# Start a new MLflow run with the name "uplift"
active_run = mlflow.start_run(run_name="uplift")

# Start a new nested MLflow run with the name "treatment"
with mlflow.start_run(run_name="treatment", nested=True) as treatment_run:
    treatment_run_id = treatment_run.info.run_id  # Get the ID of the treatment run
    treatment_model = classifier.fit(treatment_train_df)  # Fit the classifier on the treatment training data

# Start a new nested MLflow run with the name "control"
with mlflow.start_run(run_name="control", nested=True) as control_run:
    control_run_id = control_run.info.run_id  # Get the ID of the control run
    control_model = classifier.fit(control_train_df)  # Fit the classifier on the control training data
     

予測にはテスト データセットを使用します

ここでは、test_df テスト データセットを変換するために、以前定義した treatment_model および control_model を両方とも使用します。 次に、予測された上昇率を計算します。 予測された上昇率を、予測処置結果と予測制御結果の差として定義します。 この予測された上昇差が大きいほど、個人またはサブグループに対する処置(広告など)の有効性が高くなります。

getPred = F.udf(lambda v: float(v[1]), FloatType())

# Cache the resulting DataFrame for easier access
test_pred_df = (
    test_df.mlTransform(treatment_model)
    .withColumn("treatment_pred", getPred("probability"))
    .drop("rawPrediction", "probability", "prediction")
    .mlTransform(control_model)
    .withColumn("control_pred", getPred("probability"))
    .drop("rawPrediction", "probability", "prediction")
    .withColumn("pred_uplift", F.col("treatment_pred") - F.col("control_pred"))
    .select(TREATMENT_COLUMN, LABEL_COLUMN, "treatment_pred", "control_pred", "pred_uplift")
    .cache()
)

# Display the first twenty rows of the resulting DataFrame
display(test_pred_df.limit(20))

モデルの評価を実行する

実際には、個人ごとの上昇率は観測できないため、個人のグループでの上昇率を測定する必要があります。 上昇曲線を使用しては、母集団全体の実際の累積上昇をプロットした上昇曲線を使用します。

ランダムな対処に対し正規化されたアップリフト モデル 曲線を示すグラフのスクリーンショット。

X 軸は、その治療に対して選択された母集団の比率を表します。 値が 0 の場合は、処置グループがない、誰もその処置に触れたり提供されたりしていないということを示しています。 値が 1 の場合は、完全な処置グループ、すべてのユーザーが処置に触れているか、または提供されていることを示しています。 Y 軸には、上昇の測定値が表示されます。 目的は、処置グループのサイズ、または処置を提供されたり、触れたりする母集団の割合 (広告など) を見つけることです。 この方法では、結果を最適化するために、ターゲットの選択が最適化されます。

まず、次の予測される上昇によってテスト DataFrame の順序をランク付けします。 予測された上昇は、予測処置結果と予測制御結果の差です。

# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))

display(test_ranked_df.limit(20))

次に、処置グループとコントロール グループの両方の訪問の累積割合を計算します。

# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()

# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
    test_ranked_df.withColumn(
        "control_label",
        F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "treatment_label",
        F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "control_cumsum",
        F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
    )
    .withColumn(
        "treatment_cumsum",
        F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
    )
)

# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))

最後に、各割合で、処置グループとコントロール グループ間の訪問の累積割合の差としてグループの上昇率を計算する。

test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))

次に、テスト データセットの予測の上昇曲線をプロットします。 プロットの前に、PySpark DataFrame を Pandas DataFrame に変換しておく必要があります。

def uplift_plot(uplift_df):
    """
    Plot the uplift curve
    """
    gain_x = uplift_df.percent_rank
    gain_y = uplift_df.group_uplift
    # Plot the data
    fig = plt.figure(figsize=(10, 6))
    mpl.rcParams["font.size"] = 8

    ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")

    plt.plot(
        [0, gain_x.max()],
        [0, gain_y.max()],
        "--",
        color="tab:orange",
        label="Random Treatment",
    )
    plt.legend()
    plt.xlabel("Porportion Targeted")
    plt.ylabel("Uplift")
    plt.grid()

    return fig, ax


test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)

mlflow.log_figure(fig, "UpliftCurve.png")

ランダムな対処に対し正規化されたアップリフト モデル 曲線を示すグラフのスクリーンショット。

X 軸は、その治療に対して選択された母集団の比率を表します。 値が 0 の場合は、処置グループがない、誰もその処置に触れたり提供されたりしていないということを示しています。 値が 1 の場合は、完全な処置グループ、すべてのユーザーが処置に触れているか、または提供されていることを示しています。 Y 軸には、上昇の測定値が表示されます。 目的は、処置グループのサイズ、または処置を提供されたり、触れたりする母集団の割合 (広告など) を見つけることです。 この方法では、結果を最適化するために、ターゲットの選択が最適化されます。

まず、次の予測される上昇によってテスト DataFrame の順序をランク付けします。 予測された上昇は、予測処置結果と予測制御結果の差です。

# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))

display(test_ranked_df.limit(20))

次に、処置グループとコントロール グループの両方の訪問の累積割合を計算します。

# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()

# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
    test_ranked_df.withColumn(
        "control_label",
        F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "treatment_label",
        F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "control_cumsum",
        F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
    )
    .withColumn(
        "treatment_cumsum",
        F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
    )
)

# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))

最後に、各割合で、処置グループとコントロール グループ間の訪問の累積割合の差としてグループの上昇率を計算する。

test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))

次に、テスト データセットの予測の上昇曲線をプロットします。 プロットの前に、PySpark DataFrame を Pandas DataFrame に変換しておく必要があります。

def uplift_plot(uplift_df):
    """
    Plot the uplift curve
    """
    gain_x = uplift_df.percent_rank
    gain_y = uplift_df.group_uplift
    # Plot the data
    fig = plt.figure(figsize=(10, 6))
    mpl.rcParams["font.size"] = 8

    ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")

    plt.plot(
        [0, gain_x.max()],
        [0, gain_y.max()],
        "--",
        color="tab:orange",
        label="Random Treatment",
    )
    plt.legend()
    plt.xlabel("Porportion Targeted")
    plt.ylabel("Uplift")
    plt.grid()

    return fig, ax


test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)

mlflow.log_figure(fig, "UpliftCurve.png")

ランダムな対処に対し正規化されたアップリフト モデル 曲線を示すグラフのスクリーンショット。

分析と上昇曲線はどちらも、予測でランク付けされた通り、上位 20% の母集団が、処置を受けた場合に大きな利益を得ることを示しています。 これは、母集団の上位 20% が、説得可能グループを表していることを意味します。 そのため、処置グループの目的のサイズのカットオフ基準を 20% に設定して、最も大きな影響を受ける対象の選択顧客を特定できます。

cutoff_percentage = 0.2
cutoff_score = test_ranked_pd_df.iloc[int(len(test_ranked_pd_df) * cutoff_percentage)][
    "pred_uplift"
]

print("Uplift scores that exceed {:.4f} map to Persuadables.".format(cutoff_score))
mlflow.log_metrics(
    {"cutoff_score": cutoff_score, "cutoff_percentage": cutoff_percentage}
)

手順 4: 最終的な ML モデルを登録する

MLflow を使用して、処置グループとコントロール グループの両方のすべての実験を追跡し、ログに記録します。 この追跡とログ記録には、対応するパラメーター、メトリック、モデルが含まれます。 この情報は、後日使用するため、ワークスペースで実験名の下に記録されます。

# Register the model
treatment_model_uri = "runs:/{}/model".format(treatment_run_id)
mlflow.register_model(treatment_model_uri, f"{EXPERIMENT_NAME}-treatmentmodel")

control_model_uri = "runs:/{}/model".format(control_run_id)
mlflow.register_model(control_model_uri, f"{EXPERIMENT_NAME}-controlmodel")

mlflow.end_run()

実験を表示するには:

  1. 左側のパネルで、ワークスペースを選択します。
  2. 実験名、この場合 は aisample-upliftmodelling、を検索して選択します。

aisample uplift モデリング実験の結果を示すスクリーンショット。

手順 5: 予測結果を保存する

Microsoft Fabric には PREDICT という、任意のコンピューティング エンジンでバッチ スコアリングをサポートするスケーラブルな関数が用意されています。 これにより、お客様は機械学習モデルを運用化できます。 ユーザーは、特定のモデルのノートブックまたはアイテム ページから直接バッチ予測を作成できます。 PREDICT の詳細と、Microsoft Fabric で PREDICT を使用する方法については、このリソースを参照してください。

# Load the model back
loaded_treatmentmodel = mlflow.spark.load_model(treatment_model_uri, dfs_tmpdir="Files/spark")
loaded_controlmodel = mlflow.spark.load_model(control_model_uri, dfs_tmpdir="Files/spark")

# Make predictions
batch_predictions_treatment = loaded_treatmentmodel.transform(test_df)
batch_predictions_control = loaded_controlmodel.transform(test_df)
batch_predictions_treatment.show(5)
# Save the predictions in the lakehouse
batch_predictions_treatment.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions_treatment"
)
batch_predictions_control.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions_control"
)
# Determine the entire runtime
print(f"Full run cost {int(time.time() - ts)} seconds.")