Microsoft Fabric에서 향상 모델 만들기, 학습 및 평가

이 문서에서는 향상 모델을 만들고, 학습시키고, 평가하고, 고급 모델링 기술을 적용하는 방법을 알아봅니다.

중요

Microsoft Fabric은 현재 미리 보기로 제공됩니다. 이 정보는 릴리스되기 전에 상당히 수정될 수 있는 시험판 제품과 관련이 있습니다. Microsoft는 여기에 제공된 정보와 관련하여 명시적이거나 묵시적인 어떠한 보증도 하지 않습니다.

  • 향상 모델링이란?

    기계 학습 모델을 사용하여 개인의 행동에 대한 일부 치료의 인과 효과를 추정하는 인과 유추 기술 제품군입니다.

    • 설득은 치료에 긍정적 인 반응
    • 잠자는 개는 치료에 강한 부정적인 반응을 가지고
    • 잃어버린 원인 은 치료에도 결과에 도달하지 못합니다.
    • 확실히 일이 항상 치료와 함께 또는없이 결과에 도달

    향상 모델링의 목표는 "확실한 것"과 "잃어버린 원인"에 대한 노력을 낭비하지 않고 "설득력있는"을 식별하고 "잠자는 개"를 귀찮게하지 않는 것입니다.

  • 향상 모델링은 어떻게 작동하나요?

    • 메타 학습자: 치료가 있을 때와 치료가 없을 때 개인의 행동 사이의 차이를 예측합니다.
    • 트리 향상: 분할 기준이 상승의 차이를 기반으로 하는 트리 기반 알고리즘입니다.
    • NN 기반 모델:일반적으로 관찰 데이터와 함께 작동하는 신경망 모델
  • 모델링을 향상할 수 있는 위치는 어디인가요?

    • 마케팅: 쿠폰 또는 온라인 광고와 같은 치료를 적용하기 위해 설득 가능인을 식별하는 데 도움이 됩니다.
    • 치료: 치료가 특정 그룹에 어떻게 다르게 영향을 미칠 수 있는지 이해하는 데 도움이 됩니다.

사전 요구 사항

1단계: 데이터 로드

다음 예제에서는 Notebook의 셀에서 코드를 실행한다고 가정합니다. Notebook을 만들고 사용하는 방법에 대한 자세한 내용은 Notebook을 사용하는 방법을 참조하세요.

Notebook 구성

아래 매개 변수를 정의하여 이 예제를 다른 데이터 세트에 적용할 수 있습니다.

IS_CUSTOM_DATA = False  # if True, dataset has to be uploaded manually by user
DATA_FOLDER = "Files/uplift-modelling"
DATA_FILE = "criteo-research-uplift-v2.1.csv"

# data schema
FEATURE_COLUMNS = [f"f{i}" for i in range(12)]
TREATMENT_COLUMN = "treatment"
LABEL_COLUMN = "visit"

EXPERIMENT_NAME = "aisample-upliftmodelling"  # mlflow experiment name

종속성 가져오기

import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *

import numpy as np
import pandas as pd

import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.style as style
import seaborn as sns

%matplotlib inline


from synapse.ml.featurize import Featurize
from synapse.ml.core.spark import FluentAPI
from synapse.ml.lightgbm import *
from synapse.ml.train import ComputeModelStatistics

import os
import gzip

import mlflow

데이터 세트 다운로드 및 Lakehouse에 업로드

중요

