Erstellen, Trainieren und Bewerten von Uplift-Modellen in Microsoft Fabric

In diesem Artikel erfahren Sie, wie Sie Uplift-Modelle erstellen, trainieren und auswerten und Uplift-Modellierungstechniken anwenden.

Wichtig

Microsoft Fabric befindet sich derzeit in der Vorschauversion. Diese Informationen beziehen sich auf eine Vorabversion des Produkts, an der vor der Veröffentlichung noch wesentliche Änderungen vorgenommen werden können. Microsoft übernimmt keine Garantie, weder ausdrücklich noch stillschweigend, für die hier bereitgestellten Informationen.

  • Was ist Uplift-Modellierung?

    Es handelt sich um eine Familie von Kausalrückschlusstechnologien, die Machine Learning-Modelle verwendet, um die kausale Auswirkung einer Behandlung auf das Verhalten einer Person abzuschätzen.

    • Persuadables reagieren nur positiv auf die Behandlung
    • Schlafende Hunde reagieren stark negativ auf die Behandlung
    • Verlorene Ursachen erreichen das Ergebnis auch bei der Behandlung nie
    • Sicher, dass dinge immer das Ergebnis mit oder ohne die Behandlung erreichen

    Das Ziel der Uplift-Modellierung besteht darin, die "Persuadables" zu identifizieren, keine Anstrengungen für "sichere Dinge" und "verlorene Ursachen" zu verschwenden und "schlafende Hunde" zu vermeiden.

  • Wie funktioniert die Uplift-Modellierung?

    • Meta Learner: Sagt den Unterschied zwischen dem Verhalten einer Person vorher, wenn es eine Behandlung gibt und wenn es keine Behandlung gibt
    • Uplift Tree: ein baumbasierter Algorithmus, bei dem das Teilungskriterium auf Unterschieden in der Hebung basiert
    • NN-basiertes Modell: ein neuronales Netzwerkmodell, das normalerweise mit Beobachtungsdaten funktioniert
  • Wo kann die Uplift-Modellierung funktionieren?

    • Marketing: Hilfe bei der Identifizierung von Überrungen für die Anwendung einer Behandlung wie einer Gutschein oder einer Online-Werbung
    • Medizinische Behandlung: Helfen Sie zu verstehen, wie sich eine Behandlung auf bestimmte Gruppen unterschiedlich auswirken kann

Voraussetzungen

Schritt 1: Laden der Daten

Tipp

In den folgenden Beispielen wird davon ausgegangen, dass Sie den Code aus Zellen in einem Notebook ausführen. Informationen zum Erstellen und Verwenden von Notebooks finden Sie unter Verwenden von Notebooks.

Notebookkonfigurationen

Durch Definieren der folgenden Parameter können Sie dieses Beispiel auf verschiedene Datasets anwenden.

IS_CUSTOM_DATA = False  # if True, dataset has to be uploaded manually by user
DATA_FOLDER = "Files/uplift-modelling"
DATA_FILE = "criteo-research-uplift-v2.1.csv"

# data schema
FEATURE_COLUMNS = [f"f{i}" for i in range(12)]
TREATMENT_COLUMN = "treatment"
LABEL_COLUMN = "visit"

EXPERIMENT_NAME = "aisample-upliftmodelling"  # mlflow experiment name

Importieren von Abhängigkeiten

import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *

import numpy as np
import pandas as pd

import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.style as style
import seaborn as sns

%matplotlib inline


from synapse.ml.featurize import Featurize
from synapse.ml.core.spark import FluentAPI
from synapse.ml.lightgbm import *
from synapse.ml.train import ComputeModelStatistics

import os
import gzip

import mlflow

Dataset herunterladen und in Lakehouse hochladen

Wichtig

