PyTorch を使用したエンド ツー エンドのディープ ラーニング モデル

このチュートリアルでは、ディープ ラーニング モデリング プロジェクトの実験、トレーニング、チューニング、登録、評価、デプロイのライフサイクル全体について説明します。 MLflow を使用して、モデルの開発およびデプロイ プロセスのあらゆる側面を追跡する方法を示します。

ノートブックではPyTorch を使用します。これは、GPU で高速化されたテンソル計算とディープ ラーニング ネットワークを構築するための高度な機能を提供するPython パッケージです。 準備ができたら、 モザイク AI モデル サービスを使用してモデルをデプロイできます。

このステップ バイ ステップ チュートリアルでは、次の方法について説明します。

  • データを生成して視覚化する: 実際のシナリオをシミュレートし、特徴のリレーションシップを視覚化する合成データを作成します。
  • PyTorch ニューラル ネットワークの設計とトレーニング: 回帰タスク用に調整された柔軟なディープ ラーニング モデルを構築します。
  • MLflow を使用して実験を追跡する: メトリック、パラメーター、成果物をログに記録して、完全な再現性を実現します。
  • ハイパーパラメーターのチューニングを自動化する: Optuna を使用して、最適なモデル構成を効率的に検索します。
  • モデルの登録と管理: セキュリティで保護された組織化されたモデル ガバナンスのために、Unity カタログと統合された MLflow モデル レジストリを使用します。
  • デプロイと予測: Spark UDF を使用して、登録されたモデルを読み込み、ローカルまたは大規模に予測を実行します。
%pip install -Uqqq mlflow pytorch-lightning optuna skorch uv optuna-integration[pytorch_lightning]
%restart_python
from typing import Tuple, Optional, Dict, List, Any

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score


import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

import pytorch_lightning as pl
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint

import mlflow
from mlflow.models import infer_signature
from mlflow.tracking import MlflowClient
from mlflow.entities import Metric, Param

import optuna
from optuna.integration import PyTorchLightningPruningCallback

import time

0. Unity カタログを使用してモデル レジストリを構成する

Databricks で MLflow を使用する主な利点の 1 つは、 Unity Catalog とのシームレスな統合です。 この統合により、モデルの管理とガバナンスが簡素化され、開発するすべてのモデルが追跡され、バージョン管理され、セキュリティで保護されます。 Unity カタログの詳細については、(AWS | Azure | GCP) を参照してください。

レジストリ URI を設定する

次のセルは、モデル登録に Unity カタログを使用するように MLflow を構成します。

mlflow.set_registry_uri("databricks-uc")

1. 合成回帰データセットを作成する

次のセルは、 create_regression_data 関数を定義します。 この関数は、回帰の合成データを生成します。 結果のデータセットには、特徴と、重要度が異なるターゲット、ノイズ、特徴の間の線形リレーションシップと非線形リレーションシップが含まれます。 これらの機能は、実際のデータ シナリオを模倣するように設計されています。

