Créer, évaluer et déployer un modèle de détection des fraudes dans Microsoft Fabric
Dans ce tutoriel, nous allons présenter des workflows d’ingénierie des données et de science des données avec un exemple de bout en bout qui génère un modèle pour détecter les transactions frauduleuses de crédit carte. Les étapes à suivre sont les suivantes :
- Charger les données dans un Lakehouse
- Effectuer une analyse exploratoire des données sur les données
- Préparer les données en gérant le déséquilibre de classe
- Entraîner un modèle et le journaliser avec MLflow
- Déployer le modèle et enregistrer les résultats de prédiction
Important
Microsoft Fabric est en préversion.
Prérequis
Un abonnement Power BI Premium. Si vous n’en avez pas, consultez Comment acheter des Power BI Premium.
Un espace de travail Power BI avec une capacité Premium affectée. Si vous n’avez pas d’espace de travail, suivez les étapes décrites dans Créer un espace de travail pour en créer un et l’affecter à une capacité Premium.
Connectez-vous à Microsoft Fabric.
- Accédez à l’expérience Science des données dans Microsoft Fabric.
- Ouvrez l’exemple de notebook ou créez-en un.
- Créez un bloc-notes si vous souhaitez copier/coller du code dans des cellules.
- Vous pouvez également sélectionner Utiliser un exemple de>détection des fraudes pour ouvrir l’exemple de notebook.
- Ajoutez un Lakehouse à votre bloc-notes.
Étape 1 : Charger les données
Le jeu de données contient les transactions de crédit carte effectuées par les titulaires de carte européens en septembre 2013 pendant deux jours. Sur 284 807 transactions, 492 sont frauduleuses. La classe positive (fraude) ne représente que 0,172 % des données, ce qui rend le jeu de données très déséquilibré.
Variables d’entrée et de réponse
Le jeu de données contient uniquement des variables d’entrée numériques, qui sont le résultat d’une transformation d’analyse des composants principaux (PCA) sur les fonctionnalités d’origine. Pour protéger la confidentialité, nous ne pouvons pas fournir les fonctionnalités d’origine ou plus d’informations générales sur les données. Les seules fonctionnalités qui n’ont pas été transformées avec PCA sont « Time » et « Amount ».
- Fonctionnalités « V1, V2, ... V28 » sont les principaux composants obtenus avec PCA.
- « Time » contient les secondes écoulées entre chaque transaction et la première transaction du jeu de données.
- « Amount » est le montant de la transaction. Cette fonctionnalité peut être utilisée pour l’apprentissage dépendant des coûts, par exemple.
- « Class » est la variable de réponse, qui prend la valeur
1
pour fraude et0
sinon.
Étant donné le ratio de déséquilibre de classe, nous vous recommandons de mesurer la précision à l’aide de la zone sous la courbe de Precision-Recall (AUPRC). L’utilisation d’une matrice de confusion pour évaluer la précision n’est pas significative pour une classification déséquilibré.
L’extrait de code suivant montre une partie des donnéescreditcard.csv .
« Time » | « V1 » | « V2 » | « V3 » | « V4 » | « V5 » | « V6 » | « V7 » | « V8 » | « V9 » | « V10 » | « V11 » | « V12 » | « V13 » | « V14 » | « V15 » | « V16 » | « V17 » | « V18 » | « V19 » | « V20 » | « V21 » | « V22 » | « V23 » | « V24 » | « V25 » | « V26 » | « V27 » | « V28 » | « Amount » | « Classe » |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | -1.3598071336738 | -0.0727811733098497 | 2.53634673796914 | 1.37815522427443 | -0.338320769942518 | 0.462387777762292 | 0.239598554061257 | 0.0986979012610507 | 0.363786969611213 | 0.0907941719789316 | -0.551599533260813 | -0.617800855762348 | -0.991389847235408 | -0.311169353699879 | 1.46817697209427 | -0.470400525259478 | 0.207971241929242 | 0.0257905801985591 | 0.403992960255733 | 0.251412098239705 | -0.018306777944153 | 0.277837575558899 | -0.110473910188767 | 0.0669280749146731 | 0.128539358273528 | -0.189114843888824 | 0.133558376740387 | -0.0210530534538215 | 149.62 | "0" |
0 | 1.19185711131486 | 0.26615071205963 | 0.16648011335321 | 0.448154078460911 | 0.0600176492822243 | -0.0823608088155687 | -0.0788029833323113 | 0.0851016549148104 | -0.255425128109186 | -0.166974414004614 | 1.61272666105479 | 1.06523531137287 | 0.48909501589608 | -0.143772296441519 | 0.635558093258208 | 0.463917041022171 | -0.114804663102346 | -0.183361270123994 | -0.145783041325259 | -0.0690831352230203 | -0.225775248033138 | -0.638671952771851 | 0.101288021253234 | -0.339846475529127 | 0.167170404418143 | 0.125894532368176 | -0.00898309914322813 | 0.0147241691924927 | 2.69 | "0" |
Installation des bibliothèques
Pour ce tutoriel, nous devons installer la imblearn
bibliothèque. Le noyau PySpark est redémarré après l’exécution %pip install
de . Nous devons donc installer la bibliothèque avant d’exécuter d’autres cellules.
# install imblearn for SMOTE
%pip install imblearn
En définissant les paramètres suivants, nous pouvons facilement appliquer le notebook à différents jeux de données.
IS_CUSTOM_DATA = False # if True, dataset has to be uploaded manually
TARGET_COL = "Class" # target column name
IS_SAMPLE = False # if True, use only <SAMPLE_ROWS> rows of data for training, otherwise use all data
SAMPLE_ROWS = 5000 # if IS_SAMPLE is True, use only this number of rows for training
DATA_FOLDER = "Files/fraud-detection/" # folder with data files
DATA_FILE = "creditcard.csv" # data file name
EXPERIMENT_NAME = "aisample-fraud" # mlflow experiment name
Télécharger le jeu de données et le charger dans un Lakehouse
Avant d’exécuter le notebook, vous devez y ajouter un Lakehouse. Le Lakehouse est utilisé pour stocker les données de cet exemple. Pour ajouter un Lakehouse, consultez Ajouter un Lakehouse à votre bloc-notes.
if not IS_CUSTOM_DATA:
# Download demo data files into lakehouse if not exist
import os, requests
remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/Credit_Card_Fraud_Detection"
fname = "creditcard.csv"
download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError(
"Default lakehouse not found, please add a lakehouse and restart the session."
)
os.makedirs(download_path, exist_ok=True)
if not os.path.exists(f"{download_path}/{fname}"):
r = requests.get(f"{remote_url}/{fname}", timeout=30)
with open(f"{download_path}/{fname}", "wb") as f:
f.write(r.content)
print("Downloaded demo data files into lakehouse.")
# to record the notebook running time
import time
ts = time.time()
Lire les données du Lakehouse
df = (
spark.read.format("csv")
.option("header", "true")
.option("inferSchema", True)
.load(f"{DATA_FOLDER}/raw/{DATA_FILE}")
.cache()
)
Étape 2. Effectuer une analyse exploratoire des données
Dans cette section, nous allons explorer les données, case activée leur schéma, réorganiser leurs colonnes et convertir les colonnes dans les types de données appropriés.
Afficher les données brutes
Nous pouvons utiliser display
pour explorer les données brutes, calculer des statistiques de base ou même afficher des vues de graphique.
display(df)
Imprimez des informations sur les données, telles que le schéma.
# print dataset basic info
print("records read: " + str(df.count()))
print("Schema: ")
df.printSchema()
Caster les colonnes dans les types appropriés
import pyspark.sql.functions as F
df_columns = df.columns
df_columns.remove(TARGET_COL)
# to make sure the TARGET_COL is the last column
df = df.select(df_columns + [TARGET_COL]).withColumn(
TARGET_COL, F.col(TARGET_COL).cast("int")
)
if IS_SAMPLE:
df = df.limit(SAMPLE_ROWS)
Étape 3. Développer et déployer un modèle
Dans cette section, nous allons entraîner un modèle LightGBM pour classifier les transactions frauduleuses.
Préparer les données d’entraînement et de test
Commencez par fractionner les données en jeux d’entraînement et de test.
# Split the dataset into train and test
train, test = df.randomSplit([0.85, 0.15], seed=42)
# Merge Columns
from pyspark.ml.feature import VectorAssembler
feature_cols = df.columns[:-1]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = featurizer.transform(train)[TARGET_COL, "features"]
test_data = featurizer.transform(test)[TARGET_COL, "features"]
Vérifiez le volume de données et le déséquilibre dans le jeu d’entraînement.
display(train_data.groupBy(TARGET_COL).count())
Traiter les données déséquilibrées
Comme souvent avec les données réelles, ces données ont un problème de déséquilibre de classe, car la classe positive (transactions frauduleuses) ne représente que 0,172 % de toutes les transactions. Nous allons appliquer SMOTE (Synthetic Minority Over-sampling Technique) pour gérer automatiquement le déséquilibre de classe dans les données. La méthode SMOTE suréchantillonne la classe minoritaire et sous-échantillonne la classe majoritaire pour améliorer les performances du classifieur.
Appliquons SMOTE aux données d’apprentissage :
Notes
imblearn
Fonctionne uniquement pour les DataFrames pandas, et non pour les DataFrames PySpark.
from pyspark.ml.functions import vector_to_array, array_to_vector
import numpy as np
from collections import Counter
from imblearn.over_sampling import SMOTE
train_data_array = train_data.withColumn("features", vector_to_array("features"))
train_data_pd = train_data_array.toPandas()
X = train_data_pd["features"].to_numpy()
y = train_data_pd[TARGET_COL].to_numpy()
print("Original dataset shape %s" % Counter(y))
X = np.array([np.array(x) for x in X])
sm = SMOTE(random_state=42)
X_res, y_res = sm.fit_resample(X, y)
print("Resampled dataset shape %s" % Counter(y_res))
new_train_data = tuple(zip(X_res.tolist(), y_res.tolist()))
dataColumns = ["features", TARGET_COL]
new_train_data = spark.createDataFrame(data=new_train_data, schema=dataColumns)
new_train_data = new_train_data.withColumn("features", array_to_vector("features"))
Définir le modèle
Une fois nos données en place, nous pouvons maintenant définir le modèle. Nous allons utiliser un classifieur LightGBM et utiliser SynapseML pour implémenter le modèle avec quelques lignes de code.
from synapse.ml.lightgbm import LightGBMClassifier
model = LightGBMClassifier(
objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=True
)
smote_model = LightGBMClassifier(
objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=False
)
Entraîner le modèle
model = model.fit(train_data)
smote_model = smote_model.fit(new_train_data)
Expliquer le modèle
Ici, nous pouvons montrer l’importance que le modèle attribue à chaque fonctionnalité dans les données d’entraînement.
import pandas as pd
import matplotlib.pyplot as plt
feature_importances = model.getFeatureImportances()
fi = pd.Series(feature_importances, index=feature_cols)
fi = fi.sort_values(ascending=True)
f_index = fi.index
f_values = fi.values
# print feature importances
print("f_index:", f_index)
print("f_values:", f_values)
# plot
x_index = list(range(len(fi)))
x_index = [x / len(fi) for x in x_index]
plt.rcParams["figure.figsize"] = (20, 20)
plt.barh(
x_index, f_values, height=0.028, align="center", color="tan", tick_label=f_index
)
plt.xlabel("importances")
plt.ylabel("features")
plt.show()
Évaluer le modèle
Générer des prédictions de modèle :
predictions = model.transform(test_data)
predictions.limit(10).toPandas()
Afficher les métriques du modèle :
from synapse.ml.train import ComputeModelStatistics
metrics = ComputeModelStatistics(
evaluationMetric="classification", labelCol=TARGET_COL, scoredLabelsCol="prediction"
).transform(predictions)
display(metrics)
Créez une matrice de confusion :
# collect confusion matrix value
cm = metrics.select("confusion_matrix").collect()[0][0].toArray()
print(cm)
Tracer la matrice de confusion :
# plot confusion matrix
import seaborn as sns
sns.set(rc={"figure.figsize": (6, 4.5)})
ax = sns.heatmap(cm, annot=True, fmt=".20g")
ax.set_title("Confusion Matrix")
ax.set_xlabel("Predicted label")
ax.set_ylabel("True label")
Définissez une fonction pour évaluer le modèle :
from pyspark.ml.evaluation import BinaryClassificationEvaluator
def evaluate(predictions):
"""
Evaluate the model by computing AUROC and AUPRC with the predictions.
"""
# initialize the binary evaluator
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="prediction", labelCol=TARGET_COL
)
_evaluator = lambda metric: evaluator.setMetricName(metric).evaluate(predictions)
# calculate AUROC, baseline 0.5
auroc = _evaluator("areaUnderROC")
print(f"AUROC: {auroc:.4f}")
# calculate AUPRC, baseline positive rate (0.172% in the demo data)
auprc = _evaluator("areaUnderPR")
print(f"AUPRC: {auprc:.4f}")
return auroc, auprc
Évaluez le modèle d’origine :
# evaluate the original model
auroc, auprc = evaluate(predictions)
Évaluez le modèle SMOTE :
# evaluate the SMOTE model
new_predictions = smote_model.transform(test_data)
new_auroc, new_auprc = evaluate(new_predictions)
if new_auprc > auprc:
# Using model trained on SMOTE data if it has higher AUPRC
model = smote_model
auprc = new_auprc
auroc = new_auroc
Journaliser et charger le modèle avec MLflow
Maintenant que nous avons un modèle de travail décent, nous pouvons l’enregistrer pour une utilisation ultérieure. Ici, nous utilisons MLflow pour journaliser les métriques et les modèles, et charger les modèles à des fins de prédiction.
Configurer MLflow :
# setup mlflow
import mlflow
mlflow.set_experiment(EXPERIMENT_NAME)
Modèle de journal, métriques et paramètres :
# log model, metrics and params
with mlflow.start_run() as run:
print("log model:")
mlflow.spark.log_model(
model,
f"{EXPERIMENT_NAME}-lightgbm",
registered_model_name=f"{EXPERIMENT_NAME}-lightgbm",
dfs_tmpdir="Files/spark",
)
print("log metrics:")
mlflow.log_metrics({"AUPRC": auprc, "AUROC": auroc})
print("log parameters:")
mlflow.log_params({"DATA_FILE": DATA_FILE})
model_uri = f"runs:/{run.info.run_id}/{EXPERIMENT_NAME}-lightgbm"
print("Model saved in run %s" % run.info.run_id)
print(f"Model URI: {model_uri}")
Rechargez le modèle :
# load model back
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir="Files/spark")
Étape 4. Enregistrer les résultats de prédiction
Dans cette section, nous allons déployer le modèle et enregistrer les résultats de prédiction.
Déploiement et prédiction de modèle
batch_predictions = loaded_model.transform(test_data)
Enregistrez les prédictions dans le Lakehouse :
# code for saving predictions into lakehouse
batch_predictions.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/batch_predictions"
)
print(f"Full run cost {int(time.time() - ts)} seconds.")