Creare, valutare e distribuire un modello di rilevamento delle frodi in Microsoft Fabric

Questa esercitazione illustra i flussi di lavoro di data engineering e data science con un esempio end-to-end che compila un modello per rilevare transazioni con carta di credito fraudolenta. I passaggi da eseguire sono:

  • Caricare i dati in una Lakehouse
  • Eseguire analisi esplorativa dei dati sui dati
  • Preparare i dati gestendo lo squilibrio della classe
  • Eseguire il training di un modello e registrarlo con MLflow
  • Distribuire il modello e salvare i risultati della stima

Importante

Microsoft Fabric è in anteprima.

Prerequisiti

  • Passare all'esperienza di Data Science in Microsoft Fabric.
  • Aprire il notebook di esempio o creare un nuovo notebook.
    • Creare un nuovo notebook se si vuole copiare/incollare codice nelle celle.
    • In alternativa, selezionare Usa unrilevamento di frodi di esempio > per aprire il notebook di esempio.
  • Aggiungere un lakehouse al notebook.

Passaggio 1: Caricare i dati

Il set di dati contiene transazioni con carta di credito effettuate dai titolari di carte europee nel settembre 2013 nel corso di due giorni. Oltre 284.807 transazioni, 492 sono fraudolente. La classe positiva (frode) rappresenta un semplice 0,172% dei dati, rendendo così il set di dati altamente sbilanciato.

Variabili di input e risposta

Il set di dati contiene solo variabili di input numeriche, che sono il risultato di una trasformazione PCA (Principal Component Analysis) sulle funzionalità originali. Per proteggere la riservatezza, non è possibile fornire le funzionalità originali o altre informazioni in background sui dati. Le uniche funzionalità che non sono state trasformate con PCA sono "Time" e "Amount".

  • Funzionalità "V1, V2, ... V28" sono i componenti principali ottenuti con PCA.
  • "Time" contiene i secondi trascorsi tra ogni transazione e la prima transazione nel set di dati.
  • "Importo" è l'importo della transazione. Questa funzionalità può essere usata per l'apprendimento dipendente dai costi dipendenti.
  • "Class" è la variabile di risposta e accetta il valore 1 per le frodi e 0 in caso contrario.

Dato il rapporto di squilibrio della classe, è consigliabile misurare l'accuratezza usando l'area sotto la curva Precision-Recall (AUPRC). L'uso di una matrice di confusione per valutare l'accuratezza non è significativa per la classificazione non bilanciata.

Il frammento di codice seguente mostra una parte dei dati creditcard.csv .

"Time" "V1" "V2" "V3" "V4" "V5" "V6" "V7" "V8" "V9" "V10" "V11" "V12" "V13" "V14" "V15" "V16" "V17" "V18" "V19" "V20" "V21" "V22" "V23" "V24" "V25" "V26" "V27" "V28" "Importo" "Classe"
0 -1.3598071336738 -0.0727811733098497 2.53634673796914 1.37815522427443 -0.338320769942518 0.462387777762292 0.239598554061257 0.0986979012610507 0.363786969611213 0.0907941719789316 -0.551599533260813 -0.617800855762348 -0.991389847235408 -0.311169353699879 1.46817697209427 -0.470400525259478 0.207971241929242 0.0257905801985591 0.403992960255733 0.251412098239705 -0.018306777944153 0.277837575558899 -0.110473910188767 0.0669280749146731 0.128539358273528 -0.189114843888824 0.133558376740387 -0.0210530534538215 149.62 "0"
0 1.19185711131486 0.26615071205963 0.16648011335321 0.448154078460911 0.0600176492822243 -0.0823608088155687 -0.0788029833323113 0.0851016549148104 -0.255425128109186 -0.166974414004614 1.61272666105479 1.06523531137287 0.48909501589608 -0.143772296441519 0.635558093258208 0.463917041022171 -0.114804663102346 -0.183361270123994 -0.145783041325259 -0.0690831352230203 -0.225775248033138 -0.638671952771851 0.101288021253234 -0.339846475529127 0.167170404418143 0.125894532368176 -0.00898309914322813 0.0147241691924927 2.69 "0"

Installare le librerie

Per questa esercitazione, è necessario installare la imblearn libreria. Il kernel PySpark verrà riavviato dopo l'esecuzione %pip installdi , quindi è necessario installare la libreria prima di eseguire qualsiasi altra cella.

# install imblearn for SMOTE
%pip install imblearn

Definendo i parametri seguenti, è possibile applicare facilmente il notebook a set di dati diversi.

IS_CUSTOM_DATA = False  # if True, dataset has to be uploaded manually

TARGET_COL = "Class"  # target column name
IS_SAMPLE = False  # if True, use only <SAMPLE_ROWS> rows of data for training, otherwise use all data
SAMPLE_ROWS = 5000  # if IS_SAMPLE is True, use only this number of rows for training

