Создание, оценка и развертывание модели обнаружения мошенничества в Microsoft Fabric

В этом руководстве мы продемонстрируем рабочие процессы инжиниринга данных и обработки и анализа данных с комплексным примером, который создает модель для обнаружения мошеннических кредитных карта транзакций. Ниже приведены шаги, которые вы предстоит выполнить.

  • Отправка данных в Lakehouse
  • Выполнение разведочного анализа данных с данными
  • Подготовка данных путем обработки дисбаланса классов
  • Обучение модели и ее запись в журнал с помощью MLflow
  • Развертывание модели и сохранение результатов прогнозирования

Важно!

Microsoft Fabric в настоящее время находится на этапе предварительной версии. Эта информация относится к предварительной версии продукта, который может быть существенно изменен перед выпуском. Корпорация Майкрософт не дает никаких гарантий, явных или подразумеваемых, в отношении информации, представленной здесь.

Предварительные требования

  • Перейдите к интерфейсу Обработка и анализ данных в Microsoft Fabric.
  • Откройте пример записной книжки или создайте новую записную книжку.
    • Создайте записную книжку , если вы хотите скопировать или вставить код в ячейки.
    • Или выберите Использовать пример>обнаружения мошенничества , чтобы открыть пример записной книжки.
  • Добавьте Lakehouse в записную книжку.

Шаг 1. Загрузка данных

Набор данных содержит кредитные карта транзакций, выполненных европейскими держателями карт в сентябре 2013 года в течение двух дней. Из 284 807 транзакций 492 являются мошенническими. Положительный класс (мошенничество) составляет всего 0,172 % данных, что делает набор данных очень несбалансированным.

Переменные ввода и ответа

Набор данных содержит только числовые входные переменные, которые являются результатом преобразования анализа основных компонентов (PCA) для исходных признаков. Для защиты конфиденциальности мы не можем предоставить исходные функции или дополнительные справочные сведения о данных. Единственными функциями, которые не были преобразованы с помощью PCA, являются "Время" и "Сумма".

  • Функции "V1, V2, ... V28" — это основные компоненты, полученные с помощью PCA.
  • "Время" содержит секунды, прошедшие между каждой транзакцией и первой транзакцией в наборе данных.
  • "Amount" — это сумма транзакции. Эту функцию можно использовать, например, для обучения с учетом затрат.
  • "Класс" — это переменная ответа, которая принимает значение 1 для мошенничества и 0 в противном случае.

Учитывая коэффициент несбалансированности классов, рекомендуется измерять точность с помощью области под кривой Precision-Recall (AUPRC). Использование матрицы неточностей для оценки точности не имеет смысла для несбалансированной классификации.

В следующем фрагменте кода показана часть данныхcreditcard.csv .

"Время" "V1" "V2" "V3" "V4" "V5" "V6" "V7" "V8" "V9" "V10" "V11" "V12" "V13" "V14" "V15" "V16" "V17" "V18" "V19" "V20" "V21" "V22" "V23" "V24" "V25" "V26" "V27" "V28" "Сумма" "Класс"
0 -1.3598071336738 -0.0727811733098497 2.53634673796914 1.37815522427443 -0.338320769942518 0.462387777762292 0.239598554061257 0.0986979012610507 0.363786969611213 0.0907941719789316 -0.551599533260813 -0.617800855762348 -0.991389847235408 -0.311169353699879 1.46817697209427 -0.470400525259478 0.207971241929242 0.0257905801985591 0.403992960255733 0.251412098239705 -0.018306777944153 0.277837575558899 -0.110473910188767 0.0669280749146731 0.128539358273528 -0.189114843888824 0.133558376740387 -0.0210530534538215 149.62 "0"
0 1.19185711131486 0.26615071205963 0.16648011335321 0.448154078460911 0.0600176492822243 -0.0823608088155687 -0.0788029833323113 0.0851016549148104 -0.255425128109186 -0.166974414004614 1.61272666105479 1.06523531137287 0.48909501589608 -0.143772296441519 0.635558093258208 0.463917041022171 -0.114804663102346 -0.183361270123994 -0.145783041325259 -0.0690831352230203 -0.225775248033138 -0.638671952771851 0.101288021253234 -0.339846475529127 0.167170404418143 0.125894532368176 -0.00898309914322813 0.0147241691924927 2.69 "0"

Установка библиотек

Для работы с этим руководством необходимо установить библиотеку imblearn . Ядро PySpark будет перезапущено после выполнения %pip install, поэтому перед запуском других ячеек необходимо установить библиотеку.

# install imblearn for SMOTE
%pip install imblearn

Определив следующие параметры, можно легко применить записную книжку к разным наборам данных.

IS_CUSTOM_DATA = False  # if True, dataset has to be uploaded manually