실행하기 전에 Notebook에 Lakehouse를 추가합니다.

  • 데이터 세트 설명: Criteo AI Lab에서 이 데이터 세트를 만들었습니다. 데이터 세트는 각각 12개의 기능, 처리 표시기 및 2개의 이진 레이블(방문 및 변환)이 있는 사용자를 나타내는 13M 행으로 구성됩니다.

    • f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11: 기능 값(조밀, float)
    • 처리: 고객이 임의로 광고 대상인지를 나타내는 처리 그룹(1 = 처리됨, 0 = 제어)
    • conversion: 이 사용자에 대해 변환이 발생했는지 여부(이진, 레이블)
    • visit: 이 사용자에 대한 방문이 발생했는지 여부(이진, 레이블)
  • 데이터 세트 홈페이지: https://ailab.criteo.com/criteo-uplift-prediction-dataset/

  • 인용:

    @inproceedings{Diemert2018,
    author = {{Diemert Eustache, Betlei Artem} and Renaudin, Christophe and Massih-Reza, Amini},
    title={A Large Scale Benchmark for Uplift Modeling},
    publisher = {ACM},
    booktitle = {Proceedings of the AdKDD and TargetAd Workshop, KDD, London,United Kingdom, August, 20, 2018},
    year = {2018}
    }
    
    if not IS_CUSTOM_DATA:
         # Download demo data files into lakehouse if not exist
         import os, requests
    
         remote_url = "http://go.criteo.net/criteo-research-uplift-v2.1.csv.gz"
         download_file = "criteo-research-uplift-v2.1.csv.gz"
         download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"
    
         if not os.path.exists("/lakehouse/default"):
             raise FileNotFoundError(
                 "Default lakehouse not found, please add a lakehouse and restart the session."
             )
         os.makedirs(download_path, exist_ok=True)
         if not os.path.exists(f"{download_path}/{DATA_FILE}"):
             r = requests.get(f"{remote_url}", timeout=30)
             with open(f"{download_path}/{download_file}", "wb") as f:
                 f.write(r.content)
             with gzip.open(f"{download_path}/{download_file}", "rb") as fin:
                 with open(f"{download_path}/{DATA_FILE}", "wb") as fout:
                     fout.write(fin.read())
         print("Downloaded demo data files into lakehouse.")
    

Lakehouse에서 데이터 읽기

raw_df = spark.read.csv(
    f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True
).cache()

display(raw_df.limit(20))

2단계: 데이터 세트 준비

데이터 살펴보기

  • 방문/변환하는 전체 사용자 비율

    raw_df.select(
         F.mean("visit").alias("Percentage of users that visit"),
         F.mean("conversion").alias("Percentage of users that convert"),
         (F.sum("conversion") / F.sum("visit")).alias("Percentage of visitors that convert"),
    ).show()
    
  • 방문에 대한 전반적인 평균 치료 효과

    raw_df.groupby("treatment").agg(
         F.mean("visit").alias("Mean of visit"),
         F.sum("visit").alias("Sum of visit"),
         F.count("visit").alias("Count"),
    ).show()
    
  • 변환에 대한 전반적인 평균 치료 효과

    raw_df.groupby("treatment").agg(
         F.mean("conversion").alias("Mean of conversion"),
         F.sum("conversion").alias("Sum of conversion"),
         F.count("conversion").alias("Count"),
    ).show()
    

학습 테스트 데이터 세트 분할

transformer = (
    Featurize().setOutputCol("features").setInputCols(FEATURE_COLUMNS).fit(raw_df)
)

df = transformer.transform(raw_df)
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

print("Size of train dataset: %d" % train_df.count())
print("Size of test dataset: %d" % test_df.count())

train_df.groupby(TREATMENT_COLUMN).count().show()

처리 제어 데이터 세트 분할

treatment_train_df = train_df.where(f"{TREATMENT_COLUMN} > 0")
control_train_df = train_df.where(f"{TREATMENT_COLUMN} = 0")

3단계: 모델 학습 및 평가

고급 모델링: LightGBM을 사용하여 T-Learner

classifier = (
    LightGBMClassifier()
    .setFeaturesCol("features")
    .setNumLeaves(10)
    .setNumIterations(100)
    .setObjective("binary")
    .setLabelCol(LABEL_COLUMN)
)

treatment_model = classifier.fit(treatment_train_df)
control_model = classifier.fit(control_train_df)

테스트 데이터 세트에 대한 예측

getPred = F.udf(lambda v: float(v[1]), FloatType())

test_pred_df = (
    test_df.mlTransform(treatment_model)
    .withColumn("treatment_pred", getPred("probability"))
    .drop("rawPrediction", "probability", "prediction")
    .mlTransform(control_model)
    .withColumn("control_pred", getPred("probability"))
    .drop("rawPrediction", "probability", "prediction")
    .withColumn("pred_uplift", F.col("treatment_pred") - F.col("control_pred"))
    .select(
        TREATMENT_COLUMN, LABEL_COLUMN, "treatment_pred", "control_pred", "pred_uplift"
    )
    .cache()
)