def create_regression_data(
    n_samples: int,
    n_features: int,
    seed: int = 1994,
    noise_level: float = 0.3,
    nonlinear: bool = True
) -> Tuple[pd.DataFrame, pd.Series]:
    """Generates synthetic regression data with interesting correlations for MLflow and PyTorch demonstrations.

    This function creates a DataFrame of continuous features and computes a target variable with nonlinear
    relationships and interactions between features. The data is designed to be complex enough to demonstrate
    the capabilities of deep learning, but not so complex that a reasonable model can't be learned.

    Args:
        n_samples (int): Number of samples (rows) to generate.
        n_features (int): Number of feature columns.
        seed (int, optional): Random seed for reproducibility. Defaults to 1994.
        noise_level (float, optional): Level of Gaussian noise to add to the target. Defaults to 0.3.
        nonlinear (bool, optional): Whether to add nonlinear feature transformations. Defaults to True.

    Returns:
        Tuple[pd.DataFrame, pd.Series]:
            - pd.DataFrame: DataFrame containing the synthetic features.
            - pd.Series: Series containing the target labels.

    Example:
        >>> df, target = create_regression_data(n_samples=1000, n_features=10)
    """
    rng = np.random.RandomState(seed)

    # Generate random continuous features
    X = rng.uniform(-5, 5, size=(n_samples, n_features))

    # Create feature DataFrame with meaningful names
    columns = [f"feature_{i}" for i in range(n_features)]
    df = pd.DataFrame(X, columns=columns)

    # Generate base target variable with linear relationship to a subset of features
    # Use only the first n_features//2 features to create some irrelevant features
    weights = rng.uniform(-2, 2, size=n_features//2)
    target = np.dot(X[:, :n_features//2], weights)

    # Add some nonlinear transformations if requested
    if nonlinear:
        # Add square term for first feature
        target += 0.5 * X[:, 0]**2

        # Add interaction between the second and third features
        if n_features >= 3:
            target += 1.5 * X[:, 1] * X[:, 2]

        # Add sine transformation of fourth feature
        if n_features >= 4:
            target += 2 * np.sin(X[:, 3])

        # Add exponential of fifth feature, scaled down
        if n_features >= 5:
            target += 0.1 * np.exp(X[:, 4] / 2)

        # Add threshold effect for sixth feature
        if n_features >= 6:
            target += 3 * (X[:, 5] > 1.5).astype(float)

    # Add Gaussian noise
    noise = rng.normal(0, noise_level * target.std(), size=n_samples)
    target += noise

    # Add a few more interesting features to the DataFrame

    # Add a correlated feature (but not used in target calculation)
    if n_features >= 7:
        df['feature_correlated'] = df['feature_0'] * 0.8 + rng.normal(0, 0.2, size=n_samples)

    # Add a cyclical feature
    df['feature_cyclical'] = np.sin(np.linspace(0, 4*np.pi, n_samples))

    # Add a feature with outliers
    df['feature_with_outliers'] = rng.normal(0, 1, size=n_samples)
    # Add outliers to ~1% of samples
    outlier_idx = rng.choice(n_samples, size=n_samples//100, replace=False)
    df.loc[outlier_idx, 'feature_with_outliers'] = rng.uniform(10, 15, size=len(outlier_idx))

    return df, pd.Series(target, name='target')

2. 探索的データ分析 (EDA) の視覚化

視覚化は、データを理解するのに役立ちます。 次のセルのコードは 6 つの関数を作成し、それぞれが異なるプロットを生成して、データセットを視覚的に検査するのに役立ちます。

MLflow を使用して視覚化をアーティファクトとしてログに記録し、実験を完全に再現できます。

def plot_feature_distributions(X: pd.DataFrame, y: pd.Series, n_cols: int = 3) -> plt.Figure:
    """
    Creates a grid of histograms for each feature in the dataset.

    Args:
        X (pd.DataFrame): DataFrame containing features.
        y (pd.Series): Series containing the target variable.
        n_cols (int): Number of columns in the grid layout.

    Returns:
        plt.Figure: The matplotlib Figure object containing the distribution plots.
    """
    features = X.columns
    n_features = len(features)
    n_rows = (n_features + n_cols - 1) // n_cols

    fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 4 * n_rows))
    axes = axes.flatten() if n_rows * n_cols > 1 else [axes]

    for i, feature in enumerate(features):
        if i < len(axes):
            ax = axes[i]
            sns.histplot(X[feature], ax=ax, kde=True, color='skyblue')
            ax.set_title(f'Distribution of {feature}')

    # Hide any unused subplots
    for i in range(n_features, len(axes)):
        axes[i].set_visible(False)

    plt.tight_layout()
    fig.suptitle('Feature Distributions', y=1.02, fontsize=16)
    plt.close(fig)
    return fig

def plot_correlation_heatmap(X: pd.DataFrame, y: pd.Series) -> plt.Figure:
    """
    Creates a correlation heatmap of all features and the target variable.

    Args:
        X (pd.DataFrame): DataFrame containing features.
        y (pd.Series): Series containing the target variable.

    Returns:
        plt.Figure: The matplotlib Figure object containing the heatmap.
    """
    # Combine features and target into one DataFrame
    data = X.copy()
    data['target'] = y

    # Calculate correlation matrix
    corr_matrix = data.corr()

    # Set up the figure
    fig, ax = plt.subplots(figsize=(12, 10))

    # Draw the heatmap with a color bar
    cmap = sns.diverging_palette(220, 10, as_cmap=True)
    sns.heatmap(corr_matrix, annot=True, fmt='.2f', cmap=cmap,
                center=0, square=True, linewidths=0.5, ax=ax)

    ax.set_title('Feature Correlation Heatmap', fontsize=16)
    plt.close(fig)
    return fig

def plot_feature_target_relationships(X: pd.DataFrame, y: pd.Series, n_cols: int = 3) -> plt.Figure:
    """
    Creates a grid of scatter plots showing the relationship between each feature and the target.

    Args:
        X (pd.DataFrame): DataFrame containing features.
        y (pd.Series): Series containing the target variable.
        n_cols (int): Number of columns in the grid layout.

    Returns:
        plt.Figure: The matplotlib Figure object containing the relationship plots.
    """
    features = X.columns
    n_features = len(features)
    n_rows = (n_features + n_cols - 1) // n_cols

    fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 4 * n_rows))
    axes = axes.flatten() if n_rows * n_cols > 1 else [axes]

    for i, feature in enumerate(features):
        if i < len(axes):
            ax = axes[i]
            # Scatter plot with regression line
            sns.regplot(x=X[feature], y=y, ax=ax,
                       scatter_kws={'alpha': 0.5, 'color': 'blue'},
                       line_kws={'color': 'red'})
            ax.set_title(f'{feature} vs Target')

    for i in range(n_features, len(axes)):
        axes[i].set_visible(False)

    plt.tight_layout()
    fig.suptitle('Feature vs Target Relationships', y=1.02, fontsize=16)
    plt.close(fig)
    return fig

