Partager via


Tutoriel : créer, entraîner et évaluer un modèle uplift

Ce tutoriel présente un exemple de bout en bout d’un flux de travail de science des données Synapse dans Microsoft Fabric. Vous allez découvrir comment créer, entraîner et évaluer des modèles uplift et appliquer la technique de modélisation uplift.

Prérequis

Suivre dans un notebook

Vous pouvez suivre dans un notebook de deux façons :

  • Ouvrez et exécutez le notebook intégré dans l’expérience science des données Synapse
  • Chargez votre notebook à partir de GitHub vers l’expérience science des données Synapse

Ouvrir le notebook intégré

L’exemple de notebook Modélisation Uplift accompagne ce tutoriel. Consultez Pour ouvrir l’exemple de notebook intégré au tutoriel dans l’expérience science des données Synapse : 1. Accédez à la page d’accueil science des données Synapse. 1. Sélectionnez Utiliser un échantillon. 1. Sélectionnez l’échantillon correspondant :* À partir de l’onglet par défaut Workflows de bout en bout (Python), si l’exemple concerne un tutoriel Python. * À partir de l’onglet Workflows de bout en bout (R), si l’exemple concerne un tutoriel R. * À partir de l’onglet Tutoriels rapides, si l’exemple concerne un tutoriel rapide. 1. Attachez un lakehouse au notebook avant de commencer à exécuter le code. pour plus d’informations sur l’accès aux échantillons de notebooks intégrés pour les tutoriels.

Pour ouvrir l’exemple de notebook intégré au tutoriel dans l’expérience science des données Synapse :

  1. Accédez à la page d’accueil science des données Synapse

  2. Sélectionnez Utiliser un échantillon

  3. Sélectionnez l’échantillon correspondant :

    1. À partir de l’onglet par défaut Workflows de bout en bout (Python), si l’exemple concerne un tutoriel Python
    2. À partir de l’onglet Workflows de bout en bout (R), si l’exemple concerne un tutoriel R
    3. À partir de l’onglet Tutoriels rapides, si l’exemple concerne un tutoriel rapide
  4. Attachez un lakehouse au notebook avant de commencer à exécuter le code

Importer le notebook à partir de GitHub

Le notebook AIsample - Uplift Modeling.ipynb accompagne ce tutoriel.

Pour ouvrir le notebook accompagnant ce tutoriel, suivez les instructions fournies dans Préparer votre système pour le tutoriel sur la science des données afin d’importer les notebooks dans votre espace de travail.

Vous pouvez créer un nouveau notebook si vous préférez copier et coller le code de cette page.

Assurez-vous d’attacher un lakehouse au notebook avant de commencer à exécuter du code.

Étape 1 : Chargement des données

Dataset

Criteo AI Lab a créé le jeu de données. Ce jeu de données comporte 13 millions de lignes. Chaque ligne représente un utilisateur. Chaque ligne possède 12 caractéristiques, un indicateur de traitement et deux étiquettes binaires qui incluent la visites et la conversion.

Capture d’écran montrant la structure du jeu de données Criteo AI Lab.

  • f0 à f11 : valeurs de caractéristiques (valeurs denses, flottantes)
  • traitement : si un utilisateur a été ciblé de manière aléatoire pour un traitement (par exemple, publicité) (1 = traitement, 0 = contrôle)
  • conversion : indique si une conversion s’est produite (par exemple, un achat) pour un utilisateur (binaire, étiquette)
  • visite : indique si une conversion s’est produite (par exemple, un achat) pour un utilisateur (binaire, étiquette)

Citation

Le jeu de données utilisé pour ce notebook nécessite cette citation BibTex :

@inproceedings{Diemert2018,
author = {{Diemert Eustache, Betlei Artem} and Renaudin, Christophe and Massih-Reza, Amini},
title={A Large Scale Benchmark for Uplift Modeling},
publisher = {ACM},
booktitle = {Proceedings of the AdKDD and TargetAd Workshop, KDD, London,United Kingdom, August, 20, 2018},
year = {2018}
}

Conseil

En définissant les paramètres suivants, vous pouvez facilement appliquer ce notebook à différents jeux de données.

IS_CUSTOM_DATA = False  # If True, the user must upload the dataset manually
DATA_FOLDER = "Files/uplift-modelling"
DATA_FILE = "criteo-research-uplift-v2.1.csv"

# Data schema
FEATURE_COLUMNS = [f"f{i}" for i in range(12)]
TREATMENT_COLUMN = "treatment"
LABEL_COLUMN = "visit"

