Csalásészlelési modell létrehozása, értékelése és üzembe helyezése a Microsoft Fabricben
Ebben az oktatóanyagban az adatfeldolgozási és adatelemzési munkafolyamatokat mutatjuk be egy átfogó példával, amely egy modellt hoz létre a csalárd hitelkártya-tranzakciók észlelésére. A lépések a következők:
- Adatok feltöltése egy Tótárházba
- Feltáró jellegű adatelemzés végrehajtása az adatokon
- Az adatok előkészítése az osztály kiegyensúlyozatlanságának kezelésével
- Modell betanítása és naplózása az MLflow használatával
- A modell üzembe helyezése és az előrejelzési eredmények mentése
Fontos
A Microsoft Fabric jelenleg előzetes verzióban érhető el. Ezek az információk egy előzetes termékre vonatkoznak, amely a kiadás előtt lényegesen módosulhat. A Microsoft nem vállal kifejezett vagy vélelmezett garanciát az itt megadott információkra vonatkozóan.
Előfeltételek
Egy Power BI Premium-előfizetés. Ha még nincs ilyenje, olvassa el A Power BI Premium vásárlása című témakört.
Egy Power BI-munkaterület hozzárendelt Premium-kapacitással. Ha nincs munkaterülete, a Munkaterület létrehozása című cikk lépéseit követve hozzon létre egyet, és rendelje hozzá egy Prémium szintű kapacitáshoz.
Jelentkezzen be a Microsoft Fabricbe.
- Lépjen a Microsoft Fabric Adattudomány felületére.
- Nyissa meg a mintajegyzetfüzetet, vagy hozzon létre egy új jegyzetfüzetet.
- Hozzon létre egy új jegyzetfüzetet , ha kódot szeretne másolni/beilleszteni a cellákba.
- Vagy válassza aCsalásészlelésminta> használata lehetőséget a mintajegyzetfüzet megnyitásához.
- Vegyen fel egy Lakehouse-t a jegyzetfüzetbe.
1. lépés: Az adatok betöltése
Az adathalmaz az európai kártyabirtokosok által 2013 szeptemberében két nap alatt végrehajtott hitelkártyás tranzakciókat tartalmazza. A 284 807 tranzakcióból 492 csalárd. A pozitív osztály (csalás) az adatok 0,172%-át teszi ki, így az adathalmaz rendkívül kiegyensúlyozatlan.
Bemeneti és válaszváltozók
Az adathalmaz csak numerikus bemeneti változókat tartalmaz, amelyek az eredeti funkciók fő összetevő-elemzési (PCA) átalakításának eredményei. A bizalmasság védelme érdekében nem tudjuk biztosítani az eredeti funkciókat vagy az adatokkal kapcsolatos további háttérinformációkat. Az egyetlen olyan funkció, amelyet még nem alakítottak át a PCA-val, az "Idő" és az "Összeg".
- Jellemzők "V1, V2, ... A PCA-val beszerzett fő összetevők a V28".
- Az "Idő" az egyes tranzakciók és az adathalmaz első tranzakciója között eltelt másodperceket tartalmazza.
- Az "Összeg" a tranzakció összege. Ez a funkció használható például a költségérzékeny tanuláshoz.
- A "Class" a válaszváltozó, és a csalások értékét
1
veszi fel, máskülönben0
.
Az osztályegyensúlytalansági arány miatt javasoljuk, hogy a pontosságot a Precision-Recall görbe alatti terület (AUPRC) használatával mérje. A keveredési mátrix használata a pontosság kiértékeléséhez nem jelent értelmet a kiegyensúlyozatlan besoroláshoz.
Az alábbi kódrészlet a creditcard.csv adatok egy részét jeleníti meg.
"Idő" | "V1" | "V2" | "V3" | "V4" | "V5" | "V6" | "V7" | "V8" | "V9" | "V10" | "V11" | "V12" | "V13" | "V14" | "V15" | "V16" | "V17" | "V18" | "V19" | "V20" | "V21" | "V22" | "V23" | "V24" | "V25" | "V26" | "V27" | "V28" | "Mennyiség" | "Osztály" |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | -1.3598071336738 | -0.0727811733098497 | 2.53634673796914 | 1.37815522427443 | -0.338320769942518 | 0.462387777762292 | 0.239598554061257 | 0.0986979012610507 | 0.363786969611213 | 0.0907941719789316 | -0.551599533260813 | -0.617800855762348 | -0.991389847235408 | -0.311169353699879 | 1.46817697209427 | -0.470400525259478 | 0.207971241929242 | 0.0257905801985591 | 0.403992960255733 | 0.251412098239705 | -0.018306777944153 | 0.277837575558899 | -0.110473910188767 | 0.0669280749146731 | 0.128539358273528 | -0.189114843888824 | 0.133558376740387 | -0.0210530534538215 | 149.62 | "0" |
0 | 1.19185711131486 | 0.26615071205963 | 0.16648011335321 | 0.448154078460911 | 0.0600176492822243 | -0.0823608088155687 | -0.0788029833323113 | 0.0851016549148104 | -0.255425128109186 | -0.166974414004614 | 1.61272666105479 | 1.06523531137287 | 0.48909501589608 | -0.143772296441519 | 0.635558093258208 | 0.463917041022171 | -0.114804663102346 | -0.183361270123994 | -0.145783041325259 | -0.0690831352230203 | -0.225775248033138 | -0.638671952771851 | 0.101288021253234 | -0.339846475529127 | 0.167170404418143 | 0.125894532368176 | -0.00898309914322813 | 0.0147241691924927 | 2.69 | "0" |
Kódtárak telepítése
Ebben az oktatóanyagban telepíteni kell a kódtárat imblearn
. A PySpark-kernel a futtatás %pip install
után újraindul, ezért telepítenie kell a kódtárat, mielőtt bármilyen más cellát futtatnánk.
# install imblearn for SMOTE
%pip install imblearn
Az alábbi paraméterek definiálásával egyszerűen alkalmazhatjuk a jegyzetfüzetet különböző adathalmazokra.
IS_CUSTOM_DATA = False # if True, dataset has to be uploaded manually
TARGET_COL = "Class" # target column name
IS_SAMPLE = False # if True, use only <SAMPLE_ROWS> rows of data for training, otherwise use all data
SAMPLE_ROWS = 5000 # if IS_SAMPLE is True, use only this number of rows for training
DATA_FOLDER = "Files/fraud-detection/" # folder with data files
DATA_FILE = "creditcard.csv" # data file name
EXPERIMENT_NAME = "aisample-fraud" # mlflow experiment name
Töltse le az adathalmazt, és töltse fel egy Lakehouse-ba
A jegyzetfüzet futtatása előtt hozzá kell adnia egy Lakehouse-t. A Lakehouse a példához tartozó adatok tárolására szolgál. Lakehouse hozzáadásáról a Lakehouse hozzáadása a jegyzetfüzethez című témakörben olvashat.
if not IS_CUSTOM_DATA:
# Download demo data files into lakehouse if not exist
import os, requests
remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/Credit_Card_Fraud_Detection"
fname = "creditcard.csv"
download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError(
"Default lakehouse not found, please add a lakehouse and restart the session."
)
os.makedirs(download_path, exist_ok=True)
if not os.path.exists(f"{download_path}/{fname}"):
r = requests.get(f"{remote_url}/{fname}", timeout=30)
with open(f"{download_path}/{fname}", "wb") as f:
f.write(r.content)
print("Downloaded demo data files into lakehouse.")
# to record the notebook running time
import time
ts = time.time()
Adatok olvasása a Lakehouse-ból
df = (
spark.read.format("csv")
.option("header", "true")
.option("inferSchema", True)
.load(f"{DATA_FOLDER}/raw/{DATA_FILE}")
.cache()
)
2. lépés Feltáró adatok elemzése
Ebben a szakaszban áttekintjük az adatokat, ellenőrizzük a sémáját, átrendezzük az oszlopokat, és a megfelelő adattípusokba rendezzük az oszlopokat.
Nyers adatok megjelenítése
Használhatjuk display
a nyers adatok feltárására, néhány alapszintű statisztika kiszámítására, vagy akár diagramnézetek megjelenítésére is.
display(df)
Az adatokkal, például a sémával kapcsolatos információk nyomtatása.
# print dataset basic info
print("records read: " + str(df.count()))
print("Schema: ")
df.printSchema()
Oszlopok beszűkülése a megfelelő típusokba
import pyspark.sql.functions as F
df_columns = df.columns
df_columns.remove(TARGET_COL)
# to make sure the TARGET_COL is the last column
df = df.select(df_columns + [TARGET_COL]).withColumn(
TARGET_COL, F.col(TARGET_COL).cast("int")
)
if IS_SAMPLE:
df = df.limit(SAMPLE_ROWS)
3. lépés Modell fejlesztése és üzembe helyezése
Ebben a szakaszban betanítunk egy LightGBM-modellt a csalárd tranzakciók besorolására.
Betanítási és tesztelési adatok előkészítése
Először ossza fel az adatokat betanítási és tesztelési csoportokra.
# Split the dataset into train and test
train, test = df.randomSplit([0.85, 0.15], seed=42)
# Merge Columns
from pyspark.ml.feature import VectorAssembler
feature_cols = df.columns[:-1]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = featurizer.transform(train)[TARGET_COL, "features"]
test_data = featurizer.transform(test)[TARGET_COL, "features"]
Ellenőrizze az adatmennyiséget és az egyensúlyhiányt a betanítási csoportban.
display(train_data.groupBy(TARGET_COL).count())
Kiegyensúlyozatlan adatok kezelése
A valós adatokhoz hasonlóan ezek az adatok is osztályhiányos problémával járnak, mivel a pozitív osztály (csalárd tranzakciók) az összes tranzakciónak csak 0,172%-át teszi ki. SMOTE-t (szintetikus kisebbségi mintavételezési technikát) alkalmazunk az adatok osztályhiányának automatikus kezelésére. Az SMOTE metódus felülírja a kisebbségi osztályt, és alulamplesíti a többségi osztályt a jobb osztályozói teljesítmény érdekében.
Alkalmazzuk az SMOTE-t a betanítási adatokra:
Megjegyzés
imblearn
csak a pandas DataFrame-ekhez működik, a PySpark DataFrame-ekhez nem.
from pyspark.ml.functions import vector_to_array, array_to_vector
import numpy as np
from collections import Counter
from imblearn.over_sampling import SMOTE
train_data_array = train_data.withColumn("features", vector_to_array("features"))
train_data_pd = train_data_array.toPandas()
X = train_data_pd["features"].to_numpy()
y = train_data_pd[TARGET_COL].to_numpy()
print("Original dataset shape %s" % Counter(y))
X = np.array([np.array(x) for x in X])
sm = SMOTE(random_state=42)
X_res, y_res = sm.fit_resample(X, y)
print("Resampled dataset shape %s" % Counter(y_res))
new_train_data = tuple(zip(X_res.tolist(), y_res.tolist()))
dataColumns = ["features", TARGET_COL]
new_train_data = spark.createDataFrame(data=new_train_data, schema=dataColumns)
new_train_data = new_train_data.withColumn("features", array_to_vector("features"))
A modell meghatározása
Ha az adataink a helyén lesznek, most már definiálhatjuk a modellt. Egy LightGBM-osztályozót használunk, és a SynapseML használatával implementáljuk a modellt néhány sornyi kóddal.
from synapse.ml.lightgbm import LightGBMClassifier
model = LightGBMClassifier(
objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=True
)
smote_model = LightGBMClassifier(
objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=False
)
A modell betanítása
model = model.fit(train_data)
smote_model = smote_model.fit(new_train_data)
A modell ismertetése
Itt bemutatjuk, hogy a modell milyen fontosságot rendel a betanítási adatok egyes funkcióihoz.
import pandas as pd
import matplotlib.pyplot as plt
feature_importances = model.getFeatureImportances()
fi = pd.Series(feature_importances, index=feature_cols)
fi = fi.sort_values(ascending=True)
f_index = fi.index
f_values = fi.values
# print feature importances
print("f_index:", f_index)
print("f_values:", f_values)
# plot
x_index = list(range(len(fi)))
x_index = [x / len(fi) for x in x_index]
plt.rcParams["figure.figsize"] = (20, 20)
plt.barh(
x_index, f_values, height=0.028, align="center", color="tan", tick_label=f_index
)
plt.xlabel("importances")
plt.ylabel("features")
plt.show()
A modell értékelése
Modell-előrejelzések létrehozása:
predictions = model.transform(test_data)
predictions.limit(10).toPandas()
Modellmetrikák megjelenítése:
from synapse.ml.train import ComputeModelStatistics
metrics = ComputeModelStatistics(
evaluationMetric="classification", labelCol=TARGET_COL, scoredLabelsCol="prediction"
).transform(predictions)
display(metrics)
Keveredési mátrix létrehozása:
# collect confusion matrix value
cm = metrics.select("confusion_matrix").collect()[0][0].toArray()
print(cm)
Ábrázolja a keveredési mátrixot:
# plot confusion matrix
import seaborn as sns
sns.set(rc={"figure.figsize": (6, 4.5)})
ax = sns.heatmap(cm, annot=True, fmt=".20g")
ax.set_title("Confusion Matrix")
ax.set_xlabel("Predicted label")
ax.set_ylabel("True label")
Definiáljon egy függvényt a modell kiértékeléséhez:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
def evaluate(predictions):
"""
Evaluate the model by computing AUROC and AUPRC with the predictions.
"""
# initialize the binary evaluator
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="prediction", labelCol=TARGET_COL
)
_evaluator = lambda metric: evaluator.setMetricName(metric).evaluate(predictions)
# calculate AUROC, baseline 0.5
auroc = _evaluator("areaUnderROC")
print(f"AUROC: {auroc:.4f}")
# calculate AUPRC, baseline positive rate (0.172% in the demo data)
auprc = _evaluator("areaUnderPR")
print(f"AUPRC: {auprc:.4f}")
return auroc, auprc
Értékelje ki az eredeti modellt:
# evaluate the original model
auroc, auprc = evaluate(predictions)
Értékelje ki az SMOTE-modellt:
# evaluate the SMOTE model
new_predictions = smote_model.transform(test_data)
new_auroc, new_auprc = evaluate(new_predictions)
if new_auprc > auprc:
# Using model trained on SMOTE data if it has higher AUPRC
model = smote_model
auprc = new_auprc
auroc = new_auroc
A modell naplózása és betöltése az MLflow használatával
Most, hogy van egy tisztességes munkamodellünk, menthetjük későbbi használatra. Itt az MLflow használatával naplózza a metrikákat és modelleket, és betölti a modelleket előrejelzés céljából.
MLflow beállítása:
# setup mlflow
import mlflow
mlflow.set_experiment(EXPERIMENT_NAME)
Naplómodell, metrikák és paraméterek:
# log model, metrics and params
with mlflow.start_run() as run:
print("log model:")
mlflow.spark.log_model(
model,
f"{EXPERIMENT_NAME}-lightgbm",
registered_model_name=f"{EXPERIMENT_NAME}-lightgbm",
dfs_tmpdir="Files/spark",
)
print("log metrics:")
mlflow.log_metrics({"AUPRC": auprc, "AUROC": auroc})
print("log parameters:")
mlflow.log_params({"DATA_FILE": DATA_FILE})
model_uri = f"runs:/{run.info.run_id}/{EXPERIMENT_NAME}-lightgbm"
print("Model saved in run %s" % run.info.run_id)
print(f"Model URI: {model_uri}")
Töltse be újra a modellt:
# load model back
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir="Files/spark")
4. lépés: Előrejelzési eredmények mentése
Ebben a szakaszban üzembe helyezzük a modellt, és mentjük az előrejelzési eredményeket.
Modell üzembe helyezése és előrejelzése
batch_predictions = loaded_model.transform(test_data)
Előrejelzések mentése a Lakehouse-ba:
# code for saving predictions into lakehouse
batch_predictions.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/batch_predictions"
)
print(f"Full run cost {int(time.time() - ts)} seconds.")