Créer, évaluer et déployer un modèle de détection des fraudes dans Microsoft Fabric

Dans ce tutoriel, nous allons présenter des workflows d’ingénierie des données et de science des données avec un exemple de bout en bout qui génère un modèle pour détecter les transactions frauduleuses de crédit carte. Les étapes à suivre sont les suivantes :

  • Charger les données dans un Lakehouse
  • Effectuer une analyse exploratoire des données sur les données
  • Préparer les données en gérant le déséquilibre de classe
  • Entraîner un modèle et le journaliser avec MLflow
  • Déployer le modèle et enregistrer les résultats de prédiction

Important

Microsoft Fabric est en préversion.

Prérequis

  • Accédez à l’expérience Science des données dans Microsoft Fabric.
  • Ouvrez l’exemple de notebook ou créez-en un.
    • Créez un bloc-notes si vous souhaitez copier/coller du code dans des cellules.
    • Vous pouvez également sélectionner Utiliser un exemple de>détection des fraudes pour ouvrir l’exemple de notebook.
  • Ajoutez un Lakehouse à votre bloc-notes.

Étape 1 : Charger les données

Le jeu de données contient les transactions de crédit carte effectuées par les titulaires de carte européens en septembre 2013 pendant deux jours. Sur 284 807 transactions, 492 sont frauduleuses. La classe positive (fraude) ne représente que 0,172 % des données, ce qui rend le jeu de données très déséquilibré.

Variables d’entrée et de réponse

Le jeu de données contient uniquement des variables d’entrée numériques, qui sont le résultat d’une transformation d’analyse des composants principaux (PCA) sur les fonctionnalités d’origine. Pour protéger la confidentialité, nous ne pouvons pas fournir les fonctionnalités d’origine ou plus d’informations générales sur les données. Les seules fonctionnalités qui n’ont pas été transformées avec PCA sont « Time » et « Amount ».

  • Fonctionnalités « V1, V2, ... V28 » sont les principaux composants obtenus avec PCA.
  • « Time » contient les secondes écoulées entre chaque transaction et la première transaction du jeu de données.
  • « Amount » est le montant de la transaction. Cette fonctionnalité peut être utilisée pour l’apprentissage dépendant des coûts, par exemple.
  • « Class » est la variable de réponse, qui prend la valeur 1 pour fraude et 0 sinon.

Étant donné le ratio de déséquilibre de classe, nous vous recommandons de mesurer la précision à l’aide de la zone sous la courbe de Precision-Recall (AUPRC). L’utilisation d’une matrice de confusion pour évaluer la précision n’est pas significative pour une classification déséquilibré.

L’extrait de code suivant montre une partie des donnéescreditcard.csv .

« Time » « V1 » « V2 » « V3 » « V4 » « V5 » « V6 » « V7 » « V8 » « V9 » « V10 » « V11 » « V12 » « V13 » « V14 » « V15 » « V16 » « V17 » « V18 » « V19 » « V20 » « V21 » « V22 » « V23 » « V24 » « V25 » « V26 » « V27 » « V28 » « Amount » « Classe »
0 -1.3598071336738 -0.0727811733098497 2.53634673796914 1.37815522427443 -0.338320769942518 0.462387777762292 0.239598554061257 0.0986979012610507 0.363786969611213 0.0907941719789316 -0.551599533260813 -0.617800855762348 -0.991389847235408 -0.311169353699879 1.46817697209427 -0.470400525259478 0.207971241929242 0.0257905801985591 0.403992960255733 0.251412098239705 -0.018306777944153 0.277837575558899 -0.110473910188767 0.0669280749146731 0.128539358273528 -0.189114843888824 0.133558376740387 -0.0210530534538215 149.62 "0"
0 1.19185711131486 0.26615071205963 0.16648011335321 0.448154078460911 0.0600176492822243 -0.0823608088155687 -0.0788029833323113 0.0851016549148104 -0.255425128109186 -0.166974414004614 1.61272666105479 1.06523531137287 0.48909501589608 -0.143772296441519 0.635558093258208 0.463917041022171 -0.114804663102346 -0.183361270123994 -0.145783041325259 -0.0690831352230203 -0.225775248033138 -0.638671952771851 0.101288021253234 -0.339846475529127 0.167170404418143 0.125894532368176 -0.00898309914322813 0.0147241691924927 2.69 "0"

Installation des bibliothèques

Pour ce tutoriel, nous devons installer la imblearn bibliothèque. Le noyau PySpark est redémarré après l’exécution %pip installde . Nous devons donc installer la bibliothèque avant d’exécuter d’autres cellules.

# install imblearn for SMOTE
%pip install imblearn

En définissant les paramètres suivants, nous pouvons facilement appliquer le notebook à différents jeux de données.

IS_CUSTOM_DATA = False  # if True, dataset has to be uploaded manually

TARGET_COL = "Class"  # target column name
IS_SAMPLE = False  # if True, use only <SAMPLE_ROWS> rows of data for training, otherwise use all data
SAMPLE_ROWS = 5000  # if IS_SAMPLE is True, use only this number of rows for training

