教程:创建、评估流失率预测模型并对其进行评分
本教程介绍了 Microsoft Fabric 中 Synapse 数据科学工作流的端到端示例。 该方案构建了一个模型来预测银行客户是否会流失。 流失率,也称为耗损率,指停止与银行开展业务的客户比率。
本教程涵盖以下步骤:
- 安装自定义库
- 加载数据
- 通过探索性数据分析了解和处理数据,并演示如何使用 Fabric 数据整理器功能
- 使用 Scikit-Learn、LightGBM、MLflow 训练机器学习模型,并使用 MLFlow 和 Fabric 自动记录功能跟踪实验
- 评估和保存最终的机器学习模型
- 使用 Power BI 可视化效果显示模型性能
先决条件
获取 Microsoft Fabric 订阅。 或者注册免费的 Microsoft Fabric 试用版。
登录 Microsoft Fabric。
使用主页左侧的体验切换器切换到 Synapse 数据科学体验。
- 如有必要,请如在 Microsoft Fabric 中创建湖屋中所述创建 Microsoft Fabric 湖屋。
请按照笔记本进行操作
可以选择下面其中一个选项,以在笔记本中进行操作:
- 在数据科学体验中打开并运行内置笔记本
- 将笔记本从 GitHub 上传到数据科学体验
打开内置笔记本
“客户流失”是本教程随附的示例笔记本。
要在 Synapse 数据科学体验中打开教程的内置示例笔记本,请执行以下操作:
转到 Synapse 数据科学主页。
选择“使用示例”。
选择相应的示例:
- 来自默认的“端到端工作流 (Python)”选项卡(如果示例适用于 Python 教程)。
- 来自“端到端工作流 (R)“选项卡(如果示例适用于 R 教程)。
- 从“快速教程”选项卡选择(如果示例适用于快速教程)。
从 GitHub 导入笔记本
本教程附带 AIsample - Bank Customer Churn.ipynb 笔记本。
若要打开本教程随附的笔记本,请按照让系统为数据科学做好准备教程中的说明操作,将该笔记本导入到工作区。
或者,如果要从此页面复制并粘贴代码,则可以创建新的笔记本。
在开始运行代码之前,请务必将湖屋连接到笔记本。
步骤 1:安装自定义库
对于机器学习模型开发或临时数据分析,可能需要为 Apache Spark 会话快速安装自定义库。 有两个选项可用于安装库。
- 使用笔记本的内联安装功能(
%pip
或%conda
),仅在当前笔记本中安装库。 - 也可以创建 Fabric 环境,安装来自公共来源的安装库或将自定义库上传到该环境,然后工作区管理员可以将环境附加为工作区的默认值。 然后,环境中的所有库都将可用于工作区中的任何笔记本和 Spark 作业定义。 有关环境的详细信息,请参阅在 Microsoft Fabric 中创建、配置和使用环境。
在本教程中,使用 %pip install
在笔记本中安装 imblearn
库。
注意
PySpark 内核将在 %pip install
之后重启。 在运行任何其他单元之前安装所需的库。
# Use pip to install libraries
%pip install imblearn
步骤 2:加载数据
churn.csv 中的数据集包含 1 万个客户的流失状态以及 14 个属性,其中包括:
- 信用评分
- 地理位置(德国、法国、西班牙)
- 性别(男性、女性)
- 帐龄
- 保有期(成为银行客户的年数)
- 帐户余额
- 估计工资
- 客户通过银行购买的产品数
- 信用卡状态(客户是否持有信用卡)
- 活跃成员状态(是否为银行的活跃客户)
该数据集还包括行号、客户 ID 和客户姓氏列。 这些列中的值不应影响客户离开银行的决定。
客户银行账户销户事件定义客户流失。 数据集中的列 Exited
是指客户的放弃。 由于对属性的上下文知之甚少,因此不需要数据集的背景信息。 我们希望了解这些属性对 Exited
状态的影响。
在 10,000 个客户中,只有 2037 个客户(约 20%)停止与银行业务往来。 由于类不平衡率,建议生成合成数据。 混淆矩阵准确性可能与不平衡分类无关。 可能需要也使用“精准率-召回率曲线下面积”(AUPRC) 指标来测量准确度。
- 下表显示了
churn.csv
数据的预览:
CustomerID | Surname | CreditScore | 地理位置 | 性别 | 年龄 | 保有时间 | 余额 | NumOfProducts | HasCrCard | IsActiveMember | EstimatedSalary | 已终止业务往来 |
---|---|---|---|---|---|---|---|---|---|---|---|---|
15634602 | Hargrave | 619 | France | Female | 42 | 2 | 0.00 | 1 | 1 | 1 | 101348.88 | 1 |
15647311 | Hill | 608 | 西班牙 | Female | 41 | 1 | 83807.86 | 1 | 0 | 1 | 112542.58 | 0 |
下载数据集并上传到湖屋
定义这些参数,以便可以将此笔记本与不同的数据集一起使用:
IS_CUSTOM_DATA = False # If TRUE, the dataset has to be uploaded manually
IS_SAMPLE = False # If TRUE, use only SAMPLE_ROWS of data for training; otherwise, use all data
SAMPLE_ROWS = 5000 # If IS_SAMPLE is True, use only this number of rows for training
DATA_ROOT = "/lakehouse/default"
DATA_FOLDER = "Files/churn" # Folder with data files
DATA_FILE = "churn.csv" # Data file name
以下代码将下载数据集的公开可用版本,然后将该数据集存储在 Fabric 湖屋中:
重要
在运行笔记本之前,请将湖屋添加到笔记本。 否则可能会导致出错。
import os, requests
if not IS_CUSTOM_DATA:
# With an Azure Synapse Analytics blob, this can be done in one line
# Download demo data files into the lakehouse if they don't exist
remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
file_list = ["churn.csv"]
download_path = "/lakehouse/default/Files/churn/raw"
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError(
"Default lakehouse not found, please add a lakehouse and restart the session."
)
os.makedirs(download_path, exist_ok=True)
for fname in file_list:
if not os.path.exists(f"{download_path}/{fname}"):
r = requests.get(f"{remote_url}/{fname}", timeout=30)
with open(f"{download_path}/{fname}", "wb") as f:
f.write(r.content)
print("Downloaded demo data files into lakehouse.")
开始记录运行此笔记本所需的时间:
# Record the notebook running time
import time
ts = time.time()
从湖屋中读取原始数据
此代码从湖屋的“文件”部分读取原始数据,并为不同日期部分添加更多列。 创建分区增量表会用到此信息。
df = (
spark.read.option("header", True)
.option("inferSchema", True)
.csv("Files/churn/raw/churn.csv")
.cache()
)
基于数据集创建 pandas 数据帧
此代码会将 Spark DataFrame 转换为 Pandas DataFrame,以便更轻松地完成处理和可视化:
df = df.toPandas()
步骤 3:执行探索性数据分析
显示原始数据
使用 display
浏览原始数据,计算一些基本统计信息并显示图表视图。 须先导入数据可视化所需的库,例如 seaborn。 Seaborn 是一个 Python 数据可视化库,提供用于在数据帧和数组上生成视觉对象的高级界面。
import seaborn as sns
sns.set_theme(style="whitegrid", palette="tab10", rc = {'figure.figsize':(9,6)})
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
from matplotlib import rc, rcParams
import numpy as np
import pandas as pd
import itertools
display(df, summary=True)
使用数据整理器执行初始数据清理
直接从笔记本启动数据整理器,以浏览和转换任何 Pandas DataFrame。 在笔记本功能区“数据”选项卡下,使用“数据整理器”下拉列表提示浏览可供编辑的活动 Pandas DataFrame。 选择要在数据整理器中打开的 DataFrame。
注意
当笔记本内核繁忙时,无法打开数据整理器。 在启动数据整理器之前,必须完成单元格执行。 详细了解数据整理器。
启动数据整理器后,将生成数据面板的描述性概述,如下图所示。 此概述包括有关 DataFrame 的维度、缺失值等的信息。可使用数据整理器生成脚本,以便删除缺少值的行、重复行和具有特定名称的列。 然后,可将脚本复制到单元格中。 下一个单元格将显示复制的脚本。
def clean_data(df):
# Drop rows with missing data across all columns
df.dropna(inplace=True)
# Drop duplicate rows in columns: 'RowNumber', 'CustomerId'
df.drop_duplicates(subset=['RowNumber', 'CustomerId'], inplace=True)
# Drop columns: 'RowNumber', 'CustomerId', 'Surname'
df.drop(columns=['RowNumber', 'CustomerId', 'Surname'], inplace=True)
return df
df_clean = clean_data(df.copy())
确定属性
此代码可以确定分类、数值和目标属性:
# Determine the dependent (target) attribute
dependent_variable_name = "Exited"
print(dependent_variable_name)
# Determine the categorical attributes
categorical_variables = [col for col in df_clean.columns if col in "O"
or df_clean[col].nunique() <=5
and col not in "Exited"]
print(categorical_variables)
# Determine the numerical attributes
numeric_variables = [col for col in df_clean.columns if df_clean[col].dtype != "object"
and df_clean[col].nunique() >5]
print(numeric_variables)
显示五数概括
使用箱形图显示五数字摘要
- 最低分数
- 第一四分位数
- 中值 (median)
- 第三四分位数
- 最高分数
的数值属性。
df_num_cols = df_clean[numeric_variables]
sns.set(font_scale = 0.7)
fig, axes = plt.subplots(nrows = 2, ncols = 3, gridspec_kw = dict(hspace=0.3), figsize = (17,8))
fig.tight_layout()
for ax,col in zip(axes.flatten(), df_num_cols.columns):
sns.boxplot(x = df_num_cols[col], color='green', ax = ax)
# fig.suptitle('visualize and compare the distribution and central tendency of numerical attributes', color = 'k', fontsize = 12)
fig.delaxes(axes[1,2])
显示已终止和未终止业务往来客户的分布
显示已退出客户与未退出客户在各分类属性中的分布情况:
attr_list = ['Geography', 'Gender', 'HasCrCard', 'IsActiveMember', 'NumOfProducts', 'Tenure']
fig, axarr = plt.subplots(2, 3, figsize=(15, 4))
for ind, item in enumerate (attr_list):
sns.countplot(x = item, hue = 'Exited', data = df_clean, ax = axarr[ind%2][ind//2])
fig.subplots_adjust(hspace=0.7)
显示数值属性的分布
使用直方图显示数值属性的频率分布:
columns = df_num_cols.columns[: len(df_num_cols.columns)]
fig = plt.figure()
fig.set_size_inches(18, 8)
length = len(columns)
for i,j in itertools.zip_longest(columns, range(length)):
plt.subplot((length // 2), 3, j+1)
plt.subplots_adjust(wspace = 0.2, hspace = 0.5)
df_num_cols[i].hist(bins = 20, edgecolor = 'black')
plt.title(i)
# fig = fig.suptitle('distribution of numerical attributes', color = 'r' ,fontsize = 14)
plt.show()
执行特征工程
此特征工程将基于当前属性生成新属性:
df_clean["NewTenure"] = df_clean["Tenure"]/df_clean["Age"]
df_clean["NewCreditsScore"] = pd.qcut(df_clean['CreditScore'], 6, labels = [1, 2, 3, 4, 5, 6])
df_clean["NewAgeScore"] = pd.qcut(df_clean['Age'], 8, labels = [1, 2, 3, 4, 5, 6, 7, 8])
df_clean["NewBalanceScore"] = pd.qcut(df_clean['Balance'].rank(method="first"), 5, labels = [1, 2, 3, 4, 5])
df_clean["NewEstSalaryScore"] = pd.qcut(df_clean['EstimatedSalary'], 10, labels = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
使用数据整理器执行独热编码
按照前面讨论的相同步骤启动数据整理器,使用数据整理器执行独热编码。 此单元格将显示复制生成的用于独热编码的脚本:
df_clean = pd.get_dummies(df_clean, columns=['Geography', 'Gender'])
创建增量表以生成 Power BI 报表
table_name = "df_clean"
# Create a PySpark DataFrame from pandas
sparkDF=spark.createDataFrame(df_clean)
sparkDF.write.mode("overwrite").format("delta").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")
探索性数据分析中的观察结果摘要
- 大多数客户来自法国。 与法国和德国相比,西班牙的流失率最低。
- 大多数客户都有信用卡
- 有些客户的年龄和信用评分分别在 60 岁以上和 400 分以下, 但他们不能被视为离群值。
- 很少有客户拥有两个以上银行产品。
- 非活动客户的流失率较高
- 性别和保有年限几乎不会影响客户注销银行账户的决定
步骤 4:执行模型训练和跟踪
准备好数据后,便可以定义模型。 在此笔记本中应用随机林和 LightGBM 模型。
使用 scikit-learn 和 LightGBM 库通过短短几行代码实现模型。 另外,请使用 MLfLow 和 Fabric 自动记录功能来跟踪试验。
此代码示例从湖屋加载增量表。 可以使用其他将湖屋视为源的增量表。
SEED = 12345
df_clean = spark.read.format("delta").load("Tables/df_clean").toPandas()
使用 MLflow 生成用于跟踪和记录模型的试验
本部分演示如何生成试验,指定模型和训练参数以及评分指标。 此外,还演示如何训练模型、记录模型以及保存训练的模型以供以后使用。
import mlflow
# Set up the experiment name
EXPERIMENT_NAME = "sample-bank-churn-experiment" # MLflow experiment name
在训练机器学习模型时,自动日志记录会自动捕获该模型的输入参数值和输出度量。 然后,该信息会记录到工作区。在那里,MLflow API 或工作区中的相应试验可以对其进行访问并可视化。
完成后,试验如下图所示:
所有具有各自名称的试验都会被记录下来,并且能够跟踪其参数和性能指标。 要了解自动记录的更多信息,请参阅 Microsoft Fabric 中的自动记录。
设置试验和自动记录规范
mlflow.set_experiment(EXPERIMENT_NAME) # Use a date stamp to append to the experiment
mlflow.autolog(exclusive=False)
导入 scikit-learn 和 LightGBM
# Import the required libraries for model training
from sklearn.model_selection import train_test_split
from lightgbm import LGBMClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, precision_score, confusion_matrix, recall_score, roc_auc_score, classification_report
准备训练和测试数据集
y = df_clean["Exited"]
X = df_clean.drop("Exited",axis=1)
# Train/test separation
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=SEED)
将 SMOTE 应用于训练数据
分类不平衡成为问题在于,少数类的示例太少,模型无法有效了解决策边界。 为解决此问题,合成少数过采样技术 (SMOTE) 是为少数类合成新样本的最广泛使用的技术。 使用步骤 1 中安装的 imblearn
库来访问 SMOTE。
仅将 SMOTE 仅应用于训练数据集 必须保留测试数据集的原始不平衡分布,以便获取有关模型在处理原始数据时表现的有效近似值。 该试验展示了生产环境中的情况。
from collections import Counter
from imblearn.over_sampling import SMOTE
sm = SMOTE(random_state=SEED)
X_res, y_res = sm.fit_resample(X_train, y_train)
new_train = pd.concat([X_res, y_res], axis=1)
有关详细信息,请参阅 SMOTE 和从随机过度采样到 SMOTE 和 ADASYN。 不平衡学习网站提供了这些资源。
训练模型
使用随机林训练模型,最大深度为 4,具有 4 个特征:
mlflow.sklearn.autolog(registered_model_name='rfc1_sm') # Register the trained model with autologging
rfc1_sm = RandomForestClassifier(max_depth=4, max_features=4, min_samples_split=3, random_state=1) # Pass hyperparameters
with mlflow.start_run(run_name="rfc1_sm") as run:
rfc1_sm_run_id = run.info.run_id # Capture run_id for model prediction later
print("run_id: {}; status: {}".format(rfc1_sm_run_id, run.info.status))
# rfc1.fit(X_train,y_train) # Imbalanced training data
rfc1_sm.fit(X_res, y_res.ravel()) # Balanced training data
rfc1_sm.score(X_test, y_test)
y_pred = rfc1_sm.predict(X_test)
cr_rfc1_sm = classification_report(y_test, y_pred)
cm_rfc1_sm = confusion_matrix(y_test, y_pred)
roc_auc_rfc1_sm = roc_auc_score(y_res, rfc1_sm.predict_proba(X_res)[:, 1])
使用随机林训练模型,最大深度为 8,具有 6 个特征:
mlflow.sklearn.autolog(registered_model_name='rfc2_sm') # Register the trained model with autologging
rfc2_sm = RandomForestClassifier(max_depth=8, max_features=6, min_samples_split=3, random_state=1) # Pass hyperparameters
with mlflow.start_run(run_name="rfc2_sm") as run:
rfc2_sm_run_id = run.info.run_id # Capture run_id for model prediction later
print("run_id: {}; status: {}".format(rfc2_sm_run_id, run.info.status))
# rfc2.fit(X_train,y_train) # Imbalanced training data
rfc2_sm.fit(X_res, y_res.ravel()) # Balanced training data
rfc2_sm.score(X_test, y_test)
y_pred = rfc2_sm.predict(X_test)
cr_rfc2_sm = classification_report(y_test, y_pred)
cm_rfc2_sm = confusion_matrix(y_test, y_pred)
roc_auc_rfc2_sm = roc_auc_score(y_res, rfc2_sm.predict_proba(X_res)[:, 1])
使用 LightGBM 训练模型:
# lgbm_model
mlflow.lightgbm.autolog(registered_model_name='lgbm_sm') # Register the trained model with autologging
lgbm_sm_model = LGBMClassifier(learning_rate = 0.07,
max_delta_step = 2,
n_estimators = 100,
max_depth = 10,
eval_metric = "logloss",
objective='binary',
random_state=42)
with mlflow.start_run(run_name="lgbm_sm") as run:
lgbm1_sm_run_id = run.info.run_id # Capture run_id for model prediction later
# lgbm_sm_model.fit(X_train,y_train) # Imbalanced training data
lgbm_sm_model.fit(X_res, y_res.ravel()) # Balanced training data
y_pred = lgbm_sm_model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
cr_lgbm_sm = classification_report(y_test, y_pred)
cm_lgbm_sm = confusion_matrix(y_test, y_pred)
roc_auc_lgbm_sm = roc_auc_score(y_res, lgbm_sm_model.predict_proba(X_res)[:, 1])
查看用于跟踪模型性能的试验项目
试验运行会自动保存在试验项目中。 可在工作区中找到该项目。 项目名称根据用于设置试验的名称进行命名。 所有经过训练的模型及其运行、性能指标和模型参数都会记录在实验页面上。
查看试验:
- 在左侧面板中,选择工作区。
- 查找并选择相应的试验名称,在本例中为“sample-bank-churn-experiment”。
步骤 5:评估并保存最终的机器学习模型
从工作区中打开保存的试验以选择并保存最佳模型:
# Define run_uri to fetch the model
# MLflow client: mlflow.model.url, list model
load_model_rfc1_sm = mlflow.sklearn.load_model(f"runs:/{rfc1_sm_run_id}/model")
load_model_rfc2_sm = mlflow.sklearn.load_model(f"runs:/{rfc2_sm_run_id}/model")
load_model_lgbm1_sm = mlflow.lightgbm.load_model(f"runs:/{lgbm1_sm_run_id}/model")
评估测试数据集上保存的模型的性能
ypred_rfc1_sm = load_model_rfc1_sm.predict(X_test) # Random forest with maximum depth of 4 and 4 features
ypred_rfc2_sm = load_model_rfc2_sm.predict(X_test) # Random forest with maximum depth of 8 and 6 features
ypred_lgbm1_sm = load_model_lgbm1_sm.predict(X_test) # LightGBM
使用混淆矩阵显示真正/误报/漏报例
要评估分类的准确性,请构建一个绘制混淆矩阵的脚本。 此外,还可以使用 SynapseML 工具绘制混淆矩阵,如欺诈检测示例中所示。
def plot_confusion_matrix(cm, classes,
normalize=False,
title='Confusion matrix',
cmap=plt.cm.Blues):
print(cm)
plt.figure(figsize=(4,4))
plt.rcParams.update({'font.size': 10})
plt.imshow(cm, interpolation='nearest', cmap=cmap)
plt.title(title)
plt.colorbar()
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, rotation=45, color="blue")
plt.yticks(tick_marks, classes, color="blue")
fmt = '.2f' if normalize else 'd'
thresh = cm.max() / 2.
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
plt.text(j, i, format(cm[i, j], fmt),
horizontalalignment="center",
color="red" if cm[i, j] > thresh else "black")
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
为最大深度为 4 且具有 4 个特征的随机林分类器创建混淆矩阵:
cfm = confusion_matrix(y_test, y_pred=ypred_rfc1_sm)
plot_confusion_matrix(cfm, classes=['Non Churn','Churn'],
title='Random Forest with max depth of 4')
tn, fp, fn, tp = cfm.ravel()
为最大深度为 8 且具有 6 个特征的随机林分类器创建混淆矩阵:
cfm = confusion_matrix(y_test, y_pred=ypred_rfc2_sm)
plot_confusion_matrix(cfm, classes=['Non Churn','Churn'],
title='Random Forest with max depth of 8')
tn, fp, fn, tp = cfm.ravel()
为 LightGBM 创建混淆矩阵:
cfm = confusion_matrix(y_test, y_pred=ypred_lgbm1_sm)
plot_confusion_matrix(cfm, classes=['Non Churn','Churn'],
title='LightGBM')
tn, fp, fn, tp = cfm.ravel()
保存 Power BI 的结果
将增量帧保存到湖屋,可将模型预测结果移动到 Power BI 可视化对象。
df_pred = X_test.copy()
df_pred['y_test'] = y_test
df_pred['ypred_rfc1_sm'] = ypred_rfc1_sm
df_pred['ypred_rfc2_sm'] =ypred_rfc2_sm
df_pred['ypred_lgbm1_sm'] = ypred_lgbm1_sm
table_name = "df_pred_results"
sparkDF=spark.createDataFrame(df_pred)
sparkDF.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")
步骤 6:在 Power BI 中访问可视化效果
访问保存在 Power BI 中的表:
- 选择左侧的“OneLake 数据中心”
- 选择已添加到此笔记本的湖屋
- 在“打开此湖屋”部分中,选择“打开”
- 选择功能区上的“新建语义模型”。 选择
df_pred_results
,然后选择“继续”以创建可链接到预测的新 Power BI 语义模型 - 在语义模型页面顶部的工具处,选择“新建报表”以打开 Power BI 报表创作页面。
以下屏幕截图显示了一些示例可视化效果。 数据面板显示了增量表和要从表中选择的列。 选择适当的类别 (X) 和值 (Y) 轴后,可以选择筛选器和函数,例如表列的总和或平均值。
注意
在此屏幕截图中,图示示例描述了对 Power BI 中保存的预测结果的分析:
但是,对于真实的客户流失用例,用户可能需要根据主题专业知识以及其公司与业务分析团队已标准化为指标的内容,用户可能需要创建一套更全面的可视化需求。
Power BI 报表显示,使用两个以上的银行产品的客户的流失率较高。 但是,很少有客户拥有两个以上的产品。 (请参阅左下方面板中的绘图。)银行应该收集更多的数据,但也应调查与更多产品相关的其他特征。
德国银行客户流失率高于法国和西班牙的客户流失率。 (请参阅右下方面板中的绘图)。 根据报告结果,对促使客户离开的因素进行调查可能会有所帮助。
中年客户(25 至 45 岁)较多。 较多 45 到 60 岁之间的客户倾向于终止业务往来。
最后,信用分数较低的客户很可能停止与该银行业务往来,而改投其他金融机构。 该银行应该研究如何鼓励信用评分和账户余额较低的客户留在银行。
# Determine the entire runtime
print(f"Full run cost {int(time.time() - ts)} seconds.")