Unity Catalog 中的模型示例

该实例演示了如何使用 Unity Catalog 中的模型构建机器学习应用程序来预测风力发电场的每日电力输出。 该示例演示如何:

  • 使用 MLflow 跟踪和记录模型。
  • 将模型注册到 Unity Catalog。
  • 描述模型并使用别名部署模型进行推理。
  • 将注册模型与生产应用程序集成。
  • 在 Unity Catalog 中搜索和发现模型。
  • 删除模型。

本文介绍如何在 Unity Catalog UI 和 API 中使用 MLflow 跟踪和模型来执行这些步骤。

要求

请确保满足要求中的各项要求。 此外,本文中的代码示例假定你具有以下权限:

  • main 目录具有 USE CATALOG 权限。
  • main.default 架构具有 CREATE MODELUSE SCHEMA 权限。

笔记本

本文中的所有代码都在以下笔记本中提供。

Unity Catalog 中的模型示例笔记本

获取笔记本

安装 MLflow Python 客户端

此示例需要 MLflow Python 客户端 2.5.0 或更高版本,和 TensorFlow。 在笔记本顶部添加以下命令以安装这些依赖项。

%pip install --upgrade "mlflow-skinny[databricks]>=2.5.0" tensorflow
dbutils.library.restartPython()

加载数据集、训练模型并注册到 Unity Catalog

本部分介绍如何加载风电场数据集、训练模型并将模型注册到 Unity Catalog。 在试验运行中跟踪模型训练运行和指标。

加载数据集

以下代码加载一个数据集,其中包含美国风电场的天气数据和电力输出信息。 数据集包含 wind directionwind speedair temperature 功能,每六小时采样一次(在 00:0008:0016:00 各采集一次),以及过去几年来的每日总电力输出 (power)。

import pandas as pd
wind_farm_data = pd.read_csv("https://github.com/dbczumar/model-registry-demo-notebook/raw/master/dataset/windfarm_data.csv", index_col=0)

def get_training_data():
  training_data = pd.DataFrame(wind_farm_data["2014-01-01":"2018-01-01"])
  X = training_data.drop(columns="power")
  y = training_data["power"]
  return X, y

def get_validation_data():
  validation_data = pd.DataFrame(wind_farm_data["2018-01-01":"2019-01-01"])
  X = validation_data.drop(columns="power")
  y = validation_data["power"]
  return X, y

def get_weather_and_forecast():
  format_date = lambda pd_date : pd_date.date().strftime("%Y-%m-%d")
  today = pd.Timestamp('today').normalize()
  week_ago = today - pd.Timedelta(days=5)
  week_later = today + pd.Timedelta(days=5)

  past_power_output = pd.DataFrame(wind_farm_data)[format_date(week_ago):format_date(today)]
  weather_and_forecast = pd.DataFrame(wind_farm_data)[format_date(week_ago):format_date(week_later)]
  if len(weather_and_forecast) < 10:
    past_power_output = pd.DataFrame(wind_farm_data).iloc[-10:-5]
    weather_and_forecast = pd.DataFrame(wind_farm_data).iloc[-10:]

  return weather_and_forecast.drop(columns="power"), past_power_output["power"]

配置 MLflow 客户端以访问 Unity Catalog 中的模型

默认情况下,MLflow Python 客户端会在 Azure Databricks 上的工作区模型注册表中创建模型。 若要升级到 Unity Catalog 中的模型,请将客户端配置为访问 Unity Catalog 中的模型:

import mlflow
mlflow.set_registry_uri("databricks-uc")

训练并注册模型

以下代码使用 TensorFlow Keras 训练神经网络,以基于数据集中的天气特征预测电力输出,并使用 MLflow API 将拟合模型注册到 Unity Catalog。

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

MODEL_NAME = "main.default.wind_forecasting"

