Comparteix via


Programar agentes de IA en código

En esta página se muestra cómo crear un agente de INTELIGENCIA ARTIFICIAL en Python mediante Mosaic AI Agent Framework y bibliotecas populares de creación de agentes como LangGraph y OpenAI.

Requisitos

Sugerencia

Databricks recomienda instalar la versión más reciente del cliente de Python de MLflow al desarrollar agentes.

Para crear e implementar agentes mediante el enfoque de esta página, instale lo siguiente:

  • databricks-agents 1.2.0 o superior
  • mlflow 3.1.3 o superior
  • Python 3.10 o superior.
    • Use el proceso sin servidor o Databricks Runtime 13.3 LTS o superior para cumplir este requisito.
%pip install -U -qqqq databricks-agents mlflow

Databricks también recomienda instalar paquetes de integración de Databricks AI Bridge para crear agentes. Estos paquetes de integración proporcionan una capa compartida de API que interactúan con las características de inteligencia artificial de Databricks, como Databricks AI/BI Genie y Vector Search, entre marcos de creación de agentes y SDK.

OpenAI

%pip install -U -qqqq databricks-openai

LangChain/LangGraph

%pip install -U -qqqq databricks-langchain

DSPy

%pip install -U -qqqq databricks-dspy

Agentes puros de Python

%pip install -U -qqqq databricks-ai-bridge

Uso de ResponsesAgent para crear agentes

Databricks recomienda la interfaz ResponsesAgent de MLflow para crear agentes de nivel de producción. ResponsesAgent permite crear agentes con cualquier marco de terceros y, a continuación, integrarlos con las características de Inteligencia artificial de Databricks para lograr un registro sólido, seguimiento, evaluación, implementación y funcionalidades de supervisión.

El ResponsesAgent esquema es compatible con el esquema openAI Responses . Para obtener más información sobre OpenAI, consulte OpenAIResponses: Respuestas frente a ChatCompletion.

Nota

La interfaz anterior ChatAgent sigue siendo compatible con Databricks. Sin embargo, para los nuevos agentes, Databricks recomienda usar la versión más reciente de MLflow y la ResponsesAgent interfaz.

Consulte Esquema del agente heredado de entrada y salida.

ResponsesAgent ajusta fácilmente los agentes existentes para la compatibilidad de Databricks.

ResponsesAgent proporciona las siguientes ventajas:

  • Funcionalidades avanzadas del agente

    • Compatibilidad con varios agentes
    • Salida de streaming: transmita la salida en fragmentos más pequeños.
    • Historial completo de mensajes de llamada a herramientas: devuelve varios mensajes, incluidos los mensajes intermedios de llamada a herramientas, para mejorar la calidad y la administración de conversaciones.
    • Compatibilidad con la confirmación de llamadas a herramientas
    • Compatibilidad con herramientas de larga duración
  • Desarrollo, implementación y supervisión simplificados

    • Creación de agentes mediante cualquier marco: ajuste cualquier agente existente mediante la ResponsesAgent interfaz para obtener la compatibilidad integrada con AI Playground, Agent Evaluation y Agent Monitoring.
    • Interfaces de creación con tipo: escriba código de agente mediante clases de Python con tipo, que se benefician del IDE y la acción de autocompletar para los cuadernos.
    • Inferencia automática de firmas: MLflow deduce ResponsesAgent automáticamente las firmas al registrar un agente, lo que simplifica el registro y la implementación. Consulte Deducción de la firma del modelo durante el registro.
    • Seguimiento automático: MLflow realiza un seguimiento automático de las predict funciones y predict_stream , agregando respuestas transmitidas para facilitar la evaluación y visualización.
    • tablas de inferencia mejoradas por AI Gateway: las tablas de inferencia de AI Gateway se habilitan automáticamente para los agentes implementados, lo que proporciona acceso a metadatos detallados del registro de solicitudes.

Para obtener información sobre cómo crear un ResponsesAgent, consulte los ejemplos de la sección siguiente y la documentación de MLflow: ResponsesAgent for Model Serving.

ResponsesAgent Ejemplos

En los cuadernos siguientes se muestra cómo crear streaming y no streaming ResponsesAgent mediante bibliotecas populares. Para obtener información sobre cómo expandir las funcionalidades de estos agentes, consulte Herramientas del agente de IA.

OpenAI

Agente de chat simple de OpenAI mediante modelos hospedados en Databricks

Obtener el cuaderno

Agente de llamada a herramientas de OpenAI mediante modelos hospedados en Databricks