def plot_pairwise_relationships(X: pd.DataFrame, y: pd.Series, features: list[str]) -> plt.Figure:
    """
    Creates a pairplot showing relationships between selected features and the target.

    Args:
        X (pd.DataFrame): DataFrame containing features.
        y (pd.Series): Series containing the target variable.
        features (List[str]): List of feature names to include in the plot.

    Returns:
        plt.Figure: The matplotlib Figure object containing the pairplot.
    """
    # Ensure features exist in the DataFrame
    valid_features = [f for f in features if f in X.columns]

    if not valid_features:
        fig, ax = plt.subplots()
        ax.text(0.5, 0.5, "No valid features provided", ha='center', va='center')
        return fig

    # Combine selected features and target
    data = X[valid_features].copy()
    data['target'] = y

    # Create pairplot
    pairgrid = sns.pairplot(data, diag_kind="kde",
                          plot_kws={"alpha": 0.6, "s": 50},
                          corner=True)

    pairgrid.fig.suptitle("Pairwise Feature Relationships", y=1.02, fontsize=16)
    plt.close(pairgrid.fig)
    return pairgrid.fig

def plot_outliers(X: pd.DataFrame, n_cols: int = 3) -> plt.Figure:
    """
    Creates a grid of box plots to detect outliers in each feature.

    Args:
        X (pd.DataFrame): DataFrame containing features.
        n_cols (int): Number of columns in the grid layout.

    Returns:
        plt.Figure: The matplotlib Figure object containing the outlier plots.
    """
    features = X.columns
    n_features = len(features)
    n_rows = (n_features + n_cols - 1) // n_cols

    fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 4 * n_rows))
    axes = axes.flatten() if n_rows * n_cols > 1 else [axes]

    for i, feature in enumerate(features):
        if i < len(axes):
            ax = axes[i]
            # Box plot to detect outliers
            sns.boxplot(x=X[feature], ax=ax, color='skyblue')
            ax.set_title(f'Outlier Detection for {feature}')
            ax.set_xlabel(feature)

    # Hide any unused subplots
    for i in range(n_features, len(axes)):
        axes[i].set_visible(False)

    plt.tight_layout()
    fig.suptitle('Outlier Detection for Features', y=1.02, fontsize=16)
    plt.close(fig)
    return fig

def plot_residuals(y_true: pd.Series, y_pred: np.ndarray) -> plt.Figure:
    """
    Creates a residual plot to analyze model prediction errors.

    Args:
        y_true (pd.Series): True target values.
        y_pred (np.ndarray): Predicted target values.

    Returns:
        plt.Figure: The matplotlib Figure object containing the residual plot.
    """
    residuals = y_true - y_pred

    fig, ax = plt.subplots(figsize=(10, 6))

    # Scatter plot of predicted values vs residuals
    ax.scatter(y_pred, residuals, alpha=0.5)
    ax.axhline(y=0, color='r', linestyle='-')

    ax.set_xlabel('Predicted Values')
    ax.set_ylabel('Residuals')
    ax.set_title('Residual Plot')

    plt.tight_layout()
    plt.close(fig)
    return fig

3. 回帰用の PyTorch ニューラル ネットワークを設計する

次のセルのコードは、PyTorch モデルアーキテクチャを定義します。 次の特性を持つ柔軟なニューラル ネットワークが作成されます。

  • 構成可能なアーキテクチャ: 隠し層の調整可能な数とサイズ。
  • 活性化関数: 隠れ層にはReLU、出力には線形。
  • 正則化: オーバーフィットを防ぐための省略可能なドロップアウト。
  • レイヤーの正規化: トレーニングを安定させ、収束を加速します。

さまざまな方法を示すために、次のセルでは、最初に標準の PyTorch モジュールを使用し、次に PyTorch Lightning モジュールを使用してニューラル ネットワークを作成する方法を示します。

class RegressionNN(nn.Module):
    """
    A flexible feedforward neural network for regression tasks.

    Attributes:
        input_dim (int): Number of input features.
        hidden_dims (List[int]): List of hidden layer dimensions.
        dropout_rate (float): Dropout probability for regularization.
        use_layer_norm (bool): Whether to use layer normalization.
    """

    def __init__(
        self,
        input_dim: int,
        hidden_dims: List[int] = [64, 32],
        dropout_rate: float = 0.1,
        use_layer_norm: bool = True
    ):
        """
        Initialize the neural network.

        Args:
            input_dim (int): Number of input features.
            hidden_dims (List[int]): List of hidden layer dimensions.
            dropout_rate (float): Dropout probability for regularization.
            use_layer_norm (bool): Whether to use layer normalization.
        """
        super().__init__()

        self.input_dim = input_dim
        self.hidden_dims = hidden_dims
        self.dropout_rate = dropout_rate
        self.use_layer_norm = use_layer_norm

        # Build layers dynamically based on hidden_dims
        layers = []

        # Input layer
        prev_dim = input_dim

        # Hidden layers
        for dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, dim))

            if use_layer_norm:
                layers.append(nn.LayerNorm(dim))

            layers.append(nn.ReLU())

            if dropout_rate > 0:
                layers.append(nn.Dropout(dropout_rate))

            prev_dim = dim

        # Output layer (single output for regression)
        layers.append(nn.Linear(prev_dim, 1))

        # Combine all layers
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        """Forward pass through the network."""
        return self.model(x).squeeze()

    def get_params(self) -> Dict[str, Any]:
        """Return model parameters as a dictionary for MLflow logging."""
        return {
            "input_dim": self.input_dim,
            "hidden_dims": self.hidden_dims,
            "dropout_rate": self.dropout_rate,
            "use_layer_norm": self.use_layer_norm
        }
