Partilhar via


Decoradores & APIs fluentes (recomendado)

Além das integrações de Auto Tracing , você pode instrumentar seu código Python usando o SDK de Rastreamento MLflow. Isso é especialmente útil quando você precisa instrumentar seu código Python personalizado.

Decorador

O @mlflow.trace decorador permite criar um intervalo para qualquer função. Essa abordagem fornece uma maneira simples, mas eficaz, de adicionar rastreamento ao seu código com o mínimo de esforço:

  • O MLflow deteta as relações pai-filho entre funções, tornando-o compatível com integrações de rastreamento automático.
  • Captura exceções durante a execução da função e as registra como eventos de extensão.
  • Registra automaticamente o nome, as entradas, as saídas e o tempo de execução da função.
  • Pode ser usado juntamente com recursos de rastreamento automático , como mlflow.openai.autolog.

O decorador @mlflow\.trace atualmente suporta os seguintes tipos de funções:

Tipo de função Suportado
Sincronização Sim
Assíncrono Sim (>= 2.16.0)
Gerador Sim (>= 2.20.2)
Gerador assíncrono Sim (>= 2.20.2)

Exemplo

O código a seguir é um exemplo mínimo de uso do decorador para rastrear funções Python.

Sugestão

Para garantir uma observabilidade completa, o @mlflow.trace decorador deve geralmente ser o mais externo quando se utilizam múltiplos decoradores. Consulte Using @mlflow.trace with Other Decorators para obter uma explicação detalhada e exemplos.

import mlflow


@mlflow.trace(span_type="func", attributes={"key": "value"})
def add_1(x):
    return x + 1


@mlflow.trace(span_type="func", attributes={"key1": "value1"})
def minus_1(x):
    return x - 1


@mlflow.trace(name="Trace Test")
def trace_test(x):
    step1 = add_1(x)
    return minus_1(step1)


trace_test(4)

Decorador de Rastreamento

Observação

Quando um rastreamento contém vários segmentos com o mesmo nome, o MLflow acrescenta um sufixo de incremento automático a eles, como _1, _2.

Personalização de Intervalos

O @mlflow.trace decorador aceita os seguintes argumentos para personalizar o elemento de intervalo a ser criado:

  • name parâmetro para substituir o nome do span do padrão (o nome da função decorada)
  • span_type parâmetro para definir o tipo de intervalo. Defina um dos Span Types incorporados ou uma cadeia de caracteres.
  • attributes para adicionar atributos personalizados ao elemento span.

Sugestão

Ao combinar @mlflow.trace com outros decoradores (por exemplo, de frameworks web), é crucial que seja o mais externo. Para obter um exemplo claro de ordem correta versus incorreta, consulte Using @mlflow.trace with Other Decorators.

@mlflow.trace(
    name="call-local-llm", span_type=SpanType.LLM, attributes={"model": "gpt-4o-mini"}
)
def invoke(prompt: str):
    return client.invoke(
        messages=[{"role": "user", "content": prompt}], model="gpt-4o-mini"
    )

Alternativamente, pode atualizar o intervalo dinamicamente dentro da função usando mlflow.get_current_active_span API.

@mlflow.trace(span_type=SpanType.LLM)
def invoke(prompt: str):
    model_id = "gpt-4o-mini"
    # Get the current span (created by the @mlflow.trace decorator)
    span = mlflow.get_current_active_span()
    # Set the attribute to the span
    span.set_attributes({"model": model_id})
    return client.invoke(messages=[{"role": "user", "content": prompt}], model=model_id)

Usando @mlflow.trace com outros decoradores

Ao aplicar vários decoradores a uma única função, é crucial colocar @mlflow.trace como o decorador mais externo (aquele no topo). Isso garante que o MLflow possa capturar toda a execução da função, incluindo o comportamento de qualquer decorador interno.

Se @mlflow.trace não for o decorador mais externo, sua visibilidade na execução da função pode ser limitada ou incorreta, potencialmente levando a traços incompletos ou deturpação das entradas, saídas e tempo de execução da função.

Considere o seguinte exemplo conceitual:

import mlflow
import functools
import time