def train_and_register_keras_model(X, y):
  with mlflow.start_run():
    model = Sequential()
    model.add(Dense(100, input_shape=(X.shape[-1],), activation="relu", name="hidden_layer"))
    model.add(Dense(1))
    model.compile(loss="mse", optimizer="adam")

    model.fit(X, y, epochs=100, batch_size=64, validation_split=.2)
    example_input = X[:10].to_numpy()
    mlflow.tensorflow.log_model(
        model,
        artifact_path="model",
        input_example=example_input,
        registered_model_name=MODEL_NAME
    )
  return model

X_train, y_train = get_training_data()
model = train_and_register_keras_model(X_train, y_train)

在 UI 中查看模型

可使用目录资源管理器在 Unity Catalog 中查看和管理已注册的模型和模型版本。 查找刚刚在 main 目录和 default 架构下创建的模型。

已注册模型页

部署用于推理的模型版本

Unity Catalog 中的模型支持模型部署的别名。 别名为已注册模型的特定版本提供可变命名引用(例如,“Champion”或“Challenger”)。 可在下游推理工作流中使用这些别名来引用和面向模型版本。

导航到目录资源管理器中已注册的模型后,在“别名”列下单击以将“Champion”别名分配给最新的模型版本,然后按“继续”以保存更改。

设置已注册模型的别名

使用 API 加载模型版本

MLflow 模型组件定义从多个机器学习框架加载模型的函数。 例如,mlflow.tensorflow.load_model() 用于加载以 MLflow 格式保存的 TensorFlow 模型,mlflow.sklearn.load_model() 用于加载以 MLflow 格式保存的 scikit-learn 模型。

这些函数可以从 Unity Catalog 中的模型加载模型。

import mlflow.pyfunc

model_version_uri = "models:/{model_name}/1".format(model_name=MODEL_NAME)

print("Loading registered model version from URI: '{model_uri}'".format(model_uri=model_version_uri))
model_version_1 = mlflow.pyfunc.load_model(model_version_uri)

model_champion_uri = "models:/{model_name}@Champion".format(model_name=MODEL_NAME)

print("Loading registered model version from URI: '{model_uri}'".format(model_uri=model_champion_uri))
champion_model = mlflow.pyfunc.load_model(model_champion_uri)

使用 champion 模型预测电力输出

在本部分中,champion 模型用于评估风电场的天气预报数据。 forecast_power() 应用程序从指定的阶段加载预测模型的最新版本,并使用它来预测未来五天的发电量。

from mlflow.tracking import MlflowClient

def plot(model_name, model_alias, model_version, power_predictions, past_power_output):
  import matplotlib.dates as mdates
  from matplotlib import pyplot as plt
  index = power_predictions.index
  fig = plt.figure(figsize=(11, 7))
  ax = fig.add_subplot(111)
  ax.set_xlabel("Date", size=20, labelpad=20)
  ax.set_ylabel("Power\noutput\n(MW)", size=20, labelpad=60, rotation=0)
  ax.tick_params(axis='both', which='major', labelsize=17)
  ax.xaxis.set_major_formatter(mdates.DateFormatter('%m/%d'))
  ax.plot(index[:len(past_power_output)], past_power_output, label="True", color="red", alpha=0.5, linewidth=4)
  ax.plot(index, power_predictions.squeeze(), "--", label="Predicted by '%s'\nwith alias '%s' (Version %d)" % (model_name, model_alias, model_version), color="blue", linewidth=3)
  ax.set_ylim(ymin=0, ymax=max(3500, int(max(power_predictions.values) * 1.3)))
  ax.legend(fontsize=14)
  plt.title("Wind farm power output and projections", size=24, pad=20)
  plt.tight_layout()
  display(plt.show())

def forecast_power(model_name, model_alias):
  import pandas as pd
  client = MlflowClient()
  model_version = client.get_model_version_by_alias(model_name, model_alias).version
  model_uri = "models:/{model_name}@{model_alias}".format(model_name=MODEL_NAME, model_alias=model_alias)
  model = mlflow.pyfunc.load_model(model_uri)
  weather_data, past_power_output = get_weather_and_forecast()
  power_predictions = pd.DataFrame(model.predict(weather_data))
  power_predictions.index = pd.to_datetime(weather_data.index)
  print(power_predictions)
  plot(model_name, model_alias, int(model_version), power_predictions, past_power_output)

