Opprett, evaluer og distribuer en svindelregistreringsmodell i Microsoft Fabric

I denne opplæringen demonstrerer vi arbeidsflyter for datateknikk og datavitenskap med et ende-til-ende-eksempel som bygger en modell for å oppdage falske kredittkorttransaksjoner. Trinnene du skal utføre er:

  • Laste opp dataene til et Lakehouse
  • Utfør utforskende dataanalyse på dataene
  • Klargjøre dataene ved å håndtere ubalanse i klassen
  • Kalibrer en modell og logg den med MLflow
  • Distribuer modellen og lagre prognoseresultater

Viktig

Microsoft Fabric er for øyeblikket i FORHÅNDSVERSJON. Denne informasjonen er knyttet til et forhåndsutgitt produkt som kan endres vesentlig før det utgis. Microsoft gir ingen garantier, uttrykt eller underforstått, med hensyn til informasjonen som er oppgitt her.

Forutsetninger

  • Gå til Data Science-opplevelsen i Microsoft Fabric.
  • Åpne eksempelnotatblokken eller opprett en ny notatblokk.
    • Opprett en ny notatblokk hvis du vil kopiere/lime inn kode i celler.
    • Eller velg Bruk en eksempelregistrering> avsvindel for å åpne eksempelnotatblokken.
  • Legg til et Lakehouse i notatblokken.

Trinn 1: Laste inn dataene

Datasettet inneholder kredittkorttransaksjoner gjort av europeiske kortholdere i september 2013 i løpet av to dager. Av 284 807 transaksjoner er 492 falske. Den positive klassen (svindel) utgjør bare 0,172 % av dataene, og gjør dermed datasettet svært ubalansert.

Inndata- og svarvariabler

Datasettet inneholder bare numeriske inndatavariabler, som er resultatet av en PCA-transformasjon (Principal Component Analysis) på de opprinnelige funksjonene. For å beskytte konfidensialiteten kan vi ikke oppgi de opprinnelige funksjonene eller mer bakgrunnsinformasjon om dataene. De eneste funksjonene som ikke har blitt transformert med PCA, er «Tid» og «Beløp».

  • Funksjoner "V1, V2, ... V28" er hovedkomponentene som hentes med PCA.
  • «Klokkeslett» inneholder sekundene som er brukt mellom hver transaksjon og den første transaksjonen i datasettet.
  • "Beløp" er transaksjonsbeløpet. Denne funksjonen kan brukes for eksempel avhengig kostnadssensitiv læring.
  • «Klasse» er svarvariabelen, og den tar verdien 1 for svindel og 0 ellers.

Gitt forholdet mellom ubalanser i klassen, anbefaler vi at du måler nøyaktigheten ved hjelp av området under Precision-Recall kurve (AUPRC). Å bruke en forvirringsmatrise til å evaluere nøyaktighet er ikke meningsfylt for ubalansert klassifisering.

Følgende kodesnutt viser en del av creditcard.csv data.

"Tid" "V1" "V2" "V3" "V4" "V5" "V6" "V7" "V8" "V9" "V10" "V11" "V12" "V13" "V14" "V15" "V16" "V17" "V18" "V19" "V20" "V21" "V22" "V23" "V24" "V25" "V26" "V27" "V28" "Beløp" "Klasse"
0 -1.3598071336738 -0.0727811733098497 2.53634673796914 1.37815522427443 -0.338320769942518 0.462387777762292 0.239598554061257 0.0986979012610507 0.363786969611213 0.0907941719789316 -0.551599533260813 -0.617800855762348 -0.991389847235408 -0.311169353699879 1.46817697209427 -0.470400525259478 0.207971241929242 0.0257905801985591 0.403992960255733 0.251412098239705 -0.018306777944153 0.277837575558899 -0.110473910188767 0.0669280749146731 0.128539358273528 -0.189114843888824 0.133558376740387 -0.0210530534538215 149.62 "0"
0 1.19185711131486 0.26615071205963 0.16648011335321 0.448154078460911 0.0600176492822243 -0.0823608088155687 -0.0788029833323113 0.0851016549148104 -0.255425128109186 -0.166974414004614 1.61272666105479 1.06523531137287 0.48909501589608 -0.143772296441519 0.635558093258208 0.463917041022171 -0.114804663102346 -0.183361270123994 -0.145783041325259 -0.0690831352230203 -0.225775248033138 -0.638671952771851 0.101288021253234 -0.339846475529127 0.167170404418143 0.125894532368176 -0.00898309914322813 0.0147241691924927 2.69 "0"

Installer biblioteker

I denne opplæringen må vi installere imblearn biblioteket. PySpark-kjernen startes på nytt etter kjøring %pip install, og derfor må vi installere biblioteket før vi kjører andre celler.

# install imblearn for SMOTE
%pip install imblearn

Ved å definere følgende parametere kan vi enkelt bruke notatblokken på forskjellige datasett.

IS_CUSTOM_DATA = False  # if True, dataset has to be uploaded manually

