자습서: 고급 모델 만들기, 학습 및 평가

이 자습서에서는 Microsoft Fabric에서 Synapse 데이터 과학 워크플로의 엔드 투 엔드 예제를 제공합니다. 고급 모델을 만들고, 학습하고, 평가하고, 고급 모델링 기술을 적용하는 방법을 알아봅니다.

업리프트 모델링은 기계 학습 모델을 사용하여 개인의 행동에 대한 치료의 인과 효과를 추정하는 인과 추론 기술의 제품군입니다. 이 모델링은 개인을 다음 범주로 분류합니다.

  • 설득은 치료에 긍정적으로 반응
  • 잠자는 개는 치료에 강한 부정적인 반응을 가지고
  • 잃어버린 원인 은 치료에도 불구하고 결과에 도달하지 못합니다.
  • 확실한 것들은 항상 치료 여부에 관계없이 결과에 도달합니다.

우리는 설득력있는 사람들을 식별하고, 확실한 것들잃어버린 원인에 낭비되는 노력을 피하고, 잠자는 개를 귀찮게하지 않아야합니다.

고급 모델링에는 다음과 같은 구성 요소가 있습니다.

  • 메타 학습자: 개인이 치료를 받고 치료를 받지 않을 때의 개인의 행동 차이를 예측하는 알고리즘입니다.
  • 트리 향상: 처리/제어 그룹 할당 정보와 응답 정보를 노드의 분할 조건에 대한 의사 결정에 직접 결합하는 트리 기반 알고리즘
  • NN 기반 모델: 일반적으로 관찰 데이터와 함께 작동하는 신경망 모델입니다. 딥 러닝을 사용하여 잠재 변수의 분포를 결정합니다. 잠재 변수는 업리프트 모델링의 공동 설립자를 나타냅니다.

고급 모델링은 다음 영역에서 작동할 수 있습니다.

  • 마케팅에 대 한, 그것은 치료를 시도 할 수 있습니다 잠재적으로 흔들리는 설득을 식별 하는 데 도움이 수 있습니다. 예를 들어, 설득력 있는 사람들을 식별하기 위한 아웃리치에는 쿠폰 또는 온라인 광고가 포함될 수 있습니다.
  • 의학을 위해, 특정 처리가 개별 단에 어떻게 영향을 미칠 수 있는지 측정하는 것을 도울 수 있습니다. 이 측정을 통해 최적화된 대상 선택을 통해 영향을 최대화할 수 있습니다.

이 자습서에서는 다음 단계를 설명합니다.

  • 레이크하우스에 데이터 업로드
  • 데이터에 대한 예비 분석 수행
  • 모델 학습
  • MLflow를 사용하여 모델 로그 및 로드

필수 조건

전자 필기장에서 팔로우

다음 옵션 중 하나를 선택하여 전자 필기장에서 따를 수 있습니다.

  • Synapse 데이터 과학 환경에서 기본 제공 Notebook 열기 및 실행
  • GitHub에서 Synapse 데이터 과학 환경으로 Notebook 업로드

기본 제공 Notebook 열기

샘플 업리프트 모델링 Notebook은 이 자습서와 함께 제공됩니다.

Synapse 데이터 과학 환경에서 자습서의 기본 제공 샘플 Notebook을 열려면 다음을 수행합니다.

  1. Synapse 데이터 과학 홈페이지로 이동합니다.

  2. 샘플 사용을 선택합니다.

  3. 해당 샘플을 선택합니다.

    • 기본 Python(엔드 투 엔드 워크플로) 탭에서 샘플이 Python 자습서용인 경우
    • R(엔드 투 엔드 워크플로) 탭에서 샘플이 R 자습서용인 경우
    • 빠른 자습서 탭에서 샘플이 빠른 자습서용인 경우
  4. 코드 실행을 시작하기 전에 Lakehouse를 Notebook 에 연결합니다.

GitHub에서 Notebook 가져오기

AIsample - Uplift Modeling.ipynb Notebook은 이 자습서와 함께 제공됩니다.

이 자습서에 대해 함께 제공되는 Notebook을 열려면 데이터 과학 자습서를 위해 시스템 준비 자습서의 지침에 따라 Notebook을 작업 영역으로 가져옵니다.

이 페이지에서 코드를 복사하여 붙여 넣으면 새 Notebook을 만들 수 있습니다.

코드 실행을 시작하기 전에 Lakehouse를 Notebook에 연결해야 합니다.

1단계: 데이터 로드

다음 예제에서는 Notebook의 셀에서 코드를 실행한다고 가정합니다. Notebook을 만들고 사용하는 방법에 대한 자세한 내용은 Notebook을 사용하는 방법을 참조 하세요.

Notebook 구성

다른 데이터 세트에 이 Notebook을 적용할 수 있도록 다음 매개 변수를 정의합니다.

IS_CUSTOM_DATA = False  # If True, the user must upload the dataset manually
DATA_FOLDER = "Files/uplift-modelling"
DATA_FILE = "criteo-research-uplift-v2.1.csv"

