Csalásészlelési modell létrehozása, értékelése és üzembe helyezése a Microsoft Fabricben

Ebben az oktatóanyagban az adatfeldolgozási és adatelemzési munkafolyamatokat mutatjuk be egy átfogó példával, amely egy modellt hoz létre a csalárd hitelkártya-tranzakciók észlelésére. A lépések a következők:

  • Adatok feltöltése egy Tótárházba
  • Feltáró jellegű adatelemzés végrehajtása az adatokon
  • Az adatok előkészítése az osztály kiegyensúlyozatlanságának kezelésével
  • Modell betanítása és naplózása az MLflow használatával
  • A modell üzembe helyezése és az előrejelzési eredmények mentése

Fontos

A Microsoft Fabric jelenleg előzetes verzióban érhető el. Ezek az információk egy előzetes termékre vonatkoznak, amely a kiadás előtt lényegesen módosulhat. A Microsoft nem vállal kifejezett vagy vélelmezett garanciát az itt megadott információkra vonatkozóan.

Előfeltételek

  • Egy Power BI Premium-előfizetés. Ha még nincs ilyenje, olvassa el A Power BI Premium vásárlása című témakört.

  • Egy Power BI-munkaterület hozzárendelt Premium-kapacitással. Ha nincs munkaterülete, a Munkaterület létrehozása című cikk lépéseit követve hozzon létre egyet, és rendelje hozzá egy Prémium szintű kapacitáshoz.

  • Jelentkezzen be a Microsoft Fabricbe.

  • Lépjen a Microsoft Fabric Adattudomány felületére.
  • Nyissa meg a mintajegyzetfüzetet, vagy hozzon létre egy új jegyzetfüzetet.
    • Hozzon létre egy új jegyzetfüzetet , ha kódot szeretne másolni/beilleszteni a cellákba.
    • Vagy válassza aCsalásészlelésminta> használata lehetőséget a mintajegyzetfüzet megnyitásához.
  • Vegyen fel egy Lakehouse-t a jegyzetfüzetbe.

1. lépés: Az adatok betöltése

Az adathalmaz az európai kártyabirtokosok által 2013 szeptemberében két nap alatt végrehajtott hitelkártyás tranzakciókat tartalmazza. A 284 807 tranzakcióból 492 csalárd. A pozitív osztály (csalás) az adatok 0,172%-át teszi ki, így az adathalmaz rendkívül kiegyensúlyozatlan.

Bemeneti és válaszváltozók

Az adathalmaz csak numerikus bemeneti változókat tartalmaz, amelyek az eredeti funkciók fő összetevő-elemzési (PCA) átalakításának eredményei. A bizalmasság védelme érdekében nem tudjuk biztosítani az eredeti funkciókat vagy az adatokkal kapcsolatos további háttérinformációkat. Az egyetlen olyan funkció, amelyet még nem alakítottak át a PCA-val, az "Idő" és az "Összeg".

  • Jellemzők "V1, V2, ... A PCA-val beszerzett fő összetevők a V28".
  • Az "Idő" az egyes tranzakciók és az adathalmaz első tranzakciója között eltelt másodperceket tartalmazza.
  • Az "Összeg" a tranzakció összege. Ez a funkció használható például a költségérzékeny tanuláshoz.
  • A "Class" a válaszváltozó, és a csalások értékét 1 veszi fel, máskülönben 0 .

Az osztályegyensúlytalansági arány miatt javasoljuk, hogy a pontosságot a Precision-Recall görbe alatti terület (AUPRC) használatával mérje. A keveredési mátrix használata a pontosság kiértékeléséhez nem jelent értelmet a kiegyensúlyozatlan besoroláshoz.

Az alábbi kódrészlet a creditcard.csv adatok egy részét jeleníti meg.

