本指南演示如何在应用的代码驻留在 Git 或类似的版本控制系统中时跟踪 GenAI 应用程序的版本。 在此工作流中,MLflow LoggedModel
充当 元数据中心,将每个概念应用程序版本链接到其特定的外部代码(例如 Git 提交),配置。 然后,可以将LoggedModel
关联到 MLflow 实体,例如跟踪和评估运行。
版本 mlflow.set_active_model(name=...)
跟踪的关键在于:调用此函数会将应用程序的跟踪链接到LoggedModel
。
name
如果不存在,则会自动创建一个新LoggedModel
项。
你将了解的内容:
- 使用
LoggedModels
跟踪应用程序版本 - 请将评估运行链接到
LoggedModel
小窍门
建议 LoggedModels
与 MLflow 的提示注册表一起使用。 如果使用提示注册表,则每个提示的版本将自动关联到你的 LoggedModel
版本。 查看 跟踪提示版本以及应用程序版本。
先决条件
安装 MLflow 和所需包
pip install --upgrade "mlflow[databricks]>=3.1.0" openai
请按照 设置环境快速指南 创建 MLflow 试验。
步骤 1:创建示例应用程序
下面是提示 LLM 进行响应的简单应用程序。
import mlflow
from openai import OpenAI
# Enable MLflow's autologging to instrument your application with Tracing
mlflow.openai.autolog()
# Connect to a Databricks LLM via OpenAI using the same credentials as MLflow
# Alternatively, you can use your own OpenAI credentials here
mlflow_creds = mlflow.utils.databricks_utils.get_databricks_host_creds()
client = OpenAI(
api_key=mlflow_creds.token,
base_url=f"{mlflow_creds.host}/serving-endpoints"
)
# Use the trace decorator to capture the application's entry point
@mlflow.trace
def my_app(input: str):
# This call is automatically instrumented by `mlflow.openai.autolog()`
response = client.chat.completions.create(
model="databricks-claude-sonnet-4", # This example uses a Databricks hosted LLM - you can replace this with any AI Gateway or Model Serving endpoint. If you provide your own OpenAI credentials, replace with a valid OpenAI model e.g., gpt-4o, etc.
messages=[
{
"role": "system",
"content": "You are a helpful assistant.",
},
{
"role": "user",
"content": input,
},
],
)
return response.choices[0].message.content
result = my_app(input="What is MLflow?")
print(result)
步骤 2:向应用代码添加版本跟踪
版本 LoggedModel
充当应用程序特定版本的中央记录(元数据中心)。 它不需要存储应用程序代码本身,而是指向您的代码所管理的位置(例如,Git 提交哈希)。
我们使用 mlflow.set_active_model()
来声明我们当前正在使用的 LoggedModel
,或者在其不存在时创建一个新的。 此函数返回一个 ActiveModel
对象,其中包含 model_id
,此内容在后续操作中非常有用。
小窍门
在生产环境中,可以设置环境变量 MLFLOW_ACTIVE_MODEL_ID
,而不是调用 set_active_model()
。 有关更多详细信息,请参阅 生产指南中的版本跟踪 。
注释
下面的代码使用当前的 Git 提交哈希作为模型的名称,因此,模型版本仅在提交时递增。 如果要为代码库中的每个更改创建新的 LoggedModel,请参阅 帮助程序函数的附录 ,该函数为代码库中的任何更改创建唯一的 LoggedModel,即使未提交到 Git 也是如此。
在步骤 1 中的应用程序顶部插入以下代码。 在应用程序中,在执行应用的代码之前,必须调用“set_active_model()。
# Keep original imports
### NEW CODE
import subprocess
# Define your application and its version identifier
app_name = "customer_support_agent"
# Get current git commit hash for versioning
try:
git_commit = (
subprocess.check_output(["git", "rev-parse", "HEAD"])
.decode("ascii")
.strip()[:8]
)
version_identifier = f"git-{git_commit}"
except subprocess.CalledProcessError:
version_identifier = "local-dev" # Fallback if not in a git repo
logged_model_name = f"{app_name}-{version_identifier}"
# Set the active model context
active_model_info = mlflow.set_active_model(name=logged_model_name)
print(
f"Active LoggedModel: '{active_model_info.name}', Model ID: '{active_model_info.model_id}'"
)
### END NEW CODE
### ORIGINAL CODE BELOW
### ...
步骤 4:(可选)记录参数
您可以选择将定义此应用程序版本的关键配置参数直接记录到LoggedModel
,使用mlflow.log_model_params()
。 这可用于记录绑定到此代码版本的 LLM 名称、温度设置或检索策略等内容。
在步骤 3 中的代码下方添加以下代码:
app_params = {
"llm": "gpt-4o-mini",
"temperature": 0.7,
"retrieval_strategy": "vector_search_v3",
}
# Log params
mlflow.log_model_params(model_id=active_model_info.model_id, params=app_params)
步骤 5:运行应用程序
- 现在,让我们调用应用程序以查看 LoggedModel 的创建和跟踪方式。
# These 2 invocations will be linked to the same LoggedModel
result = my_app(input="What is MLflow?")
print(result)
result = my_app(input="What is Databricks?")
print(result)
- 若要在不提交的情况下模拟更改,请添加以下行以手动创建一个新的已记录模型。
# Set the active model context
active_model_info = mlflow.set_active_model(name="new-name-set-manually")
print(
f"Active LoggedModel: '{active_model_info.name}', Model ID: '{active_model_info.model_id}'"
)
app_params = {
"llm": "gpt-4o",
"temperature": 0.7,
"retrieval_strategy": "vector_search_v4",
}
# Log params
mlflow.log_model_params(model_id=active_model_info.model_id, params=app_params)
# This will create a new LoggedModel
result = my_app(input="What is GenAI?")
print(result)
步骤 6:查看与 LoggedModel 相关的跟踪
通过 UI
现在,转到 MLflow 试验 UI。 在 “跟踪 ”选项卡中,可以看到生成每个跟踪的应用的版本(请注意,第一个跟踪不会附加版本,因为我们调用了应用而不先调用 set_active_model()
)。 在 “版本 ”选项卡中,可以看到每个 LoggedModel
参数和链接跟踪。
通过 SDK
可使用 search_traces()
从 LoggedModel
查询跟踪。
import mlflow
traces = mlflow.search_traces(
filter_string=f"metadata.`mlflow.modelId` = '{active_model_info.model_id}'"
)
print(traces)
您可以使用get_logged_model()
获取LoggedModel
的详细信息:
import mlflow
import datetime
# Get LoggedModel metadata
logged_model = mlflow.get_logged_model(model_id=active_model_info.model_id)
# Inspect basic properties
print(f"\n=== LoggedModel Information ===")
print(f"Model ID: {logged_model.model_id}")
print(f"Name: {logged_model.name}")
print(f"Experiment ID: {logged_model.experiment_id}")
print(f"Status: {logged_model.status}")
print(f"Model Type: {logged_model.model_type}")
creation_time = datetime.datetime.fromtimestamp(logged_model.creation_timestamp / 1000)
print(f"Created at: {creation_time}")
# Access the parameters
print(f"\n=== Model Parameters ===")
for param_name, param_value in logged_model.params.items():
print(f"{param_name}: {param_value}")
# Access tags if any were set
if logged_model.tags:
print(f"\n=== Model Tags ===")
for tag_key, tag_value in logged_model.tags.items():
print(f"{tag_key}: {tag_value}")
步骤 6:将评估结果链接到 LoggedModel
若要评估应用程序并将结果链接到此 LoggedModel
版本,请参阅 将评估结果和跟踪链接到应用版本。 本指南介绍如何用于 mlflow.genai.evaluate()
评估应用程序的性能,并自动将指标、评估表和跟踪与特定 LoggedModel
版本相关联。
import mlflow
from mlflow.genai import scorers
eval_dataset = [
{
"inputs": {"input": "What is the most common aggregate function in SQL?"},
}
]
mlflow.genai.evaluate(data=eval_dataset, predict_fn=my_app, model_id=active_model_info.model_id, scorers=scorers.get_all_scorers())
在 MLflow 试验 UI 中的 版本 和 评估 选项卡中查看结果:
用于计算任何文件更改的唯一哈希的帮助程序函数
下面的帮助程序函数根据存储库的状态自动生成每个 LoggedModel 的名称。 若要使用此函数,请调用 set_active_model(name=get_current_git_hash())
。
get_current_git_hash()
通过返回 HEAD 提交哈希(用于清理存储库)或 HEAD 哈希和未提交的更改哈希(对于脏存储库)的组合,为 git 存储库的当前状态生成唯一确定性标识符。 它确保存储库的不同状态始终生成不同的标识符,因此每个代码更改都会导致新的 LoggedModel
。
import subprocess
import hashlib
import os
def get_current_git_hash():
"""
Get a deterministic hash representing the current git state.
For clean repositories, returns the HEAD commit hash.
For dirty repositories, returns a combination of HEAD + hash of changes.
"""
try:
# Get the git repository root
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True, text=True, check=True
)
git_root = result.stdout.strip()
# Get the current HEAD commit hash
result = subprocess.run(
["git", "rev-parse", "HEAD"], capture_output=True, text=True, check=True
)
head_hash = result.stdout.strip()
# Check if repository is dirty
result = subprocess.run(
["git", "status", "--porcelain"], capture_output=True, text=True, check=True
)
if not result.stdout.strip():
# Repository is clean, return HEAD hash
return head_hash
# Repository is dirty, create deterministic hash of changes
# Collect all types of changes
changes_parts = []
# 1. Get staged changes
result = subprocess.run(
["git", "diff", "--cached"], capture_output=True, text=True, check=True
)
if result.stdout:
changes_parts.append(("STAGED", result.stdout))
# 2. Get unstaged changes to tracked files
result = subprocess.run(
["git", "diff"], capture_output=True, text=True, check=True
)
if result.stdout:
changes_parts.append(("UNSTAGED", result.stdout))
# 3. Get all untracked/modified files from status
result = subprocess.run(
["git", "status", "--porcelain", "-uall"],
capture_output=True, text=True, check=True
)
# Parse status output to handle all file states
status_lines = result.stdout.strip().split('\n') if result.stdout.strip() else []
file_contents = []
for line in status_lines:
if len(line) >= 3:
status_code = line[:2]
filepath = line[3:] # Don't strip - filepath starts exactly at position 3
# For any modified or untracked file, include its current content
if '?' in status_code or 'M' in status_code or 'A' in status_code:
try:
# Use absolute path relative to git root
abs_filepath = os.path.join(git_root, filepath)
with open(abs_filepath, 'rb') as f:
# Read as binary to avoid encoding issues
content = f.read()
# Create a hash of the file content
file_hash = hashlib.sha256(content).hexdigest()
file_contents.append(f"{filepath}:{file_hash}")
except (IOError, OSError):
file_contents.append(f"{filepath}:unreadable")
# Sort file contents for deterministic ordering
file_contents.sort()
# Combine all changes
all_changes_parts = []
# Add diff outputs
for change_type, content in changes_parts:
all_changes_parts.append(f"{change_type}:\n{content}")
# Add file content hashes
if file_contents:
all_changes_parts.append("FILES:\n" + "\n".join(file_contents))
# Create final hash
all_changes = "\n".join(all_changes_parts)
content_to_hash = f"{head_hash}\n{all_changes}"
changes_hash = hashlib.sha256(content_to_hash.encode()).hexdigest()
# Return HEAD hash + first 8 chars of changes hash
return f"{head_hash[:32]}-dirty-{changes_hash[:8]}"
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Git command failed: {e}")
except FileNotFoundError:
raise RuntimeError("Git is not installed or not in PATH")
后续步骤
-
(可选)捆绑代码:适用于在某些情况下需要将代码和
LoggedModel
结合在一起的方案。