Fügen Sie Ihrem Notebook ein Lakehouse hinzu, bevor Sie es ausführen.

  • Datasetbeschreibung: Dieses Dataset wurde vom Criteo AI Lab erstellt. Das Dataset besteht aus 13 Millionen Zeilen, von denen jede einen Benutzer mit 12 Features, einem Behandlungsindikator und 2 binären Bezeichnungen (Besuche und Konvertierungen) darstellt.

    • f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11: Featurewerte (dicht, float)
    • Behandlung: Behandlungsgruppe (1 = behandelt, 0 = Kontrolle), die angibt, ob ein Kunde durch zufällige Werbung ins Visier genommen wurde
    • Conversion: gibt an, ob für diesen Benutzer eine Konvertierung aufgetreten ist (binär, Bezeichnung)
    • visit: ob für diesen Benutzer ein Besuch aufgetreten ist (binär, Bezeichnung)
  • Dataset-Startseite: https://ailab.criteo.com/criteo-uplift-prediction-dataset/

  • Zitat:

    @inproceedings{Diemert2018,
    author = {{Diemert Eustache, Betlei Artem} and Renaudin, Christophe and Massih-Reza, Amini},
    title={A Large Scale Benchmark for Uplift Modeling},
    publisher = {ACM},
    booktitle = {Proceedings of the AdKDD and TargetAd Workshop, KDD, London,United Kingdom, August, 20, 2018},
    year = {2018}
    }
    
    if not IS_CUSTOM_DATA:
         # Download demo data files into lakehouse if not exist
         import os, requests
    
         remote_url = "http://go.criteo.net/criteo-research-uplift-v2.1.csv.gz"
         download_file = "criteo-research-uplift-v2.1.csv.gz"
         download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"
    
         if not os.path.exists("/lakehouse/default"):
             raise FileNotFoundError(
                 "Default lakehouse not found, please add a lakehouse and restart the session."
             )
         os.makedirs(download_path, exist_ok=True)
         if not os.path.exists(f"{download_path}/{DATA_FILE}"):
             r = requests.get(f"{remote_url}", timeout=30)
             with open(f"{download_path}/{download_file}", "wb") as f:
                 f.write(r.content)
             with gzip.open(f"{download_path}/{download_file}", "rb") as fin:
                 with open(f"{download_path}/{DATA_FILE}", "wb") as fout:
                     fout.write(fin.read())
         print("Downloaded demo data files into lakehouse.")
    

Lesen von Daten aus Lakehouse

raw_df = spark.read.csv(
    f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True
).cache()

display(raw_df.limit(20))

Schritt 2: Vorbereiten des Datasets

Durchsuchen von Daten

  • Die Gesamtrate der Benutzer, die besuchen/konvertieren

    raw_df.select(
         F.mean("visit").alias("Percentage of users that visit"),
         F.mean("conversion").alias("Percentage of users that convert"),
         (F.sum("conversion") / F.sum("visit")).alias("Percentage of visitors that convert"),
    ).show()
    
  • Der gesamtdurchschnittliche Behandlungseffekt beim Besuch

    raw_df.groupby("treatment").agg(
         F.mean("visit").alias("Mean of visit"),
         F.sum("visit").alias("Sum of visit"),
         F.count("visit").alias("Count"),
    ).show()
    
  • Der gesamtdurchschnittliche Behandlungseffekt auf die Konversion

    raw_df.groupby("treatment").agg(
         F.mean("conversion").alias("Mean of conversion"),
         F.sum("conversion").alias("Sum of conversion"),
         F.count("conversion").alias("Count"),
    ).show()
    

Split train-test dataset

transformer = (
    Featurize().setOutputCol("features").setInputCols(FEATURE_COLUMNS).fit(raw_df)
)

df = transformer.transform(raw_df)
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

print("Size of train dataset: %d" % train_df.count())
print("Size of test dataset: %d" % test_df.count())

train_df.groupby(TREATMENT_COLUMN).count().show()

Dataset für die Aufteilung der Behandlungssteuerung

treatment_train_df = train_df.where(f"{TREATMENT_COLUMN} > 0")
control_train_df = train_df.where(f"{TREATMENT_COLUMN} = 0")

Schritt 3: Modelltraining und -auswertung

Uplift-Modellierung: T-Learner mit LightGBM

classifier = (
    LightGBMClassifier()
    .setFeaturesCol("features")
    .setNumLeaves(10)
    .setNumIterations(100)
    .setObjective("binary")
    .setLabelCol(LABEL_COLUMN)
)

treatment_model = classifier.fit(treatment_train_df)
control_model = classifier.fit(control_train_df)

Vorhersagen im Testdataset

getPred = F.udf(lambda v: float(v[1]), FloatType())

test_pred_df = (
    test_df.mlTransform(treatment_model)
    .withColumn("treatment_pred", getPred("probability"))
    .drop("rawPrediction", "probability", "prediction")
    .mlTransform(control_model)
    .withColumn("control_pred", getPred("probability"))
    .drop("rawPrediction", "probability", "prediction")
    .withColumn("pred_uplift", F.col("treatment_pred") - F.col("control_pred"))
    .select(
        TREATMENT_COLUMN, LABEL_COLUMN, "treatment_pred", "control_pred", "pred_uplift"
    )
    .cache()
)

