Creare, valutare e distribuire un modello di rilevamento delle frodi in Microsoft Fabric
Questa esercitazione illustra i flussi di lavoro di data engineering e data science con un esempio end-to-end che compila un modello per rilevare transazioni con carta di credito fraudolenta. I passaggi da eseguire sono:
- Caricare i dati in una Lakehouse
- Eseguire analisi esplorativa dei dati sui dati
- Preparare i dati gestendo lo squilibrio della classe
- Eseguire il training di un modello e registrarlo con MLflow
- Distribuire il modello e salvare i risultati della stima
Importante
Microsoft Fabric è in anteprima.
Prerequisiti
Sottoscrizione Power BI Premium. Se non ne hai uno, vedi Come acquistare Power BI Premium.
Un'area di lavoro di Power BI con capacità Premium assegnata. Se non si dispone di un'area di lavoro, usare la procedura descritta in Creare un'area di lavoro per crearne una e assegnarla a una capacità Premium.
Accedere a Microsoft Fabric.
- Passare all'esperienza di Data Science in Microsoft Fabric.
- Aprire il notebook di esempio o creare un nuovo notebook.
- Creare un nuovo notebook se si vuole copiare/incollare codice nelle celle.
- In alternativa, selezionare Usa unrilevamento di frodi di esempio > per aprire il notebook di esempio.
- Aggiungere un lakehouse al notebook.
Passaggio 1: Caricare i dati
Il set di dati contiene transazioni con carta di credito effettuate dai titolari di carte europee nel settembre 2013 nel corso di due giorni. Oltre 284.807 transazioni, 492 sono fraudolente. La classe positiva (frode) rappresenta un semplice 0,172% dei dati, rendendo così il set di dati altamente sbilanciato.
Variabili di input e risposta
Il set di dati contiene solo variabili di input numeriche, che sono il risultato di una trasformazione PCA (Principal Component Analysis) sulle funzionalità originali. Per proteggere la riservatezza, non è possibile fornire le funzionalità originali o altre informazioni in background sui dati. Le uniche funzionalità che non sono state trasformate con PCA sono "Time" e "Amount".
- Funzionalità "V1, V2, ... V28" sono i componenti principali ottenuti con PCA.
- "Time" contiene i secondi trascorsi tra ogni transazione e la prima transazione nel set di dati.
- "Importo" è l'importo della transazione. Questa funzionalità può essere usata per l'apprendimento dipendente dai costi dipendenti.
- "Class" è la variabile di risposta e accetta il valore
1
per le frodi e0
in caso contrario.
Dato il rapporto di squilibrio della classe, è consigliabile misurare l'accuratezza usando l'area sotto la curva Precision-Recall (AUPRC). L'uso di una matrice di confusione per valutare l'accuratezza non è significativa per la classificazione non bilanciata.
Il frammento di codice seguente mostra una parte dei dati creditcard.csv .
"Time" | "V1" | "V2" | "V3" | "V4" | "V5" | "V6" | "V7" | "V8" | "V9" | "V10" | "V11" | "V12" | "V13" | "V14" | "V15" | "V16" | "V17" | "V18" | "V19" | "V20" | "V21" | "V22" | "V23" | "V24" | "V25" | "V26" | "V27" | "V28" | "Importo" | "Classe" |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | -1.3598071336738 | -0.0727811733098497 | 2.53634673796914 | 1.37815522427443 | -0.338320769942518 | 0.462387777762292 | 0.239598554061257 | 0.0986979012610507 | 0.363786969611213 | 0.0907941719789316 | -0.551599533260813 | -0.617800855762348 | -0.991389847235408 | -0.311169353699879 | 1.46817697209427 | -0.470400525259478 | 0.207971241929242 | 0.0257905801985591 | 0.403992960255733 | 0.251412098239705 | -0.018306777944153 | 0.277837575558899 | -0.110473910188767 | 0.0669280749146731 | 0.128539358273528 | -0.189114843888824 | 0.133558376740387 | -0.0210530534538215 | 149.62 | "0" |
0 | 1.19185711131486 | 0.26615071205963 | 0.16648011335321 | 0.448154078460911 | 0.0600176492822243 | -0.0823608088155687 | -0.0788029833323113 | 0.0851016549148104 | -0.255425128109186 | -0.166974414004614 | 1.61272666105479 | 1.06523531137287 | 0.48909501589608 | -0.143772296441519 | 0.635558093258208 | 0.463917041022171 | -0.114804663102346 | -0.183361270123994 | -0.145783041325259 | -0.0690831352230203 | -0.225775248033138 | -0.638671952771851 | 0.101288021253234 | -0.339846475529127 | 0.167170404418143 | 0.125894532368176 | -0.00898309914322813 | 0.0147241691924927 | 2.69 | "0" |
Installare le librerie
Per questa esercitazione, è necessario installare la imblearn
libreria. Il kernel PySpark verrà riavviato dopo l'esecuzione %pip install
di , quindi è necessario installare la libreria prima di eseguire qualsiasi altra cella.
# install imblearn for SMOTE
%pip install imblearn
Definendo i parametri seguenti, è possibile applicare facilmente il notebook a set di dati diversi.
IS_CUSTOM_DATA = False # if True, dataset has to be uploaded manually
TARGET_COL = "Class" # target column name
IS_SAMPLE = False # if True, use only <SAMPLE_ROWS> rows of data for training, otherwise use all data
SAMPLE_ROWS = 5000 # if IS_SAMPLE is True, use only this number of rows for training
DATA_FOLDER = "Files/fraud-detection/" # folder with data files
DATA_FILE = "creditcard.csv" # data file name
EXPERIMENT_NAME = "aisample-fraud" # mlflow experiment name
Scaricare il set di dati e caricarlo in un lakehouse
Prima di eseguire il notebook, è necessario aggiungervi una Lakehouse. Lakehouse viene usato per archiviare i dati per questo esempio. Per aggiungere una Lakehouse, vedere Aggiungere un Lakehouse al notebook.
if not IS_CUSTOM_DATA:
# Download demo data files into lakehouse if not exist
import os, requests
remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/Credit_Card_Fraud_Detection"
fname = "creditcard.csv"
download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError(
"Default lakehouse not found, please add a lakehouse and restart the session."
)
os.makedirs(download_path, exist_ok=True)
if not os.path.exists(f"{download_path}/{fname}"):
r = requests.get(f"{remote_url}/{fname}", timeout=30)
with open(f"{download_path}/{fname}", "wb") as f:
f.write(r.content)
print("Downloaded demo data files into lakehouse.")
# to record the notebook running time
import time
ts = time.time()
Leggere i dati da Lakehouse
df = (
spark.read.format("csv")
.option("header", "true")
.option("inferSchema", True)
.load(f"{DATA_FOLDER}/raw/{DATA_FILE}")
.cache()
)
Passaggio 2. Eseguire l'analisi esplorativa dei dati
In questa sezione verranno esaminati i dati, ne verrà controllato lo schema, riordinare le colonne ed eseguire il cast delle colonne nei tipi di dati corretti.
Visualizzare i dati non elaborati
È possibile usare display
per esplorare i dati non elaborati, calcolare alcune statistiche di base o persino visualizzare le visualizzazioni del grafico.
display(df)
Stampare alcune informazioni sui dati, ad esempio lo schema.
# print dataset basic info
print("records read: " + str(df.count()))
print("Schema: ")
df.printSchema()
Eseguire il cast delle colonne nei tipi corretti
import pyspark.sql.functions as F
df_columns = df.columns
df_columns.remove(TARGET_COL)
# to make sure the TARGET_COL is the last column
df = df.select(df_columns + [TARGET_COL]).withColumn(
TARGET_COL, F.col(TARGET_COL).cast("int")
)
if IS_SAMPLE:
df = df.limit(SAMPLE_ROWS)
Passaggio 3. Sviluppare e distribuire un modello
In questa sezione verrà eseguito il training di un modello LightGBM per classificare le transazioni fraudolente.
Preparare i dati di training e test
Iniziare suddividendo i dati in set di training e test.
# Split the dataset into train and test
train, test = df.randomSplit([0.85, 0.15], seed=42)
# Merge Columns
from pyspark.ml.feature import VectorAssembler
feature_cols = df.columns[:-1]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = featurizer.transform(train)[TARGET_COL, "features"]
test_data = featurizer.transform(test)[TARGET_COL, "features"]
Controllare il volume dei dati e lo squilibrio nel set di training.
display(train_data.groupBy(TARGET_COL).count())
Gestire lo sbilanciamento dei dati
Come accade spesso con i dati reali, questi dati presentano un problema di squilibrio di classe, poiché la classe positiva (transazioni fraudolente) rappresenta solo lo 0,172% di tutte le transazioni. Si applicherà SMOTE (tecnica di over-campionamento delle minoranze sintetiche) per gestire automaticamente lo squilibrio delle classi nei dati. Il metodo SMOTE sovracampiona la classe di minoranza e sottocampiona la classe di maggioranza per migliorare le prestazioni del classificatore.
Applicare SMOTE ai dati di training:
Nota
imblearn
funziona solo per i dataframe pandas, non per i dataframe PySpark.
from pyspark.ml.functions import vector_to_array, array_to_vector
import numpy as np
from collections import Counter
from imblearn.over_sampling import SMOTE
train_data_array = train_data.withColumn("features", vector_to_array("features"))
train_data_pd = train_data_array.toPandas()
X = train_data_pd["features"].to_numpy()
y = train_data_pd[TARGET_COL].to_numpy()
print("Original dataset shape %s" % Counter(y))
X = np.array([np.array(x) for x in X])
sm = SMOTE(random_state=42)
X_res, y_res = sm.fit_resample(X, y)
print("Resampled dataset shape %s" % Counter(y_res))
new_train_data = tuple(zip(X_res.tolist(), y_res.tolist()))
dataColumns = ["features", TARGET_COL]
new_train_data = spark.createDataFrame(data=new_train_data, schema=dataColumns)
new_train_data = new_train_data.withColumn("features", array_to_vector("features"))
Definire il modello
Con i dati disponibili, è ora possibile definire il modello. Si userà un classificatore LightGBM e si userà SynapseML per implementare il modello con poche righe di codice.
from synapse.ml.lightgbm import LightGBMClassifier
model = LightGBMClassifier(
objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=True
)
smote_model = LightGBMClassifier(
objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=False
)
Eseguire il training del modello
model = model.fit(train_data)
smote_model = smote_model.fit(new_train_data)
Spiegare il modello
Qui è possibile mostrare l'importanza che il modello assegna a ogni funzionalità nei dati di training.
import pandas as pd
import matplotlib.pyplot as plt
feature_importances = model.getFeatureImportances()
fi = pd.Series(feature_importances, index=feature_cols)
fi = fi.sort_values(ascending=True)
f_index = fi.index
f_values = fi.values
# print feature importances
print("f_index:", f_index)
print("f_values:", f_values)
# plot
x_index = list(range(len(fi)))
x_index = [x / len(fi) for x in x_index]
plt.rcParams["figure.figsize"] = (20, 20)
plt.barh(
x_index, f_values, height=0.028, align="center", color="tan", tick_label=f_index
)
plt.xlabel("importances")
plt.ylabel("features")
plt.show()
Valutare il modello
Generare stime del modello:
predictions = model.transform(test_data)
predictions.limit(10).toPandas()
Visualizzare le metriche del modello:
from synapse.ml.train import ComputeModelStatistics
metrics = ComputeModelStatistics(
evaluationMetric="classification", labelCol=TARGET_COL, scoredLabelsCol="prediction"
).transform(predictions)
display(metrics)
Creare una matrice di confusione:
# collect confusion matrix value
cm = metrics.select("confusion_matrix").collect()[0][0].toArray()
print(cm)
Tracciare la matrice di confusione:
# plot confusion matrix
import seaborn as sns
sns.set(rc={"figure.figsize": (6, 4.5)})
ax = sns.heatmap(cm, annot=True, fmt=".20g")
ax.set_title("Confusion Matrix")
ax.set_xlabel("Predicted label")
ax.set_ylabel("True label")
Definire una funzione per valutare il modello:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
def evaluate(predictions):
"""
Evaluate the model by computing AUROC and AUPRC with the predictions.
"""
# initialize the binary evaluator
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="prediction", labelCol=TARGET_COL
)
_evaluator = lambda metric: evaluator.setMetricName(metric).evaluate(predictions)
# calculate AUROC, baseline 0.5
auroc = _evaluator("areaUnderROC")
print(f"AUROC: {auroc:.4f}")
# calculate AUPRC, baseline positive rate (0.172% in the demo data)
auprc = _evaluator("areaUnderPR")
print(f"AUPRC: {auprc:.4f}")
return auroc, auprc
Valutare il modello originale:
# evaluate the original model
auroc, auprc = evaluate(predictions)
Valutare il modello SMOTE:
# evaluate the SMOTE model
new_predictions = smote_model.transform(test_data)
new_auroc, new_auprc = evaluate(new_predictions)
if new_auprc > auprc:
# Using model trained on SMOTE data if it has higher AUPRC
model = smote_model
auprc = new_auprc
auroc = new_auroc
Registrare e caricare il modello con MLflow
Ora che è disponibile un modello funzionante decente, è possibile salvarlo per usarlo in un secondo momento. In questo caso si usa MLflow per registrare metriche e modelli e caricare nuovamente i modelli per la stima.
Configurare MLflow:
# setup mlflow
import mlflow
mlflow.set_experiment(EXPERIMENT_NAME)
Modello di log, metriche e parametri:
# log model, metrics and params
with mlflow.start_run() as run:
print("log model:")
mlflow.spark.log_model(
model,
f"{EXPERIMENT_NAME}-lightgbm",
registered_model_name=f"{EXPERIMENT_NAME}-lightgbm",
dfs_tmpdir="Files/spark",
)
print("log metrics:")
mlflow.log_metrics({"AUPRC": auprc, "AUROC": auroc})
print("log parameters:")
mlflow.log_params({"DATA_FILE": DATA_FILE})
model_uri = f"runs:/{run.info.run_id}/{EXPERIMENT_NAME}-lightgbm"
print("Model saved in run %s" % run.info.run_id)
print(f"Model URI: {model_uri}")
Ricaricare il modello:
# load model back
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir="Files/spark")
Passaggio 4. Salvare i risultati della stima
In questa sezione si distribuirà il modello e si salveranno i risultati della stima.
Distribuzione e stima del modello
batch_predictions = loaded_model.transform(test_data)
Salvare le stime in Lakehouse:
# code for saving predictions into lakehouse
batch_predictions.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/batch_predictions"
)
print(f"Full run cost {int(time.time() - ts)} seconds.")