DATA_FOLDER = "Files/fraud-detection/"  # folder with data files
DATA_FILE = "creditcard.csv"  # data file name

EXPERIMENT_NAME = "aisample-fraud"  # mlflow experiment name

Télécharger le jeu de données et le charger dans un Lakehouse

Avant d’exécuter le notebook, vous devez y ajouter un Lakehouse. Le Lakehouse est utilisé pour stocker les données de cet exemple. Pour ajouter un Lakehouse, consultez Ajouter un Lakehouse à votre bloc-notes.

if not IS_CUSTOM_DATA:
    # Download demo data files into lakehouse if not exist
    import os, requests

    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/Credit_Card_Fraud_Detection"
    fname = "creditcard.csv"
    download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError(
            "Default lakehouse not found, please add a lakehouse and restart the session."
        )
    os.makedirs(download_path, exist_ok=True)
    if not os.path.exists(f"{download_path}/{fname}"):
        r = requests.get(f"{remote_url}/{fname}", timeout=30)
        with open(f"{download_path}/{fname}", "wb") as f:
            f.write(r.content)
    print("Downloaded demo data files into lakehouse.")
# to record the notebook running time
import time

ts = time.time()

Lire les données du Lakehouse

df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", True)
    .load(f"{DATA_FOLDER}/raw/{DATA_FILE}")
    .cache()
)

Étape 2. Effectuer une analyse exploratoire des données

Dans cette section, nous allons explorer les données, case activée leur schéma, réorganiser leurs colonnes et convertir les colonnes dans les types de données appropriés.

Afficher les données brutes

Nous pouvons utiliser display pour explorer les données brutes, calculer des statistiques de base ou même afficher des vues de graphique.

display(df)

Imprimez des informations sur les données, telles que le schéma.

# print dataset basic info
print("records read: " + str(df.count()))
print("Schema: ")
df.printSchema()

Caster les colonnes dans les types appropriés

import pyspark.sql.functions as F

df_columns = df.columns
df_columns.remove(TARGET_COL)

# to make sure the TARGET_COL is the last column
df = df.select(df_columns + [TARGET_COL]).withColumn(
    TARGET_COL, F.col(TARGET_COL).cast("int")
)

if IS_SAMPLE:
    df = df.limit(SAMPLE_ROWS)

Étape 3. Développer et déployer un modèle

Dans cette section, nous allons entraîner un modèle LightGBM pour classifier les transactions frauduleuses.

Préparer les données d’entraînement et de test

Commencez par fractionner les données en jeux d’entraînement et de test.

# Split the dataset into train and test
train, test = df.randomSplit([0.85, 0.15], seed=42)
# Merge Columns
from pyspark.ml.feature import VectorAssembler

feature_cols = df.columns[:-1]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = featurizer.transform(train)[TARGET_COL, "features"]
test_data = featurizer.transform(test)[TARGET_COL, "features"]

Vérifiez le volume de données et le déséquilibre dans le jeu d’entraînement.

display(train_data.groupBy(TARGET_COL).count())

Traiter les données déséquilibrées

Comme souvent avec les données réelles, ces données ont un problème de déséquilibre de classe, car la classe positive (transactions frauduleuses) ne représente que 0,172 % de toutes les transactions. Nous allons appliquer SMOTE (Synthetic Minority Over-sampling Technique) pour gérer automatiquement le déséquilibre de classe dans les données. La méthode SMOTE suréchantillonne la classe minoritaire et sous-échantillonne la classe majoritaire pour améliorer les performances du classifieur.

Appliquons SMOTE aux données d’apprentissage :

Notes

imblearn Fonctionne uniquement pour les DataFrames pandas, et non pour les DataFrames PySpark.

from pyspark.ml.functions import vector_to_array, array_to_vector
import numpy as np
from collections import Counter
from imblearn.over_sampling import SMOTE

train_data_array = train_data.withColumn("features", vector_to_array("features"))

train_data_pd = train_data_array.toPandas()

X = train_data_pd["features"].to_numpy()
y = train_data_pd[TARGET_COL].to_numpy()
print("Original dataset shape %s" % Counter(y))

X = np.array([np.array(x) for x in X])

sm = SMOTE(random_state=42)
X_res, y_res = sm.fit_resample(X, y)
print("Resampled dataset shape %s" % Counter(y_res))

new_train_data = tuple(zip(X_res.tolist(), y_res.tolist()))
dataColumns = ["features", TARGET_COL]
new_train_data = spark.createDataFrame(data=new_train_data, schema=dataColumns)
new_train_data = new_train_data.withColumn("features", array_to_vector("features"))

Définir le modèle

Une fois nos données en place, nous pouvons maintenant définir le modèle. Nous allons utiliser un classifieur LightGBM et utiliser SynapseML pour implémenter le modèle avec quelques lignes de code.

