Vytvoření, vyhodnocení a nasazení modelu detekce podvodů v Microsoft Fabric

V tomto kurzu předvedeme datové inženýrství a pracovní postupy datových věd pomocí kompletního příkladu, který sestaví model pro detekci podvodných transakcí platebními kartami. Kroky, které provedete, jsou následující:

  • Nahrání dat do Lakehouse
  • Provádění průzkumné analýzy dat na datech
  • Příprava dat zpracováním nerovnováhy tříd
  • Trénování modelu a jeho protokolování pomocí MLflow
  • Nasazení modelu a uložení výsledků predikce

Důležité

Microsoft Fabric je v současné době ve verzi PREVIEW. Tyto informace se týkají předběžného vydání produktu, který může být před vydáním podstatně změněn. Společnost Microsoft neposkytuje na zde uvedené informace žádné záruky, ať už vyjádřené nebo předpokládané.

Požadavky

  • Přejděte na Datová Věda prostředí v Microsoft Fabric.
  • Otevřete ukázkový poznámkový blok nebo vytvořte nový poznámkový blok.
    • Pokud chcete zkopírovat nebo vložit kód do buněk, vytvořte nový poznámkový blok .
    • Nebo vyberte Použít ukázkovou>detekci podvodů a otevřete ukázkový poznámkový blok.
  • Přidejte do poznámkového bloku Lakehouse.

Krok 1: Načtení dat

Datová sada obsahuje transakce kreditních karet provedené evropskými držiteli karet v září 2013 během dvou dnů. Z 284 807 transakcí je 492 podvodných. Pozitivní třída (podvod) představuje pouhých 0,172 % dat, což způsobuje, že datová sada je vysoce nevyvážená.

Vstupní proměnné a proměnné odpovědí

Datová sada obsahuje pouze číselné vstupní proměnné, které jsou výsledkem transformace analýzy hlavních komponent (PCA) na původních funkcích. Kvůli ochraně důvěrnosti nemůžeme poskytnout původní funkce ani další základní informace o datech. Jediné funkce, které nebyly transformovány pomocí PCA, jsou "Time" a "Amount".

  • Funkce "V1, V2, ... V28" jsou hlavní komponenty získané pomocí PCA.
  • "Time" (Čas) obsahuje sekundy, které uplynuly mezi jednotlivými transakcemi a první transakcí v datové sadě.
  • "Amount" je částka transakce. Tuto funkci je možné použít například pro učení závislé na nákladech.
  • "Třída" je proměnná odpovědi a přebírá hodnotu 1 pro podvod a 0 další.

Vzhledem k nevyváženému poměru tříd doporučujeme změřovat přesnost pomocí oblasti pod křivkou Precision-Recall (AUPRC). Použití konfuzní matice k vyhodnocení přesnosti není pro nevyváženou klasifikaci smysluplné.

Následující fragment kódu ukazuje část datcreditcard.csv .

"Čas" "V1" "V2" "V3" "V4" "V5" "V6" "V7" "V8" "V9" "V10" "V11" "V12" "V13" "V14" "V15" "V16" "V17" "V18" "V19" "V20" "V21" "V22" "V23" "V24" "V25" "V26" "V27" "V28" "Částka" "Třída"
0 -1.3598071336738 -0.0727811733098497 2.53634673796914 1.37815522427443 -0.338320769942518 0.462387777762292 0.239598554061257 0.0986979012610507 0.363786969611213 0.0907941719789316 -0.551599533260813 -0.617800855762348 -0.991389847235408 -0.311169353699879 1.46817697209427 -0.470400525259478 0.207971241929242 0.0257905801985591 0.403992960255733 0.251412098239705 -0.018306777944153 0.277837575558899 -0.110473910188767 0.0669280749146731 0.128539358273528 -0.189114843888824 0.133558376740387 -0.0210530534538215 149.62 "0"
0 1.19185711131486 0.26615071205963 0.16648011335321 0.448154078460911 0.0600176492822243 -0.0823608088155687 -0.0788029833323113 0.0851016549148104 -0.255425128109186 -0.166974414004614 1.61272666105479 1.06523531137287 0.48909501589608 -0.143772296441519 0.635558093258208 0.463917041022171 -0.114804663102346 -0.183361270123994 -0.145783041325259 -0.0690831352230203 -0.225775248033138 -0.638671952771851 0.101288021253234 -0.339846475529127 0.167170404418143 0.125894532368176 -0.00898309914322813 0.0147241691924927 2.69 "0"

Instalace knihoven

Pro účely tohoto kurzu musíme knihovnu imblearn nainstalovat. Jádro PySpark se po spuštění %pip installrestartuje a proto musíme knihovnu nainstalovat před spuštěním dalších buněk.

# install imblearn for SMOTE
%pip install imblearn

Definováním následujících parametrů můžeme poznámkový blok snadno použít u různých datových sad.

IS_CUSTOM_DATA = False  # if True, dataset has to be uploaded manually