# A hypothetical additional decorator
def simple_timing_decorator(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"{func.__name__} executed in {end_time - start_time:.4f} seconds by simple_timing_decorator.")
        return result
    return wrapper

# Correct order: @mlflow.trace is outermost
@mlflow.trace(name="my_decorated_function_correct_order")
@simple_timing_decorator
# @another_framework_decorator # e.g., @app.route("/mypath") from Flask
def my_complex_function(x, y):
    # Function logic here
    time.sleep(0.1) # Simulate work
    return x + y

# Incorrect order: @mlflow.trace is NOT outermost
@simple_timing_decorator
@mlflow.trace(name="my_decorated_function_incorrect_order")
# @another_framework_decorator
def my_other_complex_function(x, y):
    time.sleep(0.1)
    return x * y

# Example calls
if __name__ == "__main__":
    print("Calling function with correct decorator order:")
    my_complex_function(5, 3)

    print("\nCalling function with incorrect decorator order:")
    my_other_complex_function(5, 3)

my_complex_function No exemplo (ordem correta), @mlflow.trace irá capturar a execução completa, incluindo o tempo adicionado por simple_timing_decorator. Em my_other_complex_function (ordem incorreta), o rastreamento capturado pelo MLflow pode não refletir com precisão o tempo total de execução ou pode perder modificações nas entradas/saídas feitas por simple_timing_decorator antes @mlflow.trace de vê-las.

Adicionando tags de rastreamento

As tags podem ser adicionadas a rastreamentos para fornecer metadados adicionais no nível de rastreamento. Há algumas maneiras diferentes de definir etiquetas em um rastreio. Por favor, consulte o how-to guide para os outros métodos.

@mlflow.trace
def my_func(x):
    mlflow.update_current_trace(tags={"fruit": "apple"})
    return x + 1

Personalizando visualizações de solicitação e resposta na interface do usuário

A aba Rastreios na interface do utilizador do MLflow exibe uma lista de rastreios, e as colunas Request e Response mostram uma visualização da entrada e saída de ponta a ponta de cada rastreio. Isso permite que você entenda rapidamente o que cada traço representa.

Por padrão, essas visualizações são truncadas para um número fixo de caracteres. No entanto, você pode personalizar o que é mostrado nessas colunas usando os request_preview parâmetros e response_preview dentro da mlflow.update_current_trace() função. Isso é particularmente útil para entradas ou saídas complexas em que o truncamento padrão pode não mostrar as informações mais relevantes.

Abaixo está um exemplo de definição de uma pré-visualização de pedido personalizada para um rastreamento que processa um documento longo e instruções do utilizador, com o objetivo de apresentar as informações mais relevantes na coluna Request da interface do utilizador.

import mlflow

@mlflow.trace(name="Summarization Pipeline")
def summarize_document(document_content: str, user_instructions: str):
    # Construct a custom preview for the request column
    # For example, show beginning of document and user instructions
    request_p = f"Doc: {document_content[:30]}... Instr: {user_instructions[:30]}..."
    mlflow.update_current_trace(request_preview=request_p)

    # Simulate LLM call
    # messages = [
    #     {"role": "system", "content": "Summarize the following document based on user instructions."},
    #     {"role": "user", "content": f"Document: {document_content}\nInstructions: {user_instructions}"}
    # ]
    # completion = client.chat.completions.create(model="gpt-4o-mini", messages=messages)
    # summary = completion.choices[0].message.content
    summary = f"Summary of document starting with '{document_content[:20]}...' based on '{user_instructions}'"

    # Customize the response preview
    response_p = f"Summary: {summary[:50]}..."
    mlflow.update_current_trace(response_preview=response_p)

    return summary

# Example Call
long_document = "This is a very long document that contains many details about various topics..." * 10
instructions = "Focus on the key takeaways regarding topic X."
summary_result = summarize_document(long_document, instructions)
# print(summary_result)

Ao definir request_preview e response_preview no rastreamento (normalmente a extensão raiz), você controla como a interação geral é resumida na exibição da lista de rastreamento principal, facilitando a identificação e a compreensão rápida dos rastreamentos.