class RegressionLightningModule(pl.LightningModule):
    """
    PyTorch Lightning module for regression tasks.

    This class wraps the RegressionNN model and adds training, validation,
    and testing logic using the PyTorch Lightning framework.
    """

    def __init__(
        self,
        input_dim: int,
        hidden_dims: List[int] = [64, 32],
        dropout_rate: float = 0.1,
        use_layer_norm: bool = True,
        learning_rate: float = 1e-3,
        weight_decay: float = 1e-5
    ):
        """
        Initialize the Lightning module.

        Args:
            input_dim (int): Number of input features.
            hidden_dims (List[int]): List of hidden layer dimensions.
            dropout_rate (float): Dropout probability for regularization.
            use_layer_norm (bool): Whether to use layer normalization.
            learning_rate (float): Learning rate for the optimizer.
            weight_decay (float): Weight decay for L2 regularization.
        """
        super().__init__()

        # Save hyperparameters
        self.save_hyperparameters()

        # Create the model
        self.model = RegressionNN(
            input_dim=input_dim,
            hidden_dims=hidden_dims,
            dropout_rate=dropout_rate,
            use_layer_norm=use_layer_norm
        )

        # Loss function
        self.loss_fn = nn.MSELoss()

    def forward(self, x):
        """Forward pass through the network."""
        return self.model(x)

    def configure_optimizers(self):
        """Configure the optimizer for training."""
        optimizer = torch.optim.Adam(
            self.parameters(),
            lr=self.hparams.learning_rate,
            weight_decay=self.hparams.weight_decay
        )
        return optimizer

    def training_step(self, batch, batch_idx):
        """Perform a training step."""
        x, y = batch
        y_pred = self(x)
        loss = self.loss_fn(y_pred, y)
        self.log('train_loss', loss, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        """Perform a validation step."""
        x, y = batch
        y_pred = self(x)
        loss = self.loss_fn(y_pred, y)
        self.log('val_loss', loss, prog_bar=True)

        # Calculate additional metrics
        rmse = torch.sqrt(loss)
        mae = torch.mean(torch.abs(y_pred - y))

        self.log('val_rmse', rmse, prog_bar=True)
        self.log('val_mae', mae, prog_bar=True)

        return loss

    def test_step(self, batch, batch_idx):
        """Perform a test step."""
        x, y = batch
        y_pred = self(x)
        loss = self.loss_fn(y_pred, y)

        # Calculate metrics for test set
        rmse = torch.sqrt(loss)
        mae = torch.mean(torch.abs(y_pred - y))

        self.log('test_loss', loss)
        self.log('test_rmse', rmse)
        self.log('test_mae', mae)

        return loss

    def get_params(self) -> Dict[str, Any]:
        """Return model parameters as a dictionary for MLflow logging."""
        return {
            "input_dim": self.hparams.input_dim,
            "hidden_dims": self.hparams.hidden_dims,
            "dropout_rate": self.hparams.dropout_rate,
            "use_layer_norm": self.hparams.use_layer_norm,
            "learning_rate": self.hparams.learning_rate,
            "weight_decay": self.hparams.weight_decay
        }
def prepare_dataloader(
    X_train, y_train, X_val, y_val, X_test, y_test, batch_size: int = 32
):
    """
    Create PyTorch DataLoaders for training, validation, and testing.

    Args:
        X_train, y_train: Training data and labels.
        X_val, y_val: Validation data and labels.
        X_test, y_test: Test data and labels.
        batch_size (int): Batch size for the DataLoaders.

    Returns:
        Tuple of (train_loader, val_loader, test_loader, scaler)
    """
    # Initialize a scaler
    scaler = StandardScaler()

    # Fit and transform the training data
    X_train_scaled = scaler.fit_transform(X_train)
    X_val_scaled = scaler.transform(X_val)
    X_test_scaled = scaler.transform(X_test)

    # Convert to PyTorch tensors - explicitly set dtype to float32
    X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train.values, dtype=torch.float32)

    X_val_tensor = torch.tensor(X_val_scaled, dtype=torch.float32)
    y_val_tensor = torch.tensor(y_val.values, dtype=torch.float32)

    X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test.values, dtype=torch.float32)

    # Create TensorDatasets
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

    # Create DataLoaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)

    return train_loader, val_loader, test_loader, scaler

4. 標準モデリングワークフロー

次のセルのコードでは、次の手順を使用して、MLflow 統合を使用して標準の PyTorch モデリング ワークフローを実装します。

  1. 合成データを生成して探索します。
  2. データをトレーニング セット、検証セット、テスト セットに分割します。
  3. データをスケーリングし、PyTorch DataLoader を作成します。
  4. ニューラル ネットワーク モデルを定義してトレーニングします。
  5. モデルのパフォーマンスを評価します。
  6. メトリック、パラメーター、成果物を MLflow に記録します。