Obtener el cuaderno

Agente de llamada a herramientas de OpenAI mediante modelos hospedados en OpenAI

Obtener el cuaderno

LangGraph

Agente de invocación de herramientas de LangGraph

Obtener el cuaderno

DSPy

Agente de llamada de herramientas por turno único de DSPy

Obtener el cuaderno

Ejemplo de varios agentes

Para obtener información sobre cómo crear un sistema multiagente, consulte Uso de Genie en sistemas multiagente.

Ejemplo de agente con estado

Para obtener información sobre cómo crear agentes con estado con memoria a corto y largo plazo mediante Lakebase como almacén de memoria, consulte Memoria del agente de IA.

Ejemplo de agente no conversacional

A diferencia de los agentes conversacionales que administran diálogos multiturno, los agentes no conversacionales se centran en ejecutar tareas claramente definidas de manera eficaz. Esta arquitectura simplificada permite un mayor rendimiento para las solicitudes independientes.

Para obtener información sobre cómo crear un agente que no sea de conversación, consulte Agentes de IA no conversacionales mediante MLflow.

¿Qué ocurre si ya tengo un agente?

Si ya tiene un agente compilado con LangChain, LangGraph o un marco similar, no es necesario volver a escribir el agente para usarlo en Databricks. En su lugar, encapsule el agente existente con la interfaz MLflow ResponsesAgent :

  1. Escriba una clase contenedora de Python que herede de mlflow.pyfunc.ResponsesAgent.

    Dentro de la clase contenedora, haga referencia al agente existente como atributo self.agent = your_existing_agent.

  2. La ResponsesAgent clase requiere la implementación de un predict método que devuelve para ResponsesAgentResponse controlar las solicitudes que no son de streaming. A continuación se muestra un ejemplo del ResponsesAgentResponses esquema:

    import uuid
    # input as a dict
    {"input": [{"role": "user", "content": "What did the data scientist say when their Spark job finally completed?"}]}
    
    # output example
    ResponsesAgentResponse(
        output=[
            {
                "type": "message",
                "id": str(uuid.uuid4()),
                "content": [{"type": "output_text", "text": "Well, that really sparked joy!"}],
                "role": "assistant",
            },
        ]
    )
    
  3. En la predict función , convierta los mensajes entrantes de ResponsesAgentRequest en el formato que espera el agente. Después de que el agente genere una respuesta, convierta su salida en un ResponsesAgentResponse objeto .

Consulte los ejemplos de código siguientes para ver cómo convertir agentes existentes en ResponsesAgent:

Conversión básica

En el caso de los agentes que no son de streaming, convierta entradas y salidas en la predict función .

from uuid import uuid4

from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
)


class MyWrappedAgent(ResponsesAgent):
    def __init__(self, agent):
        # Reference your existing agent
        self.agent = agent

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        # Convert incoming messages to your agent's format
        # prep_msgs_for_llm is a function you write to convert the incoming messages
        messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])

        # Call your existing agent (non-streaming)
        agent_response = self.agent.invoke(messages)

        # Convert your agent's output to ResponsesAgent format, assuming agent_response is a str
        output_item = (self.create_text_output_item(text=agent_response, id=str(uuid4())),)

        # Return the response
        return ResponsesAgentResponse(output=[output_item])

Streaming con código que se vuelve a usar

En el caso de los agentes de streaming, puede ser inteligente y reutilizar la lógica para evitar duplicar el código que convierte los mensajes:

from typing import Generator
from uuid import uuid4

from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)


class MyWrappedStreamingAgent(ResponsesAgent):
    def __init__(self, agent):
        # Reference your existing agent
        self.agent = agent

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        """Non-streaming predict: collects all streaming chunks into a single response."""
        # Reuse the streaming logic and collect all output items
        output_items = []
        for stream_event in self.predict_stream(request):
            if stream_event.type == "response.output_item.done":
                output_items.append(stream_event.item)

        # Return all collected items as a single response
        return ResponsesAgentResponse(output=output_items)

    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        """Streaming predict: the core logic that both methods use."""
        # Convert incoming messages to your agent's format
        # prep_msgs_for_llm is a function you write to convert the incoming messages, included in full examples linked below
        messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])

        # Stream from your existing agent
        item_id = str(uuid4())
        aggregated_stream = ""
        for chunk in self.agent.stream(messages):
            # Convert each chunk to ResponsesAgent format
            yield self.create_text_delta(delta=chunk, item_id=item_id)
            aggregated_stream += chunk

        # Emit an aggregated output_item for all the text deltas with id=item_id
        yield ResponsesAgentStreamEvent(
            type="response.output_item.done",
            item=self.create_text_output_item(text=aggregated_stream, id=item_id),
        )