TARGET_COL = "Class"  # target column name
IS_SAMPLE = False  # if True, use only <SAMPLE_ROWS> rows of data for training, otherwise use all data
SAMPLE_ROWS = 5000  # if IS_SAMPLE is True, use only this number of rows for training

DATA_FOLDER = "Files/fraud-detection/"  # folder with data files
DATA_FILE = "creditcard.csv"  # data file name

EXPERIMENT_NAME = "aisample-fraud"  # mlflow experiment name

Laste ned datasettet og laste opp til et Lakehouse

Før du kjører notatblokken, må du legge til en Lakehouse i den. Lakehouse brukes til å lagre dataene for dette eksemplet. Hvis du vil legge til et Lakehouse, kan du se Legge til en Lakehouse i notatblokken.

if not IS_CUSTOM_DATA:
    # Download demo data files into lakehouse if not exist
    import os, requests

    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/Credit_Card_Fraud_Detection"
    fname = "creditcard.csv"
    download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError(
            "Default lakehouse not found, please add a lakehouse and restart the session."
        )
    os.makedirs(download_path, exist_ok=True)
    if not os.path.exists(f"{download_path}/{fname}"):
        r = requests.get(f"{remote_url}/{fname}", timeout=30)
        with open(f"{download_path}/{fname}", "wb") as f:
            f.write(r.content)
    print("Downloaded demo data files into lakehouse.")
# to record the notebook running time
import time

ts = time.time()

Les data fra Lakehouse

df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", True)
    .load(f"{DATA_FOLDER}/raw/{DATA_FILE}")
    .cache()
)

Trinn 2. Utfør utforskende dataanalyse

I denne delen skal vi utforske dataene, kontrollere skjemaet, endre rekkefølgen på kolonnene og sende kolonnene til de riktige datatypene.

Vis rådata

Vi kan bruke display til å utforske rådata, beregne grunnleggende statistikk eller til og med vise diagramvisninger.

display(df)

Skriv ut litt informasjon om dataene, for eksempel skjemaet.

# print dataset basic info
print("records read: " + str(df.count()))
print("Schema: ")
df.printSchema()

Kast kolonner inn i de riktige typene

import pyspark.sql.functions as F

df_columns = df.columns
df_columns.remove(TARGET_COL)

# to make sure the TARGET_COL is the last column
df = df.select(df_columns + [TARGET_COL]).withColumn(
    TARGET_COL, F.col(TARGET_COL).cast("int")
)

if IS_SAMPLE:
    df = df.limit(SAMPLE_ROWS)

Trinn 3. Utvikle og distribuere en modell

I denne delen skal vi lære opp en LightGBM-modell for å klassifisere falske transaksjoner.

Klargjøre opplærings- og testdata

Begynn med å dele dataene inn i opplærings- og testsett.

# Split the dataset into train and test
train, test = df.randomSplit([0.85, 0.15], seed=42)
# Merge Columns
from pyspark.ml.feature import VectorAssembler

feature_cols = df.columns[:-1]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = featurizer.transform(train)[TARGET_COL, "features"]
test_data = featurizer.transform(test)[TARGET_COL, "features"]

Kontroller datavolumet og ubalansen i opplæringssettet.

display(train_data.groupBy(TARGET_COL).count())

Håndtere ubalanserte data

Som ofte skjer med virkelige data, har disse dataene et problem med ubalanse i klassen, siden den positive klassen (falske transaksjoner) bare utgjør 0,172 % av alle transaksjoner. Vi bruker SMOTE (Synthetic Minority Over-sampling Technique) til automatisk å håndtere klasseubalanse i dataene. SMOTE-metoden overstyrer minoritetsklassen og undersampler flertallsklassen for forbedret klassifierytelse.

La oss bruke SMOTE på opplæringsdataene:

Obs!

imblearn fungerer bare for pandas DataFrames, ikke PySpark DataFrames.

from pyspark.ml.functions import vector_to_array, array_to_vector
import numpy as np
from collections import Counter
from imblearn.over_sampling import SMOTE

train_data_array = train_data.withColumn("features", vector_to_array("features"))

train_data_pd = train_data_array.toPandas()

X = train_data_pd["features"].to_numpy()
y = train_data_pd[TARGET_COL].to_numpy()
print("Original dataset shape %s" % Counter(y))

X = np.array([np.array(x) for x in X])

sm = SMOTE(random_state=42)
X_res, y_res = sm.fit_resample(X, y)
print("Resampled dataset shape %s" % Counter(y_res))

new_train_data = tuple(zip(X_res.tolist(), y_res.tolist()))
dataColumns = ["features", TARGET_COL]
new_train_data = spark.createDataFrame(data=new_train_data, schema=dataColumns)
new_train_data = new_train_data.withColumn("features", array_to_vector("features"))

Definer modellen

Med dataene våre på plass kan vi nå definere modellen. Vi bruker en LightGBM-klassifier og bruker SynapseML til å implementere modellen med noen få linjer med kode.