この標準ワークフローは、ベースライン モデルを提供します。 その後、ハイパーパラメーター調整を使用してモデルを改善できます。

# Create the regression dataset
n_samples = 1000
n_features = 10
X, y = create_regression_data(n_samples=n_samples, n_features=n_features, nonlinear=True)

# Create EDA plots
dist_plot = plot_feature_distributions(X, y)
corr_plot = plot_correlation_heatmap(X, y)
scatter_plot = plot_feature_target_relationships(X, y)
corr_with_target = X.corrwith(y).abs().sort_values(ascending=False)
top_features = corr_with_target.head(4).index.tolist()
pairwise_plot = plot_pairwise_relationships(X, y, top_features)
outlier_plot = plot_outliers(X)

# Split the data into train, validation, and test sets
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# Prepare DataLoaders
batch_size = 32
train_loader, val_loader, test_loader, scaler = prepare_dataloader(
    X_train, y_train, X_val, y_val, X_test, y_test, batch_size=batch_size)

# Define model parameters
input_dim = X_train.shape[1]
hidden_dims = [64, 32]
dropout_rate = 0.1
use_layer_norm = True
learning_rate = 1e-3
weight_decay = 1e-5

# Create the PyTorch Lightning model
model = RegressionLightningModule(
    input_dim=input_dim,
    hidden_dims=hidden_dims,
    dropout_rate=dropout_rate,
    use_layer_norm=use_layer_norm,
    learning_rate=learning_rate,
    weight_decay=weight_decay
)

# Define early stopping and model checkpoint callbacks
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=10,
    mode='min'
)

checkpoint_callback = ModelCheckpoint(
    monitor='val_loss',
    dirpath='./checkpoints',
    filename='pytorch-regression-{epoch:02d}-{val_loss:.4f}',
    save_top_k=1,
    mode='min'
)

# Define trainer
trainer = pl.Trainer(
    max_epochs=100,
    callbacks=[early_stopping, checkpoint_callback],
    enable_progress_bar=True,
    log_every_n_steps=5
)

# Train the model
trainer.fit(model, train_loader, val_loader)

# Test the model
test_results = trainer.test(model, test_loader)

# Make predictions on the test set for evaluation
model.eval()
test_preds = []
true_values = []

with torch.no_grad():
    for batch in test_loader:
        x, y = batch
        y_pred = model(x)
        test_preds.extend(y_pred.numpy())
        true_values.extend(y.numpy())

test_preds = np.array(test_preds)
true_values = np.array(true_values)

# Calculate metrics
rmse = np.sqrt(mean_squared_error(true_values, test_preds))
mae = mean_absolute_error(true_values, test_preds)
r2 = r2_score(true_values, test_preds)

# Create residual plot
residual_plot = plot_residuals(pd.Series(true_values), test_preds)

5. MLflow を使用してモデルをログに記録する

Databricks で MLflow を使用してモデルをログに記録すると、重要な成果物とメタデータがキャプチャされます。 これにより、モデルが再現可能であるだけでなく、必要なすべての依存関係と明確な API コントラクトを使用してデプロイする準備が整います。 ログに記録される内容の詳細については、 MLflow のドキュメントを参照してください

次のセルのコードは、 with mlflow.start_run():を使用して MLflow の実行を開始します。 これにより、実行の MLflow コンテキスト マネージャーが初期化され、コード ブロックで実行が囲まれます。 コード ブロックが終了すると、ログに記録されたすべてのメトリック、パラメーター、成果物が保存され、MLflow の実行が自動的に終了します。