Migración desde ChatCompletions

Si el agente existente usa la API ChatCompletions de OpenAI, puede migrarla a sin volver a ResponsesAgent escribir su lógica principal. Agregue un contenedor que:

  1. Convierte los mensajes entrantes ResponsesAgentRequest en el formato que espera el ChatCompletions agente.
  2. Traduce las ChatCompletions salidas en el ResponsesAgentResponse esquema.
  3. Opcionalmente, admite el streaming mediante la asignación de diferencias incrementales de ChatCompletions en ResponsesAgentStreamEvent objetos.
from typing import Generator
from uuid import uuid4

from databricks.sdk import WorkspaceClient
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)


# Legacy agent that outputs ChatCompletions objects
class LegacyAgent:
    def __init__(self):
        self.w = WorkspaceClient()
        self.OpenAI = self.w.serving_endpoints.get_open_ai_client()

    def stream(self, messages):
        for chunk in self.OpenAI.chat.completions.create(
            model="databricks-claude-sonnet-4-5",
            messages=messages,
            stream=True,
        ):
            yield chunk.to_dict()


# Wrapper that converts the legacy agent to a ResponsesAgent
class MyWrappedStreamingAgent(ResponsesAgent):
    def __init__(self, agent):
        # `agent` is your existing ChatCompletions agent
        self.agent = agent

    def prep_msgs_for_llm(self, messages):
        # dummy example of prep_msgs_for_llm
        # real example of prep_msgs_for_llm included in full examples linked below
        return [{"role": "user", "content": "Hello, how are you?"}]

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        """Non-streaming predict: collects all streaming chunks into a single response."""
        # Reuse the streaming logic and collect all output items
        output_items = []
        for stream_event in self.predict_stream(request):
            if stream_event.type == "response.output_item.done":
                output_items.append(stream_event.item)

        # Return all collected items as a single response
        return ResponsesAgentResponse(output=output_items)

    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        """Streaming predict: the core logic that both methods use."""
        # Convert incoming messages to your agent's format
        messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])

        # process the ChatCompletion output stream
        agent_content = ""
        tool_calls = []
        msg_id = None
        for chunk in self.agent.stream(messages):  # call the underlying agent's stream method
            delta = chunk["choices"][0]["delta"]
            msg_id = chunk.get("id", None)
            content = delta.get("content", None)
            if tc := delta.get("tool_calls"):
                if not tool_calls:  # only accommodate for single tool call right now
                    tool_calls = tc
                else:
                    tool_calls[0]["function"]["arguments"] += tc[0]["function"]["arguments"]
            elif content is not None:
                agent_content += content
                yield ResponsesAgentStreamEvent(**self.create_text_delta(content, item_id=msg_id))

        # aggregate the streamed text content
        yield ResponsesAgentStreamEvent(
            type="response.output_item.done",
            item=self.create_text_output_item(agent_content, msg_id),
        )

        for tool_call in tool_calls:
            yield ResponsesAgentStreamEvent(
                type="response.output_item.done",
                item=self.create_function_call_item(
                    str(uuid4()),
                    tool_call["id"],
                    tool_call["function"]["name"],
                    tool_call["function"]["arguments"],
                ),
            )


agent = MyWrappedStreamingAgent(LegacyAgent())

for chunk in agent.predict_stream(
    ResponsesAgentRequest(input=[{"role": "user", "content": "Hello, how are you?"}])
):
    print(chunk)

Para obtener ejemplos completos, consulte ResponsesAgent ejemplos.

Respuestas de streaming

El streaming permite a los agentes enviar respuestas en fragmentos en tiempo real en lugar de esperar a que se complete la respuesta. Para implementar el streaming con ResponsesAgent, emita una serie de eventos delta seguidos de un evento de finalización final:

  1. Emitir eventos delta: envíe varios output_text.delta eventos con el mismo item_id para transmitir fragmentos de texto en tiempo real.
  2. Finalizar con evento terminado: envíe un evento final response.output_item.done con el mismo item_id que los eventos delta que contienen el texto de salida final completo.