Tratamento automático de exceções

Se um Exception for gerado durante o processamento de uma operação instrumentada por rastreamento, uma indicação será mostrada na interface de utilizador de que a invocação não foi bem-sucedida e uma captura parcial dos dados será disponibilizada para ajudar na depuração. Além disso, os detalhes sobre a Exceção que foi levantada serão incluídos dentro Events da extensão parcialmente concluída, ajudando ainda mais na identificação de onde os problemas estão ocorrendo em seu código.

Erro de rastreamento

Combinação com o Auto-Tracing

O @mlflow.trace decorador pode ser usado em conjunto com o rastreamento automático para combinar o rastreamento automático com intervalos definidos manualmente em um único rastreio coeso e integrado. Saiba mais aqui.

Serviço de streaming

O @mlflow.trace decorador pode ser usado para rastrear funções que retornam um gerador ou um iterador, desde MLflow 2.20.2.

@mlflow.trace
def stream_data():
    for i in range(5):
        yield i

O exemplo acima gerará um traço com um único intervalo para a função stream_data. Por defeito, o MLflow capturará todos os elementos produzidos pelo gerador como uma lista na saída do span. No exemplo acima, a saída do span será [0, 1, 2, 3, 4].

Observação

Uma extensão para uma função de fluxo começará quando o iterador retornado começar a ser consumido e terminará quando o iterador estiver esgotado ou uma exceção for gerada durante a iteração.

Usando redutores de saída

Se quiser agregar os elementos para ser uma saída de span única, você pode usar o output_reducer parâmetro para especificar uma função personalizada para agregar os elementos. A função personalizada deve ter uma lista de elementos produzidos como entradas.

from typing import List, Any

@mlflow.trace(output_reducer=lambda x: ",".join(x))
def stream_data():
    for c in "hello":
        yield c

No exemplo acima, a saída do span será "h,e,l,l,o". Os blocos brutos ainda podem ser encontrados no separador Events do intervalo na interface do usuário MLflow Trace, permitindo que você inspecione valores individuais produzidos durante a depuração.

Padrões comuns de redutor de saída

Aqui estão alguns padrões comuns para implementar redutores de saída:

Agregação de Tokens
from typing import List, Dict, Any

def aggregate_tokens(chunks: List[str]) -> str:
    """Concatenate streaming tokens into complete text"""
    return "".join(chunks)

@mlflow.trace(output_reducer=aggregate_tokens)
def stream_text():
    for word in ["Hello", " ", "World", "!"]:
        yield word