# Log the model and training results with MLflow
with mlflow.start_run() as run:
    # Create MLflow client for batch logging
    mlflow_client = MlflowClient()
    run_id = run.info.run_id

    # Extract metrics
    final_train_loss = trainer.callback_metrics.get("train_loss").item() if "train_loss" in trainer.callback_metrics else None
    final_val_loss = trainer.callback_metrics.get("val_loss").item() if "val_loss" in trainer.callback_metrics else None

    # Extract parameters for logging
    model_params = model.get_params()

    # Create a list to store all metrics for batch logging
    all_metrics = []

    # Add each metric to the list
    if final_train_loss is not None:
        all_metrics.append(Metric(key="train_loss", value=final_train_loss, timestamp=0, step=0))
    if final_val_loss is not None:
        all_metrics.append(Metric(key="val_loss", value=final_val_loss, timestamp=0, step=0))

    # Add test metrics
    all_metrics.append(Metric(key="test_rmse", value=rmse, timestamp=0, step=0))
    all_metrics.append(Metric(key="test_mae", value=mae, timestamp=0, step=0))
    all_metrics.append(Metric(key="test_r2", value=r2, timestamp=0, step=0))

    # Collect all parameters to log
    # Note: The code uses log_params for model_params since there could be many parameters,
    # but converts the individual param calls to batch
    from mlflow.entities import Param
    all_params = [
        Param(key="batch_size", value=str(batch_size)),
        Param(key="early_stopping_patience", value=str(early_stopping.patience)),
        Param(key="max_epochs", value=str(trainer.max_epochs)),
        Param(key="actual_epochs", value=str(trainer.current_epoch))
    ]

    # Generate a model signature using the infer signature utility in MLflow
    input_example = X_train.iloc[[0]].values.astype(np.float32)  # Ensure float32 type
    input_example_scaled = scaler.transform(input_example).astype(np.float32)

    model.eval()
    with torch.no_grad():
        # Ensure tensor is float32
        tensor_input = torch.tensor(input_example_scaled, dtype=torch.float32)
        signature_preds = model(tensor_input)

    # Ensure prediction is also float32
    signature = infer_signature(input_example, signature_preds.numpy().reshape(-1).astype(np.float32))

    # Log model parameters first (since these could be numerous)
    mlflow.log_params(model_params)

    # Log all metrics and remaining parameters in a single batch operation
    mlflow_client.log_batch(
        run_id=run_id,
        metrics=all_metrics,
        params=all_params
    )

    # Log the model to MLflow and register the model to Unity Catalog
    model_info = mlflow.pytorch.log_model(
        model,
        artifact_path="model",
        input_example=input_example,
        signature=signature,
        registered_model_name="demo.pytorch_regression_model",
    )

    # Log feature analysis plots
    mlflow.log_figure(dist_plot, "feature_distributions.png")
    mlflow.log_figure(corr_plot, "correlation_heatmap.png")
    mlflow.log_figure(scatter_plot, "feature_target_relationships.png")
    mlflow.log_figure(pairwise_plot, "pairwise_relationships.png")
    mlflow.log_figure(outlier_plot, "outlier_detection.png")
    mlflow.log_figure(residual_plot, "residual_plot.png")

    # Run MLflow evaluation to generate additional metrics without having to implement them
    evaluation_data = X_test.copy()
    evaluation_data["label"] = y_test

    # Skip mlflow.evaluate for now to avoid type mismatch issues
    # Instead, log the metrics directly
    print(f"Model logged: {model_info.model_uri}")
    print(f"Test RMSE: {rmse:.4f}")
    print(f"Test MAE: {mae:.4f}")
    print(f"Test R²: {r2:.4f}")

6. ハイパーパラメーターの調整

このセクションでは、 MLflow で Optuna入れ子になった実行を使用してハイパーパラメーターのチューニングを自動化する方法について説明します。 この方法では、さまざまなパラメーター構成を調べ、すべての実験の詳細をキャプチャできます。

次のセルのコードでは、次の処理が行われます。

  1. 前に定義した create_regression_data 関数を使用して、合成回帰データセットを生成します。

  2. データセットを個別のトレーニング データセットとテスト データセットに分割し、評価用にテスト データセットのコピーを保存します。

  3. ハイパーパラメーター 調整プロセスの目的関数を作成します。 目的関数は、レイヤーの数、非表示のディメンション、ドロップアウト率、学習率、正則化パラメーターなど、PyTorch モデルのハイパーパラメーターの検索空間を定義します。 Optuna は、これらの値を動的にサンプリングし、各試用版で異なるパラメーターの組み合わせをテストすることを確認します。

  4. 目的関数内で入れ子になった MLflow 実行を開始します。 この入れ子になった実行では、現在のハイパーパラメーター評価版に固有のすべての詳細が自動的にキャプチャされ、ログに記録されます。 それぞれの評価版を独自の入れ子になった実行で分離することで、各構成とそれに対応するパフォーマンス メトリックを適切に整理した記録を保持できます。 入れ子になった実行では、次のログが記録されます。

    • その試用版に使用される特定のハイパーパラメーター。
    • テスト セットで計算されたパフォーマンス メトリック (この場合は検証損失)。
    • トレーニング済みのモデル インスタンスも、試用版のメタデータの一部として格納されます。 これにより、後で最もパフォーマンスの高いモデルを簡単に取得できます。

    このコードでは、各モデルが MLflow に記録されることはありません。 ハイパーパラメーターの調整を行っている間、各イテレーションが特に適切であるとは限らないため、各イテレーションのモデル成果物を記録する理由はありません。

  5. 親 MLflow 実行を作成します。 この実行により、最適なハイパーパラメーターのセット (検証損失が最も低いセット) を識別するように設計された Optuna スタディが開始されます。 Optuna は一連の試用版を実行します。各試用版では、ハイパーパラメーターの一意の組み合わせを使用します。 各評価版では、入れ子になった MLflow の実行によってすべての実験の詳細がキャプチャされるため、後で各モデル構成のパフォーマンスを追跡して比較できます。

  6. この調査では、最も低い検証損失に基づいて、最適な試用版を特定します。 このコードでは、最適なモデルと最適なパラメーター値が抽出されます。 このコードでは、 infer_signature を使用してモデル署名を保存します。これは、予想される入力スキーマと出力スキーマを指定し、Unity Catalog などのシステムとの一貫性のあるデプロイと統合のために重要です。 最後に、最適なモデルがログに記録され、Unity カタログに登録されます。 EDA プロットや特徴の重要度グラフなどの追加の成果物も記録されます。

