以编程方式搜索痕迹

使用 mlflow.search_traces()以编程方式搜索和分析跟踪。

快速参考

# Search by status
mlflow.search_traces("attributes.status = 'OK'")
mlflow.search_traces("attributes.status = 'ERROR'")

# Search by time (milliseconds since epoch)
mlflow.search_traces("attributes.timestamp_ms > 1749006880539")
mlflow.search_traces("attributes.execution_time_ms > 5000")

# Search by tags
mlflow.search_traces("tags.environment = 'production'")
mlflow.search_traces("tags.`mlflow.traceName` = 'my_function'")

# Search by metadata
mlflow.search_traces("metadata.`mlflow.user` = 'alice@company.com'")

# Combined filters (AND only)
mlflow.search_traces(
    "attributes.status = 'OK' AND tags.environment = 'production'"
)

密钥规则

  • 始终使用前缀:attributes.、或 tags.metadata.
  • 如果标记或属性名称中包含点,则使用反引号:tags.`mlflow.traceName`
  • 仅使用单引号:'value'而不是"value"
  • 时间的毫秒数:1749006880539 不是日期
  • 仅限于“AND”:不支持“OR”

特定于 Databricks 的参数

以下是 Databricks 特定的参数:

  • sql_warehouse_id:可选的 Databricks SQL 仓库 ID。 指定后,将使用指定的 SQL 仓库执行跟踪查询,以提高大型跟踪数据集的性能。
  • model_id:Databricks 模型注册表中的可选模型 ID。 如果指定,则搜索与给定注册模型相关的线索。

SQL Warehouse 集成

使用 Databricks SQL 仓库执行跟踪查询,以提高大型跟踪数据集的性能:

# Use SQL warehouse for better performance
traces = mlflow.search_traces(
    filter_string="attributes.status = 'OK'",
    sql_warehouse_id="your-warehouse-id"
)

模型注册表集成

在 Databricks 中搜索与已注册模型相关的记录信息:

# Find traces for a specific registered model
model_traces = mlflow.search_traces(
    model_id="my-model-123",
    filter_string="attributes.status = 'OK'"
)

# Analyze model performance from traces
print(f"Found {len(model_traces)} successful traces for model")
print(f"Average latency: {model_traces['execution_time_ms'].mean():.2f}ms")

搜索示例

按状态搜索

# Find successful, failed, or in-progress traces
traces = mlflow.search_traces(filter_string="attributes.status = 'OK'")
# Exclude errors
traces = mlflow.search_traces(filter_string="attributes.status != 'ERROR'")

按时间戳搜索

import time
from datetime import datetime

# Recent traces (last 5 minutes)
current_time_ms = int(time.time() * 1000)
five_minutes_ago = current_time_ms - (5 * 60 * 1000)
traces = mlflow.search_traces(
    filter_string=f"attributes.timestamp_ms > {five_minutes_ago}"
)

# Date range
start_date = int(datetime(2024, 1, 1).timestamp() * 1000)
end_date = int(datetime(2024, 1, 31).timestamp() * 1000)
traces = mlflow.search_traces(
    filter_string=f"attributes.timestamp_ms > {start_date} AND attributes.timestamp_ms < {end_date}"
)

# Can also use 'timestamp' alias instead of 'timestamp_ms'
traces = mlflow.search_traces(filter_string=f"attributes.timestamp > {five_minutes_ago}")

按执行时间搜索

# Find slow traces
traces = mlflow.search_traces(filter_string="attributes.execution_time_ms > 5000")

# Performance range
traces = mlflow.search_traces(
    filter_string="attributes.execution_time_ms > 100 AND attributes.execution_time_ms < 1000"
)

# Can also use 'latency' alias instead of 'execution_time_ms'
traces = mlflow.search_traces(filter_string="attributes.latency > 1000")

按标记搜索

# Custom tags (set via mlflow.update_current_trace)
traces = mlflow.search_traces(filter_string="tags.customer_id = 'C001'")

# System tags (require backticks for dotted names)
traces = mlflow.search_traces(
    filter_string="tags.`mlflow.traceName` = 'process_chat_request'"
)
traces = mlflow.search_traces(
    filter_string="tags.`mlflow.artifactLocation` != ''"
)

复杂筛选器

# Recent successful production traces
current_time_ms = int(time.time() * 1000)
one_hour_ago = current_time_ms - (60 * 60 * 1000)

traces = mlflow.search_traces(
    filter_string=f"attributes.status = 'OK' AND "
                 f"attributes.timestamp_ms > {one_hour_ago} AND "
                 f"tags.environment = 'production'"
)

# Fast traces from specific user
traces = mlflow.search_traces(
    filter_string="attributes.execution_time_ms < 100 AND "
                 "metadata.`mlflow.user` = 'alice@company.com'"
)