"Idő" "V1" "V2" "V3" "V4" "V5" "V6" "V7" "V8" "V9" "V10" "V11" "V12" "V13" "V14" "V15" "V16" "V17" "V18" "V19" "V20" "V21" "V22" "V23" "V24" "V25" "V26" "V27" "V28" "Mennyiség" "Osztály"
0 -1.3598071336738 -0.0727811733098497 2.53634673796914 1.37815522427443 -0.338320769942518 0.462387777762292 0.239598554061257 0.0986979012610507 0.363786969611213 0.0907941719789316 -0.551599533260813 -0.617800855762348 -0.991389847235408 -0.311169353699879 1.46817697209427 -0.470400525259478 0.207971241929242 0.0257905801985591 0.403992960255733 0.251412098239705 -0.018306777944153 0.277837575558899 -0.110473910188767 0.0669280749146731 0.128539358273528 -0.189114843888824 0.133558376740387 -0.0210530534538215 149.62 "0"
0 1.19185711131486 0.26615071205963 0.16648011335321 0.448154078460911 0.0600176492822243 -0.0823608088155687 -0.0788029833323113 0.0851016549148104 -0.255425128109186 -0.166974414004614 1.61272666105479 1.06523531137287 0.48909501589608 -0.143772296441519 0.635558093258208 0.463917041022171 -0.114804663102346 -0.183361270123994 -0.145783041325259 -0.0690831352230203 -0.225775248033138 -0.638671952771851 0.101288021253234 -0.339846475529127 0.167170404418143 0.125894532368176 -0.00898309914322813 0.0147241691924927 2.69 "0"

Kódtárak telepítése

Ebben az oktatóanyagban telepíteni kell a kódtárat imblearn . A PySpark-kernel a futtatás %pip installután újraindul, ezért telepítenie kell a kódtárat, mielőtt bármilyen más cellát futtatnánk.

# install imblearn for SMOTE
%pip install imblearn

Az alábbi paraméterek definiálásával egyszerűen alkalmazhatjuk a jegyzetfüzetet különböző adathalmazokra.

IS_CUSTOM_DATA = False  # if True, dataset has to be uploaded manually

TARGET_COL = "Class"  # target column name
IS_SAMPLE = False  # if True, use only <SAMPLE_ROWS> rows of data for training, otherwise use all data
SAMPLE_ROWS = 5000  # if IS_SAMPLE is True, use only this number of rows for training

DATA_FOLDER = "Files/fraud-detection/"  # folder with data files
DATA_FILE = "creditcard.csv"  # data file name

EXPERIMENT_NAME = "aisample-fraud"  # mlflow experiment name

Töltse le az adathalmazt, és töltse fel egy Lakehouse-ba

A jegyzetfüzet futtatása előtt hozzá kell adnia egy Lakehouse-t. A Lakehouse a példához tartozó adatok tárolására szolgál. Lakehouse hozzáadásáról a Lakehouse hozzáadása a jegyzetfüzethez című témakörben olvashat.

if not IS_CUSTOM_DATA:
    # Download demo data files into lakehouse if not exist
    import os, requests

    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/Credit_Card_Fraud_Detection"
    fname = "creditcard.csv"
    download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError(
            "Default lakehouse not found, please add a lakehouse and restart the session."
        )
    os.makedirs(download_path, exist_ok=True)
    if not os.path.exists(f"{download_path}/{fname}"):
        r = requests.get(f"{remote_url}/{fname}", timeout=30)
        with open(f"{download_path}/{fname}", "wb") as f:
            f.write(r.content)
    print("Downloaded demo data files into lakehouse.")
# to record the notebook running time
import time

ts = time.time()

Adatok olvasása a Lakehouse-ból

df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", True)
    .load(f"{DATA_FOLDER}/raw/{DATA_FILE}")
    .cache()
)

2. lépés Feltáró adatok elemzése

Ebben a szakaszban áttekintjük az adatokat, ellenőrizzük a sémáját, átrendezzük az oszlopokat, és a megfelelő adattípusokba rendezzük az oszlopokat.

Nyers adatok megjelenítése

Használhatjuk display a nyers adatok feltárására, néhány alapszintű statisztika kiszámítására, vagy akár diagramnézetek megjelenítésére is.

display(df)

Az adatokkal, például a sémával kapcsolatos információk nyomtatása.

# print dataset basic info
print("records read: " + str(df.count()))
print("Schema: ")
df.printSchema()