EXPERIMENT_NAME = "aisample-upliftmodelling"  # MLflow experiment name

Importer des bibliothèques

Avant le traitement, vous devez importer les bibliothèques Spark et SynapseML requises. Vous devez également importer une bibliothèque de visualisation des données, par exemple Seaborn, une bibliothèque de visualisation des données Python. Une bibliothèque de visualisation des données Python fournit une interface de haut niveau pour construire des ressources visuelles sur des DataFrame et des tableaux. En savoir plus sur Spark, SynapseML et Seaborn.

import os
import gzip

import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *

import numpy as np
import pandas as pd

import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.style as style
import seaborn as sns

%matplotlib inline

from synapse.ml.featurize import Featurize
from synapse.ml.core.spark import FluentAPI
from synapse.ml.lightgbm import *
from synapse.ml.train import ComputeModelStatistics

import mlflow

Télécharger un jeu de données et le charger sur un lakehouse

Ce code télécharge une version du jeu de données accessible au public, puis stocke cette ressource de données dans un lakehouse Fabric.

Important

Assurez-vous d’Ajouter un lakehouse au notebook avant de l’exécuter. Dans le cas contraire, vous obtiendrez une erreur.

if not IS_CUSTOM_DATA:
    # Download demo data files into lakehouse if not exist
    import os, requests

    remote_url = "http://go.criteo.net/criteo-research-uplift-v2.1.csv.gz"
    download_file = "criteo-research-uplift-v2.1.csv.gz"
    download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found, please add a lakehouse and restart the session.")
    os.makedirs(download_path, exist_ok=True)
    if not os.path.exists(f"{download_path}/{DATA_FILE}"):
        r = requests.get(f"{remote_url}", timeout=30)
        with open(f"{download_path}/{download_file}", "wb") as f:
            f.write(r.content)
        with gzip.open(f"{download_path}/{download_file}", "rb") as fin:
            with open(f"{download_path}/{DATA_FILE}", "wb") as fout:
                fout.write(fin.read())
    print("Downloaded demo data files into lakehouse.")

Commencez à enregistrer le temps nécessaire à l’exécution de ce notebook.

# Record the notebook running time
import time

ts = time.time()

Configurer le suivi des expériences MLflow

Pour étendre les fonctionnalités de journalisation MLflow, la synchronisation automatique capture automatiquement les valeurs des paramètres d’entrée et des métriques de sortie d’un modèle Machine Learning pendant sa formation. Ces informations sont ensuite enregistrées dans l’espace de travail, où les API MLflow ou l’expérience correspondante dans l’espace de travail peuvent y accéder et les visualiser. Pour plus d’informations sur la synchronisation automatique, consultez cette ressource.

# Set up the MLflow experiment
import mlflow

mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True)  # Disable MLflow autologging

Remarque

Pour désactiver l’autologging de Microsoft Fabric dans une session notebook, appelez mlflow.autolog() et définissez disable=True.

Lire des données à partir du lakehouse

Lit les données brutes de la section Fichiers du lakehouse, et ajoute des colonnes supplémentaires pour les différentes parties de la date. Les mêmes informations sont utilisées pour créer une table delta partitionnée.

raw_df = spark.read.csv(f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True).cache()

Étape 2 : analyse exploratoire des données

Utilisez la commande display pour afficher les statistiques de haut niveau à propos du jeu de données. Vous pouvez également afficher les vues de diagramme pour visualiser facilement des sous-ensembles du jeu de données.

display(raw_df.limit(20))

Examinez le pourcentage d'utilisateurs qui visitent le site, le pourcentage d'utilisateurs et le pourcentage de visiteurs qui convertissent leur visite.

raw_df.select(
    F.mean("visit").alias("Percentage of users that visit"),
    F.mean("conversion").alias("Percentage of users that convert"),
    (F.sum("conversion") / F.sum("visit")).alias("Percentage of visitors that convert"),
).show()

L’analyse indique que 4,9 % des utilisateurs du groupe de traitement, ceux qui ont reçu le traitement ou la publicité, ont visité le magasin en ligne. Seuls 3,8 % des utilisateurs du groupe de contrôle, ceux qui n’ont jamais reçu le traitement ou qui n’ont jamais été exposés à la publicité, ont également visité le magasin en ligne. En outre, 0,31 % de tous les utilisateurs du groupe de traitement ont été convertis ou ont effectué un achat, contre seulement 0,19 % des utilisateurs du groupe de contrôles. Par conséquent, le taux de conversion des visiteurs qui ont effectué un achat et qui étaient également membres du groupe de traitement , est de 6,36 %, contre seulement 5,07 %** pour les utilisateurs du groupe de contrôles. Sur la base de ces résultats, le traitement peut potentiellement améliorer le taux de visite d’environ 1 %, et le taux de conversion des visiteurs d’environ 1,3 %. Le traitement entraîne une amélioration significative.