# Data schema
FEATURE_COLUMNS = [f"f{i}" for i in range(12)]
TREATMENT_COLUMN = "treatment"
LABEL_COLUMN = "visit"

EXPERIMENT_NAME = "aisample-upliftmodelling"  # MLflow experiment name

종속성 가져오기

import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *

import numpy as np
import pandas as pd

import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.style as style
import seaborn as sns

%matplotlib inline


from synapse.ml.featurize import Featurize
from synapse.ml.core.spark import FluentAPI
from synapse.ml.lightgbm import *
from synapse.ml.train import ComputeModelStatistics

import os
import gzip

import mlflow

데이터 세트를 다운로드하고 레이크하우스에 업로드

Important

실행하기 전에 레이크하우스를 Notebook 에 추가해야 합니다. 그렇지 않으면 오류가 발생합니다.

이 자습서에서는 Criteo AI Lab의 Criteo Uplift 예측 데이터 세트를 사용합니다. 데이터 세트에는 1,300만 개의 행이 있습니다. 각 행은 12개의 기능을 가진 사용자를 나타냅니다. 각 행에는 처리 표시기와 두 개의 이진 레이블(변환 및 방문)도 포함됩니다. 세부 정보는 다음과 같습니다.

  • f0, f1, f2, f3, f4, f5, , f7f6, f8, f9, f10f11: 기능 값(dense, float)
  • treatment: 광고에서 고객을 임의로 대상으로 지정했는지 여부를 나타내는 처리 그룹(1 = 처리됨, 0 = 제어)입니다.
  • conversion: 이 사용자에 대한 변환이 발생했는지 여부(binary, label)
  • visit: 이 사용자에 대한 방문이 발생했는지 여부(binary, label)

예를 들어 데이터 세트 인용은 다음과 같습니다.

@inproceedings{Diemert2018,
author = {{Diemert Eustache, Betlei Artem} and Renaudin, Christophe and Massih-Reza, Amini},
title={A Large Scale Benchmark for Uplift Modeling},
publisher = {ACM},
booktitle = {Proceedings of the AdKDD and TargetAd Workshop, KDD, London,United Kingdom, August, 20, 2018},
year = {2018}
}
if not IS_CUSTOM_DATA:
    # Download the demo data files into the lakehouse, if they don't exist
    import os, requests

    remote_url = "http://go.criteo.net/criteo-research-uplift-v2.1.csv.gz"
    download_file = "criteo-research-uplift-v2.1.csv.gz"
    download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError(
            "Default lakehouse not found, please add a lakehouse and restart the session."
        )
    os.makedirs(download_path, exist_ok=True)
    if not os.path.exists(f"{download_path}/{DATA_FILE}"):
        r = requests.get(f"{remote_url}", timeout=30)
        with open(f"{download_path}/{download_file}", "wb") as f:
            f.write(r.content)
        with gzip.open(f"{download_path}/{download_file}", "rb") as fin:
            with open(f"{download_path}/{DATA_FILE}", "wb") as fout:
                fout.write(fin.read())
    print("Downloaded demo data files into lakehouse.")

레이크하우스에서 데이터 읽기

raw_df = spark.read.csv(
    f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True
).cache()

display(raw_df.limit(20))

2단계: 데이터 세트 준비

데이터 탐색

이 코드는 방문하고 변환하는 사용자의 전체 비율을 반환합니다.

raw_df.select(
    F.mean("visit").alias("Percentage of users that visit"),
    F.mean("conversion").alias("Percentage of users that convert"),
    (F.sum("conversion") / F.sum("visit")).alias("Percentage of visitors that convert"),
).show()

이 코드는 방문에 대한 전반적인 평균 치료 효과를 반환합니다.

raw_df.groupby("treatment").agg(
    F.mean("visit").alias("Mean of visit"),
    F.sum("visit").alias("Sum of visit"),
    F.count("visit").alias("Count"),
).show()

이 코드는 변환에 대한 전체 평균 처리 효과를 반환합니다.

raw_df.groupby("treatment").agg(
    F.mean("conversion").alias("Mean of conversion"),
    F.sum("conversion").alias("Sum of conversion"),
    F.count("conversion").alias("Count"),
).show()

학습 및 테스트 데이터 세트 분할

transformer = (
    Featurize().setOutputCol("features").setInputCols(FEATURE_COLUMNS).fit(raw_df)
)

df = transformer.transform(raw_df)
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

print("Size of train dataset: %d" % train_df.count())
print("Size of test dataset: %d" % test_df.count())

train_df.groupby(TREATMENT_COLUMN).count().show()

처리 및 제어 데이터 세트 분할

treatment_train_df = train_df.where(f"{TREATMENT_COLUMN} > 0")
control_train_df = train_df.where(f"{TREATMENT_COLUMN} = 0")

3단계: 모델 학습 및 평가

고급 모델링 수행: LightGBM을 사용하여 T-Learner