Oszlopok beszűkülése a megfelelő típusokba

import pyspark.sql.functions as F

df_columns = df.columns
df_columns.remove(TARGET_COL)

# to make sure the TARGET_COL is the last column
df = df.select(df_columns + [TARGET_COL]).withColumn(
    TARGET_COL, F.col(TARGET_COL).cast("int")
)

if IS_SAMPLE:
    df = df.limit(SAMPLE_ROWS)

3. lépés Modell fejlesztése és üzembe helyezése

Ebben a szakaszban betanítunk egy LightGBM-modellt a csalárd tranzakciók besorolására.

Betanítási és tesztelési adatok előkészítése

Először ossza fel az adatokat betanítási és tesztelési csoportokra.

# Split the dataset into train and test
train, test = df.randomSplit([0.85, 0.15], seed=42)
# Merge Columns
from pyspark.ml.feature import VectorAssembler

feature_cols = df.columns[:-1]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = featurizer.transform(train)[TARGET_COL, "features"]
test_data = featurizer.transform(test)[TARGET_COL, "features"]

Ellenőrizze az adatmennyiséget és az egyensúlyhiányt a betanítási csoportban.

display(train_data.groupBy(TARGET_COL).count())

Kiegyensúlyozatlan adatok kezelése

A valós adatokhoz hasonlóan ezek az adatok is osztályhiányos problémával járnak, mivel a pozitív osztály (csalárd tranzakciók) az összes tranzakciónak csak 0,172%-át teszi ki. SMOTE-t (szintetikus kisebbségi mintavételezési technikát) alkalmazunk az adatok osztályhiányának automatikus kezelésére. Az SMOTE metódus felülírja a kisebbségi osztályt, és alulamplesíti a többségi osztályt a jobb osztályozói teljesítmény érdekében.

Alkalmazzuk az SMOTE-t a betanítási adatokra:

Megjegyzés

imblearn csak a pandas DataFrame-ekhez működik, a PySpark DataFrame-ekhez nem.

from pyspark.ml.functions import vector_to_array, array_to_vector
import numpy as np
from collections import Counter
from imblearn.over_sampling import SMOTE

train_data_array = train_data.withColumn("features", vector_to_array("features"))

train_data_pd = train_data_array.toPandas()

X = train_data_pd["features"].to_numpy()
y = train_data_pd[TARGET_COL].to_numpy()
print("Original dataset shape %s" % Counter(y))

X = np.array([np.array(x) for x in X])

sm = SMOTE(random_state=42)
X_res, y_res = sm.fit_resample(X, y)
print("Resampled dataset shape %s" % Counter(y_res))

new_train_data = tuple(zip(X_res.tolist(), y_res.tolist()))
dataColumns = ["features", TARGET_COL]
new_train_data = spark.createDataFrame(data=new_train_data, schema=dataColumns)
new_train_data = new_train_data.withColumn("features", array_to_vector("features"))

A modell meghatározása

Ha az adataink a helyén lesznek, most már definiálhatjuk a modellt. Egy LightGBM-osztályozót használunk, és a SynapseML használatával implementáljuk a modellt néhány sornyi kóddal.

from synapse.ml.lightgbm import LightGBMClassifier

model = LightGBMClassifier(
    objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=True
)
smote_model = LightGBMClassifier(
    objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=False
)

A modell betanítása

model = model.fit(train_data)
smote_model = smote_model.fit(new_train_data)

A modell ismertetése

Itt bemutatjuk, hogy a modell milyen fontosságot rendel a betanítási adatok egyes funkcióihoz.

import pandas as pd
import matplotlib.pyplot as plt

feature_importances = model.getFeatureImportances()
fi = pd.Series(feature_importances, index=feature_cols)
fi = fi.sort_values(ascending=True)
f_index = fi.index
f_values = fi.values

# print feature importances
print("f_index:", f_index)
print("f_values:", f_values)

# plot
x_index = list(range(len(fi)))
x_index = [x / len(fi) for x in x_index]
plt.rcParams["figure.figsize"] = (20, 20)
plt.barh(
    x_index, f_values, height=0.028, align="center", color="tan", tick_label=f_index
)
plt.xlabel("importances")
plt.ylabel("features")
plt.show()