Étape 3 : définir le modèle pour la formation

Préparez les jeux de données d’entraînement et de test

Ici, vous ajustez un transformateur de caractérisation au DataFrame raw_df, pour extraire les fonctionnalités des colonnes d’entrée spécifiées et générer ces fonctionnalités dans une nouvelle colonne nommée features.

Le DataFrame résultant est stocké dans un nouveau DataFrame nommé df.

transformer = Featurize().setOutputCol("features").setInputCols(FEATURE_COLUMNS).fit(raw_df)
df = transformer.transform(raw_df)
# Split the DataFrame into training and test sets, with a 80/20 ratio and a seed of 42
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Print the training and test dataset sizes
print("Size of train dataset: %d" % train_df.count())
print("Size of test dataset: %d" % test_df.count())

# Group the training dataset by the treatment column, and count the number of occurrences of each value
train_df.groupby(TREATMENT_COLUMN).count().show()

Préparez les jeux de données de traitement et de contrôle

Après avoir créé les jeux de données d’entraînement et de test, vous devez également créer les jeux de données de traitement et de contrôle pour entraîner les modèles Machine Learning à mesurer l’uplift.

# Extract the treatment and control DataFrames
treatment_train_df = train_df.where(f"{TREATMENT_COLUMN} > 0")
control_train_df = train_df.where(f"{TREATMENT_COLUMN} = 0")

Maintenant que vous avez préparé vos données, vous pouvez procéder à l’entraînement d’un modèle avec LightGBM.

Modélisation uplift : T-Learner avec LightGBM

Les méta-learners sont un ensemble d’algorithmes, basés sur des algorithmes de d’apprentissage automatique comme LightGBM, Xgboost, etc. Ils aident à estimer l'effet de traitement moyen conditionnel, ou CATE. T-learner est un méta-learner qui n’utilise pas de modèle unique. Au lieu de cela, T-learner utilise un modèle par variable de traitement. Par conséquent, deux modèles sont développés et nous faisons référence au méta-learner en tant que T-learner. T-learner utilise plusieurs modèles Machine Learning pour surmonter le problème de l’élimination totale du traitement, en obligeant l'apprenant à se diviser d'abord sur celui-ci.

mlflow.autolog(exclusive=False)
classifier = (
    LightGBMClassifier(dataTransferMode="bulk")
    .setFeaturesCol("features")  # Set the column name for features
    .setNumLeaves(10)  # Set the number of leaves in each decision tree
    .setNumIterations(100)  # Set the number of boosting iterations
    .setObjective("binary")  # Set the objective function for binary classification
    .setLabelCol(LABEL_COLUMN)  # Set the column name for the label
)

# Start a new MLflow run with the name "uplift"
active_run = mlflow.start_run(run_name="uplift")

# Start a new nested MLflow run with the name "treatment"
with mlflow.start_run(run_name="treatment", nested=True) as treatment_run:
    treatment_run_id = treatment_run.info.run_id  # Get the ID of the treatment run
    treatment_model = classifier.fit(treatment_train_df)  # Fit the classifier on the treatment training data

# Start a new nested MLflow run with the name "control"
with mlflow.start_run(run_name="control", nested=True) as control_run:
    control_run_id = control_run.info.run_id  # Get the ID of the control run
    control_model = classifier.fit(control_train_df)  # Fit the classifier on the control training data
     

Utilisez le jeu de données de test pour une prévision

Ici, vous utilisez treatment_model et control_model, tous deux définis précédemment, pour transformer le jeu de données de test test_df. Ensuite, vous calculez l’uplift prévu. Vous définissez l’uplift prévu comme la différence entre le résultat prévu du traitement et le résultat prévu du contrôle. Plus cette différence d’uplift prévu est importante, plus l'efficacité du traitement (par exemple, la publicité) sur une personne ou un sous-groupe est grande.

getPred = F.udf(lambda v: float(v[1]), FloatType())