TARGET_COL = "Class"  # target column name
IS_SAMPLE = False  # if True, use only <SAMPLE_ROWS> rows of data for training, otherwise use all data
SAMPLE_ROWS = 5000  # if IS_SAMPLE is True, use only this number of rows for training

DATA_FOLDER = "Files/fraud-detection/"  # folder with data files
DATA_FILE = "creditcard.csv"  # data file name

EXPERIMENT_NAME = "aisample-fraud"  # mlflow experiment name

Stažení datové sady a nahrání do Lakehouse

Před spuštěním poznámkového bloku do něj musíte přidat Lakehouse. Lakehouse slouží k ukládání dat pro tento příklad. Pokud chcete přidat Lakehouse, přečtěte si téma Přidání Lakehouse do poznámkového bloku.

if not IS_CUSTOM_DATA:
    # Download demo data files into lakehouse if not exist
    import os, requests

    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/Credit_Card_Fraud_Detection"
    fname = "creditcard.csv"
    download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError(
            "Default lakehouse not found, please add a lakehouse and restart the session."
        )
    os.makedirs(download_path, exist_ok=True)
    if not os.path.exists(f"{download_path}/{fname}"):
        r = requests.get(f"{remote_url}/{fname}", timeout=30)
        with open(f"{download_path}/{fname}", "wb") as f:
            f.write(r.content)
    print("Downloaded demo data files into lakehouse.")
# to record the notebook running time
import time

ts = time.time()

Čtení dat z Lakehouse

df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", True)
    .load(f"{DATA_FOLDER}/raw/{DATA_FILE}")
    .cache()
)

Krok 2. Provádění průzkumných analýz dat

V této části prozkoumáme data, zkontrolujeme jejich schéma, změníme pořadí sloupců a přetypujeme sloupce do správných datových typů.

Zobrazení nezpracovaných dat

Můžeme použít display k prozkoumání nezpracovaných dat, výpočtu některých základních statistik nebo dokonce k zobrazení zobrazení grafu.

display(df)

Vytiskněte některé informace o datech, například schéma.

# print dataset basic info
print("records read: " + str(df.count()))
print("Schema: ")
df.printSchema()

Přetypování sloupců do správných typů

import pyspark.sql.functions as F

df_columns = df.columns
df_columns.remove(TARGET_COL)

# to make sure the TARGET_COL is the last column
df = df.select(df_columns + [TARGET_COL]).withColumn(
    TARGET_COL, F.col(TARGET_COL).cast("int")
)

if IS_SAMPLE:
    df = df.limit(SAMPLE_ROWS)

Krok 3. Vývoj a nasazení modelu

V této části vytrénujeme model LightGBM pro klasifikaci podvodných transakcí.

Příprava trénovacích a testovacích dat

Začněte rozdělením dat do trénovacích a testovacích sad.

# Split the dataset into train and test
train, test = df.randomSplit([0.85, 0.15], seed=42)
# Merge Columns
from pyspark.ml.feature import VectorAssembler

feature_cols = df.columns[:-1]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = featurizer.transform(train)[TARGET_COL, "features"]
test_data = featurizer.transform(test)[TARGET_COL, "features"]

Zkontrolujte objem dat a nerovnováhu v trénovací sadě.

display(train_data.groupBy(TARGET_COL).count())

Zpracování nevyvážených dat

Jak se často stává s reálnými daty, mají tato data problém s nerovnováhou tříd, protože pozitivní třída (podvodné transakce) představuje pouze 0,172 % všech transakcí. Použijeme metodu SMOTE (Synthetic Minority Over-sampling Technique) k automatickému zpracování nerovnováhy tříd v datech. Metoda SMOTE převzorkuje minoritní třídu a podvzorkuje třídu majority pro zlepšení výkonu klasifikátoru.

Pojďme použít SMOTE na trénovací data:

Poznámka

imblearn Funguje pouze pro datové rámce pandas, nikoli pro datové rámce PySpark.

from pyspark.ml.functions import vector_to_array, array_to_vector
import numpy as np
from collections import Counter
from imblearn.over_sampling import SMOTE

train_data_array = train_data.withColumn("features", vector_to_array("features"))

train_data_pd = train_data_array.toPandas()

X = train_data_pd["features"].to_numpy()
y = train_data_pd[TARGET_COL].to_numpy()
print("Original dataset shape %s" % Counter(y))

X = np.array([np.array(x) for x in X])

sm = SMOTE(random_state=42)
X_res, y_res = sm.fit_resample(X, y)
print("Resampled dataset shape %s" % Counter(y_res))

new_train_data = tuple(zip(X_res.tolist(), y_res.tolist()))
dataColumns = ["features", TARGET_COL]
new_train_data = spark.createDataFrame(data=new_train_data, schema=dataColumns)
new_train_data = new_train_data.withColumn("features", array_to_vector("features"))

Definování modelu

Když máme data na místě, můžeme model definovat. Použijeme klasifikátor LightGBM a použijeme SynapseML k implementaci modelu s několika řádky kódu.

