Unity Catalog 中的模型示例
该实例演示了如何使用 Unity Catalog 中的模型构建机器学习应用程序来预测风力发电场的每日电力输出。 该示例演示如何:
- 使用 MLflow 跟踪和记录模型。
- 将模型注册到 Unity Catalog。
- 描述模型并使用别名部署模型进行推理。
- 将注册模型与生产应用程序集成。
- 在 Unity Catalog 中搜索和发现模型。
- 删除模型。
本文介绍如何在 Unity Catalog UI 和 API 中使用 MLflow 跟踪和模型来执行这些步骤。
要求
请确保满足要求中的各项要求。 此外,本文中的代码示例假定你具有以下权限:
- 对
main
目录具有USE CATALOG
权限。 - 对
main.default
架构具有CREATE MODEL
和USE SCHEMA
权限。
笔记本
本文中的所有代码都在以下笔记本中提供。
Unity Catalog 中的模型示例笔记本
安装 MLflow Python 客户端
此示例需要 MLflow Python 客户端 2.5.0 或更高版本,和 TensorFlow。 在笔记本顶部添加以下命令以安装这些依赖项。
%pip install --upgrade "mlflow-skinny[databricks]>=2.5.0" tensorflow
dbutils.library.restartPython()
加载数据集、训练模型并注册到 Unity Catalog
本部分介绍如何加载风电场数据集、训练模型并将模型注册到 Unity Catalog。 在试验运行中跟踪模型训练运行和指标。
加载数据集
以下代码加载一个数据集,其中包含美国风电场的天气数据和电力输出信息。 数据集包含 wind direction
、wind speed
和 air temperature
功能,每六小时采样一次(在 00:00
、08:00
、16:00
各采集一次),以及过去几年来的每日总电力输出 (power
)。
import pandas as pd
wind_farm_data = pd.read_csv("https://github.com/dbczumar/model-registry-demo-notebook/raw/master/dataset/windfarm_data.csv", index_col=0)
def get_training_data():
training_data = pd.DataFrame(wind_farm_data["2014-01-01":"2018-01-01"])
X = training_data.drop(columns="power")
y = training_data["power"]
return X, y
def get_validation_data():
validation_data = pd.DataFrame(wind_farm_data["2018-01-01":"2019-01-01"])
X = validation_data.drop(columns="power")
y = validation_data["power"]
return X, y
def get_weather_and_forecast():
format_date = lambda pd_date : pd_date.date().strftime("%Y-%m-%d")
today = pd.Timestamp('today').normalize()
week_ago = today - pd.Timedelta(days=5)
week_later = today + pd.Timedelta(days=5)
past_power_output = pd.DataFrame(wind_farm_data)[format_date(week_ago):format_date(today)]
weather_and_forecast = pd.DataFrame(wind_farm_data)[format_date(week_ago):format_date(week_later)]
if len(weather_and_forecast) < 10:
past_power_output = pd.DataFrame(wind_farm_data).iloc[-10:-5]
weather_and_forecast = pd.DataFrame(wind_farm_data).iloc[-10:]
return weather_and_forecast.drop(columns="power"), past_power_output["power"]
配置 MLflow 客户端以访问 Unity Catalog 中的模型
默认情况下,MLflow Python 客户端会在 Azure Databricks 上的工作区模型注册表中创建模型。 若要升级到 Unity Catalog 中的模型,请将客户端配置为访问 Unity Catalog 中的模型:
import mlflow
mlflow.set_registry_uri("databricks-uc")
训练并注册模型
以下代码使用 TensorFlow Keras 训练神经网络,以基于数据集中的天气特征预测电力输出,并使用 MLflow API 将拟合模型注册到 Unity Catalog。
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
MODEL_NAME = "main.default.wind_forecasting"
def train_and_register_keras_model(X, y):
with mlflow.start_run():
model = Sequential()
model.add(Dense(100, input_shape=(X.shape[-1],), activation="relu", name="hidden_layer"))
model.add(Dense(1))
model.compile(loss="mse", optimizer="adam")
model.fit(X, y, epochs=100, batch_size=64, validation_split=.2)
example_input = X[:10].to_numpy()
mlflow.tensorflow.log_model(
model,
artifact_path="model",
input_example=example_input,
registered_model_name=MODEL_NAME
)
return model
X_train, y_train = get_training_data()
model = train_and_register_keras_model(X_train, y_train)
在 UI 中查看模型
可使用目录资源管理器在 Unity Catalog 中查看和管理已注册的模型和模型版本。 查找刚刚在 main
目录和 default
架构下创建的模型。
部署用于推理的模型版本
Unity Catalog 中的模型支持模型部署的别名。 别名为已注册模型的特定版本提供可变命名引用(例如,“Champion”或“Challenger”)。 可在下游推理工作流中使用这些别名来引用和面向模型版本。
导航到目录资源管理器中已注册的模型后,在“别名”列下单击以将“Champion”别名分配给最新的模型版本,然后按“继续”以保存更改。
使用 API 加载模型版本
MLflow 模型组件定义从多个机器学习框架加载模型的函数。 例如,mlflow.tensorflow.load_model()
用于加载以 MLflow 格式保存的 TensorFlow 模型,mlflow.sklearn.load_model()
用于加载以 MLflow 格式保存的 scikit-learn 模型。
这些函数可以从 Unity Catalog 中的模型加载模型。
import mlflow.pyfunc
model_version_uri = "models:/{model_name}/1".format(model_name=MODEL_NAME)
print("Loading registered model version from URI: '{model_uri}'".format(model_uri=model_version_uri))
model_version_1 = mlflow.pyfunc.load_model(model_version_uri)
model_champion_uri = "models:/{model_name}@Champion".format(model_name=MODEL_NAME)
print("Loading registered model version from URI: '{model_uri}'".format(model_uri=model_champion_uri))
champion_model = mlflow.pyfunc.load_model(model_champion_uri)
使用 champion 模型预测电力输出
在本部分中,champion 模型用于评估风电场的天气预报数据。 forecast_power()
应用程序从指定的阶段加载预测模型的最新版本,并使用它来预测未来五天的发电量。
from mlflow.tracking import MlflowClient
def plot(model_name, model_alias, model_version, power_predictions, past_power_output):
import matplotlib.dates as mdates
from matplotlib import pyplot as plt
index = power_predictions.index
fig = plt.figure(figsize=(11, 7))
ax = fig.add_subplot(111)
ax.set_xlabel("Date", size=20, labelpad=20)
ax.set_ylabel("Power\noutput\n(MW)", size=20, labelpad=60, rotation=0)
ax.tick_params(axis='both', which='major', labelsize=17)
ax.xaxis.set_major_formatter(mdates.DateFormatter('%m/%d'))
ax.plot(index[:len(past_power_output)], past_power_output, label="True", color="red", alpha=0.5, linewidth=4)
ax.plot(index, power_predictions.squeeze(), "--", label="Predicted by '%s'\nwith alias '%s' (Version %d)" % (model_name, model_alias, model_version), color="blue", linewidth=3)
ax.set_ylim(ymin=0, ymax=max(3500, int(max(power_predictions.values) * 1.3)))
ax.legend(fontsize=14)
plt.title("Wind farm power output and projections", size=24, pad=20)
plt.tight_layout()
display(plt.show())
def forecast_power(model_name, model_alias):
import pandas as pd
client = MlflowClient()
model_version = client.get_model_version_by_alias(model_name, model_alias).version
model_uri = "models:/{model_name}@{model_alias}".format(model_name=MODEL_NAME, model_alias=model_alias)
model = mlflow.pyfunc.load_model(model_uri)
weather_data, past_power_output = get_weather_and_forecast()
power_predictions = pd.DataFrame(model.predict(weather_data))
power_predictions.index = pd.to_datetime(weather_data.index)
print(power_predictions)
plot(model_name, model_alias, int(model_version), power_predictions, past_power_output)
forecast_power(MODEL_NAME, "Champion")
使用 API 添加模型和模型版本描述
本部分中的代码演示如何使用 MLflow API 添加模型和模型版本说明。
client = MlflowClient()
client.update_registered_model(
name=MODEL_NAME,
description="This model forecasts the power output of a wind farm based on weather data. The weather data consists of three features: wind speed, wind direction, and air temperature."
)
client.update_model_version(
name=MODEL_NAME,
version=1,
description="This model version was built using TensorFlow Keras. It is a feed-forward neural network with one hidden layer."
)
创建新模型版本
经典的机器学习技术对电力预测也是有效的。 下面的代码使用 scikit learn 训练随机林模型,并使用 mlflow.sklearn.log_model()
函数将其注册到 Unity Catalog 中。
import mlflow.sklearn
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
with mlflow.start_run():
n_estimators = 300
mlflow.log_param("n_estimators", n_estimators)
rand_forest = RandomForestRegressor(n_estimators=n_estimators)
rand_forest.fit(X_train, y_train)
val_x, val_y = get_validation_data()
mse = mean_squared_error(rand_forest.predict(val_x), val_y)
print("Validation MSE: %d" % mse)
mlflow.log_metric("mse", mse)
example_input = val_x.iloc[[0]]
# Specify the `registered_model_name` parameter of the `mlflow.sklearn.log_model()`
# function to register the model to <UC>. This automatically
# creates a new model version
mlflow.sklearn.log_model(
sk_model=rand_forest,
artifact_path="sklearn-model",
input_example=example_input,
registered_model_name=MODEL_NAME
)
提取新模型版本号
以下代码演示如何检索模型名对应的最新模型版本号。
client = MlflowClient()
model_version_infos = client.search_model_versions("name = '%s'" % MODEL_NAME)
new_model_version = max([model_version_info.version for model_version_info in model_version_infos])
向新模型版本添加说明
client.update_model_version(
name=MODEL_NAME,
version=new_model_version,
description="This model version is a random forest containing 100 decision trees that was trained in scikit-learn."
)
将新模型版本标记为 Challenger 并测试模型
在部署模型来为生产流量提供服务之前,最佳做法是使用生产数据示例对它进行测试。 之前,你使用“Champion”别名来表示为大多数生产工作负载提供服务的模型版本。 以下代码将“Challenger”别名分配给新的模型版本,并评估其性能。
client.set_registered_model_alias(
name=MODEL_NAME,
alias="Challenger",
version=new_model_version
)
forecast_power(MODEL_NAME, "Challenger")
将新模型版本部署为 Champion 模型版本
验证新模型版本在测试中表现良好后,下面的代码将“Champion”别名分配给新模型版本,并使用与使用 champion 模型预测电力输出部分完全相同的应用程序代码生成电力预测。
client.set_registered_model_alias(
name=MODEL_NAME,
alias="Champion",
version=new_model_version
)
forecast_power(MODEL_NAME, "Champion")
现在,预测模型有两个模型版本:在 Keras 模型中训练的模型版本和在 scikit-learn 中训练的版本。 请注意,“Challenger”别名仍然分配给新的 scikit-learn 模型版本,因此面向“Challenger”模型版本的下游工作负载都将继续成功运行:
删除模型
当不再使用模型版本时,可将其删除。 还可删除整个已注册模型;这会移除其所有关联的模型版本。 请注意,删除模型版本会清除分配给模型版本的任何别名。
使用 MLflow API 删除 Version 1
client.delete_model_version(
name=MODEL_NAME,
version=1,
)
使用 MLflow API 删除模型
client = MlflowClient()
client.delete_registered_model(name=MODEL_NAME)