classifier = (
    LightGBMClassifier()
    .setFeaturesCol("features")
    .setNumLeaves(10)
    .setNumIterations(100)
    .setObjective("binary")
    .setLabelCol(LABEL_COLUMN)
)

treatment_model = classifier.fit(treatment_train_df)
control_model = classifier.fit(control_train_df)

테스트 데이터 세트에 대한 예측

getPred = F.udf(lambda v: float(v[1]), FloatType())

test_pred_df = (
    test_df.mlTransform(treatment_model)
    .withColumn("treatment_pred", getPred("probability"))
    .drop("rawPrediction", "probability", "prediction")
    .mlTransform(control_model)
    .withColumn("control_pred", getPred("probability"))
    .drop("rawPrediction", "probability", "prediction")
    .withColumn("pred_uplift", F.col("treatment_pred") - F.col("control_pred"))
    .select(
        TREATMENT_COLUMN, LABEL_COLUMN, "treatment_pred", "control_pred", "pred_uplift"
    )
    .cache()
)

display(test_pred_df.limit(20))

모델 평가

각 개인에 대한 상승을 관찰할 수 없으므로 고객 그룹에 대한 상승을 측정합니다. 상승 곡선인구에 걸쳐 실제 누적 상승을 플롯.

먼저 예측된 상승에 따라 테스트 DataFrame 순서의 순위를 지정합니다.

test_ranked_df = test_pred_df.withColumn(
    "percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift")))
)

display(test_ranked_df.limit(20))

각 그룹(치료 또는 제어)에서 방문의 누적 백분율을 계산합니다.

C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()

test_ranked_df = (
    test_ranked_df.withColumn(
        "control_label",
        F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "treatment_label",
        F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "control_cumsum",
        F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
    )
    .withColumn(
        "treatment_cumsum",
        F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
    )
)

display(test_ranked_df.limit(20))

각 백분율로 그룹의 상승률을 계산합니다.

test_ranked_df = test_ranked_df.withColumn(
    "group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")
).cache()

display(test_ranked_df.limit(20))

테스트 데이터 세트의 예측에 상승 곡선을 그어 줍니다. 그리기 전에 PySpark DataFrame을 pandas DataFrame으로 변환해야 합니다.

def uplift_plot(uplift_df):
    """
    Plot the uplift curve
    """
    gain_x = uplift_df.percent_rank
    gain_y = uplift_df.group_uplift
    # Plot the data
    plt.figure(figsize=(10, 6))
    mpl.rcParams["font.size"] = 8

    ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")

    plt.plot(
        [0, gain_x.max()],
        [0, gain_y.max()],
        "--",
        color="tab:orange",
        label="Random Treatment",
    )
    plt.legend()
    plt.xlabel("Porportion Targeted")
    plt.ylabel("Uplift")
    plt.grid()

    return ax


test_ranked_pd_df = test_ranked_df.select(
    ["pred_uplift", "percent_rank", "group_uplift"]
).toPandas()
uplift_plot(test_ranked_pd_df)

Screenshot of a chart that shows a normalized uplift model curve versus random treatment.

이 예제의 상승 곡선에서 예측에 따라 순위가 지정된 상위 20% 모집단은 치료와 함께 큰 이득을 얻습니다. 우리는 그들을 설득력 있는 사람으로 정의할 수 있습니다. 컷오프 점수를 20%로 인쇄하여 대상 고객을 식별할 수 있습니다.

cutoff_percentage = 0.2
cutoff_score = test_ranked_pd_df.iloc[int(len(test_ranked_pd_df) * cutoff_percentage)][
    "pred_uplift"
]

print("Uplift score higher than {:.4f} are Persuadables".format(cutoff_score))

4단계: MLflow를 사용하여 모델 로그 및 로드

나중에 사용할 수 있도록 학습된 모델을 저장합니다. 이 예제에서 MLflow는 메트릭과 모델을 모두 기록합니다. 이 API를 사용하여 예측을 위한 모델을 로드할 수도 있습니다.

# Set up MLflow
mlflow.set_experiment(EXPERIMENT_NAME)
# Log models, metrics, and parameters
with mlflow.start_run() as run:
    print("log model:")
    mlflow.spark.log_model(
        treatment_model,
        f"{EXPERIMENT_NAME}-treatmentmodel",
        registered_model_name=f"{EXPERIMENT_NAME}-treatmentmodel",
        dfs_tmpdir="Files/spark",
    )

    mlflow.spark.log_model(
        control_model,
        f"{EXPERIMENT_NAME}-controlmodel",
        registered_model_name=f"{EXPERIMENT_NAME}-controlmodel",
        dfs_tmpdir="Files/spark",
    )

    model_uri = f"runs:/{run.info.run_id}/{EXPERIMENT_NAME}"
    print("Model saved in run %s" % run.info.run_id)
    print(f"Model URI: {model_uri}-treatmentmodel")
    print(f"Model URI: {model_uri}-controlmodel")
# Reload the model
loaded_treatmentmodel = mlflow.spark.load_model(
    f"{model_uri}-treatmentmodel", dfs_tmpdir="Files/spark"
)