Agregação de métricas
def aggregate_metrics(chunks: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Aggregate streaming metrics into summary statistics"""
    values = [c["value"] for c in chunks if "value" in c]
    return {
        "count": len(values),
        "sum": sum(values),
        "average": sum(values) / len(values) if values else 0,
        "max": max(values) if values else None,
        "min": min(values) if values else None
    }

@mlflow.trace(output_reducer=aggregate_metrics)
def stream_metrics():
    for i in range(10):
        yield {"value": i * 2, "timestamp": time.time()}
Recolha de erros
def collect_results_and_errors(chunks: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Separate successful results from errors"""
    results = []
    errors = []

    for chunk in chunks:
        if chunk.get("error"):
            errors.append(chunk["error"])
        else:
            results.append(chunk.get("data"))

    return {
        "results": results,
        "errors": errors,
        "success_rate": len(results) / len(chunks) if chunks else 0,
        "has_errors": len(errors) > 0
    }

Exemplo avançado: OpenAI Streaming

A seguir está um exemplo avançado que usa o output_reducer para consolidar a saída do ChatCompletionChunk de um LLM da OpenAI em um único objeto de mensagem.

Sugestão

Recomendamos o uso do rastreamento automático para OpenAI para casos de uso de produção, que lida com isso automaticamente. O exemplo abaixo é para fins de demonstração.

import mlflow
import openai
from openai.types.chat import *
from typing import Optional


def aggregate_chunks(outputs: list[ChatCompletionChunk]) -> Optional[ChatCompletion]:
    """Consolidate ChatCompletionChunks to a single ChatCompletion"""
    if not outputs:
        return None

    first_chunk = outputs[0]
    delta = first_chunk.choices[0].delta
    message = ChatCompletionMessage(
        role=delta.role, content=delta.content, tool_calls=delta.tool_calls or []
    )
    finish_reason = first_chunk.choices[0].finish_reason
    for chunk in outputs[1:]:
        delta = chunk.choices[0].delta
        message.content += delta.content or ""
        message.tool_calls += delta.tool_calls or []
        finish_reason = finish_reason or chunk.choices[0].finish_reason

    base = ChatCompletion(
        id=first_chunk.id,
        choices=[Choice(index=0, message=message, finish_reason=finish_reason)],
        created=first_chunk.created,
        model=first_chunk.model,
        object="chat.completion",
    )
    return base


@mlflow.trace(output_reducer=aggregate_chunks)
def predict(messages: list[dict]):
    stream = openai.OpenAI().chat.completions.create(
        model="gpt-4o-mini",
        messages=messages,
        stream=True,
    )
    for chunk in stream:
        yield chunk


for chunk in predict([{"role": "user", "content": "Hello"}]):
    print(chunk)

No exemplo acima, o span gerado predict terá uma única mensagem de conclusão de chat como saída, que é agregada pela função redutor personalizada.

Real-World casos de uso

Aqui estão exemplos adicionais de redutores de saída para cenários comuns de GenAI:

Resposta LLM com análise JSON
from typing import List, Dict, Any
import json

def parse_json_from_llm(content: str) -> str:
    """Extract and clean JSON from LLM responses that may include markdown"""
    # Remove common markdown code block wrappers
    if content.startswith("```json") and content.endswith("```"):
        content = content[7:-3]  # Remove ```json prefix and ``` suffix
    elif content.startswith("```") and content.endswith("```"):
        content = content[3:-3]  # Remove generic ``` wrappers
    return content.strip()

def json_stream_reducer(chunks: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Aggregate LLM streaming output and parse JSON response"""
    full_content = ""
    metadata = {}
    errors = []

    # Process different chunk types
    for chunk in chunks:
        chunk_type = chunk.get("type", "content")

        if chunk_type == "content" or chunk_type == "token":
            full_content += chunk.get("content", "")
        elif chunk_type == "metadata":
            metadata.update(chunk.get("data", {}))
        elif chunk_type == "error":
            errors.append(chunk.get("error"))

    # Return early if errors occurred
    if errors:
        return {
            "status": "error",
            "errors": errors,
            "raw_content": full_content,
            **metadata
        }

    # Try to parse accumulated content as JSON
    try:
        cleaned_content = parse_json_from_llm(full_content)
        parsed_data = json.loads(cleaned_content)

        return {
            "status": "success",
            "data": parsed_data,
            "raw_content": full_content,
            **metadata
        }
    except json.JSONDecodeError as e:
        return {
            "status": "parse_error",
            "error": f"Failed to parse JSON: {str(e)}",
            "raw_content": full_content,
            **metadata
        }

@mlflow.trace(output_reducer=json_stream_reducer)
def generate_structured_output(prompt: str, schema: dict):
    """Generate structured JSON output from an LLM"""
    # Simulate streaming JSON generation
    yield {"type": "content", "content": '{"name": "John", '}
    yield {"type": "content", "content": '"email": "john@example.com", '}
    yield {"type": "content", "content": '"age": 30}'}

    # Add metadata
    trace_id = mlflow.get_current_active_span().request_id if mlflow.get_current_active_span() else None
    yield {"type": "metadata", "data": {"trace_id": trace_id, "model": "gpt-4"}}
Geração de saída estruturada com OpenAI

Aqui está um exemplo completo do uso de redutores de saída com OpenAI para gerar e analisar respostas JSON estruturadas:

import json
import mlflow
import openai
from typing import List, Dict, Any, Optional

def structured_output_reducer(chunks: List[Dict[str, Any]]) -> Dict[str, Any]:
    """
    Aggregate streaming chunks into structured output with comprehensive error handling.
    Handles token streaming, metadata collection, and JSON parsing.
    """
    content_parts = []
    trace_id = None
    model_info = None
    errors = []

    for chunk in chunks:
        chunk_type = chunk.get("type", "token")

        if chunk_type == "token":
            content_parts.append(chunk.get("content", ""))
        elif chunk_type == "trace_info":
            trace_id = chunk.get("trace_id")
            model_info = chunk.get("model")
        elif chunk_type == "error":
            errors.append(chunk.get("message"))

    # Join all content parts
    full_content = "".join(content_parts)

    # Base response
    response = {
        "trace_id": trace_id,
        "model": model_info,
        "raw_content": full_content
    }

    # Handle errors
    if errors:
        response["status"] = "error"
        response["errors"] = errors
        return response

    # Try to extract and parse JSON
    try:
        # Clean markdown wrappers if present
        json_content = full_content.strip()
        if json_content.startswith("```json") and json_content.endswith("```"):
            json_content = json_content[7:-3].strip()
        elif json_content.startswith("```") and json_content.endswith("```"):
            json_content = json_content[3:-3].strip()

        parsed_data = json.loads(json_content)
        response["status"] = "success"
        response["data"] = parsed_data

    except json.JSONDecodeError as e:
        response["status"] = "parse_error"
        response["error"] = f"JSON parsing failed: {str(e)}"
        response["error_position"] = e.pos if hasattr(e, 'pos') else None

    return response

@mlflow.trace(output_reducer=structured_output_reducer)
async def generate_customer_email(
    customer_name: str,
    issue: str,
    sentiment: str = "professional"
) -> None:
    """
    Generate a structured customer service email response.
    Demonstrates real-world streaming with OpenAI and structured output parsing.
    """
    client = openai.AsyncOpenAI()

    system_prompt = """You are a customer service assistant. Generate a professional email response in JSON format:
    {
        "subject": "email subject line",
        "greeting": "personalized greeting",
        "body": "main email content addressing the issue",
        "closing": "professional closing",
        "priority": "high|medium|low"
    }"""

    user_prompt = f"Customer: {customer_name}\nIssue: {issue}\nTone: {sentiment}"

    try:
        # Stream the response
        stream = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt}
            ],
            stream=True,
            temperature=0.7
        )

        # Yield streaming tokens
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                yield {
                    "type": "token",
                    "content": chunk.choices[0].delta.content
                }

        # Add trace metadata
        if current_span := mlflow.get_current_active_span():
            yield {
                "type": "trace_info",
                "trace_id": current_span.request_id,
                "model": "gpt-4o-mini"
            }

    except Exception as e:
        yield {
            "type": "error",
            "message": f"OpenAI API error: {str(e)}"
        }

# Example usage
async def main():
    # This will automatically aggregate the streamed output into structured JSON
    async for chunk in generate_customer_email(
        customer_name="John Doe",
        issue="Product arrived damaged",
        sentiment="empathetic"
    ):
        # In practice, you might send these chunks to a frontend
        print(chunk.get("content", ""), end="", flush=True)

Observação

Este exemplo mostra vários padrões do mundo real:

  • Atualizações de interface do utilizador em tempo real: podem ser exibidos à medida que chegam
  • Validação de saída estruturada: a análise JSON garante o formato de resposta
  • Resiliência a erros: lida com erros de API e falhas de análise normalmente
  • Correlação de traços: estabelece ligação entre a saída de streaming e traços do MLflow para depuração.
Agregação de resposta multimodelo
def multi_model_reducer(chunks: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Aggregate responses from multiple models"""
    responses = {}
    latencies = {}

    for chunk in chunks:
        model = chunk.get("model")
        if model:
            responses[model] = chunk.get("response", "")
            latencies[model] = chunk.get("latency", 0)

    return {
        "responses": responses,
        "latencies": latencies,
        "fastest_model": min(latencies, key=latencies.get) if latencies else None,
        "consensus": len(set(responses.values())) == 1
    }

Teste de Redutores de Saída

Os redutores de saída podem ser testados independentemente da estrutura de rastreamento, facilitando a garantia de que eles lidam com casos extremos corretamente.

import unittest
from typing import List, Dict, Any

def my_reducer(chunks: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Example reducer to be tested"""
    if not chunks:
        return {"status": "empty", "total": 0}

    total = sum(c.get("value", 0) for c in chunks)
    errors = [c for c in chunks if c.get("error")]

    return {
        "status": "error" if errors else "success",
        "total": total,
        "count": len(chunks),
        "average": total / len(chunks) if chunks else 0,
        "error_count": len(errors)
    }

class TestOutputReducer(unittest.TestCase):
    def test_normal_case(self):
        chunks = [
            {"value": 10},
            {"value": 20},
            {"value": 30}
        ]
        result = my_reducer(chunks)
        self.assertEqual(result["status"], "success")
        self.assertEqual(result["total"], 60)
        self.assertEqual(result["average"], 20.0)

    def test_empty_input(self):
        result = my_reducer([])
        self.assertEqual(result["status"], "empty")
        self.assertEqual(result["total"], 0)

    def test_error_handling(self):
        chunks = [
            {"value": 10},
            {"error": "Network timeout"},
            {"value": 20}
        ]
        result = my_reducer(chunks)
        self.assertEqual(result["status"], "error")
        self.assertEqual(result["total"], 30)
        self.assertEqual(result["error_count"], 1)

    def test_missing_values(self):
        chunks = [
            {"value": 10},
            {"metadata": "some info"},  # No value field
            {"value": 20}
        ]
        result = my_reducer(chunks)
        self.assertEqual(result["total"], 30)
        self.assertEqual(result["count"], 3)

Sugestão

  • Os redutores de saída recebem todos os fragmentos na memória de uma só vez. Para fluxos muito grandes, considere a implementação de alternativas de streaming ou estratégias de fragmentação.
  • O vão permanece aberto até que o gerador seja totalmente consumido, o que afeta as métricas de latência.
  • Os redutores devem ser sem estado e evitar efeitos secundários para um comportamento previsível.

Encapsulamento de funções

O encapsulamento de funções fornece uma maneira flexível de adicionar rastreamento a funções existentes sem modificar suas definições. Isso é particularmente útil quando você deseja adicionar rastreamento a funções de terceiros ou funções definidas fora do seu controle. Ao envolver uma função externa com @mlflow.trace, pode capturar as suas entradas, saídas e contexto de execução.

Observação

Ao envolver funções dinamicamente, o conceito de "mais externo" ainda se aplica. O wrapper de rastreamento deve ser aplicado no ponto em que você deseja capturar toda a chamada para a função encapsulada.

import math
import mlflow


def invocation(x, y, exp=2):
    # Wrap an external function from the math library
    traced_pow = mlflow.trace(math.pow)
    raised = traced_pow(x, exp)

    traced_factorial = mlflow.trace(math.factorial)
    factorial = traced_factorial(int(raised))
    return response


invocation(4)

Gestor de Contexto

Além do decorador, o MLflow permite criar um intervalo que pode ser acessado dentro de qualquer bloco de código arbitrário encapsulado usando o gerenciador de contexto mlflow.start_span. Ele pode ser útil para capturar interações complexas dentro do seu código em detalhes mais finos do que o que é possível capturando os limites de uma única função.

Da mesma forma que o decorador, o gerenciador de contexto captura automaticamente a relação pai-filho, exceções, tempo de execução e trabalha com rastreamento automático. No entanto, o nome, as entradas e as saídas do intervalo devem ser fornecidos manualmente. Você pode defini-los através do mlflow.entities.Span objeto que é retornado do gerenciador de contexto.

with mlflow.start_span(name="my_span") as span:
    span.set_inputs({"x": 1, "y": 2})
    z = x + y
    span.set_outputs(z)

Abaixo está um exemplo um pouco mais complexo que usa o gerente de contexto mlflow.start_span em conjunto com o decorador e a rastreação automática para o OpenAI.

import mlflow
from mlflow.entities import SpanType


@mlflow.trace(span_type=SpanType.CHAIN)
def start_session():
    messages = [{"role": "system", "content": "You are a friendly chat bot"}]
    while True:
        with mlflow.start_span(name="User") as span:
            span.set_inputs(messages)
            user_input = input(">> ")
            span.set_outputs(user_input)

        if user_input == "BYE":
            break

        messages.append({"role": "user", "content": user_input})

        response = openai.OpenAI().chat.completions.create(
            model="gpt-4o-mini",
            max_tokens=100,
            messages=messages,
        )
        answer = response.choices[0].message.content
        print(f"🤖: {answer}")

        messages.append({"role": "assistant", "content": answer})


mlflow.openai.autolog()
start_session()

Multi Threading

O MLflow Tracing é thread-safe, os rastreamentos são isolados por padrão por thread. Mas você também pode criar um rastreamento que abrange vários threads com algumas etapas adicionais.

O MLflow usa o mecanismo ContextVar integrado do Python para garantir a segurança do thread, que não é propagado entre threads por padrão. Portanto, você precisa copiar manualmente o contexto do thread principal para o thread de trabalho, conforme mostrado no exemplo abaixo.

import contextvars
from concurrent.futures import ThreadPoolExecutor, as_completed
import mlflow
from mlflow.entities import SpanType
import openai

client = openai.OpenAI()

# Enable MLflow Tracing for OpenAI
mlflow.openai.autolog()


@mlflow.trace
def worker(question: str) -> str:
    messages = [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": question},
    ]
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages,
        temperature=0.1,
        max_tokens=100,
    )
    return response.choices[0].message.content


@mlflow.trace
def main(questions: list[str]) -> list[str]:
    results = []
    # Almost same as how you would use ThreadPoolExecutor, but two additional steps
    #  1. Copy the context in the main thread using copy_context()
    #  2. Use ctx.run() to run the worker in the copied context
    with ThreadPoolExecutor(max_workers=2) as executor:
        futures = []
        for question in questions:
            ctx = contextvars.copy_context()
            futures.append(executor.submit(ctx.run, worker, question))
        for future in as_completed(futures):
            results.append(future.result())
    return results


questions = [
    "What is the capital of France?",
    "What is the capital of Germany?",
]

main(questions)

Traçado multithreaded

Sugestão

Por outro lado, ContextVar é copiado para tarefas assíncronas por padrão. Portanto, não precisa de copiar manualmente o contexto ao usar asyncio, o que pode ser uma maneira mais fácil de lidar com tarefas de E/S simultâneas em Python com MLflow Tracing.

(Avançado) APIs de cliente de baixo nível

Quando o decorador ou o gerenciador de contexto não atender às suas necessidades, você poderá usar as APIs de cliente de baixo nível. Por exemplo, pode ser necessário iniciar e terminar um período de funções diferentes. A API do cliente foi projetada como um wrapper fino sobre as APIs REST do MLflow, oferecendo mais controle sobre o ciclo de vida do rastreamento. Para mais detalhes, consulte o guia.

Advertência

Ao usar APIs de cliente, esteja ciente das seguintes limitações:

  • A relação pai-filho NÃO é capturada automaticamente. Você precisa passar manualmente o ID do span pai.
  • Os spans criados usando a API do cliente NÃO se combinam com spans de rastreamento automático.
  • As APIs de baixo nível marcadas como experimentais estão sujeitas a alterações com base em atualizações de implementação de back-end.

## Next steps
Continue your journey with these recommended actions and tutorials.

- [Low-level Client APIs](/mlflow3/genai/tracing/app-instrumentation/manual-tracing/low-level-api.md) - Learn advanced tracing control for complex scenarios
- [Debug & observe your app](/mlflow3/genai/tracing/observe-with-traces/index.md) - Use your traced app for debugging and analysis
- [Combine with automatic tracing](/mlflow3/genai/tracing/app-instrumentation/automatic.md) - Mix manual and automatic tracing

## Reference guides
Explore detailed documentation for concepts and features mentioned in this guide.

- [Tracing data model](/mlflow3/genai/tracing/data-model.md) - Understand the structure of traces and spans
- [Tracing concepts](/mlflow3/genai/tracing/tracing-101.md) - Learn fundamentals of distributed tracing
- [FAQ](/mlflow3/genai/tracing/faq.md) - Common questions about tracing implementation