# Specific function with performance threshold
traces = mlflow.search_traces(
    filter_string="tags.`mlflow.traceName` = 'process_payment' AND "
                 "attributes.execution_time_ms > 1000"
)

按上下文元数据查询

这些示例演示如何使用上下文元数据(如用户 ID、会话、环境和功能标志)跨多个跟踪进行搜索。 有关向跟踪添加上下文元数据的详细信息,请参阅 向跟踪添加上下文

分析用户行为

from mlflow.client import MlflowClient

client = MlflowClient()

def analyze_user_behavior(user_id: str, experiment_id: str):
    """Analyze a specific user's interaction patterns."""

    # Search for all traces from a specific user
    user_traces = client.search_traces(
        experiment_ids=[experiment_id],
        filter_string=f"metadata.`mlflow.trace.user` = '{user_id}'",
        max_results=1000
    )

    # Calculate key metrics
    total_interactions = len(user_traces)
    unique_sessions = len(set(t.info.metadata.get("mlflow.trace.session", "") for t in user_traces))
    avg_response_time = sum(t.info.execution_time_ms for t in user_traces) / total_interactions

    return {
        "total_interactions": total_interactions,
        "unique_sessions": unique_sessions,
        "avg_response_time": avg_response_time
    }

分析会话流

def analyze_session_flow(session_id: str, experiment_id: str):
    """Analyze conversation flow within a session."""

    # Get all traces from a session, ordered chronologically
    session_traces = client.search_traces(
        experiment_ids=[experiment_id],
        filter_string=f"metadata.`mlflow.trace.session` = '{session_id}'",
        order_by=["timestamp ASC"]
    )

    # Build a timeline of the conversation
    conversation_turns = []
    for i, trace in enumerate(session_traces):
        conversation_turns.append({
            "turn": i + 1,
            "timestamp": trace.info.timestamp,
            "duration_ms": trace.info.execution_time_ms,
            "status": trace.info.status
        })

    return conversation_turns

比较版本之间的错误率

def compare_version_error_rates(experiment_id: str, versions: list):
    """Compare error rates across different app versions in production."""
    error_rates = {}
    for version in versions:
        traces = client.search_traces(
            experiment_ids=[experiment_id],
            filter_string=f"metadata.`mlflow.source.type` = 'production' AND metadata.app_version = '{version}'"
        )
        if not traces:
            error_rates[version] = None # Or 0 if no traces means no errors
            continue

        error_count = sum(1 for t in traces if t.info.status == "ERROR")
        error_rates[version] = (error_count / len(traces)) * 100
    return error_rates

# version_errors = compare_version_error_rates("your_exp_id", ["1.0.0", "1.1.0"])
# print(version_errors)

分析功能标志性能

def analyze_feature_flag_performance(experiment_id: str, flag_name: str):
    """Analyze performance differences between feature flag states."""
    control_latency = []
    treatment_latency = []

    control_traces = client.search_traces(
        experiment_ids=[experiment_id],
        filter_string=f"metadata.feature_flag_{flag_name} = 'false'",
    )
    for t in control_traces:
        control_latency.append(t.info.execution_time_ms)

    treatment_traces = client.search_traces(
        experiment_ids=[experiment_id],
        filter_string=f"metadata.feature_flag_{flag_name} = 'true'",
    )
    for t in treatment_traces:
        treatment_latency.append(t.info.execution_time_ms)

    avg_control_latency = sum(control_latency) / len(control_latency) if control_latency else 0
    avg_treatment_latency = sum(treatment_latency) / len(treatment_latency) if treatment_latency else 0

    return {
        f"avg_latency_{flag_name}_off": avg_control_latency,
        f"avg_latency_{flag_name}_on": avg_treatment_latency
    }

# perf_metrics = analyze_feature_flag_performance("your_exp_id", "new_retriever")
# print(perf_metrics)

数据帧操作

返回 mlflow.search_traces 的数据帧包含以下列:

traces_df = mlflow.search_traces()

# Default columns
print(traces_df.columns)
# ['request_id', 'trace', 'timestamp_ms', 'status', 'execution_time_ms',
#  'request', 'response', 'request_metadata', 'spans', 'tags']

提取跨度字段

# Extract specific span fields into DataFrame columns
traces = mlflow.search_traces(
    extract_fields=[
        "process_request.inputs.customer_id",
        "process_request.outputs",
        "validate_input.inputs",
        "generate_response.outputs.message"
    ]
)

# Use extracted fields for evaluation dataset
eval_data = traces.rename(columns={
    "process_request.inputs.customer_id": "customer",
    "generate_response.outputs.message": "ground_truth"
})

后续步骤