Nota
L'accés a aquesta pàgina requereix autorització. Pots provar d'iniciar sessió o canviar de directori.
L'accés a aquesta pàgina requereix autorització. Pots provar de canviar directoris.
En esta página se muestra cómo crear un agente de INTELIGENCIA ARTIFICIAL en Python mediante Mosaic AI Agent Framework y bibliotecas populares de creación de agentes como LangGraph y OpenAI.
Requisitos
Sugerencia
Databricks recomienda instalar la versión más reciente del cliente de Python de MLflow al desarrollar agentes.
Para crear e implementar agentes mediante el enfoque de esta página, instale lo siguiente:
-
databricks-agents1.2.0 o superior -
mlflow3.1.3 o superior - Python 3.10 o superior.
- Use el proceso sin servidor o Databricks Runtime 13.3 LTS o superior para cumplir este requisito.
%pip install -U -qqqq databricks-agents mlflow
Databricks también recomienda instalar paquetes de integración de Databricks AI Bridge para crear agentes. Estos paquetes de integración proporcionan una capa compartida de API que interactúan con las características de inteligencia artificial de Databricks, como Databricks AI/BI Genie y Vector Search, entre marcos de creación de agentes y SDK.
OpenAI
%pip install -U -qqqq databricks-openai
LangChain/LangGraph
%pip install -U -qqqq databricks-langchain
DSPy
%pip install -U -qqqq databricks-dspy
Agentes puros de Python
%pip install -U -qqqq databricks-ai-bridge
Uso de ResponsesAgent para crear agentes
Databricks recomienda la interfaz ResponsesAgent de MLflow para crear agentes de nivel de producción.
ResponsesAgent permite crear agentes con cualquier marco de terceros y, a continuación, integrarlos con las características de Inteligencia artificial de Databricks para lograr un registro sólido, seguimiento, evaluación, implementación y funcionalidades de supervisión.
El ResponsesAgent esquema es compatible con el esquema openAI Responses . Para obtener más información sobre OpenAI, consulte OpenAIResponses: Respuestas frente a ChatCompletion.
Nota
La interfaz anterior ChatAgent sigue siendo compatible con Databricks. Sin embargo, para los nuevos agentes, Databricks recomienda usar la versión más reciente de MLflow y la ResponsesAgent interfaz.
ResponsesAgent proporciona las siguientes ventajas:
Funcionalidades avanzadas del agente
- Compatibilidad con varios agentes
- Salida de streaming: transmita la salida en fragmentos más pequeños.
- Historial completo de mensajes de llamada a herramientas: devuelve varios mensajes, incluidos los mensajes intermedios de llamada a herramientas, para mejorar la calidad y la administración de conversaciones.
- Compatibilidad con la confirmación de llamadas a herramientas
- Compatibilidad con herramientas de larga duración
Desarrollo, implementación y supervisión simplificados
-
Creación de agentes mediante cualquier marco: ajuste cualquier agente existente mediante la
ResponsesAgentinterfaz para obtener la compatibilidad integrada con AI Playground, Agent Evaluation y Agent Monitoring. - Interfaces de creación con tipo: escriba código de agente mediante clases de Python con tipo, que se benefician del IDE y la acción de autocompletar para los cuadernos.
-
Inferencia automática de firmas: MLflow deduce
ResponsesAgentautomáticamente las firmas al registrar un agente, lo que simplifica el registro y la implementación. Consulte Deducción de la firma del modelo durante el registro. -
Seguimiento automático: MLflow realiza un seguimiento automático de las
predictfunciones ypredict_stream, agregando respuestas transmitidas para facilitar la evaluación y visualización. - tablas de inferencia mejoradas por AI Gateway: las tablas de inferencia de AI Gateway se habilitan automáticamente para los agentes implementados, lo que proporciona acceso a metadatos detallados del registro de solicitudes.
-
Creación de agentes mediante cualquier marco: ajuste cualquier agente existente mediante la
Para obtener información sobre cómo crear un ResponsesAgent, consulte los ejemplos de la sección siguiente y la documentación de MLflow: ResponsesAgent for Model Serving.
ResponsesAgent Ejemplos
En los cuadernos siguientes se muestra cómo crear streaming y no streaming ResponsesAgent mediante bibliotecas populares. Para obtener información sobre cómo expandir las funcionalidades de estos agentes, consulte Herramientas del agente de IA.
OpenAI
Agente de chat simple de OpenAI mediante modelos hospedados en Databricks
Agente de llamada a herramientas de OpenAI mediante modelos hospedados en Databricks
Agente de llamada a herramientas de OpenAI mediante modelos hospedados en OpenAI
LangGraph
Agente de invocación de herramientas de LangGraph
DSPy
Agente de llamada de herramientas por turno único de DSPy
Ejemplo de varios agentes
Para obtener información sobre cómo crear un sistema multiagente, consulte Uso de Genie en sistemas multiagente.
Ejemplo de agente con estado
Para obtener información sobre cómo crear agentes con estado con memoria a corto y largo plazo mediante Lakebase como almacén de memoria, consulte Memoria del agente de IA.
Ejemplo de agente no conversacional
A diferencia de los agentes conversacionales que administran diálogos multiturno, los agentes no conversacionales se centran en ejecutar tareas claramente definidas de manera eficaz. Esta arquitectura simplificada permite un mayor rendimiento para las solicitudes independientes.
Para obtener información sobre cómo crear un agente que no sea de conversación, consulte Agentes de IA no conversacionales mediante MLflow.
¿Qué ocurre si ya tengo un agente?
Si ya tiene un agente compilado con LangChain, LangGraph o un marco similar, no es necesario volver a escribir el agente para usarlo en Databricks. En su lugar, encapsule el agente existente con la interfaz MLflow ResponsesAgent :
Escriba una clase contenedora de Python que herede de
mlflow.pyfunc.ResponsesAgent.Dentro de la clase contenedora, haga referencia al agente existente como atributo
self.agent = your_existing_agent.La
ResponsesAgentclase requiere la implementación de unpredictmétodo que devuelve paraResponsesAgentResponsecontrolar las solicitudes que no son de streaming. A continuación se muestra un ejemplo delResponsesAgentResponsesesquema:import uuid # input as a dict {"input": [{"role": "user", "content": "What did the data scientist say when their Spark job finally completed?"}]} # output example ResponsesAgentResponse( output=[ { "type": "message", "id": str(uuid.uuid4()), "content": [{"type": "output_text", "text": "Well, that really sparked joy!"}], "role": "assistant", }, ] )En la
predictfunción , convierta los mensajes entrantes deResponsesAgentRequesten el formato que espera el agente. Después de que el agente genere una respuesta, convierta su salida en unResponsesAgentResponseobjeto .
Consulte los ejemplos de código siguientes para ver cómo convertir agentes existentes en ResponsesAgent:
Conversión básica
En el caso de los agentes que no son de streaming, convierta entradas y salidas en la predict función .
from uuid import uuid4
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
)
class MyWrappedAgent(ResponsesAgent):
def __init__(self, agent):
# Reference your existing agent
self.agent = agent
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
# Convert incoming messages to your agent's format
# prep_msgs_for_llm is a function you write to convert the incoming messages
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# Call your existing agent (non-streaming)
agent_response = self.agent.invoke(messages)
# Convert your agent's output to ResponsesAgent format, assuming agent_response is a str
output_item = (self.create_text_output_item(text=agent_response, id=str(uuid4())),)
# Return the response
return ResponsesAgentResponse(output=[output_item])
Streaming con código que se vuelve a usar
En el caso de los agentes de streaming, puede ser inteligente y reutilizar la lógica para evitar duplicar el código que convierte los mensajes:
from typing import Generator
from uuid import uuid4
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
class MyWrappedStreamingAgent(ResponsesAgent):
def __init__(self, agent):
# Reference your existing agent
self.agent = agent
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Non-streaming predict: collects all streaming chunks into a single response."""
# Reuse the streaming logic and collect all output items
output_items = []
for stream_event in self.predict_stream(request):
if stream_event.type == "response.output_item.done":
output_items.append(stream_event.item)
# Return all collected items as a single response
return ResponsesAgentResponse(output=output_items)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Streaming predict: the core logic that both methods use."""
# Convert incoming messages to your agent's format
# prep_msgs_for_llm is a function you write to convert the incoming messages, included in full examples linked below
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# Stream from your existing agent
item_id = str(uuid4())
aggregated_stream = ""
for chunk in self.agent.stream(messages):
# Convert each chunk to ResponsesAgent format
yield self.create_text_delta(delta=chunk, item_id=item_id)
aggregated_stream += chunk
# Emit an aggregated output_item for all the text deltas with id=item_id
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(text=aggregated_stream, id=item_id),
)
Migración desde ChatCompletions
Si el agente existente usa la API ChatCompletions de OpenAI, puede migrarla a sin volver a ResponsesAgent escribir su lógica principal. Agregue un contenedor que:
- Convierte los mensajes entrantes
ResponsesAgentRequesten el formato que espera elChatCompletionsagente. - Traduce las
ChatCompletionssalidas en elResponsesAgentResponseesquema. - Opcionalmente, admite el streaming mediante la asignación de diferencias incrementales de
ChatCompletionsenResponsesAgentStreamEventobjetos.
from typing import Generator
from uuid import uuid4
from databricks.sdk import WorkspaceClient
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
# Legacy agent that outputs ChatCompletions objects
class LegacyAgent:
def __init__(self):
self.w = WorkspaceClient()
self.OpenAI = self.w.serving_endpoints.get_open_ai_client()
def stream(self, messages):
for chunk in self.OpenAI.chat.completions.create(
model="databricks-claude-sonnet-4-5",
messages=messages,
stream=True,
):
yield chunk.to_dict()
# Wrapper that converts the legacy agent to a ResponsesAgent
class MyWrappedStreamingAgent(ResponsesAgent):
def __init__(self, agent):
# `agent` is your existing ChatCompletions agent
self.agent = agent
def prep_msgs_for_llm(self, messages):
# dummy example of prep_msgs_for_llm
# real example of prep_msgs_for_llm included in full examples linked below
return [{"role": "user", "content": "Hello, how are you?"}]
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Non-streaming predict: collects all streaming chunks into a single response."""
# Reuse the streaming logic and collect all output items
output_items = []
for stream_event in self.predict_stream(request):
if stream_event.type == "response.output_item.done":
output_items.append(stream_event.item)
# Return all collected items as a single response
return ResponsesAgentResponse(output=output_items)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Streaming predict: the core logic that both methods use."""
# Convert incoming messages to your agent's format
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# process the ChatCompletion output stream
agent_content = ""
tool_calls = []
msg_id = None
for chunk in self.agent.stream(messages): # call the underlying agent's stream method
delta = chunk["choices"][0]["delta"]
msg_id = chunk.get("id", None)
content = delta.get("content", None)
if tc := delta.get("tool_calls"):
if not tool_calls: # only accommodate for single tool call right now
tool_calls = tc
else:
tool_calls[0]["function"]["arguments"] += tc[0]["function"]["arguments"]
elif content is not None:
agent_content += content
yield ResponsesAgentStreamEvent(**self.create_text_delta(content, item_id=msg_id))
# aggregate the streamed text content
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(agent_content, msg_id),
)
for tool_call in tool_calls:
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_function_call_item(
str(uuid4()),
tool_call["id"],
tool_call["function"]["name"],
tool_call["function"]["arguments"],
),
)
agent = MyWrappedStreamingAgent(LegacyAgent())
for chunk in agent.predict_stream(
ResponsesAgentRequest(input=[{"role": "user", "content": "Hello, how are you?"}])
):
print(chunk)
Para obtener ejemplos completos, consulte ResponsesAgent ejemplos.
Respuestas de streaming
El streaming permite a los agentes enviar respuestas en fragmentos en tiempo real en lugar de esperar a que se complete la respuesta. Para implementar el streaming con ResponsesAgent, emita una serie de eventos delta seguidos de un evento de finalización final:
-
Emitir eventos delta: envíe varios
output_text.deltaeventos con el mismoitem_idpara transmitir fragmentos de texto en tiempo real. -
Finalizar con evento terminado: envíe un evento final
response.output_item.donecon el mismoitem_idque los eventos delta que contienen el texto de salida final completo.
Cada evento delta transmite un fragmento de texto al cliente. El evento final hecho contiene el texto de respuesta completo y indica a Databricks que haga lo siguiente:
- Seguimiento de la salida del agente con el seguimiento de MLflow
- Agregar respuestas transmitidas en tablas de inferencia de AI Gateway
- Mostrar la salida completa en la interfaz de usuario del área de juegos de IA
Propagación de errores de streaming
Mosaic AI propaga los errores detectados durante el streaming con el último token en databricks_output.error. Depende del cliente que realiza la llamada manejar y mostrar adecuadamente este error.
{
"delta": …,
"databricks_output": {
"trace": {...},
"error": {
"error_code": BAD_REQUEST,
"message": "TimeoutException: Tool XYZ failed to execute."
}
}
}
Características avanzadas
entradas y salidas personalizadas
Algunos escenarios pueden requerir entradas de agente adicionales, como client_type y , session_ido salidas como vínculos de origen de recuperación que no deben incluirse en el historial de chat para futuras interacciones.
En estos escenarios, MLflow ResponsesAgent admite de forma nativa los campos custom_inputs y custom_outputs. Puede acceder a las entradas personalizadas a través request.custom_inputs de en todos los ejemplos vinculados anteriormente en ResponsesAgent Examples.
Advertencia
La aplicación de revisión de evaluación del agente no admite la representación de seguimientos para agentes con campos de entrada adicionales.
Consulte los cuadernos siguientes para obtener información sobre cómo establecer entradas y salidas personalizadas.
Proporcionar custom_inputs en ai Playground y revisar la aplicación
Si el agente acepta entradas adicionales mediante el custom_inputs campo , puede proporcionar manualmente estas entradas tanto en ai Playground como en la aplicación de revisión.
En el AI Playground o la aplicación Agent Review, seleccione el icono de engranaje
Habilite custom_inputs.
Proporcione un objeto JSON que coincida con el esquema de entrada definido del agente.
Especificar esquemas personalizados del recuperador
Los agentes de inteligencia artificial suelen usar recuperadores para buscar y consultar datos no estructurados de índices de búsqueda vectorial. Para obtener herramientas recuperadoras de ejemplo, consulte Herramientas para construir y rastrear recuperadores para datos no estructurados.
Realice un seguimiento de estos recuperadores dentro del agente con intervalos de RECUPERACIÓN de MLflow para habilitar las características del producto de Databricks, entre las que se incluyen:
- Mostrar automáticamente vínculos a documentos de origen recuperados en la interfaz de usuario de AI Playground
- Ejecución automática de la recuperación del fundamento y evaluadores de relevancia en la evaluación del agente
Nota
Databricks recomienda usar las herramientas del recuperador proporcionadas por paquetes de Databricks AI Bridge como databricks_langchain.VectorSearchRetrieverTool y databricks_openai.VectorSearchRetrieverTool porque ya se ajustan al esquema del recuperador de MLflow. Consulte Desarrollo local de herramientas de recuperación de vectores de búsqueda con AI Bridge.
Si el agente incluye intervalos del recuperador con un esquema personalizado, llame a mlflow.models.set_retriever_schema cuando defina el agente en el código. Esto asigna las columnas de salida del recuperador a los campos esperados de MLflow (primary_key, text_column, doc_uri).
import mlflow
# Define the retriever's schema by providing your column names
# For example, the following call specifies the schema of a retriever that returns a list of objects like
# [
# {
# 'document_id': '9a8292da3a9d4005a988bf0bfdd0024c',
# 'chunk_text': 'MLflow is an open-source platform, purpose-built to assist machine learning practitioners...',
# 'doc_uri': 'https://mlflow.org/docs/latest/index.html',
# 'title': 'MLflow: A Tool for Managing the Machine Learning Lifecycle'
# },
# {
# 'document_id': '7537fe93c97f4fdb9867412e9c1f9e5b',
# 'chunk_text': 'A great way to get started with MLflow is to use the autologging feature. Autologging automatically logs your model...',
# 'doc_uri': 'https://mlflow.org/docs/latest/getting-started/',
# 'title': 'Getting Started with MLflow'
# },
# ...
# ]
mlflow.models.set_retriever_schema(
# Specify the name of your retriever span
name="mlflow_docs_vector_search",
# Specify the output column name to treat as the primary key (ID) of each retrieved document
primary_key="document_id",
# Specify the output column name to treat as the text content (page content) of each retrieved document
text_column="chunk_text",
# Specify the output column name to treat as the document URI of each retrieved document
doc_uri="doc_uri",
# Specify any other columns returned by the retriever
other_columns=["title"],
)
Nota
La columna doc_uri es especialmente importante al evaluar el rendimiento del recuperador.
doc_uri es el identificador principal de los documentos devueltos por el recuperador, lo que le permite compararlos con los conjuntos de evaluación de la verdad fundamental. Consulte Conjuntos de evaluación (MLflow 2).
Consideraciones sobre la implementación
Preparación del servicio de modelos de Databricks
Databricks implementa ResponsesAgents en un entorno distribuido en Databricks Model Serving. Esto significa que durante una conversación de varios turnos, es posible que la misma réplica de servicio no controle todas las solicitudes. Preste atención a las siguientes implicaciones para administrar el estado del agente:
Evitar el almacenamiento en caché local: al implementar un
ResponsesAgent, no asuma que la misma réplica controla todas las solicitudes en una conversación multiturno. Reconstruya el estado interno mediante un esquemaResponsesAgentRequestde diccionario para cada turno.estado seguro para subprocesos: diseñe el estado del agente para que sea seguro para subprocesos, lo que impide conflictos en entornos multiproceso.
Inicializar el estado en la función
predict: Inicializar el estado cada vez que se llama a la funciónpredict, no durante la inicialización deResponsesAgent. Almacenar el estado en el nivelResponsesAgentpodría filtrar información entre conversaciones y provocar conflictos porque una sola réplicaResponsesAgentpodría controlar las solicitudes de varias conversaciones.
Parametrize code for deployment across environments (Parametrize code for deployment across environments)
Parametrizar el código del agente para reutilizar el mismo código de agente en distintos entornos.
Los parámetros son pares clave-valor que se definen en un diccionario de Python o en un archivo .yaml.
Para configurar el código, cree un ModelConfig mediante un diccionario de Python o un .yaml archivo.
ModelConfig es un conjunto de parámetros clave-valor que permite la administración flexible de la configuración. Por ejemplo, puede usar un diccionario durante el desarrollo y, a continuación, convertirlo en un archivo .yaml para la implementación de producción y CI/CD.
A continuación se muestra un ejemplo ModelConfig:
llm_parameters:
max_tokens: 500
temperature: 0.01
model_serving_endpoint: databricks-meta-llama-3-3-70b-instruct
vector_search_index: ml.docs.databricks_docs_index
prompt_template: 'You are a hello world bot. Respond with a reply to the user''s
question that indicates your prompt template came from a YAML file. Your response
must use the word "YAML" somewhere. User''s question: {question}'
prompt_template_input_vars:
- question
En el código del agente, puede hacer referencia a una configuración predeterminada (desarrollo) desde el archivo o diccionario .yaml:
import mlflow
# Example for loading from a .yml file
config_file = "configs/hello_world_config.yml"
model_config = mlflow.models.ModelConfig(development_config=config_file)
# Example of using a dictionary
config_dict = {
"prompt_template": "You are a hello world bot. Respond with a reply to the user's question that is fun and interesting to the user. User's question: {question}",
"prompt_template_input_vars": ["question"],
"model_serving_endpoint": "databricks-meta-llama-3-3-70b-instruct",
"llm_parameters": {"temperature": 0.01, "max_tokens": 500},
}
model_config = mlflow.models.ModelConfig(development_config=config_dict)
# Use model_config.get() to retrieve a parameter value
# You can also use model_config.to_dict() to convert the loaded config object
# into a dictionary
value = model_config.get('sample_param')
A continuación, al registrar el agente, especifique el parámetro model_config estableciéndolo en log_model para definir un conjunto personalizado de parámetros que se usará al cargar el agente registrado. Consulte la Documentación de MLflow: ModelConfig.
Uso de código sincrónico o patrones de devolución de llamada
Para garantizar la estabilidad y la compatibilidad, use código sincrónico o patrones basados en devolución de llamada en la implementación del agente.
Azure Databricks administra automáticamente la comunicación asincrónica para proporcionar una simultaneidad y un rendimiento óptimos al implementar un agente. La introducción de bucles de eventos personalizados o marcos asincrónicos puede provocar errores como RuntimeError: This event loop is already running and caused unpredictable behavior.
Azure Databricks recomienda evitar la programación asincrónica, como el uso de asyncio o la creación de bucles de eventos personalizados, al desarrollar agentes.