# Create a custom pruning callback as a fallback
class PyTorchLightningPruningCallback(pl.Callback):
    """PyTorch Lightning callback to prune unpromising trials.

    This is a simplified version for use when the optuna-integration package isn't available.
    """

    def __init__(self, trial, monitor):
        super().__init__()
        self._trial = trial
        self.monitor = monitor

    def on_validation_end(self, trainer, pl_module):
        # Report the validation metric to Optuna
        metrics = trainer.callback_metrics
        current_score = metrics.get(self.monitor)

        if current_score is not None:
            self._trial.report(current_score.item(), trainer.current_epoch)

            # If trial should be pruned based on current value,
            # stop the training
            if self._trial.should_prune():
                message = "Trial was pruned at epoch {}.".format(trainer.current_epoch)
                raise optuna.TrialPruned(message)

# Generate a larger dataset for hyperparameter tuning
n_samples = 2000
n_features = 10

X, y = create_regression_data(n_samples=n_samples, n_features=n_features, nonlinear=True)

# Split the data
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# Prepare the evaluation data
evaluation_data = X_test.copy()
evaluation_data["label"] = y_test

# Create the data loaders
batch_size = 32
train_loader, val_loader, test_loader, scaler = prepare_dataloader(
    X_train, y_train, X_val, y_val, X_test, y_test, batch_size=batch_size)

def objective(trial):
    """Optuna objective function to minimize validation loss."""

    # Define the hyperparameter search space
    n_layers = trial.suggest_int("n_layers", 1, 3)

    # Create hidden dimensions based on number of layers
    hidden_dims = []
    for i in range(n_layers):
        hidden_dims.append(trial.suggest_int(f"hidden_dim_{i}", 16, 128))

    # Other hyperparameters
    dropout_rate = trial.suggest_float("dropout_rate", 0.0, 0.5)
    learning_rate = trial.suggest_float("learning_rate", 1e-4, 1e-2, log=True)
    weight_decay = trial.suggest_float("weight_decay", 1e-6, 1e-3, log=True)
    use_layer_norm = trial.suggest_categorical("use_layer_norm", [True, False])

    # Start a nested MLflow run for this trial
    with mlflow.start_run(nested=True) as child_run:
        # Create MLflow client for batch logging
        mlflow_client = MlflowClient()
        run_id = child_run.info.run_id

        # Prepare parameters for batch logging
        params_list = []
        param_dict = {
            "n_layers": n_layers,
            "hidden_dims": str(hidden_dims),  # Convert list to string
            "dropout_rate": dropout_rate,
            "learning_rate": learning_rate,
            "weight_decay": weight_decay,
            "use_layer_norm": use_layer_norm,
            "batch_size": batch_size
        }

        # Convert parameters to Param objects
        for key, value in param_dict.items():
            params_list.append(Param(key, str(value)))

        # Create the model with these hyperparameters
        model = RegressionLightningModule(
            input_dim=X_train.shape[1],
            hidden_dims=hidden_dims,
            dropout_rate=dropout_rate,
            use_layer_norm=use_layer_norm,
            learning_rate=learning_rate,
            weight_decay=weight_decay
        )

        # Callbacks
        early_stopping = EarlyStopping(
            monitor='val_loss',
            patience=5,
            mode='min'
        )

        pruning_callback = PyTorchLightningPruningCallback(
            trial, monitor="val_loss"
        )

        # Define trainer with early stopping and pruning
        trainer = pl.Trainer(
            max_epochs=50,
            callbacks=[early_stopping, pruning_callback],
            enable_progress_bar=False,
            log_every_n_steps=10
        )

        # Train and validate the model
        trainer.fit(model, train_loader, val_loader)

        # Get the best validation loss
        best_val_loss = trainer.callback_metrics.get("val_loss").item()
        val_rmse = np.sqrt(best_val_loss)

        # Prepare metrics for batch logging
        current_time = int(time.time() * 1000)  # Current time in milliseconds
        metrics_list = [
            Metric("val_loss", best_val_loss, current_time, 0),
            Metric("val_rmse", val_rmse, current_time, 0)
        ]

        # Use log_batch through the client for efficient logging
        mlflow_client.log_batch(run_id, metrics=metrics_list, params=params_list)

    # Store the model in the trial's user attributes
    trial.set_user_attr("model", model)

    # Return the value to minimize (validation loss)
    return best_val_loss