Cada evento delta transmite un fragmento de texto al cliente. El evento final hecho contiene el texto de respuesta completo y indica a Databricks que haga lo siguiente:

  • Seguimiento de la salida del agente con el seguimiento de MLflow
  • Agregar respuestas transmitidas en tablas de inferencia de AI Gateway
  • Mostrar la salida completa en la interfaz de usuario del área de juegos de IA

Propagación de errores de streaming

Mosaic AI propaga los errores detectados durante el streaming con el último token en databricks_output.error. Depende del cliente que realiza la llamada manejar y mostrar adecuadamente este error.

{
  "delta": …,
  "databricks_output": {
    "trace": {...},
    "error": {
      "error_code": BAD_REQUEST,
      "message": "TimeoutException: Tool XYZ failed to execute."
    }
  }
}

Características avanzadas

entradas y salidas personalizadas

Algunos escenarios pueden requerir entradas de agente adicionales, como client_type y , session_ido salidas como vínculos de origen de recuperación que no deben incluirse en el historial de chat para futuras interacciones.

En estos escenarios, MLflow ResponsesAgent admite de forma nativa los campos custom_inputs y custom_outputs. Puede acceder a las entradas personalizadas a través request.custom_inputs de en todos los ejemplos vinculados anteriormente en ResponsesAgent Examples.

Advertencia

La aplicación de revisión de evaluación del agente no admite la representación de seguimientos para agentes con campos de entrada adicionales.

Consulte los cuadernos siguientes para obtener información sobre cómo establecer entradas y salidas personalizadas.

Proporcionar custom_inputs en ai Playground y revisar la aplicación

Si el agente acepta entradas adicionales mediante el custom_inputs campo , puede proporcionar manualmente estas entradas tanto en ai Playground como en la aplicación de revisión.

  1. En el AI Playground o la aplicación Agent Review, seleccione el icono de engranaje Icono de engranaje.

  2. Habilite custom_inputs.

  3. Proporcione un objeto JSON que coincida con el esquema de entrada definido del agente.

    Proporcione custom_inputs en IA Playground.

Especificar esquemas personalizados del recuperador

Los agentes de inteligencia artificial suelen usar recuperadores para buscar y consultar datos no estructurados de índices de búsqueda vectorial. Para obtener herramientas recuperadoras de ejemplo, consulte Herramientas para construir y rastrear recuperadores para datos no estructurados.

Realice un seguimiento de estos recuperadores dentro del agente con intervalos de RECUPERACIÓN de MLflow para habilitar las características del producto de Databricks, entre las que se incluyen:

  • Mostrar automáticamente vínculos a documentos de origen recuperados en la interfaz de usuario de AI Playground
  • Ejecución automática de la recuperación del fundamento y evaluadores de relevancia en la evaluación del agente

Nota

Databricks recomienda usar las herramientas del recuperador proporcionadas por paquetes de Databricks AI Bridge como databricks_langchain.VectorSearchRetrieverTool y databricks_openai.VectorSearchRetrieverTool porque ya se ajustan al esquema del recuperador de MLflow. Consulte Desarrollo local de herramientas de recuperación de vectores de búsqueda con AI Bridge.

Si el agente incluye intervalos del recuperador con un esquema personalizado, llame a mlflow.models.set_retriever_schema cuando defina el agente en el código. Esto asigna las columnas de salida del recuperador a los campos esperados de MLflow (primary_key, text_column, doc_uri).

import mlflow
# Define the retriever's schema by providing your column names
# For example, the following call specifies the schema of a retriever that returns a list of objects like
# [
#     {
#         'document_id': '9a8292da3a9d4005a988bf0bfdd0024c',
#         'chunk_text': 'MLflow is an open-source platform, purpose-built to assist machine learning practitioners...',
#         'doc_uri': 'https://mlflow.org/docs/latest/index.html',
#         'title': 'MLflow: A Tool for Managing the Machine Learning Lifecycle'
#     },
#     {
#         'document_id': '7537fe93c97f4fdb9867412e9c1f9e5b',
#         'chunk_text': 'A great way to get started with MLflow is to use the autologging feature. Autologging automatically logs your model...',
#         'doc_uri': 'https://mlflow.org/docs/latest/getting-started/',
#         'title': 'Getting Started with MLflow'
#     },
# ...
# ]
mlflow.models.set_retriever_schema(
    # Specify the name of your retriever span
    name="mlflow_docs_vector_search",
    # Specify the output column name to treat as the primary key (ID) of each retrieved document
    primary_key="document_id",
    # Specify the output column name to treat as the text content (page content) of each retrieved document
    text_column="chunk_text",
    # Specify the output column name to treat as the document URI of each retrieved document
    doc_uri="doc_uri",
    # Specify any other columns returned by the retriever
    other_columns=["title"],
)