A modell értékelése

Modell-előrejelzések létrehozása:

predictions = model.transform(test_data)
predictions.limit(10).toPandas()

Modellmetrikák megjelenítése:

from synapse.ml.train import ComputeModelStatistics

metrics = ComputeModelStatistics(
    evaluationMetric="classification", labelCol=TARGET_COL, scoredLabelsCol="prediction"
).transform(predictions)
display(metrics)

Keveredési mátrix létrehozása:

# collect confusion matrix value
cm = metrics.select("confusion_matrix").collect()[0][0].toArray()
print(cm)

Ábrázolja a keveredési mátrixot:

# plot confusion matrix
import seaborn as sns

sns.set(rc={"figure.figsize": (6, 4.5)})
ax = sns.heatmap(cm, annot=True, fmt=".20g")
ax.set_title("Confusion Matrix")
ax.set_xlabel("Predicted label")
ax.set_ylabel("True label")

Definiáljon egy függvényt a modell kiértékeléséhez:

from pyspark.ml.evaluation import BinaryClassificationEvaluator


def evaluate(predictions):
    """
    Evaluate the model by computing AUROC and AUPRC with the predictions.
    """

    # initialize the binary evaluator
    evaluator = BinaryClassificationEvaluator(
        rawPredictionCol="prediction", labelCol=TARGET_COL
    )

    _evaluator = lambda metric: evaluator.setMetricName(metric).evaluate(predictions)

    # calculate AUROC, baseline 0.5
    auroc = _evaluator("areaUnderROC")
    print(f"AUROC: {auroc:.4f}")

    # calculate AUPRC, baseline positive rate (0.172% in the demo data)
    auprc = _evaluator("areaUnderPR")
    print(f"AUPRC: {auprc:.4f}")

    return auroc, auprc

Értékelje ki az eredeti modellt:

# evaluate the original model
auroc, auprc = evaluate(predictions)

Értékelje ki az SMOTE-modellt:

# evaluate the SMOTE model
new_predictions = smote_model.transform(test_data)
new_auroc, new_auprc = evaluate(new_predictions)
if new_auprc > auprc:
    # Using model trained on SMOTE data if it has higher AUPRC
    model = smote_model
    auprc = new_auprc
    auroc = new_auroc

A modell naplózása és betöltése az MLflow használatával

Most, hogy van egy tisztességes munkamodellünk, menthetjük későbbi használatra. Itt az MLflow használatával naplózza a metrikákat és modelleket, és betölti a modelleket előrejelzés céljából.

MLflow beállítása:

# setup mlflow
import mlflow

mlflow.set_experiment(EXPERIMENT_NAME)

Naplómodell, metrikák és paraméterek:

# log model, metrics and params
with mlflow.start_run() as run:
    print("log model:")
    mlflow.spark.log_model(
        model,
        f"{EXPERIMENT_NAME}-lightgbm",
        registered_model_name=f"{EXPERIMENT_NAME}-lightgbm",
        dfs_tmpdir="Files/spark",
    )

    print("log metrics:")
    mlflow.log_metrics({"AUPRC": auprc, "AUROC": auroc})

    print("log parameters:")
    mlflow.log_params({"DATA_FILE": DATA_FILE})

    model_uri = f"runs:/{run.info.run_id}/{EXPERIMENT_NAME}-lightgbm"
    print("Model saved in run %s" % run.info.run_id)
    print(f"Model URI: {model_uri}")

Töltse be újra a modellt:

# load model back
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir="Files/spark")

4. lépés: Előrejelzési eredmények mentése

Ebben a szakaszban üzembe helyezzük a modellt, és mentjük az előrejelzési eredményeket.

Modell üzembe helyezése és előrejelzése

batch_predictions = loaded_model.transform(test_data)

Előrejelzések mentése a Lakehouse-ba:

# code for saving predictions into lakehouse
batch_predictions.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions"
)
print(f"Full run cost {int(time.time() - ts)} seconds.")

Következő lépések