Microsoft Fabric에서 향상 모델 만들기, 학습 및 평가
이 문서에서는 향상 모델을 만들고, 학습시키고, 평가하고, 고급 모델링 기술을 적용하는 방법을 알아봅니다.
중요
Microsoft Fabric은 현재 미리 보기로 제공됩니다. 이 정보는 릴리스되기 전에 상당히 수정될 수 있는 시험판 제품과 관련이 있습니다. Microsoft는 여기에 제공된 정보와 관련하여 명시적이거나 묵시적인 어떠한 보증도 하지 않습니다.
향상 모델링이란?
기계 학습 모델을 사용하여 개인의 행동에 대한 일부 치료의 인과 효과를 추정하는 인과 유추 기술 제품군입니다.
- 설득은 치료에 긍정적 인 반응
- 잠자는 개는 치료에 강한 부정적인 반응을 가지고
- 잃어버린 원인 은 치료에도 결과에 도달하지 못합니다.
- 확실히 일이 항상 치료와 함께 또는없이 결과에 도달
향상 모델링의 목표는 "확실한 것"과 "잃어버린 원인"에 대한 노력을 낭비하지 않고 "설득력있는"을 식별하고 "잠자는 개"를 귀찮게하지 않는 것입니다.
향상 모델링은 어떻게 작동하나요?
- 메타 학습자: 치료가 있을 때와 치료가 없을 때 개인의 행동 사이의 차이를 예측합니다.
- 트리 향상: 분할 기준이 상승의 차이를 기반으로 하는 트리 기반 알고리즘입니다.
- NN 기반 모델:일반적으로 관찰 데이터와 함께 작동하는 신경망 모델
모델링을 향상할 수 있는 위치는 어디인가요?
- 마케팅: 쿠폰 또는 온라인 광고와 같은 치료를 적용하기 위해 설득 가능인을 식별하는 데 도움이 됩니다.
- 치료: 치료가 특정 그룹에 어떻게 다르게 영향을 미칠 수 있는지 이해하는 데 도움이 됩니다.
사전 요구 사항
- Microsoft Fabric Notebook을 사용하는 방법에 대해 잘 알고 있습니다.
- 레이크하우스. Lakehouse는 이 예제의 데이터를 저장하는 데 사용됩니다. 자세한 내용은 Notebook에 Lakehouse 추가를 참조하세요.
1단계: 데이터 로드
팁
다음 예제에서는 Notebook의 셀에서 코드를 실행한다고 가정합니다. Notebook을 만들고 사용하는 방법에 대한 자세한 내용은 Notebook을 사용하는 방법을 참조하세요.
Notebook 구성
아래 매개 변수를 정의하여 이 예제를 다른 데이터 세트에 적용할 수 있습니다.
IS_CUSTOM_DATA = False # if True, dataset has to be uploaded manually by user
DATA_FOLDER = "Files/uplift-modelling"
DATA_FILE = "criteo-research-uplift-v2.1.csv"
# data schema
FEATURE_COLUMNS = [f"f{i}" for i in range(12)]
TREATMENT_COLUMN = "treatment"
LABEL_COLUMN = "visit"
EXPERIMENT_NAME = "aisample-upliftmodelling" # mlflow experiment name
종속성 가져오기
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.style as style
import seaborn as sns
%matplotlib inline
from synapse.ml.featurize import Featurize
from synapse.ml.core.spark import FluentAPI
from synapse.ml.lightgbm import *
from synapse.ml.train import ComputeModelStatistics
import os
import gzip
import mlflow
데이터 세트 다운로드 및 Lakehouse에 업로드
중요
실행하기 전에 Notebook에 Lakehouse를 추가합니다.
데이터 세트 설명: Criteo AI Lab에서 이 데이터 세트를 만들었습니다. 데이터 세트는 각각 12개의 기능, 처리 표시기 및 2개의 이진 레이블(방문 및 변환)이 있는 사용자를 나타내는 13M 행으로 구성됩니다.
- f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11: 기능 값(조밀, float)
- 처리: 고객이 임의로 광고 대상인지를 나타내는 처리 그룹(1 = 처리됨, 0 = 제어)
- conversion: 이 사용자에 대해 변환이 발생했는지 여부(이진, 레이블)
- visit: 이 사용자에 대한 방문이 발생했는지 여부(이진, 레이블)
데이터 세트 홈페이지: https://ailab.criteo.com/criteo-uplift-prediction-dataset/
인용:
@inproceedings{Diemert2018, author = {{Diemert Eustache, Betlei Artem} and Renaudin, Christophe and Massih-Reza, Amini}, title={A Large Scale Benchmark for Uplift Modeling}, publisher = {ACM}, booktitle = {Proceedings of the AdKDD and TargetAd Workshop, KDD, London,United Kingdom, August, 20, 2018}, year = {2018} }
if not IS_CUSTOM_DATA: # Download demo data files into lakehouse if not exist import os, requests remote_url = "http://go.criteo.net/criteo-research-uplift-v2.1.csv.gz" download_file = "criteo-research-uplift-v2.1.csv.gz" download_path = f"/lakehouse/default/{DATA_FOLDER}/raw" if not os.path.exists("/lakehouse/default"): raise FileNotFoundError( "Default lakehouse not found, please add a lakehouse and restart the session." ) os.makedirs(download_path, exist_ok=True) if not os.path.exists(f"{download_path}/{DATA_FILE}"): r = requests.get(f"{remote_url}", timeout=30) with open(f"{download_path}/{download_file}", "wb") as f: f.write(r.content) with gzip.open(f"{download_path}/{download_file}", "rb") as fin: with open(f"{download_path}/{DATA_FILE}", "wb") as fout: fout.write(fin.read()) print("Downloaded demo data files into lakehouse.")
Lakehouse에서 데이터 읽기
raw_df = spark.read.csv(
f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True
).cache()
display(raw_df.limit(20))
2단계: 데이터 세트 준비
데이터 살펴보기
방문/변환하는 전체 사용자 비율
raw_df.select( F.mean("visit").alias("Percentage of users that visit"), F.mean("conversion").alias("Percentage of users that convert"), (F.sum("conversion") / F.sum("visit")).alias("Percentage of visitors that convert"), ).show()
방문에 대한 전반적인 평균 치료 효과
raw_df.groupby("treatment").agg( F.mean("visit").alias("Mean of visit"), F.sum("visit").alias("Sum of visit"), F.count("visit").alias("Count"), ).show()
변환에 대한 전반적인 평균 치료 효과
raw_df.groupby("treatment").agg( F.mean("conversion").alias("Mean of conversion"), F.sum("conversion").alias("Sum of conversion"), F.count("conversion").alias("Count"), ).show()
학습 테스트 데이터 세트 분할
transformer = (
Featurize().setOutputCol("features").setInputCols(FEATURE_COLUMNS).fit(raw_df)
)
df = transformer.transform(raw_df)
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
print("Size of train dataset: %d" % train_df.count())
print("Size of test dataset: %d" % test_df.count())
train_df.groupby(TREATMENT_COLUMN).count().show()
처리 제어 데이터 세트 분할
treatment_train_df = train_df.where(f"{TREATMENT_COLUMN} > 0")
control_train_df = train_df.where(f"{TREATMENT_COLUMN} = 0")
3단계: 모델 학습 및 평가
고급 모델링: LightGBM을 사용하여 T-Learner
classifier = (
LightGBMClassifier()
.setFeaturesCol("features")
.setNumLeaves(10)
.setNumIterations(100)
.setObjective("binary")
.setLabelCol(LABEL_COLUMN)
)
treatment_model = classifier.fit(treatment_train_df)
control_model = classifier.fit(control_train_df)
테스트 데이터 세트에 대한 예측
getPred = F.udf(lambda v: float(v[1]), FloatType())
test_pred_df = (
test_df.mlTransform(treatment_model)
.withColumn("treatment_pred", getPred("probability"))
.drop("rawPrediction", "probability", "prediction")
.mlTransform(control_model)
.withColumn("control_pred", getPred("probability"))
.drop("rawPrediction", "probability", "prediction")
.withColumn("pred_uplift", F.col("treatment_pred") - F.col("control_pred"))
.select(
TREATMENT_COLUMN, LABEL_COLUMN, "treatment_pred", "control_pred", "pred_uplift"
)
.cache()
)
display(test_pred_df.limit(20))
모델 평가
각 개인에 대해 실제 상승을 관찰할 수 없으므로 고객 그룹에 대한 상승도를 측정합니다.
- 상승 곡선 : 인구에 걸쳐 실제 누적 상승을 플롯
먼저 예측 향상을 기준으로 테스트 데이터 프레임 순서의 순위를 지정합니다.
test_ranked_df = test_pred_df.withColumn(
"percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift")))
)
display(test_ranked_df.limit(20))
다음으로, 각 그룹(치료 또는 제어)의 누적 방문 비율을 계산합니다.
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()
test_ranked_df = (
test_ranked_df.withColumn(
"control_label",
F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"treatment_label",
F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"control_cumsum",
F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
)
.withColumn(
"treatment_cumsum",
F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
)
)
display(test_ranked_df.limit(20))
마지막으로, 각 백분율에서 그룹의 상승도를 계산합니다.
test_ranked_df = test_ranked_df.withColumn(
"group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")
).cache()
display(test_ranked_df.limit(20))
이제 테스트 데이터 세트의 예측에 상승 곡선을 그릴 수 있습니다. 그리기 전에 pyspark 데이터 프레임을 pandas 데이터 프레임으로 변환해야 합니다.
def uplift_plot(uplift_df):
"""
Plot the uplift curve
"""
gain_x = uplift_df.percent_rank
gain_y = uplift_df.group_uplift
# plot the data
plt.figure(figsize=(10, 6))
mpl.rcParams["font.size"] = 8
ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")
plt.plot(
[0, gain_x.max()],
[0, gain_y.max()],
"--",
color="tab:orange",
label="Random Treatment",
)
plt.legend()
plt.xlabel("Porportion Targeted")
plt.ylabel("Uplift")
plt.grid(b=True, which="major")
return ax
test_ranked_pd_df = test_ranked_df.select(
["pred_uplift", "percent_rank", "group_uplift"]
).toPandas()
uplift_plot(test_ranked_pd_df)
이전 예제의 상승 곡선에서 예측에 의해 순위가 매겨진 상위 20%의 모집단은 치료를 받은 경우 큰 이득을 얻게 됩니다. 즉, 이는 설득력 있는 인구라는 것을 의미 합니다. 따라서 컷오프 점수를 20%로 인쇄하여 대상 고객을 식별할 수 있습니다.
cutoff_percentage = 0.2
cutoff_score = test_ranked_pd_df.iloc[int(len(test_ranked_pd_df) * cutoff_percentage)][
"pred_uplift"
]
print("Uplift score higher than {:.4f} are Persuadables".format(cutoff_score))
MLflow를 사용하여 모델 로그 및 로드
이제 학습된 모델이 있으므로 나중에 사용할 수 있도록 저장합니다. 다음 예제에서는 MLflow를 사용하여 메트릭 및 모델을 기록합니다. 이 API를 사용하여 예측을 위해 모델을 로드할 수도 있습니다.
# setup mlflow
mlflow.set_experiment(EXPERIMENT_NAME)
# log model, metrics and params
with mlflow.start_run() as run:
print("log model:")
mlflow.spark.log_model(
treatment_model,
f"{EXPERIMENT_NAME}-treatmentmodel",
registered_model_name=f"{EXPERIMENT_NAME}-treatmentmodel",
dfs_tmpdir="Files/spark",
)
mlflow.spark.log_model(
control_model,
f"{EXPERIMENT_NAME}-controlmodel",
registered_model_name=f"{EXPERIMENT_NAME}-controlmodel",
dfs_tmpdir="Files/spark",
)
model_uri = f"runs:/{run.info.run_id}/{EXPERIMENT_NAME}"
print("Model saved in run %s" % run.info.run_id)
print(f"Model URI: {model_uri}-treatmentmodel")
print(f"Model URI: {model_uri}-controlmodel")
# load model back
loaded_treatmentmodel = mlflow.spark.load_model(
f"{model_uri}-treatmentmodel", dfs_tmpdir="Files/spark"
)