from synapse.ml.lightgbm import LightGBMClassifier

model = LightGBMClassifier(
    objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=True
)
smote_model = LightGBMClassifier(
    objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=False
)

Entraîner le modèle

model = model.fit(train_data)
smote_model = smote_model.fit(new_train_data)

Expliquer le modèle

Ici, nous pouvons montrer l’importance que le modèle attribue à chaque fonctionnalité dans les données d’entraînement.

import pandas as pd
import matplotlib.pyplot as plt

feature_importances = model.getFeatureImportances()
fi = pd.Series(feature_importances, index=feature_cols)
fi = fi.sort_values(ascending=True)
f_index = fi.index
f_values = fi.values

# print feature importances
print("f_index:", f_index)
print("f_values:", f_values)

# plot
x_index = list(range(len(fi)))
x_index = [x / len(fi) for x in x_index]
plt.rcParams["figure.figsize"] = (20, 20)
plt.barh(
    x_index, f_values, height=0.028, align="center", color="tan", tick_label=f_index
)
plt.xlabel("importances")
plt.ylabel("features")
plt.show()

Évaluer le modèle

Générer des prédictions de modèle :

predictions = model.transform(test_data)
predictions.limit(10).toPandas()

Afficher les métriques du modèle :

from synapse.ml.train import ComputeModelStatistics

metrics = ComputeModelStatistics(
    evaluationMetric="classification", labelCol=TARGET_COL, scoredLabelsCol="prediction"
).transform(predictions)
display(metrics)

Créez une matrice de confusion :

# collect confusion matrix value
cm = metrics.select("confusion_matrix").collect()[0][0].toArray()
print(cm)

Tracer la matrice de confusion :

# plot confusion matrix
import seaborn as sns

sns.set(rc={"figure.figsize": (6, 4.5)})
ax = sns.heatmap(cm, annot=True, fmt=".20g")
ax.set_title("Confusion Matrix")
ax.set_xlabel("Predicted label")
ax.set_ylabel("True label")

Définissez une fonction pour évaluer le modèle :

from pyspark.ml.evaluation import BinaryClassificationEvaluator


def evaluate(predictions):
    """
    Evaluate the model by computing AUROC and AUPRC with the predictions.
    """

    # initialize the binary evaluator
    evaluator = BinaryClassificationEvaluator(
        rawPredictionCol="prediction", labelCol=TARGET_COL
    )

    _evaluator = lambda metric: evaluator.setMetricName(metric).evaluate(predictions)

    # calculate AUROC, baseline 0.5
    auroc = _evaluator("areaUnderROC")
    print(f"AUROC: {auroc:.4f}")

    # calculate AUPRC, baseline positive rate (0.172% in the demo data)
    auprc = _evaluator("areaUnderPR")
    print(f"AUPRC: {auprc:.4f}")

    return auroc, auprc

Évaluez le modèle d’origine :

# evaluate the original model
auroc, auprc = evaluate(predictions)

Évaluez le modèle SMOTE :

# evaluate the SMOTE model
new_predictions = smote_model.transform(test_data)
new_auroc, new_auprc = evaluate(new_predictions)
if new_auprc > auprc:
    # Using model trained on SMOTE data if it has higher AUPRC
    model = smote_model
    auprc = new_auprc
    auroc = new_auroc

Journaliser et charger le modèle avec MLflow

Maintenant que nous avons un modèle de travail décent, nous pouvons l’enregistrer pour une utilisation ultérieure. Ici, nous utilisons MLflow pour journaliser les métriques et les modèles, et charger les modèles à des fins de prédiction.

Configurer MLflow :

# setup mlflow
import mlflow

mlflow.set_experiment(EXPERIMENT_NAME)

Modèle de journal, métriques et paramètres :

# log model, metrics and params
with mlflow.start_run() as run:
    print("log model:")
    mlflow.spark.log_model(
        model,
        f"{EXPERIMENT_NAME}-lightgbm",
        registered_model_name=f"{EXPERIMENT_NAME}-lightgbm",
        dfs_tmpdir="Files/spark",
    )

    print("log metrics:")
    mlflow.log_metrics({"AUPRC": auprc, "AUROC": auroc})

    print("log parameters:")
    mlflow.log_params({"DATA_FILE": DATA_FILE})

    model_uri = f"runs:/{run.info.run_id}/{EXPERIMENT_NAME}-lightgbm"
    print("Model saved in run %s" % run.info.run_id)
    print(f"Model URI: {model_uri}")

Rechargez le modèle :

# load model back
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir="Files/spark")

Étape 4. Enregistrer les résultats de prédiction

Dans cette section, nous allons déployer le modèle et enregistrer les résultats de prédiction.

Déploiement et prédiction de modèle

batch_predictions = loaded_model.transform(test_data)

Enregistrez les prédictions dans le Lakehouse :

# code for saving predictions into lakehouse
batch_predictions.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions"
)
print(f"Full run cost {int(time.time() - ts)} seconds.")

Étapes suivantes