Vytvoření, vyhodnocení a nasazení modelu detekce podvodů v Microsoft Fabric
V tomto kurzu předvedeme datové inženýrství a pracovní postupy datových věd pomocí kompletního příkladu, který sestaví model pro detekci podvodných transakcí platebními kartami. Kroky, které provedete, jsou následující:
- Nahrání dat do Lakehouse
- Provádění průzkumné analýzy dat na datech
- Příprava dat zpracováním nerovnováhy tříd
- Trénování modelu a jeho protokolování pomocí MLflow
- Nasazení modelu a uložení výsledků predikce
Důležité
Microsoft Fabric je v současné době ve verzi PREVIEW. Tyto informace se týkají předběžného vydání produktu, který může být před vydáním podstatně změněn. Společnost Microsoft neposkytuje na zde uvedené informace žádné záruky, ať už vyjádřené nebo předpokládané.
Požadavky
Předplatné Power BI Premium. Pokud ho nemáte, podívejte se na článek Jak koupit Power BI Premium.
Pracovní prostor Power BI s přiřazenou kapacitou Premium Pokud pracovní prostor nemáte, vytvořte ho podle kroků v tématu Vytvoření pracovního prostoru a přiřaďte ho ke kapacitě Premium.
Přihlaste se k Microsoft Fabric.
- Přejděte na Datová Věda prostředí v Microsoft Fabric.
- Otevřete ukázkový poznámkový blok nebo vytvořte nový poznámkový blok.
- Pokud chcete zkopírovat nebo vložit kód do buněk, vytvořte nový poznámkový blok .
- Nebo vyberte Použít ukázkovou>detekci podvodů a otevřete ukázkový poznámkový blok.
- Přidejte do poznámkového bloku Lakehouse.
Krok 1: Načtení dat
Datová sada obsahuje transakce kreditních karet provedené evropskými držiteli karet v září 2013 během dvou dnů. Z 284 807 transakcí je 492 podvodných. Pozitivní třída (podvod) představuje pouhých 0,172 % dat, což způsobuje, že datová sada je vysoce nevyvážená.
Vstupní proměnné a proměnné odpovědí
Datová sada obsahuje pouze číselné vstupní proměnné, které jsou výsledkem transformace analýzy hlavních komponent (PCA) na původních funkcích. Kvůli ochraně důvěrnosti nemůžeme poskytnout původní funkce ani další základní informace o datech. Jediné funkce, které nebyly transformovány pomocí PCA, jsou "Time" a "Amount".
- Funkce "V1, V2, ... V28" jsou hlavní komponenty získané pomocí PCA.
- "Time" (Čas) obsahuje sekundy, které uplynuly mezi jednotlivými transakcemi a první transakcí v datové sadě.
- "Amount" je částka transakce. Tuto funkci je možné použít například pro učení závislé na nákladech.
- "Třída" je proměnná odpovědi a přebírá hodnotu
1
pro podvod a0
další.
Vzhledem k nevyváženému poměru tříd doporučujeme změřovat přesnost pomocí oblasti pod křivkou Precision-Recall (AUPRC). Použití konfuzní matice k vyhodnocení přesnosti není pro nevyváženou klasifikaci smysluplné.
Následující fragment kódu ukazuje část datcreditcard.csv .
"Čas" | "V1" | "V2" | "V3" | "V4" | "V5" | "V6" | "V7" | "V8" | "V9" | "V10" | "V11" | "V12" | "V13" | "V14" | "V15" | "V16" | "V17" | "V18" | "V19" | "V20" | "V21" | "V22" | "V23" | "V24" | "V25" | "V26" | "V27" | "V28" | "Částka" | "Třída" |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | -1.3598071336738 | -0.0727811733098497 | 2.53634673796914 | 1.37815522427443 | -0.338320769942518 | 0.462387777762292 | 0.239598554061257 | 0.0986979012610507 | 0.363786969611213 | 0.0907941719789316 | -0.551599533260813 | -0.617800855762348 | -0.991389847235408 | -0.311169353699879 | 1.46817697209427 | -0.470400525259478 | 0.207971241929242 | 0.0257905801985591 | 0.403992960255733 | 0.251412098239705 | -0.018306777944153 | 0.277837575558899 | -0.110473910188767 | 0.0669280749146731 | 0.128539358273528 | -0.189114843888824 | 0.133558376740387 | -0.0210530534538215 | 149.62 | "0" |
0 | 1.19185711131486 | 0.26615071205963 | 0.16648011335321 | 0.448154078460911 | 0.0600176492822243 | -0.0823608088155687 | -0.0788029833323113 | 0.0851016549148104 | -0.255425128109186 | -0.166974414004614 | 1.61272666105479 | 1.06523531137287 | 0.48909501589608 | -0.143772296441519 | 0.635558093258208 | 0.463917041022171 | -0.114804663102346 | -0.183361270123994 | -0.145783041325259 | -0.0690831352230203 | -0.225775248033138 | -0.638671952771851 | 0.101288021253234 | -0.339846475529127 | 0.167170404418143 | 0.125894532368176 | -0.00898309914322813 | 0.0147241691924927 | 2.69 | "0" |
Instalace knihoven
Pro účely tohoto kurzu musíme knihovnu imblearn
nainstalovat. Jádro PySpark se po spuštění %pip install
restartuje a proto musíme knihovnu nainstalovat před spuštěním dalších buněk.
# install imblearn for SMOTE
%pip install imblearn
Definováním následujících parametrů můžeme poznámkový blok snadno použít u různých datových sad.
IS_CUSTOM_DATA = False # if True, dataset has to be uploaded manually
TARGET_COL = "Class" # target column name
IS_SAMPLE = False # if True, use only <SAMPLE_ROWS> rows of data for training, otherwise use all data
SAMPLE_ROWS = 5000 # if IS_SAMPLE is True, use only this number of rows for training
DATA_FOLDER = "Files/fraud-detection/" # folder with data files
DATA_FILE = "creditcard.csv" # data file name
EXPERIMENT_NAME = "aisample-fraud" # mlflow experiment name
Stažení datové sady a nahrání do Lakehouse
Před spuštěním poznámkového bloku do něj musíte přidat Lakehouse. Lakehouse slouží k ukládání dat pro tento příklad. Pokud chcete přidat Lakehouse, přečtěte si téma Přidání Lakehouse do poznámkového bloku.
if not IS_CUSTOM_DATA:
# Download demo data files into lakehouse if not exist
import os, requests
remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/Credit_Card_Fraud_Detection"
fname = "creditcard.csv"
download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError(
"Default lakehouse not found, please add a lakehouse and restart the session."
)
os.makedirs(download_path, exist_ok=True)
if not os.path.exists(f"{download_path}/{fname}"):
r = requests.get(f"{remote_url}/{fname}", timeout=30)
with open(f"{download_path}/{fname}", "wb") as f:
f.write(r.content)
print("Downloaded demo data files into lakehouse.")
# to record the notebook running time
import time
ts = time.time()
Čtení dat z Lakehouse
df = (
spark.read.format("csv")
.option("header", "true")
.option("inferSchema", True)
.load(f"{DATA_FOLDER}/raw/{DATA_FILE}")
.cache()
)
Krok 2. Provádění průzkumných analýz dat
V této části prozkoumáme data, zkontrolujeme jejich schéma, změníme pořadí sloupců a přetypujeme sloupce do správných datových typů.
Zobrazení nezpracovaných dat
Můžeme použít display
k prozkoumání nezpracovaných dat, výpočtu některých základních statistik nebo dokonce k zobrazení zobrazení grafu.
display(df)
Vytiskněte některé informace o datech, například schéma.
# print dataset basic info
print("records read: " + str(df.count()))
print("Schema: ")
df.printSchema()
Přetypování sloupců do správných typů
import pyspark.sql.functions as F
df_columns = df.columns
df_columns.remove(TARGET_COL)
# to make sure the TARGET_COL is the last column
df = df.select(df_columns + [TARGET_COL]).withColumn(
TARGET_COL, F.col(TARGET_COL).cast("int")
)
if IS_SAMPLE:
df = df.limit(SAMPLE_ROWS)
Krok 3. Vývoj a nasazení modelu
V této části vytrénujeme model LightGBM pro klasifikaci podvodných transakcí.
Příprava trénovacích a testovacích dat
Začněte rozdělením dat do trénovacích a testovacích sad.
# Split the dataset into train and test
train, test = df.randomSplit([0.85, 0.15], seed=42)
# Merge Columns
from pyspark.ml.feature import VectorAssembler
feature_cols = df.columns[:-1]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = featurizer.transform(train)[TARGET_COL, "features"]
test_data = featurizer.transform(test)[TARGET_COL, "features"]
Zkontrolujte objem dat a nerovnováhu v trénovací sadě.
display(train_data.groupBy(TARGET_COL).count())
Zpracování nevyvážených dat
Jak se často stává s reálnými daty, mají tato data problém s nerovnováhou tříd, protože pozitivní třída (podvodné transakce) představuje pouze 0,172 % všech transakcí. Použijeme metodu SMOTE (Synthetic Minority Over-sampling Technique) k automatickému zpracování nerovnováhy tříd v datech. Metoda SMOTE převzorkuje minoritní třídu a podvzorkuje třídu majority pro zlepšení výkonu klasifikátoru.
Pojďme použít SMOTE na trénovací data:
Poznámka
imblearn
Funguje pouze pro datové rámce pandas, nikoli pro datové rámce PySpark.
from pyspark.ml.functions import vector_to_array, array_to_vector
import numpy as np
from collections import Counter
from imblearn.over_sampling import SMOTE
train_data_array = train_data.withColumn("features", vector_to_array("features"))
train_data_pd = train_data_array.toPandas()
X = train_data_pd["features"].to_numpy()
y = train_data_pd[TARGET_COL].to_numpy()
print("Original dataset shape %s" % Counter(y))
X = np.array([np.array(x) for x in X])
sm = SMOTE(random_state=42)
X_res, y_res = sm.fit_resample(X, y)
print("Resampled dataset shape %s" % Counter(y_res))
new_train_data = tuple(zip(X_res.tolist(), y_res.tolist()))
dataColumns = ["features", TARGET_COL]
new_train_data = spark.createDataFrame(data=new_train_data, schema=dataColumns)
new_train_data = new_train_data.withColumn("features", array_to_vector("features"))
Definování modelu
Když máme data na místě, můžeme model definovat. Použijeme klasifikátor LightGBM a použijeme SynapseML k implementaci modelu s několika řádky kódu.
from synapse.ml.lightgbm import LightGBMClassifier
model = LightGBMClassifier(
objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=True
)
smote_model = LightGBMClassifier(
objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=False
)
Trénování modelu
model = model.fit(train_data)
smote_model = smote_model.fit(new_train_data)
Vysvětlení modelu
Tady můžeme ukázat důležitost, kterou model přiřazuje jednotlivým funkcím v trénovacích datech.
import pandas as pd
import matplotlib.pyplot as plt
feature_importances = model.getFeatureImportances()
fi = pd.Series(feature_importances, index=feature_cols)
fi = fi.sort_values(ascending=True)
f_index = fi.index
f_values = fi.values
# print feature importances
print("f_index:", f_index)
print("f_values:", f_values)
# plot
x_index = list(range(len(fi)))
x_index = [x / len(fi) for x in x_index]
plt.rcParams["figure.figsize"] = (20, 20)
plt.barh(
x_index, f_values, height=0.028, align="center", color="tan", tick_label=f_index
)
plt.xlabel("importances")
plt.ylabel("features")
plt.show()
Vyhodnocení modelu
Generování predikcí modelu:
predictions = model.transform(test_data)
predictions.limit(10).toPandas()
Zobrazení metrik modelu:
from synapse.ml.train import ComputeModelStatistics
metrics = ComputeModelStatistics(
evaluationMetric="classification", labelCol=TARGET_COL, scoredLabelsCol="prediction"
).transform(predictions)
display(metrics)
Vytvořte konfuzní matici:
# collect confusion matrix value
cm = metrics.select("confusion_matrix").collect()[0][0].toArray()
print(cm)
Vykreslí konfuzní matici:
# plot confusion matrix
import seaborn as sns
sns.set(rc={"figure.figsize": (6, 4.5)})
ax = sns.heatmap(cm, annot=True, fmt=".20g")
ax.set_title("Confusion Matrix")
ax.set_xlabel("Predicted label")
ax.set_ylabel("True label")
Definujte funkci pro vyhodnocení modelu:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
def evaluate(predictions):
"""
Evaluate the model by computing AUROC and AUPRC with the predictions.
"""
# initialize the binary evaluator
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="prediction", labelCol=TARGET_COL
)
_evaluator = lambda metric: evaluator.setMetricName(metric).evaluate(predictions)
# calculate AUROC, baseline 0.5
auroc = _evaluator("areaUnderROC")
print(f"AUROC: {auroc:.4f}")
# calculate AUPRC, baseline positive rate (0.172% in the demo data)
auprc = _evaluator("areaUnderPR")
print(f"AUPRC: {auprc:.4f}")
return auroc, auprc
Vyhodnocení původního modelu:
# evaluate the original model
auroc, auprc = evaluate(predictions)
Vyhodnocení modelu SMOTE:
# evaluate the SMOTE model
new_predictions = smote_model.transform(test_data)
new_auroc, new_auprc = evaluate(new_predictions)
if new_auprc > auprc:
# Using model trained on SMOTE data if it has higher AUPRC
model = smote_model
auprc = new_auprc
auroc = new_auroc
Protokolování a načtení modelu pomocí MLflow
Teď, když máme slušný funkční model, můžeme ho uložit pro pozdější použití. Tady používáme MLflow k protokolování metrik a modelů a načítání modelů zpět pro predikci.
Nastavení MLflow:
# setup mlflow
import mlflow
mlflow.set_experiment(EXPERIMENT_NAME)
Model protokolu, metriky a parametry:
# log model, metrics and params
with mlflow.start_run() as run:
print("log model:")
mlflow.spark.log_model(
model,
f"{EXPERIMENT_NAME}-lightgbm",
registered_model_name=f"{EXPERIMENT_NAME}-lightgbm",
dfs_tmpdir="Files/spark",
)
print("log metrics:")
mlflow.log_metrics({"AUPRC": auprc, "AUROC": auroc})
print("log parameters:")
mlflow.log_params({"DATA_FILE": DATA_FILE})
model_uri = f"runs:/{run.info.run_id}/{EXPERIMENT_NAME}-lightgbm"
print("Model saved in run %s" % run.info.run_id)
print(f"Model URI: {model_uri}")
Znovu načtěte model:
# load model back
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir="Files/spark")
Krok 4: Uložení výsledků predikce
V této části nasadíme model a uložíme výsledky předpovědi.
Nasazení a predikce modelu
batch_predictions = loaded_model.transform(test_data)
Ukládání předpovědí do Lakehouse:
# code for saving predictions into lakehouse
batch_predictions.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/batch_predictions"
)
print(f"Full run cost {int(time.time() - ts)} seconds.")