display(test_pred_df.limit(20))

모델 평가

각 개인에 대해 실제 상승을 관찰할 수 없으므로 고객 그룹에 대한 상승도를 측정합니다.

  • 상승 곡선 : 인구에 걸쳐 실제 누적 상승을 플롯

먼저 예측 향상을 기준으로 테스트 데이터 프레임 순서의 순위를 지정합니다.

test_ranked_df = test_pred_df.withColumn(
    "percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift")))
)

display(test_ranked_df.limit(20))

다음으로, 각 그룹(치료 또는 제어)의 누적 방문 비율을 계산합니다.

C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()

test_ranked_df = (
    test_ranked_df.withColumn(
        "control_label",
        F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "treatment_label",
        F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "control_cumsum",
        F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
    )
    .withColumn(
        "treatment_cumsum",
        F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
    )
)

display(test_ranked_df.limit(20))

마지막으로, 각 백분율에서 그룹의 상승도를 계산합니다.

test_ranked_df = test_ranked_df.withColumn(
    "group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")
).cache()

display(test_ranked_df.limit(20))

이제 테스트 데이터 세트의 예측에 상승 곡선을 그릴 수 있습니다. 그리기 전에 pyspark 데이터 프레임을 pandas 데이터 프레임으로 변환해야 합니다.

def uplift_plot(uplift_df):
    """
    Plot the uplift curve
    """
    gain_x = uplift_df.percent_rank
    gain_y = uplift_df.group_uplift
    # plot the data
    plt.figure(figsize=(10, 6))
    mpl.rcParams["font.size"] = 8

    ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")

    plt.plot(
        [0, gain_x.max()],
        [0, gain_y.max()],
        "--",
        color="tab:orange",
        label="Random Treatment",
    )
    plt.legend()
    plt.xlabel("Porportion Targeted")
    plt.ylabel("Uplift")
    plt.grid(b=True, which="major")

    return ax


test_ranked_pd_df = test_ranked_df.select(
    ["pred_uplift", "percent_rank", "group_uplift"]
).toPandas()
uplift_plot(test_ranked_pd_df)

정규화된 상승 모델 곡선과 임의 처리를 보여 주는 차트.

이전 예제의 상승 곡선에서 예측에 의해 순위가 매겨진 상위 20%의 모집단은 치료를 받은 경우 큰 이득을 얻게 됩니다. 즉, 이는 설득력 있는 인구라는 것을 의미 합니다. 따라서 컷오프 점수를 20%로 인쇄하여 대상 고객을 식별할 수 있습니다.

cutoff_percentage = 0.2
cutoff_score = test_ranked_pd_df.iloc[int(len(test_ranked_pd_df) * cutoff_percentage)][
    "pred_uplift"
]

print("Uplift score higher than {:.4f} are Persuadables".format(cutoff_score))

MLflow를 사용하여 모델 로그 및 로드

이제 학습된 모델이 있으므로 나중에 사용할 수 있도록 저장합니다. 다음 예제에서는 MLflow를 사용하여 메트릭 및 모델을 기록합니다. 이 API를 사용하여 예측을 위해 모델을 로드할 수도 있습니다.

# setup mlflow
mlflow.set_experiment(EXPERIMENT_NAME)
# log model, metrics and params
with mlflow.start_run() as run:
    print("log model:")
    mlflow.spark.log_model(
        treatment_model,
        f"{EXPERIMENT_NAME}-treatmentmodel",
        registered_model_name=f"{EXPERIMENT_NAME}-treatmentmodel",
        dfs_tmpdir="Files/spark",
    )

    mlflow.spark.log_model(
        control_model,
        f"{EXPERIMENT_NAME}-controlmodel",
        registered_model_name=f"{EXPERIMENT_NAME}-controlmodel",
        dfs_tmpdir="Files/spark",
    )

    model_uri = f"runs:/{run.info.run_id}/{EXPERIMENT_NAME}"
    print("Model saved in run %s" % run.info.run_id)
    print(f"Model URI: {model_uri}-treatmentmodel")
    print(f"Model URI: {model_uri}-controlmodel")
# load model back
loaded_treatmentmodel = mlflow.spark.load_model(
    f"{model_uri}-treatmentmodel", dfs_tmpdir="Files/spark"
)