from synapse.ml.lightgbm import LightGBMClassifier

model = LightGBMClassifier(
    objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=True
)
smote_model = LightGBMClassifier(
    objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=False
)

Lær opp modellen

model = model.fit(train_data)
smote_model = smote_model.fit(new_train_data)

Forklar modellen

Her kan vi vise viktigheten som modellen tilordner til hver funksjon i opplæringsdataene.

import pandas as pd
import matplotlib.pyplot as plt

feature_importances = model.getFeatureImportances()
fi = pd.Series(feature_importances, index=feature_cols)
fi = fi.sort_values(ascending=True)
f_index = fi.index
f_values = fi.values

# print feature importances
print("f_index:", f_index)
print("f_values:", f_values)

# plot
x_index = list(range(len(fi)))
x_index = [x / len(fi) for x in x_index]
plt.rcParams["figure.figsize"] = (20, 20)
plt.barh(
    x_index, f_values, height=0.028, align="center", color="tan", tick_label=f_index
)
plt.xlabel("importances")
plt.ylabel("features")
plt.show()

Evaluer modellen

Generer modellprognoser:

predictions = model.transform(test_data)
predictions.limit(10).toPandas()

Måledata for visningsmodell:

from synapse.ml.train import ComputeModelStatistics

metrics = ComputeModelStatistics(
    evaluationMetric="classification", labelCol=TARGET_COL, scoredLabelsCol="prediction"
).transform(predictions)
display(metrics)

Opprett en forvirringsmatrise:

# collect confusion matrix value
cm = metrics.select("confusion_matrix").collect()[0][0].toArray()
print(cm)

Tegn inn forvirringsmatrisen:

# plot confusion matrix
import seaborn as sns

sns.set(rc={"figure.figsize": (6, 4.5)})
ax = sns.heatmap(cm, annot=True, fmt=".20g")
ax.set_title("Confusion Matrix")
ax.set_xlabel("Predicted label")
ax.set_ylabel("True label")

Definer en funksjon for å evaluere modellen:

from pyspark.ml.evaluation import BinaryClassificationEvaluator


def evaluate(predictions):
    """
    Evaluate the model by computing AUROC and AUPRC with the predictions.
    """

    # initialize the binary evaluator
    evaluator = BinaryClassificationEvaluator(
        rawPredictionCol="prediction", labelCol=TARGET_COL
    )

    _evaluator = lambda metric: evaluator.setMetricName(metric).evaluate(predictions)

    # calculate AUROC, baseline 0.5
    auroc = _evaluator("areaUnderROC")
    print(f"AUROC: {auroc:.4f}")

    # calculate AUPRC, baseline positive rate (0.172% in the demo data)
    auprc = _evaluator("areaUnderPR")
    print(f"AUPRC: {auprc:.4f}")

    return auroc, auprc

Evaluer den opprinnelige modellen:

# evaluate the original model
auroc, auprc = evaluate(predictions)

Evaluer SMOTE-modellen:

# evaluate the SMOTE model
new_predictions = smote_model.transform(test_data)
new_auroc, new_auprc = evaluate(new_predictions)
if new_auprc > auprc:
    # Using model trained on SMOTE data if it has higher AUPRC
    model = smote_model
    auprc = new_auprc
    auroc = new_auroc

Logg og last inn modellen med MLflow

Nå som vi har en anstendig arbeidsmodell, kan vi spare den til senere bruk. Her bruker vi MLflow til å logge måledata og modeller, og laster inn modellene tilbake for prognose.

Konfigurer MLflow:

# setup mlflow
import mlflow

mlflow.set_experiment(EXPERIMENT_NAME)

Loggmodell, måledata og parametere:

# log model, metrics and params
with mlflow.start_run() as run:
    print("log model:")
    mlflow.spark.log_model(
        model,
        f"{EXPERIMENT_NAME}-lightgbm",
        registered_model_name=f"{EXPERIMENT_NAME}-lightgbm",
        dfs_tmpdir="Files/spark",
    )

    print("log metrics:")
    mlflow.log_metrics({"AUPRC": auprc, "AUROC": auroc})

    print("log parameters:")
    mlflow.log_params({"DATA_FILE": DATA_FILE})

    model_uri = f"runs:/{run.info.run_id}/{EXPERIMENT_NAME}-lightgbm"
    print("Model saved in run %s" % run.info.run_id)
    print(f"Model URI: {model_uri}")

Last inn modellen på nytt:

# load model back
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir="Files/spark")

Trinn 4. Lagre prognoseresultater

I denne delen skal vi distribuere modellen og lagre prognoseresultatene.

Modelldistribusjon og prognose

batch_predictions = loaded_model.transform(test_data)

Lagre prognoser i Lakehouse:

# code for saving predictions into lakehouse
batch_predictions.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions"
)
print(f"Full run cost {int(time.time() - ts)} seconds.")

Neste trinn