forecast_power(MODEL_NAME, "Champion")

使用 API 添加模型和模型版本描述

本部分中的代码演示如何使用 MLflow API 添加模型和模型版本说明。

client = MlflowClient()
client.update_registered_model(
  name=MODEL_NAME,
  description="This model forecasts the power output of a wind farm based on weather data. The weather data consists of three features: wind speed, wind direction, and air temperature."
)

client.update_model_version(
  name=MODEL_NAME,
  version=1,
  description="This model version was built using TensorFlow Keras. It is a feed-forward neural network with one hidden layer."
)

创建新模型版本

经典的机器学习技术对电力预测也是有效的。 下面的代码使用 scikit learn 训练随机林模型,并使用 mlflow.sklearn.log_model() 函数将其注册到 Unity Catalog 中。

import mlflow.sklearn
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

with mlflow.start_run():
  n_estimators = 300
  mlflow.log_param("n_estimators", n_estimators)

  rand_forest = RandomForestRegressor(n_estimators=n_estimators)
  rand_forest.fit(X_train, y_train)

  val_x, val_y = get_validation_data()
  mse = mean_squared_error(rand_forest.predict(val_x), val_y)
  print("Validation MSE: %d" % mse)
  mlflow.log_metric("mse", mse)

  example_input = val_x.iloc[[0]]

  # Specify the `registered_model_name` parameter of the `mlflow.sklearn.log_model()`
  # function to register the model to <UC>. This automatically
  # creates a new model version
  mlflow.sklearn.log_model(
    sk_model=rand_forest,
    artifact_path="sklearn-model",
    input_example=example_input,
    registered_model_name=MODEL_NAME
  )

提取新模型版本号

以下代码演示如何检索模型名对应的最新模型版本号。

client = MlflowClient()
model_version_infos = client.search_model_versions("name = '%s'" % MODEL_NAME)
new_model_version = max([model_version_info.version for model_version_info in model_version_infos])

向新模型版本添加说明

client.update_model_version(
  name=MODEL_NAME,
  version=new_model_version,
  description="This model version is a random forest containing 100 decision trees that was trained in scikit-learn."
)

将新模型版本标记为 Challenger 并测试模型

在部署模型来为生产流量提供服务之前,最佳做法是使用生产数据示例对它进行测试。 之前,你使用“Champion”别名来表示为大多数生产工作负载提供服务的模型版本。 以下代码将“Challenger”别名分配给新的模型版本,并评估其性能。

client.set_registered_model_alias(
  name=MODEL_NAME,
  alias="Challenger",
  version=new_model_version
)

forecast_power(MODEL_NAME, "Challenger")

将新模型版本部署为 Champion 模型版本

验证新模型版本在测试中表现良好后,下面的代码将“Champion”别名分配给新模型版本,并使用与使用 champion 模型预测电力输出部分完全相同的应用程序代码生成电力预测。

client.set_registered_model_alias(
  name=MODEL_NAME,
  alias="Champion",
  version=new_model_version
)

forecast_power(MODEL_NAME, "Champion")

现在,预测模型有两个模型版本:在 Keras 模型中训练的模型版本和在 scikit-learn 中训练的版本。 请注意,“Challenger”别名仍然分配给新的 scikit-learn 模型版本,因此面向“Challenger”模型版本的下游工作负载都将继续成功运行:

产品模型版本

删除模型

当不再使用模型版本时,可将其删除。 还可删除整个已注册模型;这会移除其所有关联的模型版本。 请注意,删除模型版本会清除分配给模型版本的任何别名。

使用 MLflow API 删除 Version 1

client.delete_model_version(
   name=MODEL_NAME,
   version=1,
)

使用 MLflow API 删除模型

client = MlflowClient()
client.delete_registered_model(name=MODEL_NAME)