best_model_version = None
# The parent run stores the best iteration from the hyperparameter tuning execution
with mlflow.start_run() as run:
    # Create MLflow client for batch logging
    mlflow_client = MlflowClient()
    run_id = run.info.run_id

    study = optuna.create_study(direction="minimize")
    study.optimize(objective, n_trials=20)

    best_trial = study.best_trial
    best_model = best_trial.user_attrs["model"]

    # Test the best model
    trainer = pl.Trainer(
        enable_progress_bar=True,
        log_every_n_steps=5
    )
    test_results = trainer.test(best_model, test_loader)

    # Make predictions on the test set for evaluation
    best_model.eval()
    test_preds = []
    true_values = []

    with torch.no_grad():
        for batch in test_loader:
            x, y = batch
            y_pred = best_model(x)
            test_preds.extend(y_pred.numpy())
            true_values.extend(y.numpy())

    test_preds = np.array(test_preds)
    true_values = np.array(true_values)

    # Calculate metrics
    rmse = np.sqrt(mean_squared_error(true_values, test_preds))
    mae = mean_absolute_error(true_values, test_preds)
    r2 = r2_score(true_values, test_preds)

    # Prepare parameters for batch logging
    best_params_list = []
    for key, value in best_trial.params.items():
        best_params_list.append(Param(f"best_{key}", str(value)))

    # Prepare metrics for batch logging
    current_time = int(time.time() * 1000)  # Current time in milliseconds
    metrics_list = [
        Metric("best_val_loss", best_trial.value, current_time, 0),
        Metric("test_rmse", rmse, current_time, 0),
        Metric("test_mae", mae, current_time, 0),
        Metric("test_r2", r2, current_time, 0)
    ]

    # Log metrics and parameters in a single batch call
    mlflow_client.log_batch(run_id, metrics=metrics_list, params=best_params_list)

    # Generate model signature - ensure consistent float32 types
    input_example = X_train.iloc[[0]].values.astype(np.float32)
    input_example_scaled = scaler.transform(input_example).astype(np.float32)

    best_model.eval()
    with torch.no_grad():
        tensor_input = torch.tensor(input_example_scaled, dtype=torch.float32)
        signature_preds = best_model(tensor_input)

    signature = infer_signature(input_example, signature_preds.numpy().reshape(-1).astype(np.float32))

    # Log and register the PyTorch model
    model_info = mlflow.pytorch.log_model(
        best_model,
        artifact_path="model",
        input_example=input_example,
        signature=signature,
        registered_model_name="demo.pytorch_regression_optimized",
    )

    # Create residual plot
    residual_plot = plot_residuals(pd.Series(true_values), test_preds)

    # Log figures (no batch equivalent for figures)
    mlflow.log_figure(dist_plot, "feature_distributions.png")
    mlflow.log_figure(corr_plot, "correlation_heatmap.png")
    mlflow.log_figure(scatter_plot, "feature_target_relationships.png")
    mlflow.log_figure(pairwise_plot, "pairwise_relationships.png")
    mlflow.log_figure(outlier_plot, "outlier_detection.png")
    mlflow.log_figure(residual_plot, "residual_plot.png")

    # Skip mlflow.evaluate for now to avoid type mismatch issues
    # Instead, log the metrics directly
    print(f"Best model logged: {model_info.model_uri}")
    print(f"Best parameters: {best_trial.params}")
    print(f"Test RMSE: {rmse:.4f}")
    print(f"Test MAE: {mae:.4f}")
    print(f"Test R²: {r2:.4f}")

    best_model_version = model_info.registered_model_version
from mlflow import MlflowClient

# Initialize MLflow client
client = MlflowClient()

# Set a human-readable alias for the best model version
# This makes it easier to reference specific model versions programmatically
client.set_registered_model_alias("demo.pytorch_regression_optimized", "best", int(best_model_version))

7. デプロイ前の検証

MLflow には、運用環境のような環境をシミュレートし、モデルが正しく構成されていることを検証する mlflow.models.predict ユーティリティが用意されています。

# Reference the model by its alias
model_uri = "models:/demo.pytorch_regression_optimized@best"

# Validate the model's deployment readiness
mlflow.models.predict(model_uri=model_uri, input_data=X_test, env_manager="local")

8. 登録済みモデルを読み込んで予測を行う

このセクションのコードでは、MLflow から登録済みモデルを読み込み、それを使用してローカルで予測を行う方法を示します。 これは、テストやバッチ推論のシナリオに役立ちます。

# Convert the data type of X_test to float32
X_test = X_test.astype('float32')

# Load the model using the pyfunc interface (recommended for deployment)
loaded_model = mlflow.pyfunc.load_model(model_uri=model_uri)

# Make predictions with the loaded model
predictions = loaded_model.predict(X_test)

print(f"Shape of predictions: {predictions.shape}")
print(f"First 5 predictions: {predictions[:5]}")
print(f"First 5 actual values: {y_test.values[:5]}")

9. MLflow での Spark UDF を使用したバッチ予測

大規模な予測の場合は、モデルを Spark UDF に変換し、それを Spark DataFrame に適用して、分散推論を有効にすることができます。

from pyspark.sql.functions import array, col

# Convert the test data to a Spark DataFrame
X_spark = spark.createDataFrame(X_test)

# Create an array of all feature columns
# This step is necessary because:
# 1. The PyTorch model expects an input tensor with shape [-1, 13]
# 2. The model_udf needs to receive each row as a single array of 13 values
# 3. Without this array transformation, 13 separate columns would be passed to the model
#    which wouldn't match the expected tensor structure
X_spark_with_array = X_spark.withColumn(
    "features_array",
    array(*[col(c) for c in X_spark.columns])
)

# Create a Spark UDF from the registered model
model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

# Apply MLflow UDF to the array column
# Pass the single array column to the model, which matches the expected tensor format
X_spark_with_predictions = X_spark_with_array.withColumn(
    "prediction",
    model_udf("features_array")
)

display(X_spark_with_predictions.limit(5))

ノートブックの例

PyTorch を使用したエンド ツー エンドのディープ ラーニング モデル

ノートブックを入手