Создание, оценка и развертывание модели обнаружения мошенничества в Microsoft Fabric
В этом руководстве мы продемонстрируем рабочие процессы инжиниринга данных и обработки и анализа данных с комплексным примером, который создает модель для обнаружения мошеннических кредитных карта транзакций. Ниже приведены шаги, которые вы предстоит выполнить.
- Отправка данных в Lakehouse
- Выполнение разведочного анализа данных с данными
- Подготовка данных путем обработки дисбаланса классов
- Обучение модели и ее запись в журнал с помощью MLflow
- Развертывание модели и сохранение результатов прогнозирования
Важно!
Microsoft Fabric в настоящее время находится на этапе предварительной версии. Эта информация относится к предварительной версии продукта, который может быть существенно изменен перед выпуском. Корпорация Майкрософт не дает никаких гарантий, явных или подразумеваемых, в отношении информации, представленной здесь.
Предварительные требования
Подписка на Power BI Premium. Если у вас ее нет, см. статью Как приобрести Power BI Premium.
Рабочая область Power BI с назначенной емкостью Premium. Если у вас нет рабочей области, выполните действия, описанные в разделе Создание рабочей области , чтобы создать ее и назначить ее емкости Premium.
Войдите в Microsoft Fabric.
- Перейдите к интерфейсу Обработка и анализ данных в Microsoft Fabric.
- Откройте пример записной книжки или создайте новую записную книжку.
- Создайте записную книжку , если вы хотите скопировать или вставить код в ячейки.
- Или выберите Использовать пример>обнаружения мошенничества , чтобы открыть пример записной книжки.
- Добавьте Lakehouse в записную книжку.
Шаг 1. Загрузка данных
Набор данных содержит кредитные карта транзакций, выполненных европейскими держателями карт в сентябре 2013 года в течение двух дней. Из 284 807 транзакций 492 являются мошенническими. Положительный класс (мошенничество) составляет всего 0,172 % данных, что делает набор данных очень несбалансированным.
Переменные ввода и ответа
Набор данных содержит только числовые входные переменные, которые являются результатом преобразования анализа основных компонентов (PCA) для исходных признаков. Для защиты конфиденциальности мы не можем предоставить исходные функции или дополнительные справочные сведения о данных. Единственными функциями, которые не были преобразованы с помощью PCA, являются "Время" и "Сумма".
- Функции "V1, V2, ... V28" — это основные компоненты, полученные с помощью PCA.
- "Время" содержит секунды, прошедшие между каждой транзакцией и первой транзакцией в наборе данных.
- "Amount" — это сумма транзакции. Эту функцию можно использовать, например, для обучения с учетом затрат.
- "Класс" — это переменная ответа, которая принимает значение
1
для мошенничества и0
в противном случае.
Учитывая коэффициент несбалансированности классов, рекомендуется измерять точность с помощью области под кривой Precision-Recall (AUPRC). Использование матрицы неточностей для оценки точности не имеет смысла для несбалансированной классификации.
В следующем фрагменте кода показана часть данныхcreditcard.csv .
"Время" | "V1" | "V2" | "V3" | "V4" | "V5" | "V6" | "V7" | "V8" | "V9" | "V10" | "V11" | "V12" | "V13" | "V14" | "V15" | "V16" | "V17" | "V18" | "V19" | "V20" | "V21" | "V22" | "V23" | "V24" | "V25" | "V26" | "V27" | "V28" | "Сумма" | "Класс" |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | -1.3598071336738 | -0.0727811733098497 | 2.53634673796914 | 1.37815522427443 | -0.338320769942518 | 0.462387777762292 | 0.239598554061257 | 0.0986979012610507 | 0.363786969611213 | 0.0907941719789316 | -0.551599533260813 | -0.617800855762348 | -0.991389847235408 | -0.311169353699879 | 1.46817697209427 | -0.470400525259478 | 0.207971241929242 | 0.0257905801985591 | 0.403992960255733 | 0.251412098239705 | -0.018306777944153 | 0.277837575558899 | -0.110473910188767 | 0.0669280749146731 | 0.128539358273528 | -0.189114843888824 | 0.133558376740387 | -0.0210530534538215 | 149.62 | "0" |
0 | 1.19185711131486 | 0.26615071205963 | 0.16648011335321 | 0.448154078460911 | 0.0600176492822243 | -0.0823608088155687 | -0.0788029833323113 | 0.0851016549148104 | -0.255425128109186 | -0.166974414004614 | 1.61272666105479 | 1.06523531137287 | 0.48909501589608 | -0.143772296441519 | 0.635558093258208 | 0.463917041022171 | -0.114804663102346 | -0.183361270123994 | -0.145783041325259 | -0.0690831352230203 | -0.225775248033138 | -0.638671952771851 | 0.101288021253234 | -0.339846475529127 | 0.167170404418143 | 0.125894532368176 | -0.00898309914322813 | 0.0147241691924927 | 2.69 | "0" |
Установка библиотек
Для работы с этим руководством необходимо установить библиотеку imblearn
. Ядро PySpark будет перезапущено после выполнения %pip install
, поэтому перед запуском других ячеек необходимо установить библиотеку.
# install imblearn for SMOTE
%pip install imblearn
Определив следующие параметры, можно легко применить записную книжку к разным наборам данных.
IS_CUSTOM_DATA = False # if True, dataset has to be uploaded manually
TARGET_COL = "Class" # target column name
IS_SAMPLE = False # if True, use only <SAMPLE_ROWS> rows of data for training, otherwise use all data
SAMPLE_ROWS = 5000 # if IS_SAMPLE is True, use only this number of rows for training
DATA_FOLDER = "Files/fraud-detection/" # folder with data files
DATA_FILE = "creditcard.csv" # data file name
EXPERIMENT_NAME = "aisample-fraud" # mlflow experiment name
Скачивание набора данных и отправка в Lakehouse
Перед запуском записной книжки необходимо добавить в нее Lakehouse. Lakehouse используется для хранения данных в этом примере. Сведения о добавлении Lakehouse см. в статье Добавление Lakehouse в записную книжку.
if not IS_CUSTOM_DATA:
# Download demo data files into lakehouse if not exist
import os, requests
remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/Credit_Card_Fraud_Detection"
fname = "creditcard.csv"
download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError(
"Default lakehouse not found, please add a lakehouse and restart the session."
)
os.makedirs(download_path, exist_ok=True)
if not os.path.exists(f"{download_path}/{fname}"):
r = requests.get(f"{remote_url}/{fname}", timeout=30)
with open(f"{download_path}/{fname}", "wb") as f:
f.write(r.content)
print("Downloaded demo data files into lakehouse.")
# to record the notebook running time
import time
ts = time.time()
Чтение данных из Lakehouse
df = (
spark.read.format("csv")
.option("header", "true")
.option("inferSchema", True)
.load(f"{DATA_FOLDER}/raw/{DATA_FILE}")
.cache()
)
Шаг 2. Выполнение разведочного анализа данных
В этом разделе мы рассмотрим данные, проверка схему, переупорядочение столбцов и приведем столбцы к правильным типам данных.
Отображение необработанных данных
Мы можем использовать для display
просмотра необработанных данных, вычисления некоторых базовых статистических данных или даже отображения представлений диаграмм.
display(df)
Выведите некоторые сведения о данных, например схему.
# print dataset basic info
print("records read: " + str(df.count()))
print("Schema: ")
df.printSchema()
Приведение столбцов к правильным типам
import pyspark.sql.functions as F
df_columns = df.columns
df_columns.remove(TARGET_COL)
# to make sure the TARGET_COL is the last column
df = df.select(df_columns + [TARGET_COL]).withColumn(
TARGET_COL, F.col(TARGET_COL).cast("int")
)
if IS_SAMPLE:
df = df.limit(SAMPLE_ROWS)
Шаг 3. Разработка и развертывание модели
В этом разделе мы обучим модель LightGBM для классификации мошеннических транзакций.
Подготовка данных для обучения и тестирования
Начните с разделения данных на наборы для обучения и тестирования.
# Split the dataset into train and test
train, test = df.randomSplit([0.85, 0.15], seed=42)
# Merge Columns
from pyspark.ml.feature import VectorAssembler
feature_cols = df.columns[:-1]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = featurizer.transform(train)[TARGET_COL, "features"]
test_data = featurizer.transform(test)[TARGET_COL, "features"]
Проверьте объем данных и дисбаланс в обучаемом наборе.
display(train_data.groupBy(TARGET_COL).count())
Обработка несбалансированных данных
Как это часто бывает с реальными данными, эти данные имеют проблему дисбаланса классов, так как на положительный класс (мошеннические транзакции) приходится только 0,172 % всех транзакций. Мы применим SMOTE (метод избыточной выборки синтетического меньшинства) для автоматической обработки дисбаланса классов в данных. Метод SMOTE переимпливает класс меньшинства и недооценивает класс большинства для повышения производительности классификатора.
Давайте применим SMOTE к данным для обучения:
Примечание
imblearn
Работает только для кадров данных Pandas, но не для Кадров данных PySpark.
from pyspark.ml.functions import vector_to_array, array_to_vector
import numpy as np
from collections import Counter
from imblearn.over_sampling import SMOTE
train_data_array = train_data.withColumn("features", vector_to_array("features"))
train_data_pd = train_data_array.toPandas()
X = train_data_pd["features"].to_numpy()
y = train_data_pd[TARGET_COL].to_numpy()
print("Original dataset shape %s" % Counter(y))
X = np.array([np.array(x) for x in X])
sm = SMOTE(random_state=42)
X_res, y_res = sm.fit_resample(X, y)
print("Resampled dataset shape %s" % Counter(y_res))
new_train_data = tuple(zip(X_res.tolist(), y_res.tolist()))
dataColumns = ["features", TARGET_COL]
new_train_data = spark.createDataFrame(data=new_train_data, schema=dataColumns)
new_train_data = new_train_data.withColumn("features", array_to_vector("features"))
Определение модели
Теперь мы можем определить модель с помощью наших данных. Мы будем использовать классификатор LightGBM и SynapseML для реализации модели с несколькими строками кода.
from synapse.ml.lightgbm import LightGBMClassifier
model = LightGBMClassifier(
objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=True
)
smote_model = LightGBMClassifier(
objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=False
)
Обучение модели
model = model.fit(train_data)
smote_model = smote_model.fit(new_train_data)
Объяснение модели
Здесь можно показать важность, которую модель присваивает каждому признаку в обучающих данных.
import pandas as pd
import matplotlib.pyplot as plt
feature_importances = model.getFeatureImportances()
fi = pd.Series(feature_importances, index=feature_cols)
fi = fi.sort_values(ascending=True)
f_index = fi.index
f_values = fi.values
# print feature importances
print("f_index:", f_index)
print("f_values:", f_values)
# plot
x_index = list(range(len(fi)))
x_index = [x / len(fi) for x in x_index]
plt.rcParams["figure.figsize"] = (20, 20)
plt.barh(
x_index, f_values, height=0.028, align="center", color="tan", tick_label=f_index
)
plt.xlabel("importances")
plt.ylabel("features")
plt.show()
Оценка модели
Создание прогнозов модели:
predictions = model.transform(test_data)
predictions.limit(10).toPandas()
Отображение метрик модели:
from synapse.ml.train import ComputeModelStatistics
metrics = ComputeModelStatistics(
evaluationMetric="classification", labelCol=TARGET_COL, scoredLabelsCol="prediction"
).transform(predictions)
display(metrics)
Создайте матрицу неточностей:
# collect confusion matrix value
cm = metrics.select("confusion_matrix").collect()[0][0].toArray()
print(cm)
Построим матрицу неточностей:
# plot confusion matrix
import seaborn as sns
sns.set(rc={"figure.figsize": (6, 4.5)})
ax = sns.heatmap(cm, annot=True, fmt=".20g")
ax.set_title("Confusion Matrix")
ax.set_xlabel("Predicted label")
ax.set_ylabel("True label")
Определите функцию для оценки модели:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
def evaluate(predictions):
"""
Evaluate the model by computing AUROC and AUPRC with the predictions.
"""
# initialize the binary evaluator
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="prediction", labelCol=TARGET_COL
)
_evaluator = lambda metric: evaluator.setMetricName(metric).evaluate(predictions)
# calculate AUROC, baseline 0.5
auroc = _evaluator("areaUnderROC")
print(f"AUROC: {auroc:.4f}")
# calculate AUPRC, baseline positive rate (0.172% in the demo data)
auprc = _evaluator("areaUnderPR")
print(f"AUPRC: {auprc:.4f}")
return auroc, auprc
Оцените исходную модель:
# evaluate the original model
auroc, auprc = evaluate(predictions)
Оцените модель SMOTE:
# evaluate the SMOTE model
new_predictions = smote_model.transform(test_data)
new_auroc, new_auprc = evaluate(new_predictions)
if new_auprc > auprc:
# Using model trained on SMOTE data if it has higher AUPRC
model = smote_model
auprc = new_auprc
auroc = new_auroc
Регистрация и загрузка модели с помощью MLflow
Теперь, когда у нас есть достойная рабочая модель, мы можем сохранить ее для последующего использования. Здесь мы используем MLflow для ведения журнала метрик и моделей и обратной загрузки моделей для прогнозирования.
Настройка MLflow:
# setup mlflow
import mlflow
mlflow.set_experiment(EXPERIMENT_NAME)
Модель журнала, метрики и параметры:
# log model, metrics and params
with mlflow.start_run() as run:
print("log model:")
mlflow.spark.log_model(
model,
f"{EXPERIMENT_NAME}-lightgbm",
registered_model_name=f"{EXPERIMENT_NAME}-lightgbm",
dfs_tmpdir="Files/spark",
)
print("log metrics:")
mlflow.log_metrics({"AUPRC": auprc, "AUROC": auroc})
print("log parameters:")
mlflow.log_params({"DATA_FILE": DATA_FILE})
model_uri = f"runs:/{run.info.run_id}/{EXPERIMENT_NAME}-lightgbm"
print("Model saved in run %s" % run.info.run_id)
print(f"Model URI: {model_uri}")
Перезагрузите модель:
# load model back
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir="Files/spark")
Шаг 4. Сохранение результатов прогнозирования
В этом разделе мы развернем модель и сохраним результаты прогнозирования.
Развертывание и прогнозирование модели
batch_predictions = loaded_model.transform(test_data)
Сохраните прогнозы в Lakehouse:
# code for saving predictions into lakehouse
batch_predictions.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/batch_predictions"
)
print(f"Full run cost {int(time.time() - ts)} seconds.")