DATA_FOLDER = "Files/fraud-detection/"  # folder with data files
DATA_FILE = "creditcard.csv"  # data file name

EXPERIMENT_NAME = "aisample-fraud"  # mlflow experiment name

Scaricare il set di dati e caricarlo in un lakehouse

Prima di eseguire il notebook, è necessario aggiungervi una Lakehouse. Lakehouse viene usato per archiviare i dati per questo esempio. Per aggiungere una Lakehouse, vedere Aggiungere un Lakehouse al notebook.

if not IS_CUSTOM_DATA:
    # Download demo data files into lakehouse if not exist
    import os, requests

    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/Credit_Card_Fraud_Detection"
    fname = "creditcard.csv"
    download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError(
            "Default lakehouse not found, please add a lakehouse and restart the session."
        )
    os.makedirs(download_path, exist_ok=True)
    if not os.path.exists(f"{download_path}/{fname}"):
        r = requests.get(f"{remote_url}/{fname}", timeout=30)
        with open(f"{download_path}/{fname}", "wb") as f:
            f.write(r.content)
    print("Downloaded demo data files into lakehouse.")
# to record the notebook running time
import time

ts = time.time()

Leggere i dati da Lakehouse

df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", True)
    .load(f"{DATA_FOLDER}/raw/{DATA_FILE}")
    .cache()
)

Passaggio 2. Eseguire l'analisi esplorativa dei dati

In questa sezione verranno esaminati i dati, ne verrà controllato lo schema, riordinare le colonne ed eseguire il cast delle colonne nei tipi di dati corretti.

Visualizzare i dati non elaborati

È possibile usare display per esplorare i dati non elaborati, calcolare alcune statistiche di base o persino visualizzare le visualizzazioni del grafico.

display(df)

Stampare alcune informazioni sui dati, ad esempio lo schema.

# print dataset basic info
print("records read: " + str(df.count()))
print("Schema: ")
df.printSchema()

Eseguire il cast delle colonne nei tipi corretti

import pyspark.sql.functions as F

df_columns = df.columns
df_columns.remove(TARGET_COL)

# to make sure the TARGET_COL is the last column
df = df.select(df_columns + [TARGET_COL]).withColumn(
    TARGET_COL, F.col(TARGET_COL).cast("int")
)

if IS_SAMPLE:
    df = df.limit(SAMPLE_ROWS)

Passaggio 3. Sviluppare e distribuire un modello

In questa sezione verrà eseguito il training di un modello LightGBM per classificare le transazioni fraudolente.

Preparare i dati di training e test

Iniziare suddividendo i dati in set di training e test.

# Split the dataset into train and test
train, test = df.randomSplit([0.85, 0.15], seed=42)
# Merge Columns
from pyspark.ml.feature import VectorAssembler

feature_cols = df.columns[:-1]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = featurizer.transform(train)[TARGET_COL, "features"]
test_data = featurizer.transform(test)[TARGET_COL, "features"]

Controllare il volume dei dati e lo squilibrio nel set di training.

display(train_data.groupBy(TARGET_COL).count())

Gestire lo sbilanciamento dei dati

Come accade spesso con i dati reali, questi dati presentano un problema di squilibrio di classe, poiché la classe positiva (transazioni fraudolente) rappresenta solo lo 0,172% di tutte le transazioni. Si applicherà SMOTE (tecnica di over-campionamento delle minoranze sintetiche) per gestire automaticamente lo squilibrio delle classi nei dati. Il metodo SMOTE sovracampiona la classe di minoranza e sottocampiona la classe di maggioranza per migliorare le prestazioni del classificatore.

Applicare SMOTE ai dati di training:

Nota

imblearn funziona solo per i dataframe pandas, non per i dataframe PySpark.

from pyspark.ml.functions import vector_to_array, array_to_vector
import numpy as np
from collections import Counter
from imblearn.over_sampling import SMOTE

train_data_array = train_data.withColumn("features", vector_to_array("features"))

train_data_pd = train_data_array.toPandas()

X = train_data_pd["features"].to_numpy()
y = train_data_pd[TARGET_COL].to_numpy()
print("Original dataset shape %s" % Counter(y))

X = np.array([np.array(x) for x in X])

sm = SMOTE(random_state=42)
X_res, y_res = sm.fit_resample(X, y)
print("Resampled dataset shape %s" % Counter(y_res))

new_train_data = tuple(zip(X_res.tolist(), y_res.tolist()))
dataColumns = ["features", TARGET_COL]
new_train_data = spark.createDataFrame(data=new_train_data, schema=dataColumns)
new_train_data = new_train_data.withColumn("features", array_to_vector("features"))

Definire il modello

Con i dati disponibili, è ora possibile definire il modello. Si userà un classificatore LightGBM e si userà SynapseML per implementare il modello con poche righe di codice.