# Cache the resulting DataFrame for easier access
test_pred_df = (
    test_df.mlTransform(treatment_model)
    .withColumn("treatment_pred", getPred("probability"))
    .drop("rawPrediction", "probability", "prediction")
    .mlTransform(control_model)
    .withColumn("control_pred", getPred("probability"))
    .drop("rawPrediction", "probability", "prediction")
    .withColumn("pred_uplift", F.col("treatment_pred") - F.col("control_pred"))
    .select(TREATMENT_COLUMN, LABEL_COLUMN, "treatment_pred", "control_pred", "pred_uplift")
    .cache()
)

# Display the first twenty rows of the resulting DataFrame
display(test_pred_df.limit(20))

Effectuer une évaluation de modèle

Étant donné que l’uplift ne peut pas être observé pour chaque individu, vous devez le mesurer par rapport à un groupe d’individus. Vous utilisez une courbe uplift qui trace l’uplift cumulé réel dans la population.

Capture d’écran d’une courbe de modèle d’uplift normalisée par rapport à un traitement aléatoire.

L’axe des abscisses représente le ratio de la population sélectionnée pour le traitement. Une valeur de 0 indique qu'il n'y a pas de groupe de traitement, c'est-à-dire que personne n'est exposé au traitement ou ne se le voit proposer. Une valeur de 1 indique un groupe de traitement complet, c'est-à-dire que tout le monde est exposé au traitement ou se le voit proposer. L’axe des ordonnées montre la mesure d’uplift. L'objectif est de déterminer la taille du groupe de traitement, ou le pourcentage de la population qui se verrait proposer le traitement ou y serait exposée (par exemple, par la publicité). Cette approche optimise la sélection de la cible, afin d'optimiser le résultat.

Tout d’abord, classez l’ordre du DataFrame de test en fonction de l’uplift prédit. L’uplift prévu est la différence entre le résultat prévu du traitement et le résultat prévu du contrôle.

# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))

display(test_ranked_df.limit(20))

Ensuite, calculez le pourcentage cumulé de visites à la fois dans le groupe de contrôle et de traitement.

# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()

# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
    test_ranked_df.withColumn(
        "control_label",
        F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "treatment_label",
        F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "control_cumsum",
        F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
    )
    .withColumn(
        "treatment_cumsum",
        F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
    )
)

# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))

Enfin, à chaque pourcentage, calculez l’uplift du groupe en tant que différence entre le pourcentage cumulé de visites entre le groupe de traitement et le groupe de contrôle.

test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))

À présent, tracez la courbe uplift pour la prévision du jeu de données de test. Vous devez convertir le DataFrame PySpark en DataFrame Pandas avant de procéder au traçage.

def uplift_plot(uplift_df):
    """
    Plot the uplift curve
    """
    gain_x = uplift_df.percent_rank
    gain_y = uplift_df.group_uplift
    # Plot the data
    fig = plt.figure(figsize=(10, 6))
    mpl.rcParams["font.size"] = 8

    ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")

    plt.plot(
        [0, gain_x.max()],
        [0, gain_y.max()],
        "--",
        color="tab:orange",
        label="Random Treatment",
    )
    plt.legend()
    plt.xlabel("Porportion Targeted")
    plt.ylabel("Uplift")
    plt.grid()

    return fig, ax


test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)

mlflow.log_figure(fig, "UpliftCurve.png")

Capture d’écran d’une courbe de modèle d’uplift normalisée par rapport à un traitement aléatoire.

L’axe des abscisses représente le ratio de la population sélectionnée pour le traitement. Une valeur de 0 indique qu'il n'y a pas de groupe de traitement, c'est-à-dire que personne n'est exposé au traitement ou ne se le voit proposer. Une valeur de 1 indique un groupe de traitement complet, c'est-à-dire que tout le monde est exposé au traitement ou se le voit proposer. L’axe des ordonnées montre la mesure d’uplift. L'objectif est de déterminer la taille du groupe de traitement, ou le pourcentage de la population qui se verrait proposer le traitement ou y serait exposée (par exemple, par la publicité). Cette approche optimise la sélection de la cible, afin d'optimiser le résultat.

Tout d’abord, classez l’ordre du DataFrame de test en fonction de l’uplift prédit. L’uplift prévu est la différence entre le résultat prévu du traitement et le résultat prévu du contrôle.

# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))

display(test_ranked_df.limit(20))

Ensuite, calculez le pourcentage cumulé de visites à la fois dans le groupe de contrôle et de traitement.

# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()

# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
    test_ranked_df.withColumn(
        "control_label",
        F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "treatment_label",
        F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "control_cumsum",
        F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
    )
    .withColumn(
        "treatment_cumsum",
        F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
    )
)

# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))

Enfin, à chaque pourcentage, calculez l’uplift du groupe en tant que différence entre le pourcentage cumulé de visites entre le groupe de traitement et le groupe de contrôle.

test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))

À présent, tracez la courbe uplift pour la prévision du jeu de données de test. Vous devez convertir le DataFrame PySpark en DataFrame Pandas avant de procéder au traçage.

def uplift_plot(uplift_df):
    """
    Plot the uplift curve
    """
    gain_x = uplift_df.percent_rank
    gain_y = uplift_df.group_uplift
    # Plot the data
    fig = plt.figure(figsize=(10, 6))
    mpl.rcParams["font.size"] = 8

    ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")

    plt.plot(
        [0, gain_x.max()],
        [0, gain_y.max()],
        "--",
        color="tab:orange",
        label="Random Treatment",
    )
    plt.legend()
    plt.xlabel("Porportion Targeted")
    plt.ylabel("Uplift")
    plt.grid()

    return fig, ax


test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)

mlflow.log_figure(fig, "UpliftCurve.png")

Capture d’écran d’une courbe de modèle d’uplift normalisée par rapport à un traitement aléatoire.

L'analyse et la courbe d'augmentation montrent toutes deux que les 20 % les plus importants, classés par la prédiction, bénéficieraient d'un gain important s'ils recevaient le traitement. Cela signifie que les 20 % les plus importants de la population représentent le groupe des influençables. Vous pouvez donc fixer la note seuil pour la taille souhaitée du groupe de traitement à 20 %, afin d'identifier les clients cibles de la sélection pour obtenir l'impact le plus important.

cutoff_percentage = 0.2
cutoff_score = test_ranked_pd_df.iloc[int(len(test_ranked_pd_df) * cutoff_percentage)][
    "pred_uplift"
]

print("Uplift scores that exceed {:.4f} map to Persuadables.".format(cutoff_score))
mlflow.log_metrics(
    {"cutoff_score": cutoff_score, "cutoff_percentage": cutoff_percentage}
)

Étape 4 : enregistrement du modèle de ML final

Vous utilisez MLflow pour suivre et enregistrer toutes les expériences pour le groupe de traitement et le groupe de contrôles. Ce suivi et cette journalisation incluent les paramètres, les indicateurs et les modèles correspondants. Ces informations sont consignées sous le nom de l’expérience, dans l’espace de travail, pour une utilisation ultérieure.

# Register the model
treatment_model_uri = "runs:/{}/model".format(treatment_run_id)
mlflow.register_model(treatment_model_uri, f"{EXPERIMENT_NAME}-treatmentmodel")

control_model_uri = "runs:/{}/model".format(control_run_id)
mlflow.register_model(control_model_uri, f"{EXPERIMENT_NAME}-controlmodel")

mlflow.end_run()

Pour afficher vos expériences :

  1. Dans le volet gauche, sélectionnez votre espace de travail.
  2. Recherchez et sélectionnez le nom de l’expérience, dans ce cas aisample-upliftmodelling.

Capture d’écran montrant les résultats de l’expérience de modélisation de l’uplift aisample.

Étape 5 : Enregistrer les résultats de la prédiction

Microsoft Fabric offre PREDICT ; une fonction évolutive qui prend en charge le scoring par lots dans n’importe quel moteur de calcul. Il permet aux clients d’opérationnaliser des modèles Machine Learning. Les utilisateurs peuvent créer des prédictions par lots directement à partir d’un bloc-notes ou de la page d’élément d’un modèle spécifique. Consultez cette ressource pour en savoir plus sur PREDICT et pour apprendre à utiliser PREDICT dans Microsoft Fabric.

# Load the model back
loaded_treatmentmodel = mlflow.spark.load_model(treatment_model_uri, dfs_tmpdir="Files/spark")
loaded_controlmodel = mlflow.spark.load_model(control_model_uri, dfs_tmpdir="Files/spark")

# Make predictions
batch_predictions_treatment = loaded_treatmentmodel.transform(test_df)
batch_predictions_control = loaded_controlmodel.transform(test_df)
batch_predictions_treatment.show(5)
# Save the predictions in the lakehouse
batch_predictions_treatment.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions_treatment"
)
batch_predictions_control.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions_control"
)
# Determine the entire runtime
print(f"Full run cost {int(time.time() - ts)} seconds.")