このチュートリアルでは、ディープ ラーニング モデリング プロジェクトの実験、トレーニング、チューニング、登録、評価、デプロイのライフサイクル全体について説明します。 MLflow を使用して、モデルの開発およびデプロイ プロセスのあらゆる側面を追跡する方法を示します。
ノートブックではPyTorch を使用します。これは、GPU で高速化されたテンソル計算とディープ ラーニング ネットワークを構築するための高度な機能を提供するPython パッケージです。 準備ができたら、 モザイク AI モデル サービスを使用してモデルをデプロイできます。
このステップ バイ ステップ チュートリアルでは、次の方法について説明します。
- データを生成して視覚化する: 実際のシナリオをシミュレートし、特徴のリレーションシップを視覚化する合成データを作成します。
- PyTorch ニューラル ネットワークの設計とトレーニング: 回帰タスク用に調整された柔軟なディープ ラーニング モデルを構築します。
- MLflow を使用して実験を追跡する: メトリック、パラメーター、成果物をログに記録して、完全な再現性を実現します。
- ハイパーパラメーターのチューニングを自動化する: Optuna を使用して、最適なモデル構成を効率的に検索します。
- モデルの登録と管理: セキュリティで保護された組織化されたモデル ガバナンスのために、Unity カタログと統合された MLflow モデル レジストリを使用します。
- デプロイと予測: Spark UDF を使用して、登録されたモデルを読み込み、ローカルまたは大規模に予測を実行します。
%pip install -Uqqq mlflow pytorch-lightning optuna skorch uv optuna-integration[pytorch_lightning]
%restart_python
from typing import Tuple, Optional, Dict, List, Any
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import pytorch_lightning as pl
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint
import mlflow
from mlflow.models import infer_signature
from mlflow.tracking import MlflowClient
from mlflow.entities import Metric, Param
import optuna
from optuna.integration import PyTorchLightningPruningCallback
import time
0. Unity カタログを使用してモデル レジストリを構成する
Databricks で MLflow を使用する主な利点の 1 つは、 Unity Catalog とのシームレスな統合です。 この統合により、モデルの管理とガバナンスが簡素化され、開発するすべてのモデルが追跡され、バージョン管理され、セキュリティで保護されます。 Unity カタログの詳細については、(AWS | Azure | GCP) を参照してください。
レジストリ URI を設定する
次のセルは、モデル登録に Unity カタログを使用するように MLflow を構成します。
mlflow.set_registry_uri("databricks-uc")
1. 合成回帰データセットを作成する
次のセルは、 create_regression_data 関数を定義します。 この関数は、回帰の合成データを生成します。 結果のデータセットには、特徴と、重要度が異なるターゲット、ノイズ、特徴の間の線形リレーションシップと非線形リレーションシップが含まれます。 これらの機能は、実際のデータ シナリオを模倣するように設計されています。
def create_regression_data(
n_samples: int,
n_features: int,
seed: int = 1994,
noise_level: float = 0.3,
nonlinear: bool = True
) -> Tuple[pd.DataFrame, pd.Series]:
"""Generates synthetic regression data with interesting correlations for MLflow and PyTorch demonstrations.
This function creates a DataFrame of continuous features and computes a target variable with nonlinear
relationships and interactions between features. The data is designed to be complex enough to demonstrate
the capabilities of deep learning, but not so complex that a reasonable model can't be learned.
Args:
n_samples (int): Number of samples (rows) to generate.
n_features (int): Number of feature columns.
seed (int, optional): Random seed for reproducibility. Defaults to 1994.
noise_level (float, optional): Level of Gaussian noise to add to the target. Defaults to 0.3.
nonlinear (bool, optional): Whether to add nonlinear feature transformations. Defaults to True.
Returns:
Tuple[pd.DataFrame, pd.Series]:
- pd.DataFrame: DataFrame containing the synthetic features.
- pd.Series: Series containing the target labels.
Example:
>>> df, target = create_regression_data(n_samples=1000, n_features=10)
"""
rng = np.random.RandomState(seed)
# Generate random continuous features
X = rng.uniform(-5, 5, size=(n_samples, n_features))
# Create feature DataFrame with meaningful names
columns = [f"feature_{i}" for i in range(n_features)]
df = pd.DataFrame(X, columns=columns)
# Generate base target variable with linear relationship to a subset of features
# Use only the first n_features//2 features to create some irrelevant features
weights = rng.uniform(-2, 2, size=n_features//2)
target = np.dot(X[:, :n_features//2], weights)
# Add some nonlinear transformations if requested
if nonlinear:
# Add square term for first feature
target += 0.5 * X[:, 0]**2
# Add interaction between the second and third features
if n_features >= 3:
target += 1.5 * X[:, 1] * X[:, 2]
# Add sine transformation of fourth feature
if n_features >= 4:
target += 2 * np.sin(X[:, 3])
# Add exponential of fifth feature, scaled down
if n_features >= 5:
target += 0.1 * np.exp(X[:, 4] / 2)
# Add threshold effect for sixth feature
if n_features >= 6:
target += 3 * (X[:, 5] > 1.5).astype(float)
# Add Gaussian noise
noise = rng.normal(0, noise_level * target.std(), size=n_samples)
target += noise
# Add a few more interesting features to the DataFrame
# Add a correlated feature (but not used in target calculation)
if n_features >= 7:
df['feature_correlated'] = df['feature_0'] * 0.8 + rng.normal(0, 0.2, size=n_samples)
# Add a cyclical feature
df['feature_cyclical'] = np.sin(np.linspace(0, 4*np.pi, n_samples))
# Add a feature with outliers
df['feature_with_outliers'] = rng.normal(0, 1, size=n_samples)
# Add outliers to ~1% of samples
outlier_idx = rng.choice(n_samples, size=n_samples//100, replace=False)
df.loc[outlier_idx, 'feature_with_outliers'] = rng.uniform(10, 15, size=len(outlier_idx))
return df, pd.Series(target, name='target')
2. 探索的データ分析 (EDA) の視覚化
視覚化は、データを理解するのに役立ちます。 次のセルのコードは 6 つの関数を作成し、それぞれが異なるプロットを生成して、データセットを視覚的に検査するのに役立ちます。
MLflow を使用して視覚化をアーティファクトとしてログに記録し、実験を完全に再現できます。
def plot_feature_distributions(X: pd.DataFrame, y: pd.Series, n_cols: int = 3) -> plt.Figure:
"""
Creates a grid of histograms for each feature in the dataset.
Args:
X (pd.DataFrame): DataFrame containing features.
y (pd.Series): Series containing the target variable.
n_cols (int): Number of columns in the grid layout.
Returns:
plt.Figure: The matplotlib Figure object containing the distribution plots.
"""
features = X.columns
n_features = len(features)
n_rows = (n_features + n_cols - 1) // n_cols
fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 4 * n_rows))
axes = axes.flatten() if n_rows * n_cols > 1 else [axes]
for i, feature in enumerate(features):
if i < len(axes):
ax = axes[i]
sns.histplot(X[feature], ax=ax, kde=True, color='skyblue')
ax.set_title(f'Distribution of {feature}')
# Hide any unused subplots
for i in range(n_features, len(axes)):
axes[i].set_visible(False)
plt.tight_layout()
fig.suptitle('Feature Distributions', y=1.02, fontsize=16)
plt.close(fig)
return fig
def plot_correlation_heatmap(X: pd.DataFrame, y: pd.Series) -> plt.Figure:
"""
Creates a correlation heatmap of all features and the target variable.
Args:
X (pd.DataFrame): DataFrame containing features.
y (pd.Series): Series containing the target variable.
Returns:
plt.Figure: The matplotlib Figure object containing the heatmap.
"""
# Combine features and target into one DataFrame
data = X.copy()
data['target'] = y
# Calculate correlation matrix
corr_matrix = data.corr()
# Set up the figure
fig, ax = plt.subplots(figsize=(12, 10))
# Draw the heatmap with a color bar
cmap = sns.diverging_palette(220, 10, as_cmap=True)
sns.heatmap(corr_matrix, annot=True, fmt='.2f', cmap=cmap,
center=0, square=True, linewidths=0.5, ax=ax)
ax.set_title('Feature Correlation Heatmap', fontsize=16)
plt.close(fig)
return fig
def plot_feature_target_relationships(X: pd.DataFrame, y: pd.Series, n_cols: int = 3) -> plt.Figure:
"""
Creates a grid of scatter plots showing the relationship between each feature and the target.
Args:
X (pd.DataFrame): DataFrame containing features.
y (pd.Series): Series containing the target variable.
n_cols (int): Number of columns in the grid layout.
Returns:
plt.Figure: The matplotlib Figure object containing the relationship plots.
"""
features = X.columns
n_features = len(features)
n_rows = (n_features + n_cols - 1) // n_cols
fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 4 * n_rows))
axes = axes.flatten() if n_rows * n_cols > 1 else [axes]
for i, feature in enumerate(features):
if i < len(axes):
ax = axes[i]
# Scatter plot with regression line
sns.regplot(x=X[feature], y=y, ax=ax,
scatter_kws={'alpha': 0.5, 'color': 'blue'},
line_kws={'color': 'red'})
ax.set_title(f'{feature} vs Target')
for i in range(n_features, len(axes)):
axes[i].set_visible(False)
plt.tight_layout()
fig.suptitle('Feature vs Target Relationships', y=1.02, fontsize=16)
plt.close(fig)
return fig
def plot_pairwise_relationships(X: pd.DataFrame, y: pd.Series, features: list[str]) -> plt.Figure:
"""
Creates a pairplot showing relationships between selected features and the target.
Args:
X (pd.DataFrame): DataFrame containing features.
y (pd.Series): Series containing the target variable.
features (List[str]): List of feature names to include in the plot.
Returns:
plt.Figure: The matplotlib Figure object containing the pairplot.
"""
# Ensure features exist in the DataFrame
valid_features = [f for f in features if f in X.columns]
if not valid_features:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "No valid features provided", ha='center', va='center')
return fig
# Combine selected features and target
data = X[valid_features].copy()
data['target'] = y
# Create pairplot
pairgrid = sns.pairplot(data, diag_kind="kde",
plot_kws={"alpha": 0.6, "s": 50},
corner=True)
pairgrid.fig.suptitle("Pairwise Feature Relationships", y=1.02, fontsize=16)
plt.close(pairgrid.fig)
return pairgrid.fig
def plot_outliers(X: pd.DataFrame, n_cols: int = 3) -> plt.Figure:
"""
Creates a grid of box plots to detect outliers in each feature.
Args:
X (pd.DataFrame): DataFrame containing features.
n_cols (int): Number of columns in the grid layout.
Returns:
plt.Figure: The matplotlib Figure object containing the outlier plots.
"""
features = X.columns
n_features = len(features)
n_rows = (n_features + n_cols - 1) // n_cols
fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 4 * n_rows))
axes = axes.flatten() if n_rows * n_cols > 1 else [axes]
for i, feature in enumerate(features):
if i < len(axes):
ax = axes[i]
# Box plot to detect outliers
sns.boxplot(x=X[feature], ax=ax, color='skyblue')
ax.set_title(f'Outlier Detection for {feature}')
ax.set_xlabel(feature)
# Hide any unused subplots
for i in range(n_features, len(axes)):
axes[i].set_visible(False)
plt.tight_layout()
fig.suptitle('Outlier Detection for Features', y=1.02, fontsize=16)
plt.close(fig)
return fig
def plot_residuals(y_true: pd.Series, y_pred: np.ndarray) -> plt.Figure:
"""
Creates a residual plot to analyze model prediction errors.
Args:
y_true (pd.Series): True target values.
y_pred (np.ndarray): Predicted target values.
Returns:
plt.Figure: The matplotlib Figure object containing the residual plot.
"""
residuals = y_true - y_pred
fig, ax = plt.subplots(figsize=(10, 6))
# Scatter plot of predicted values vs residuals
ax.scatter(y_pred, residuals, alpha=0.5)
ax.axhline(y=0, color='r', linestyle='-')
ax.set_xlabel('Predicted Values')
ax.set_ylabel('Residuals')
ax.set_title('Residual Plot')
plt.tight_layout()
plt.close(fig)
return fig
3. 回帰用の PyTorch ニューラル ネットワークを設計する
次のセルのコードは、PyTorch モデルアーキテクチャを定義します。 次の特性を持つ柔軟なニューラル ネットワークが作成されます。
- 構成可能なアーキテクチャ: 隠し層の調整可能な数とサイズ。
- 活性化関数: 隠れ層にはReLU、出力には線形。
- 正則化: オーバーフィットを防ぐための省略可能なドロップアウト。
- レイヤーの正規化: トレーニングを安定させ、収束を加速します。
さまざまな方法を示すために、次のセルでは、最初に標準の PyTorch モジュールを使用し、次に PyTorch Lightning モジュールを使用してニューラル ネットワークを作成する方法を示します。
class RegressionNN(nn.Module):
"""
A flexible feedforward neural network for regression tasks.
Attributes:
input_dim (int): Number of input features.
hidden_dims (List[int]): List of hidden layer dimensions.
dropout_rate (float): Dropout probability for regularization.
use_layer_norm (bool): Whether to use layer normalization.
"""
def __init__(
self,
input_dim: int,
hidden_dims: List[int] = [64, 32],
dropout_rate: float = 0.1,
use_layer_norm: bool = True
):
"""
Initialize the neural network.
Args:
input_dim (int): Number of input features.
hidden_dims (List[int]): List of hidden layer dimensions.
dropout_rate (float): Dropout probability for regularization.
use_layer_norm (bool): Whether to use layer normalization.
"""
super().__init__()
self.input_dim = input_dim
self.hidden_dims = hidden_dims
self.dropout_rate = dropout_rate
self.use_layer_norm = use_layer_norm
# Build layers dynamically based on hidden_dims
layers = []
# Input layer
prev_dim = input_dim
# Hidden layers
for dim in hidden_dims:
layers.append(nn.Linear(prev_dim, dim))
if use_layer_norm:
layers.append(nn.LayerNorm(dim))
layers.append(nn.ReLU())
if dropout_rate > 0:
layers.append(nn.Dropout(dropout_rate))
prev_dim = dim
# Output layer (single output for regression)
layers.append(nn.Linear(prev_dim, 1))
# Combine all layers
self.model = nn.Sequential(*layers)
def forward(self, x):
"""Forward pass through the network."""
return self.model(x).squeeze()
def get_params(self) -> Dict[str, Any]:
"""Return model parameters as a dictionary for MLflow logging."""
return {
"input_dim": self.input_dim,
"hidden_dims": self.hidden_dims,
"dropout_rate": self.dropout_rate,
"use_layer_norm": self.use_layer_norm
}
class RegressionLightningModule(pl.LightningModule):
"""
PyTorch Lightning module for regression tasks.
This class wraps the RegressionNN model and adds training, validation,
and testing logic using the PyTorch Lightning framework.
"""
def __init__(
self,
input_dim: int,
hidden_dims: List[int] = [64, 32],
dropout_rate: float = 0.1,
use_layer_norm: bool = True,
learning_rate: float = 1e-3,
weight_decay: float = 1e-5
):
"""
Initialize the Lightning module.
Args:
input_dim (int): Number of input features.
hidden_dims (List[int]): List of hidden layer dimensions.
dropout_rate (float): Dropout probability for regularization.
use_layer_norm (bool): Whether to use layer normalization.
learning_rate (float): Learning rate for the optimizer.
weight_decay (float): Weight decay for L2 regularization.
"""
super().__init__()
# Save hyperparameters
self.save_hyperparameters()
# Create the model
self.model = RegressionNN(
input_dim=input_dim,
hidden_dims=hidden_dims,
dropout_rate=dropout_rate,
use_layer_norm=use_layer_norm
)
# Loss function
self.loss_fn = nn.MSELoss()
def forward(self, x):
"""Forward pass through the network."""
return self.model(x)
def configure_optimizers(self):
"""Configure the optimizer for training."""
optimizer = torch.optim.Adam(
self.parameters(),
lr=self.hparams.learning_rate,
weight_decay=self.hparams.weight_decay
)
return optimizer
def training_step(self, batch, batch_idx):
"""Perform a training step."""
x, y = batch
y_pred = self(x)
loss = self.loss_fn(y_pred, y)
self.log('train_loss', loss, prog_bar=True)
return loss
def validation_step(self, batch, batch_idx):
"""Perform a validation step."""
x, y = batch
y_pred = self(x)
loss = self.loss_fn(y_pred, y)
self.log('val_loss', loss, prog_bar=True)
# Calculate additional metrics
rmse = torch.sqrt(loss)
mae = torch.mean(torch.abs(y_pred - y))
self.log('val_rmse', rmse, prog_bar=True)
self.log('val_mae', mae, prog_bar=True)
return loss
def test_step(self, batch, batch_idx):
"""Perform a test step."""
x, y = batch
y_pred = self(x)
loss = self.loss_fn(y_pred, y)
# Calculate metrics for test set
rmse = torch.sqrt(loss)
mae = torch.mean(torch.abs(y_pred - y))
self.log('test_loss', loss)
self.log('test_rmse', rmse)
self.log('test_mae', mae)
return loss
def get_params(self) -> Dict[str, Any]:
"""Return model parameters as a dictionary for MLflow logging."""
return {
"input_dim": self.hparams.input_dim,
"hidden_dims": self.hparams.hidden_dims,
"dropout_rate": self.hparams.dropout_rate,
"use_layer_norm": self.hparams.use_layer_norm,
"learning_rate": self.hparams.learning_rate,
"weight_decay": self.hparams.weight_decay
}
def prepare_dataloader(
X_train, y_train, X_val, y_val, X_test, y_test, batch_size: int = 32
):
"""
Create PyTorch DataLoaders for training, validation, and testing.
Args:
X_train, y_train: Training data and labels.
X_val, y_val: Validation data and labels.
X_test, y_test: Test data and labels.
batch_size (int): Batch size for the DataLoaders.
Returns:
Tuple of (train_loader, val_loader, test_loader, scaler)
"""
# Initialize a scaler
scaler = StandardScaler()
# Fit and transform the training data
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)
# Convert to PyTorch tensors - explicitly set dtype to float32
X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.values, dtype=torch.float32)
X_val_tensor = torch.tensor(X_val_scaled, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val.values, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test.values, dtype=torch.float32)
# Create TensorDatasets
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)
return train_loader, val_loader, test_loader, scaler
4. 標準モデリングワークフロー
次のセルのコードでは、次の手順を使用して、MLflow 統合を使用して標準の PyTorch モデリング ワークフローを実装します。
- 合成データを生成して探索します。
- データをトレーニング セット、検証セット、テスト セットに分割します。
- データをスケーリングし、PyTorch DataLoader を作成します。
- ニューラル ネットワーク モデルを定義してトレーニングします。
- モデルのパフォーマンスを評価します。
- メトリック、パラメーター、成果物を MLflow に記録します。
この標準ワークフローは、ベースライン モデルを提供します。 その後、ハイパーパラメーター調整を使用してモデルを改善できます。
# Create the regression dataset
n_samples = 1000
n_features = 10
X, y = create_regression_data(n_samples=n_samples, n_features=n_features, nonlinear=True)
# Create EDA plots
dist_plot = plot_feature_distributions(X, y)
corr_plot = plot_correlation_heatmap(X, y)
scatter_plot = plot_feature_target_relationships(X, y)
corr_with_target = X.corrwith(y).abs().sort_values(ascending=False)
top_features = corr_with_target.head(4).index.tolist()
pairwise_plot = plot_pairwise_relationships(X, y, top_features)
outlier_plot = plot_outliers(X)
# Split the data into train, validation, and test sets
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
# Prepare DataLoaders
batch_size = 32
train_loader, val_loader, test_loader, scaler = prepare_dataloader(
X_train, y_train, X_val, y_val, X_test, y_test, batch_size=batch_size)
# Define model parameters
input_dim = X_train.shape[1]
hidden_dims = [64, 32]
dropout_rate = 0.1
use_layer_norm = True
learning_rate = 1e-3
weight_decay = 1e-5
# Create the PyTorch Lightning model
model = RegressionLightningModule(
input_dim=input_dim,
hidden_dims=hidden_dims,
dropout_rate=dropout_rate,
use_layer_norm=use_layer_norm,
learning_rate=learning_rate,
weight_decay=weight_decay
)
# Define early stopping and model checkpoint callbacks
early_stopping = EarlyStopping(
monitor='val_loss',
patience=10,
mode='min'
)
checkpoint_callback = ModelCheckpoint(
monitor='val_loss',
dirpath='./checkpoints',
filename='pytorch-regression-{epoch:02d}-{val_loss:.4f}',
save_top_k=1,
mode='min'
)
# Define trainer
trainer = pl.Trainer(
max_epochs=100,
callbacks=[early_stopping, checkpoint_callback],
enable_progress_bar=True,
log_every_n_steps=5
)
# Train the model
trainer.fit(model, train_loader, val_loader)
# Test the model
test_results = trainer.test(model, test_loader)
# Make predictions on the test set for evaluation
model.eval()
test_preds = []
true_values = []
with torch.no_grad():
for batch in test_loader:
x, y = batch
y_pred = model(x)
test_preds.extend(y_pred.numpy())
true_values.extend(y.numpy())
test_preds = np.array(test_preds)
true_values = np.array(true_values)
# Calculate metrics
rmse = np.sqrt(mean_squared_error(true_values, test_preds))
mae = mean_absolute_error(true_values, test_preds)
r2 = r2_score(true_values, test_preds)
# Create residual plot
residual_plot = plot_residuals(pd.Series(true_values), test_preds)
5. MLflow を使用してモデルをログに記録する
Databricks で MLflow を使用してモデルをログに記録すると、重要な成果物とメタデータがキャプチャされます。 これにより、モデルが再現可能であるだけでなく、必要なすべての依存関係と明確な API コントラクトを使用してデプロイする準備が整います。 ログに記録される内容の詳細については、 MLflow のドキュメントを参照してください。
次のセルのコードは、 with mlflow.start_run():を使用して MLflow の実行を開始します。 これにより、実行の MLflow コンテキスト マネージャーが初期化され、コード ブロックで実行が囲まれます。 コード ブロックが終了すると、ログに記録されたすべてのメトリック、パラメーター、成果物が保存され、MLflow の実行が自動的に終了します。
# Log the model and training results with MLflow
with mlflow.start_run() as run:
# Create MLflow client for batch logging
mlflow_client = MlflowClient()
run_id = run.info.run_id
# Extract metrics
final_train_loss = trainer.callback_metrics.get("train_loss").item() if "train_loss" in trainer.callback_metrics else None
final_val_loss = trainer.callback_metrics.get("val_loss").item() if "val_loss" in trainer.callback_metrics else None
# Extract parameters for logging
model_params = model.get_params()
# Create a list to store all metrics for batch logging
all_metrics = []
# Add each metric to the list
if final_train_loss is not None:
all_metrics.append(Metric(key="train_loss", value=final_train_loss, timestamp=0, step=0))
if final_val_loss is not None:
all_metrics.append(Metric(key="val_loss", value=final_val_loss, timestamp=0, step=0))
# Add test metrics
all_metrics.append(Metric(key="test_rmse", value=rmse, timestamp=0, step=0))
all_metrics.append(Metric(key="test_mae", value=mae, timestamp=0, step=0))
all_metrics.append(Metric(key="test_r2", value=r2, timestamp=0, step=0))
# Collect all parameters to log
# Note: The code uses log_params for model_params since there could be many parameters,
# but converts the individual param calls to batch
from mlflow.entities import Param
all_params = [
Param(key="batch_size", value=str(batch_size)),
Param(key="early_stopping_patience", value=str(early_stopping.patience)),
Param(key="max_epochs", value=str(trainer.max_epochs)),
Param(key="actual_epochs", value=str(trainer.current_epoch))
]
# Generate a model signature using the infer signature utility in MLflow
input_example = X_train.iloc[[0]].values.astype(np.float32) # Ensure float32 type
input_example_scaled = scaler.transform(input_example).astype(np.float32)
model.eval()
with torch.no_grad():
# Ensure tensor is float32
tensor_input = torch.tensor(input_example_scaled, dtype=torch.float32)
signature_preds = model(tensor_input)
# Ensure prediction is also float32
signature = infer_signature(input_example, signature_preds.numpy().reshape(-1).astype(np.float32))
# Log model parameters first (since these could be numerous)
mlflow.log_params(model_params)
# Log all metrics and remaining parameters in a single batch operation
mlflow_client.log_batch(
run_id=run_id,
metrics=all_metrics,
params=all_params
)
# Log the model to MLflow and register the model to Unity Catalog
model_info = mlflow.pytorch.log_model(
model,
artifact_path="model",
input_example=input_example,
signature=signature,
registered_model_name="demo.pytorch_regression_model",
)
# Log feature analysis plots
mlflow.log_figure(dist_plot, "feature_distributions.png")
mlflow.log_figure(corr_plot, "correlation_heatmap.png")
mlflow.log_figure(scatter_plot, "feature_target_relationships.png")
mlflow.log_figure(pairwise_plot, "pairwise_relationships.png")
mlflow.log_figure(outlier_plot, "outlier_detection.png")
mlflow.log_figure(residual_plot, "residual_plot.png")
# Run MLflow evaluation to generate additional metrics without having to implement them
evaluation_data = X_test.copy()
evaluation_data["label"] = y_test
# Skip mlflow.evaluate for now to avoid type mismatch issues
# Instead, log the metrics directly
print(f"Model logged: {model_info.model_uri}")
print(f"Test RMSE: {rmse:.4f}")
print(f"Test MAE: {mae:.4f}")
print(f"Test R²: {r2:.4f}")
6. ハイパーパラメーターの調整
このセクションでは、 MLflow で Optuna と 入れ子になった実行を使用してハイパーパラメーターのチューニングを自動化する方法について説明します。 この方法では、さまざまなパラメーター構成を調べ、すべての実験の詳細をキャプチャできます。
次のセルのコードでは、次の処理が行われます。
前に定義した
create_regression_data関数を使用して、合成回帰データセットを生成します。データセットを個別のトレーニング データセットとテスト データセットに分割し、評価用にテスト データセットのコピーを保存します。
ハイパーパラメーター 調整プロセスの目的関数を作成します。 目的関数は、レイヤーの数、非表示のディメンション、ドロップアウト率、学習率、正則化パラメーターなど、PyTorch モデルのハイパーパラメーターの検索空間を定義します。 Optuna は、これらの値を動的にサンプリングし、各試用版で異なるパラメーターの組み合わせをテストすることを確認します。
目的関数内で入れ子になった MLflow 実行を開始します。 この入れ子になった実行では、現在のハイパーパラメーター評価版に固有のすべての詳細が自動的にキャプチャされ、ログに記録されます。 それぞれの評価版を独自の入れ子になった実行で分離することで、各構成とそれに対応するパフォーマンス メトリックを適切に整理した記録を保持できます。 入れ子になった実行では、次のログが記録されます。
- その試用版に使用される特定のハイパーパラメーター。
- テスト セットで計算されたパフォーマンス メトリック (この場合は検証損失)。
- トレーニング済みのモデル インスタンスも、試用版のメタデータの一部として格納されます。 これにより、後で最もパフォーマンスの高いモデルを簡単に取得できます。
このコードでは、各モデルが MLflow に記録されることはありません。 ハイパーパラメーターの調整を行っている間、各イテレーションが特に適切であるとは限らないため、各イテレーションのモデル成果物を記録する理由はありません。
親 MLflow 実行を作成します。 この実行により、最適なハイパーパラメーターのセット (検証損失が最も低いセット) を識別するように設計された Optuna スタディが開始されます。 Optuna は一連の試用版を実行します。各試用版では、ハイパーパラメーターの一意の組み合わせを使用します。 各評価版では、入れ子になった MLflow の実行によってすべての実験の詳細がキャプチャされるため、後で各モデル構成のパフォーマンスを追跡して比較できます。
この調査では、最も低い検証損失に基づいて、最適な試用版を特定します。 このコードでは、最適なモデルと最適なパラメーター値が抽出されます。 このコードでは、
infer_signatureを使用してモデル署名を保存します。これは、予想される入力スキーマと出力スキーマを指定し、Unity Catalog などのシステムとの一貫性のあるデプロイと統合のために重要です。 最後に、最適なモデルがログに記録され、Unity カタログに登録されます。 EDA プロットや特徴の重要度グラフなどの追加の成果物も記録されます。
# Create a custom pruning callback as a fallback
class PyTorchLightningPruningCallback(pl.Callback):
"""PyTorch Lightning callback to prune unpromising trials.
This is a simplified version for use when the optuna-integration package isn't available.
"""
def __init__(self, trial, monitor):
super().__init__()
self._trial = trial
self.monitor = monitor
def on_validation_end(self, trainer, pl_module):
# Report the validation metric to Optuna
metrics = trainer.callback_metrics
current_score = metrics.get(self.monitor)
if current_score is not None:
self._trial.report(current_score.item(), trainer.current_epoch)
# If trial should be pruned based on current value,
# stop the training
if self._trial.should_prune():
message = "Trial was pruned at epoch {}.".format(trainer.current_epoch)
raise optuna.TrialPruned(message)
# Generate a larger dataset for hyperparameter tuning
n_samples = 2000
n_features = 10
X, y = create_regression_data(n_samples=n_samples, n_features=n_features, nonlinear=True)
# Split the data
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
# Prepare the evaluation data
evaluation_data = X_test.copy()
evaluation_data["label"] = y_test
# Create the data loaders
batch_size = 32
train_loader, val_loader, test_loader, scaler = prepare_dataloader(
X_train, y_train, X_val, y_val, X_test, y_test, batch_size=batch_size)
def objective(trial):
"""Optuna objective function to minimize validation loss."""
# Define the hyperparameter search space
n_layers = trial.suggest_int("n_layers", 1, 3)
# Create hidden dimensions based on number of layers
hidden_dims = []
for i in range(n_layers):
hidden_dims.append(trial.suggest_int(f"hidden_dim_{i}", 16, 128))
# Other hyperparameters
dropout_rate = trial.suggest_float("dropout_rate", 0.0, 0.5)
learning_rate = trial.suggest_float("learning_rate", 1e-4, 1e-2, log=True)
weight_decay = trial.suggest_float("weight_decay", 1e-6, 1e-3, log=True)
use_layer_norm = trial.suggest_categorical("use_layer_norm", [True, False])
# Start a nested MLflow run for this trial
with mlflow.start_run(nested=True) as child_run:
# Create MLflow client for batch logging
mlflow_client = MlflowClient()
run_id = child_run.info.run_id
# Prepare parameters for batch logging
params_list = []
param_dict = {
"n_layers": n_layers,
"hidden_dims": str(hidden_dims), # Convert list to string
"dropout_rate": dropout_rate,
"learning_rate": learning_rate,
"weight_decay": weight_decay,
"use_layer_norm": use_layer_norm,
"batch_size": batch_size
}
# Convert parameters to Param objects
for key, value in param_dict.items():
params_list.append(Param(key, str(value)))
# Create the model with these hyperparameters
model = RegressionLightningModule(
input_dim=X_train.shape[1],
hidden_dims=hidden_dims,
dropout_rate=dropout_rate,
use_layer_norm=use_layer_norm,
learning_rate=learning_rate,
weight_decay=weight_decay
)
# Callbacks
early_stopping = EarlyStopping(
monitor='val_loss',
patience=5,
mode='min'
)
pruning_callback = PyTorchLightningPruningCallback(
trial, monitor="val_loss"
)
# Define trainer with early stopping and pruning
trainer = pl.Trainer(
max_epochs=50,
callbacks=[early_stopping, pruning_callback],
enable_progress_bar=False,
log_every_n_steps=10
)
# Train and validate the model
trainer.fit(model, train_loader, val_loader)
# Get the best validation loss
best_val_loss = trainer.callback_metrics.get("val_loss").item()
val_rmse = np.sqrt(best_val_loss)
# Prepare metrics for batch logging
current_time = int(time.time() * 1000) # Current time in milliseconds
metrics_list = [
Metric("val_loss", best_val_loss, current_time, 0),
Metric("val_rmse", val_rmse, current_time, 0)
]
# Use log_batch through the client for efficient logging
mlflow_client.log_batch(run_id, metrics=metrics_list, params=params_list)
# Store the model in the trial's user attributes
trial.set_user_attr("model", model)
# Return the value to minimize (validation loss)
return best_val_loss
best_model_version = None
# The parent run stores the best iteration from the hyperparameter tuning execution
with mlflow.start_run() as run:
# Create MLflow client for batch logging
mlflow_client = MlflowClient()
run_id = run.info.run_id
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=20)
best_trial = study.best_trial
best_model = best_trial.user_attrs["model"]
# Test the best model
trainer = pl.Trainer(
enable_progress_bar=True,
log_every_n_steps=5
)
test_results = trainer.test(best_model, test_loader)
# Make predictions on the test set for evaluation
best_model.eval()
test_preds = []
true_values = []
with torch.no_grad():
for batch in test_loader:
x, y = batch
y_pred = best_model(x)
test_preds.extend(y_pred.numpy())
true_values.extend(y.numpy())
test_preds = np.array(test_preds)
true_values = np.array(true_values)
# Calculate metrics
rmse = np.sqrt(mean_squared_error(true_values, test_preds))
mae = mean_absolute_error(true_values, test_preds)
r2 = r2_score(true_values, test_preds)
# Prepare parameters for batch logging
best_params_list = []
for key, value in best_trial.params.items():
best_params_list.append(Param(f"best_{key}", str(value)))
# Prepare metrics for batch logging
current_time = int(time.time() * 1000) # Current time in milliseconds
metrics_list = [
Metric("best_val_loss", best_trial.value, current_time, 0),
Metric("test_rmse", rmse, current_time, 0),
Metric("test_mae", mae, current_time, 0),
Metric("test_r2", r2, current_time, 0)
]
# Log metrics and parameters in a single batch call
mlflow_client.log_batch(run_id, metrics=metrics_list, params=best_params_list)
# Generate model signature - ensure consistent float32 types
input_example = X_train.iloc[[0]].values.astype(np.float32)
input_example_scaled = scaler.transform(input_example).astype(np.float32)
best_model.eval()
with torch.no_grad():
tensor_input = torch.tensor(input_example_scaled, dtype=torch.float32)
signature_preds = best_model(tensor_input)
signature = infer_signature(input_example, signature_preds.numpy().reshape(-1).astype(np.float32))
# Log and register the PyTorch model
model_info = mlflow.pytorch.log_model(
best_model,
artifact_path="model",
input_example=input_example,
signature=signature,
registered_model_name="demo.pytorch_regression_optimized",
)
# Create residual plot
residual_plot = plot_residuals(pd.Series(true_values), test_preds)
# Log figures (no batch equivalent for figures)
mlflow.log_figure(dist_plot, "feature_distributions.png")
mlflow.log_figure(corr_plot, "correlation_heatmap.png")
mlflow.log_figure(scatter_plot, "feature_target_relationships.png")
mlflow.log_figure(pairwise_plot, "pairwise_relationships.png")
mlflow.log_figure(outlier_plot, "outlier_detection.png")
mlflow.log_figure(residual_plot, "residual_plot.png")
# Skip mlflow.evaluate for now to avoid type mismatch issues
# Instead, log the metrics directly
print(f"Best model logged: {model_info.model_uri}")
print(f"Best parameters: {best_trial.params}")
print(f"Test RMSE: {rmse:.4f}")
print(f"Test MAE: {mae:.4f}")
print(f"Test R²: {r2:.4f}")
best_model_version = model_info.registered_model_version
from mlflow import MlflowClient
# Initialize MLflow client
client = MlflowClient()
# Set a human-readable alias for the best model version
# This makes it easier to reference specific model versions programmatically
client.set_registered_model_alias("demo.pytorch_regression_optimized", "best", int(best_model_version))
7. デプロイ前の検証
MLflow には、運用環境のような環境をシミュレートし、モデルが正しく構成されていることを検証する mlflow.models.predict ユーティリティが用意されています。
# Reference the model by its alias
model_uri = "models:/demo.pytorch_regression_optimized@best"
# Validate the model's deployment readiness
mlflow.models.predict(model_uri=model_uri, input_data=X_test, env_manager="local")
8. 登録済みモデルを読み込んで予測を行う
このセクションのコードでは、MLflow から登録済みモデルを読み込み、それを使用してローカルで予測を行う方法を示します。 これは、テストやバッチ推論のシナリオに役立ちます。
# Convert the data type of X_test to float32
X_test = X_test.astype('float32')
# Load the model using the pyfunc interface (recommended for deployment)
loaded_model = mlflow.pyfunc.load_model(model_uri=model_uri)
# Make predictions with the loaded model
predictions = loaded_model.predict(X_test)
print(f"Shape of predictions: {predictions.shape}")
print(f"First 5 predictions: {predictions[:5]}")
print(f"First 5 actual values: {y_test.values[:5]}")
9. MLflow での Spark UDF を使用したバッチ予測
大規模な予測の場合は、モデルを Spark UDF に変換し、それを Spark DataFrame に適用して、分散推論を有効にすることができます。
from pyspark.sql.functions import array, col
# Convert the test data to a Spark DataFrame
X_spark = spark.createDataFrame(X_test)
# Create an array of all feature columns
# This step is necessary because:
# 1. The PyTorch model expects an input tensor with shape [-1, 13]
# 2. The model_udf needs to receive each row as a single array of 13 values
# 3. Without this array transformation, 13 separate columns would be passed to the model
# which wouldn't match the expected tensor structure
X_spark_with_array = X_spark.withColumn(
"features_array",
array(*[col(c) for c in X_spark.columns])
)
# Create a Spark UDF from the registered model
model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
# Apply MLflow UDF to the array column
# Pass the single array column to the model, which matches the expected tensor format
X_spark_with_predictions = X_spark_with_array.withColumn(
"prediction",
model_udf("features_array")
)
display(X_spark_with_predictions.limit(5))