TARGET_COL = "Class"  # target column name
IS_SAMPLE = False  # if True, use only <SAMPLE_ROWS> rows of data for training, otherwise use all data
SAMPLE_ROWS = 5000  # if IS_SAMPLE is True, use only this number of rows for training

DATA_FOLDER = "Files/fraud-detection/"  # folder with data files
DATA_FILE = "creditcard.csv"  # data file name

EXPERIMENT_NAME = "aisample-fraud"  # mlflow experiment name

Скачивание набора данных и отправка в Lakehouse

Перед запуском записной книжки необходимо добавить в нее Lakehouse. Lakehouse используется для хранения данных в этом примере. Сведения о добавлении Lakehouse см. в статье Добавление Lakehouse в записную книжку.

if not IS_CUSTOM_DATA:
    # Download demo data files into lakehouse if not exist
    import os, requests

    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/Credit_Card_Fraud_Detection"
    fname = "creditcard.csv"
    download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError(
            "Default lakehouse not found, please add a lakehouse and restart the session."
        )
    os.makedirs(download_path, exist_ok=True)
    if not os.path.exists(f"{download_path}/{fname}"):
        r = requests.get(f"{remote_url}/{fname}", timeout=30)
        with open(f"{download_path}/{fname}", "wb") as f:
            f.write(r.content)
    print("Downloaded demo data files into lakehouse.")
# to record the notebook running time
import time

ts = time.time()

Чтение данных из Lakehouse

df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", True)
    .load(f"{DATA_FOLDER}/raw/{DATA_FILE}")
    .cache()
)

Шаг 2. Выполнение разведочного анализа данных

В этом разделе мы рассмотрим данные, проверка схему, переупорядочение столбцов и приведем столбцы к правильным типам данных.

Отображение необработанных данных

Мы можем использовать для display просмотра необработанных данных, вычисления некоторых базовых статистических данных или даже отображения представлений диаграмм.

display(df)

Выведите некоторые сведения о данных, например схему.

# print dataset basic info
print("records read: " + str(df.count()))
print("Schema: ")
df.printSchema()

Приведение столбцов к правильным типам

import pyspark.sql.functions as F

df_columns = df.columns
df_columns.remove(TARGET_COL)

# to make sure the TARGET_COL is the last column
df = df.select(df_columns + [TARGET_COL]).withColumn(
    TARGET_COL, F.col(TARGET_COL).cast("int")
)

if IS_SAMPLE:
    df = df.limit(SAMPLE_ROWS)

Шаг 3. Разработка и развертывание модели

В этом разделе мы обучим модель LightGBM для классификации мошеннических транзакций.

Подготовка данных для обучения и тестирования

Начните с разделения данных на наборы для обучения и тестирования.

# Split the dataset into train and test
train, test = df.randomSplit([0.85, 0.15], seed=42)
# Merge Columns
from pyspark.ml.feature import VectorAssembler

feature_cols = df.columns[:-1]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = featurizer.transform(train)[TARGET_COL, "features"]
test_data = featurizer.transform(test)[TARGET_COL, "features"]

Проверьте объем данных и дисбаланс в обучаемом наборе.

display(train_data.groupBy(TARGET_COL).count())

Обработка несбалансированных данных

Как это часто бывает с реальными данными, эти данные имеют проблему дисбаланса классов, так как на положительный класс (мошеннические транзакции) приходится только 0,172 % всех транзакций. Мы применим SMOTE (метод избыточной выборки синтетического меньшинства) для автоматической обработки дисбаланса классов в данных. Метод SMOTE переимпливает класс меньшинства и недооценивает класс большинства для повышения производительности классификатора.

Давайте применим SMOTE к данным для обучения:

Примечание

imblearn Работает только для кадров данных Pandas, но не для Кадров данных PySpark.

from pyspark.ml.functions import vector_to_array, array_to_vector
import numpy as np
from collections import Counter
from imblearn.over_sampling import SMOTE

train_data_array = train_data.withColumn("features", vector_to_array("features"))

train_data_pd = train_data_array.toPandas()

X = train_data_pd["features"].to_numpy()
y = train_data_pd[TARGET_COL].to_numpy()
print("Original dataset shape %s" % Counter(y))

X = np.array([np.array(x) for x in X])

sm = SMOTE(random_state=42)
X_res, y_res = sm.fit_resample(X, y)
print("Resampled dataset shape %s" % Counter(y_res))

new_train_data = tuple(zip(X_res.tolist(), y_res.tolist()))
dataColumns = ["features", TARGET_COL]
new_train_data = spark.createDataFrame(data=new_train_data, schema=dataColumns)
new_train_data = new_train_data.withColumn("features", array_to_vector("features"))

Определение модели

Теперь мы можем определить модель с помощью наших данных. Мы будем использовать классификатор LightGBM и SynapseML для реализации модели с несколькими строками кода.

from synapse.ml.lightgbm import LightGBMClassifier