Nota

La columna doc_uri es especialmente importante al evaluar el rendimiento del recuperador. doc_uri es el identificador principal de los documentos devueltos por el recuperador, lo que le permite compararlos con los conjuntos de evaluación de la verdad fundamental. Consulte Conjuntos de evaluación (MLflow 2).

Consideraciones sobre la implementación

Preparación del servicio de modelos de Databricks

Databricks implementa ResponsesAgents en un entorno distribuido en Databricks Model Serving. Esto significa que durante una conversación de varios turnos, es posible que la misma réplica de servicio no controle todas las solicitudes. Preste atención a las siguientes implicaciones para administrar el estado del agente:

  • Evitar el almacenamiento en caché local: al implementar un ResponsesAgent, no asuma que la misma réplica controla todas las solicitudes en una conversación multiturno. Reconstruya el estado interno mediante un esquema ResponsesAgentRequest de diccionario para cada turno.

  • estado seguro para subprocesos: diseñe el estado del agente para que sea seguro para subprocesos, lo que impide conflictos en entornos multiproceso.

  • Inicializar el estado en la función predict: Inicializar el estado cada vez que se llama a la función predict, no durante la inicialización de ResponsesAgent. Almacenar el estado en el nivel ResponsesAgent podría filtrar información entre conversaciones y provocar conflictos porque una sola réplica ResponsesAgent podría controlar las solicitudes de varias conversaciones.

Parametrize code for deployment across environments (Parametrize code for deployment across environments)

Parametrizar el código del agente para reutilizar el mismo código de agente en distintos entornos.

Los parámetros son pares clave-valor que se definen en un diccionario de Python o en un archivo .yaml.

Para configurar el código, cree un ModelConfig mediante un diccionario de Python o un .yaml archivo. ModelConfig es un conjunto de parámetros clave-valor que permite la administración flexible de la configuración. Por ejemplo, puede usar un diccionario durante el desarrollo y, a continuación, convertirlo en un archivo .yaml para la implementación de producción y CI/CD.

A continuación se muestra un ejemplo ModelConfig:

llm_parameters:
  max_tokens: 500
  temperature: 0.01
model_serving_endpoint: databricks-meta-llama-3-3-70b-instruct
vector_search_index: ml.docs.databricks_docs_index
prompt_template: 'You are a hello world bot. Respond with a reply to the user''s
  question that indicates your prompt template came from a YAML file. Your response
  must use the word "YAML" somewhere. User''s question: {question}'
prompt_template_input_vars:
  - question

En el código del agente, puede hacer referencia a una configuración predeterminada (desarrollo) desde el archivo o diccionario .yaml:

import mlflow
# Example for loading from a .yml file
config_file = "configs/hello_world_config.yml"
model_config = mlflow.models.ModelConfig(development_config=config_file)

# Example of using a dictionary
config_dict = {
    "prompt_template": "You are a hello world bot. Respond with a reply to the user's question that is fun and interesting to the user. User's question: {question}",
    "prompt_template_input_vars": ["question"],
    "model_serving_endpoint": "databricks-meta-llama-3-3-70b-instruct",
    "llm_parameters": {"temperature": 0.01, "max_tokens": 500},
}

model_config = mlflow.models.ModelConfig(development_config=config_dict)

# Use model_config.get() to retrieve a parameter value
# You can also use model_config.to_dict() to convert the loaded config object
# into a dictionary
value = model_config.get('sample_param')

A continuación, al registrar el agente, especifique el parámetro model_config estableciéndolo en log_model para definir un conjunto personalizado de parámetros que se usará al cargar el agente registrado. Consulte la Documentación de MLflow: ModelConfig.

Uso de código sincrónico o patrones de devolución de llamada

Para garantizar la estabilidad y la compatibilidad, use código sincrónico o patrones basados en devolución de llamada en la implementación del agente.

Azure Databricks administra automáticamente la comunicación asincrónica para proporcionar una simultaneidad y un rendimiento óptimos al implementar un agente. La introducción de bucles de eventos personalizados o marcos asincrónicos puede provocar errores como RuntimeError: This event loop is already running and caused unpredictable behavior.

Azure Databricks recomienda evitar la programación asincrónica, como el uso de asyncio o la creación de bucles de eventos personalizados, al desarrollar agentes.

Pasos siguientes