使用 mlflow.search_traces()以编程方式搜索和分析跟踪。
快速参考
# Search by status
mlflow.search_traces("attributes.status = 'OK'")
mlflow.search_traces("attributes.status = 'ERROR'")
# Search by time (milliseconds since epoch)
mlflow.search_traces("attributes.timestamp_ms > 1749006880539")
mlflow.search_traces("attributes.execution_time_ms > 5000")
# Search by tags
mlflow.search_traces("tags.environment = 'production'")
mlflow.search_traces("tags.`mlflow.traceName` = 'my_function'")
# Search by metadata
mlflow.search_traces("metadata.`mlflow.user` = 'alice@company.com'")
# Combined filters (AND only)
mlflow.search_traces(
"attributes.status = 'OK' AND tags.environment = 'production'"
)
密钥规则
- 始终使用前缀:
attributes.、或tags.metadata. - 如果标记或属性名称中包含点,则使用反引号:
tags.`mlflow.traceName` - 仅使用单引号:
'value'而不是"value" - 时间的毫秒数:
1749006880539不是日期 - 仅限于“AND”:不支持“OR”
特定于 Databricks 的参数
以下是 Databricks 特定的参数:
-
sql_warehouse_id:可选的 Databricks SQL 仓库 ID。 指定后,将使用指定的 SQL 仓库执行跟踪查询,以提高大型跟踪数据集的性能。 -
model_id:Databricks 模型注册表中的可选模型 ID。 如果指定,则搜索与给定注册模型相关的线索。
SQL Warehouse 集成
使用 Databricks SQL 仓库执行跟踪查询,以提高大型跟踪数据集的性能:
# Use SQL warehouse for better performance
traces = mlflow.search_traces(
filter_string="attributes.status = 'OK'",
sql_warehouse_id="your-warehouse-id"
)
模型注册表集成
在 Databricks 中搜索与已注册模型相关的记录信息:
# Find traces for a specific registered model
model_traces = mlflow.search_traces(
model_id="my-model-123",
filter_string="attributes.status = 'OK'"
)
# Analyze model performance from traces
print(f"Found {len(model_traces)} successful traces for model")
print(f"Average latency: {model_traces['execution_time_ms'].mean():.2f}ms")
搜索示例
按状态搜索
# Find successful, failed, or in-progress traces
traces = mlflow.search_traces(filter_string="attributes.status = 'OK'")
# Exclude errors
traces = mlflow.search_traces(filter_string="attributes.status != 'ERROR'")
按时间戳搜索
import time
from datetime import datetime
# Recent traces (last 5 minutes)
current_time_ms = int(time.time() * 1000)
five_minutes_ago = current_time_ms - (5 * 60 * 1000)
traces = mlflow.search_traces(
filter_string=f"attributes.timestamp_ms > {five_minutes_ago}"
)
# Date range
start_date = int(datetime(2024, 1, 1).timestamp() * 1000)
end_date = int(datetime(2024, 1, 31).timestamp() * 1000)
traces = mlflow.search_traces(
filter_string=f"attributes.timestamp_ms > {start_date} AND attributes.timestamp_ms < {end_date}"
)
# Can also use 'timestamp' alias instead of 'timestamp_ms'
traces = mlflow.search_traces(filter_string=f"attributes.timestamp > {five_minutes_ago}")
按执行时间搜索
# Find slow traces
traces = mlflow.search_traces(filter_string="attributes.execution_time_ms > 5000")
# Performance range
traces = mlflow.search_traces(
filter_string="attributes.execution_time_ms > 100 AND attributes.execution_time_ms < 1000"
)
# Can also use 'latency' alias instead of 'execution_time_ms'
traces = mlflow.search_traces(filter_string="attributes.latency > 1000")
按标记搜索
# Custom tags (set via mlflow.update_current_trace)
traces = mlflow.search_traces(filter_string="tags.customer_id = 'C001'")
# System tags (require backticks for dotted names)
traces = mlflow.search_traces(
filter_string="tags.`mlflow.traceName` = 'process_chat_request'"
)
traces = mlflow.search_traces(
filter_string="tags.`mlflow.artifactLocation` != ''"
)
复杂筛选器
# Recent successful production traces
current_time_ms = int(time.time() * 1000)
one_hour_ago = current_time_ms - (60 * 60 * 1000)
traces = mlflow.search_traces(
filter_string=f"attributes.status = 'OK' AND "
f"attributes.timestamp_ms > {one_hour_ago} AND "
f"tags.environment = 'production'"
)
# Fast traces from specific user
traces = mlflow.search_traces(
filter_string="attributes.execution_time_ms < 100 AND "
"metadata.`mlflow.user` = 'alice@company.com'"
)
# Specific function with performance threshold
traces = mlflow.search_traces(
filter_string="tags.`mlflow.traceName` = 'process_payment' AND "
"attributes.execution_time_ms > 1000"
)
按上下文元数据查询
这些示例演示如何使用上下文元数据(如用户 ID、会话、环境和功能标志)跨多个跟踪进行搜索。 有关向跟踪添加上下文元数据的详细信息,请参阅 向跟踪添加上下文。
分析用户行为
from mlflow.client import MlflowClient
client = MlflowClient()
def analyze_user_behavior(user_id: str, experiment_id: str):
"""Analyze a specific user's interaction patterns."""
# Search for all traces from a specific user
user_traces = client.search_traces(
experiment_ids=[experiment_id],
filter_string=f"metadata.`mlflow.trace.user` = '{user_id}'",
max_results=1000
)
# Calculate key metrics
total_interactions = len(user_traces)
unique_sessions = len(set(t.info.metadata.get("mlflow.trace.session", "") for t in user_traces))
avg_response_time = sum(t.info.execution_time_ms for t in user_traces) / total_interactions
return {
"total_interactions": total_interactions,
"unique_sessions": unique_sessions,
"avg_response_time": avg_response_time
}
分析会话流
def analyze_session_flow(session_id: str, experiment_id: str):
"""Analyze conversation flow within a session."""
# Get all traces from a session, ordered chronologically
session_traces = client.search_traces(
experiment_ids=[experiment_id],
filter_string=f"metadata.`mlflow.trace.session` = '{session_id}'",
order_by=["timestamp ASC"]
)
# Build a timeline of the conversation
conversation_turns = []
for i, trace in enumerate(session_traces):
conversation_turns.append({
"turn": i + 1,
"timestamp": trace.info.timestamp,
"duration_ms": trace.info.execution_time_ms,
"status": trace.info.status
})
return conversation_turns
比较版本之间的错误率
def compare_version_error_rates(experiment_id: str, versions: list):
"""Compare error rates across different app versions in production."""
error_rates = {}
for version in versions:
traces = client.search_traces(
experiment_ids=[experiment_id],
filter_string=f"metadata.`mlflow.source.type` = 'production' AND metadata.app_version = '{version}'"
)
if not traces:
error_rates[version] = None # Or 0 if no traces means no errors
continue
error_count = sum(1 for t in traces if t.info.status == "ERROR")
error_rates[version] = (error_count / len(traces)) * 100
return error_rates
# version_errors = compare_version_error_rates("your_exp_id", ["1.0.0", "1.1.0"])
# print(version_errors)
分析功能标志性能
def analyze_feature_flag_performance(experiment_id: str, flag_name: str):
"""Analyze performance differences between feature flag states."""
control_latency = []
treatment_latency = []
control_traces = client.search_traces(
experiment_ids=[experiment_id],
filter_string=f"metadata.feature_flag_{flag_name} = 'false'",
)
for t in control_traces:
control_latency.append(t.info.execution_time_ms)
treatment_traces = client.search_traces(
experiment_ids=[experiment_id],
filter_string=f"metadata.feature_flag_{flag_name} = 'true'",
)
for t in treatment_traces:
treatment_latency.append(t.info.execution_time_ms)
avg_control_latency = sum(control_latency) / len(control_latency) if control_latency else 0
avg_treatment_latency = sum(treatment_latency) / len(treatment_latency) if treatment_latency else 0
return {
f"avg_latency_{flag_name}_off": avg_control_latency,
f"avg_latency_{flag_name}_on": avg_treatment_latency
}
# perf_metrics = analyze_feature_flag_performance("your_exp_id", "new_retriever")
# print(perf_metrics)
数据帧操作
返回 mlflow.search_traces 的数据帧包含以下列:
traces_df = mlflow.search_traces()
# Default columns
print(traces_df.columns)
# ['request_id', 'trace', 'timestamp_ms', 'status', 'execution_time_ms',
# 'request', 'response', 'request_metadata', 'spans', 'tags']
提取跨度字段
# Extract specific span fields into DataFrame columns
traces = mlflow.search_traces(
extract_fields=[
"process_request.inputs.customer_id",
"process_request.outputs",
"validate_input.inputs",
"generate_response.outputs.message"
]
)
# Use extracted fields for evaluation dataset
eval_data = traces.rename(columns={
"process_request.inputs.customer_id": "customer",
"generate_response.outputs.message": "ground_truth"
})
后续步骤
- 生成评估数据集 - 将查询跟踪转换为测试数据集