model = LightGBMClassifier(
    objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=True
)
smote_model = LightGBMClassifier(
    objective="binary", featuresCol="features", labelCol=TARGET_COL, isUnbalance=False
)

Обучение модели

model = model.fit(train_data)
smote_model = smote_model.fit(new_train_data)

Объяснение модели

Здесь можно показать важность, которую модель присваивает каждому признаку в обучающих данных.

import pandas as pd
import matplotlib.pyplot as plt

feature_importances = model.getFeatureImportances()
fi = pd.Series(feature_importances, index=feature_cols)
fi = fi.sort_values(ascending=True)
f_index = fi.index
f_values = fi.values

# print feature importances
print("f_index:", f_index)
print("f_values:", f_values)

# plot
x_index = list(range(len(fi)))
x_index = [x / len(fi) for x in x_index]
plt.rcParams["figure.figsize"] = (20, 20)
plt.barh(
    x_index, f_values, height=0.028, align="center", color="tan", tick_label=f_index
)
plt.xlabel("importances")
plt.ylabel("features")
plt.show()

Оценка модели

Создание прогнозов модели:

predictions = model.transform(test_data)
predictions.limit(10).toPandas()

Отображение метрик модели:

from synapse.ml.train import ComputeModelStatistics

metrics = ComputeModelStatistics(
    evaluationMetric="classification", labelCol=TARGET_COL, scoredLabelsCol="prediction"
).transform(predictions)
display(metrics)

Создайте матрицу неточностей:

# collect confusion matrix value
cm = metrics.select("confusion_matrix").collect()[0][0].toArray()
print(cm)

Построим матрицу неточностей:

# plot confusion matrix
import seaborn as sns

sns.set(rc={"figure.figsize": (6, 4.5)})
ax = sns.heatmap(cm, annot=True, fmt=".20g")
ax.set_title("Confusion Matrix")
ax.set_xlabel("Predicted label")
ax.set_ylabel("True label")

Определите функцию для оценки модели:

from pyspark.ml.evaluation import BinaryClassificationEvaluator


def evaluate(predictions):
    """
    Evaluate the model by computing AUROC and AUPRC with the predictions.
    """

    # initialize the binary evaluator
    evaluator = BinaryClassificationEvaluator(
        rawPredictionCol="prediction", labelCol=TARGET_COL
    )

    _evaluator = lambda metric: evaluator.setMetricName(metric).evaluate(predictions)

    # calculate AUROC, baseline 0.5
    auroc = _evaluator("areaUnderROC")
    print(f"AUROC: {auroc:.4f}")

    # calculate AUPRC, baseline positive rate (0.172% in the demo data)
    auprc = _evaluator("areaUnderPR")
    print(f"AUPRC: {auprc:.4f}")

    return auroc, auprc

Оцените исходную модель:

# evaluate the original model
auroc, auprc = evaluate(predictions)

Оцените модель SMOTE:

# evaluate the SMOTE model
new_predictions = smote_model.transform(test_data)
new_auroc, new_auprc = evaluate(new_predictions)
if new_auprc > auprc:
    # Using model trained on SMOTE data if it has higher AUPRC
    model = smote_model
    auprc = new_auprc
    auroc = new_auroc

Регистрация и загрузка модели с помощью MLflow

Теперь, когда у нас есть достойная рабочая модель, мы можем сохранить ее для последующего использования. Здесь мы используем MLflow для ведения журнала метрик и моделей и обратной загрузки моделей для прогнозирования.

Настройка MLflow:

# setup mlflow
import mlflow

mlflow.set_experiment(EXPERIMENT_NAME)

Модель журнала, метрики и параметры:

# log model, metrics and params
with mlflow.start_run() as run:
    print("log model:")
    mlflow.spark.log_model(
        model,
        f"{EXPERIMENT_NAME}-lightgbm",
        registered_model_name=f"{EXPERIMENT_NAME}-lightgbm",
        dfs_tmpdir="Files/spark",
    )

    print("log metrics:")
    mlflow.log_metrics({"AUPRC": auprc, "AUROC": auroc})

    print("log parameters:")
    mlflow.log_params({"DATA_FILE": DATA_FILE})

    model_uri = f"runs:/{run.info.run_id}/{EXPERIMENT_NAME}-lightgbm"
    print("Model saved in run %s" % run.info.run_id)
    print(f"Model URI: {model_uri}")

Перезагрузите модель:

# load model back
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir="Files/spark")

Шаг 4. Сохранение результатов прогнозирования

В этом разделе мы развернем модель и сохраним результаты прогнозирования.

Развертывание и прогнозирование модели

batch_predictions = loaded_model.transform(test_data)

Сохраните прогнозы в Lakehouse:

# code for saving predictions into lakehouse
batch_predictions.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions"
)
print(f"Full run cost {int(time.time() - ts)} seconds.")

Next Steps