from synapse.ml.lightgbm import LightGBMClassifier

model = LightGBMClassifier(
    objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=True
)
smote_model = LightGBMClassifier(
    objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=False
)

Trénování modelu

model = model.fit(train_data)
smote_model = smote_model.fit(new_train_data)

Vysvětlení modelu

Tady můžeme ukázat důležitost, kterou model přiřazuje jednotlivým funkcím v trénovacích datech.

import pandas as pd
import matplotlib.pyplot as plt

feature_importances = model.getFeatureImportances()
fi = pd.Series(feature_importances, index=feature_cols)
fi = fi.sort_values(ascending=True)
f_index = fi.index
f_values = fi.values

# print feature importances
print("f_index:", f_index)
print("f_values:", f_values)

# plot
x_index = list(range(len(fi)))
x_index = [x / len(fi) for x in x_index]
plt.rcParams["figure.figsize"] = (20, 20)
plt.barh(
    x_index, f_values, height=0.028, align="center", color="tan", tick_label=f_index
)
plt.xlabel("importances")
plt.ylabel("features")
plt.show()

Vyhodnocení modelu

Generování predikcí modelu:

predictions = model.transform(test_data)
predictions.limit(10).toPandas()

Zobrazení metrik modelu:

from synapse.ml.train import ComputeModelStatistics

metrics = ComputeModelStatistics(
    evaluationMetric="classification", labelCol=TARGET_COL, scoredLabelsCol="prediction"
).transform(predictions)
display(metrics)

Vytvořte konfuzní matici:

# collect confusion matrix value
cm = metrics.select("confusion_matrix").collect()[0][0].toArray()
print(cm)

Vykreslí konfuzní matici:

# plot confusion matrix
import seaborn as sns

sns.set(rc={"figure.figsize": (6, 4.5)})
ax = sns.heatmap(cm, annot=True, fmt=".20g")
ax.set_title("Confusion Matrix")
ax.set_xlabel("Predicted label")
ax.set_ylabel("True label")

Definujte funkci pro vyhodnocení modelu:

from pyspark.ml.evaluation import BinaryClassificationEvaluator


def evaluate(predictions):
    """
    Evaluate the model by computing AUROC and AUPRC with the predictions.
    """

    # initialize the binary evaluator
    evaluator = BinaryClassificationEvaluator(
        rawPredictionCol="prediction", labelCol=TARGET_COL
    )

    _evaluator = lambda metric: evaluator.setMetricName(metric).evaluate(predictions)

    # calculate AUROC, baseline 0.5
    auroc = _evaluator("areaUnderROC")
    print(f"AUROC: {auroc:.4f}")

    # calculate AUPRC, baseline positive rate (0.172% in the demo data)
    auprc = _evaluator("areaUnderPR")
    print(f"AUPRC: {auprc:.4f}")

    return auroc, auprc

Vyhodnocení původního modelu:

# evaluate the original model
auroc, auprc = evaluate(predictions)

Vyhodnocení modelu SMOTE:

# evaluate the SMOTE model
new_predictions = smote_model.transform(test_data)
new_auroc, new_auprc = evaluate(new_predictions)
if new_auprc > auprc:
    # Using model trained on SMOTE data if it has higher AUPRC
    model = smote_model
    auprc = new_auprc
    auroc = new_auroc

Protokolování a načtení modelu pomocí MLflow

Teď, když máme slušný funkční model, můžeme ho uložit pro pozdější použití. Tady používáme MLflow k protokolování metrik a modelů a načítání modelů zpět pro predikci.

Nastavení MLflow:

# setup mlflow
import mlflow

mlflow.set_experiment(EXPERIMENT_NAME)

Model protokolu, metriky a parametry:

# log model, metrics and params
with mlflow.start_run() as run:
    print("log model:")
    mlflow.spark.log_model(
        model,
        f"{EXPERIMENT_NAME}-lightgbm",
        registered_model_name=f"{EXPERIMENT_NAME}-lightgbm",
        dfs_tmpdir="Files/spark",
    )

    print("log metrics:")
    mlflow.log_metrics({"AUPRC": auprc, "AUROC": auroc})

    print("log parameters:")
    mlflow.log_params({"DATA_FILE": DATA_FILE})

    model_uri = f"runs:/{run.info.run_id}/{EXPERIMENT_NAME}-lightgbm"
    print("Model saved in run %s" % run.info.run_id)
    print(f"Model URI: {model_uri}")

Znovu načtěte model:

# load model back
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir="Files/spark")

Krok 4: Uložení výsledků predikce

V této části nasadíme model a uložíme výsledky předpovědi.

Nasazení a predikce modelu

batch_predictions = loaded_model.transform(test_data)

Ukládání předpovědí do Lakehouse:

# code for saving predictions into lakehouse
batch_predictions.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions"
)
print(f"Full run cost {int(time.time() - ts)} seconds.")

Další kroky