from synapse.ml.lightgbm import LightGBMClassifier

model = LightGBMClassifier(
    objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=True
)
smote_model = LightGBMClassifier(
    objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=False
)

Eseguire il training del modello

model = model.fit(train_data)
smote_model = smote_model.fit(new_train_data)

Spiegare il modello

Qui è possibile mostrare l'importanza che il modello assegna a ogni funzionalità nei dati di training.

import pandas as pd
import matplotlib.pyplot as plt

feature_importances = model.getFeatureImportances()
fi = pd.Series(feature_importances, index=feature_cols)
fi = fi.sort_values(ascending=True)
f_index = fi.index
f_values = fi.values

# print feature importances
print("f_index:", f_index)
print("f_values:", f_values)

# plot
x_index = list(range(len(fi)))
x_index = [x / len(fi) for x in x_index]
plt.rcParams["figure.figsize"] = (20, 20)
plt.barh(
    x_index, f_values, height=0.028, align="center", color="tan", tick_label=f_index
)
plt.xlabel("importances")
plt.ylabel("features")
plt.show()

Valutare il modello

Generare stime del modello:

predictions = model.transform(test_data)
predictions.limit(10).toPandas()

Visualizzare le metriche del modello:

from synapse.ml.train import ComputeModelStatistics

metrics = ComputeModelStatistics(
    evaluationMetric="classification", labelCol=TARGET_COL, scoredLabelsCol="prediction"
).transform(predictions)
display(metrics)

Creare una matrice di confusione:

# collect confusion matrix value
cm = metrics.select("confusion_matrix").collect()[0][0].toArray()
print(cm)

Tracciare la matrice di confusione:

# plot confusion matrix
import seaborn as sns

sns.set(rc={"figure.figsize": (6, 4.5)})
ax = sns.heatmap(cm, annot=True, fmt=".20g")
ax.set_title("Confusion Matrix")
ax.set_xlabel("Predicted label")
ax.set_ylabel("True label")

Definire una funzione per valutare il modello:

from pyspark.ml.evaluation import BinaryClassificationEvaluator


def evaluate(predictions):
    """
    Evaluate the model by computing AUROC and AUPRC with the predictions.
    """

    # initialize the binary evaluator
    evaluator = BinaryClassificationEvaluator(
        rawPredictionCol="prediction", labelCol=TARGET_COL
    )

    _evaluator = lambda metric: evaluator.setMetricName(metric).evaluate(predictions)

    # calculate AUROC, baseline 0.5
    auroc = _evaluator("areaUnderROC")
    print(f"AUROC: {auroc:.4f}")

    # calculate AUPRC, baseline positive rate (0.172% in the demo data)
    auprc = _evaluator("areaUnderPR")
    print(f"AUPRC: {auprc:.4f}")

    return auroc, auprc

Valutare il modello originale:

# evaluate the original model
auroc, auprc = evaluate(predictions)

Valutare il modello SMOTE:

# evaluate the SMOTE model
new_predictions = smote_model.transform(test_data)
new_auroc, new_auprc = evaluate(new_predictions)
if new_auprc > auprc:
    # Using model trained on SMOTE data if it has higher AUPRC
    model = smote_model
    auprc = new_auprc
    auroc = new_auroc

Registrare e caricare il modello con MLflow

Ora che è disponibile un modello funzionante decente, è possibile salvarlo per usarlo in un secondo momento. In questo caso si usa MLflow per registrare metriche e modelli e caricare nuovamente i modelli per la stima.

Configurare MLflow:

# setup mlflow
import mlflow

mlflow.set_experiment(EXPERIMENT_NAME)

Modello di log, metriche e parametri:

# log model, metrics and params
with mlflow.start_run() as run:
    print("log model:")
    mlflow.spark.log_model(
        model,
        f"{EXPERIMENT_NAME}-lightgbm",
        registered_model_name=f"{EXPERIMENT_NAME}-lightgbm",
        dfs_tmpdir="Files/spark",
    )

    print("log metrics:")
    mlflow.log_metrics({"AUPRC": auprc, "AUROC": auroc})

    print("log parameters:")
    mlflow.log_params({"DATA_FILE": DATA_FILE})

    model_uri = f"runs:/{run.info.run_id}/{EXPERIMENT_NAME}-lightgbm"
    print("Model saved in run %s" % run.info.run_id)
    print(f"Model URI: {model_uri}")

Ricaricare il modello:

# load model back
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir="Files/spark")

Passaggio 4. Salvare i risultati della stima

In questa sezione si distribuirà il modello e si salveranno i risultati della stima.

Distribuzione e stima del modello

batch_predictions = loaded_model.transform(test_data)

Salvare le stime in Lakehouse:

# code for saving predictions into lakehouse
batch_predictions.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions"
)
print(f"Full run cost {int(time.time() - ts)} seconds.")

Passaggi successivi