display(test_pred_df.limit(20))

Modellauswertung

Da der tatsächliche Auftrieb nicht für jeden Einzelnen zu beobachten ist, messen Sie den Uplift über eine Gruppe von Kunden.

  • Uplift-Kurve: Zeichnet den tatsächlichen kumulativen Anstieg in der Bevölkerung

Ordnen Sie zunächst die Reihenfolge des Testdatenrahmens nach dem Vorhersageuplift ein.

test_ranked_df = test_pred_df.withColumn(
    "percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift")))
)

display(test_ranked_df.limit(20))

Berechnen Sie als Nächstes den kumulativen Prozentsatz der Besuche in jeder Gruppe (Behandlung oder Kontrolle).

C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()

test_ranked_df = (
    test_ranked_df.withColumn(
        "control_label",
        F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "treatment_label",
        F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "control_cumsum",
        F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
    )
    .withColumn(
        "treatment_cumsum",
        F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
    )
)

display(test_ranked_df.limit(20))

Berechnen Sie schließlich den Uplift der Gruppe bei jedem Prozentsatz.

test_ranked_df = test_ranked_df.withColumn(
    "group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")
).cache()

display(test_ranked_df.limit(20))

Jetzt können Sie die Uplift-Kurve für die Vorhersage des Testdatasets zeichnen. Sie müssen den pyspark-Dataframe vor dem Plotten in pandas-Dataframe konvertieren.

def uplift_plot(uplift_df):
    """
    Plot the uplift curve
    """
    gain_x = uplift_df.percent_rank
    gain_y = uplift_df.group_uplift
    # plot the data
    plt.figure(figsize=(10, 6))
    mpl.rcParams["font.size"] = 8

    ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")

    plt.plot(
        [0, gain_x.max()],
        [0, gain_y.max()],
        "--",
        color="tab:orange",
        label="Random Treatment",
    )
    plt.legend()
    plt.xlabel("Porportion Targeted")
    plt.ylabel("Uplift")
    plt.grid(b=True, which="major")

    return ax


test_ranked_pd_df = test_ranked_df.select(
    ["pred_uplift", "percent_rank", "group_uplift"]
).toPandas()
uplift_plot(test_ranked_pd_df)

Diagramm mit einer normalisierten Uplift-Modellkurve im Vergleich zur zufälligen Behandlung.

Beachten Sie bei der Uplift-Kurve im vorherigen Beispiel, dass die von Ihrer Vorhersage eingestuften 20 % der Bevölkerung einen großen Gewinn haben, wenn sie die Behandlung erhalten haben, was bedeutet, dass sie die Überzeugenden sind. Daher können Sie die Cutoff-Bewertung mit 20 % Prozent drucken, um die Zielkunden zu identifizieren.

cutoff_percentage = 0.2
cutoff_score = test_ranked_pd_df.iloc[int(len(test_ranked_pd_df) * cutoff_percentage)][
    "pred_uplift"
]

print("Uplift score higher than {:.4f} are Persuadables".format(cutoff_score))

Protokoll- und Lademodell mit MLflow

Nachdem Sie nun über ein trainiertes Modell verfügen, speichern Sie es zur späteren Verwendung. Im folgenden Beispiel wird MLflow verwendet, um Metriken und Modelle zu protokollieren. Sie können diese API auch verwenden, um Modelle für Vorhersage zu laden.

# setup mlflow
mlflow.set_experiment(EXPERIMENT_NAME)
# log model, metrics and params
with mlflow.start_run() as run:
    print("log model:")
    mlflow.spark.log_model(
        treatment_model,
        f"{EXPERIMENT_NAME}-treatmentmodel",
        registered_model_name=f"{EXPERIMENT_NAME}-treatmentmodel",
        dfs_tmpdir="Files/spark",
    )

    mlflow.spark.log_model(
        control_model,
        f"{EXPERIMENT_NAME}-controlmodel",
        registered_model_name=f"{EXPERIMENT_NAME}-controlmodel",
        dfs_tmpdir="Files/spark",
    )

    model_uri = f"runs:/{run.info.run_id}/{EXPERIMENT_NAME}"
    print("Model saved in run %s" % run.info.run_id)
    print(f"Model URI: {model_uri}-treatmentmodel")
    print(f"Model URI: {model_uri}-controlmodel")
# load model back
loaded_treatmentmodel = mlflow.spark.load_model(
    f"{model_uri}-treatmentmodel", dfs_tmpdir="Files/spark"
)