Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Esta página mostra como criar um agente de IA em Python usando o Mosaic AI Agent Framework e bibliotecas populares de criação de agentes como LangGraph e OpenAI.
Requerimentos
Sugestão
O Databricks recomenda a instalação da versão mais recente do cliente Python MLflow ao desenvolver agentes.
Para criar e implantar agentes usando a abordagem nesta página, instale o seguinte:
-
databricks-agents1.2.0 ou superior -
mlflow3.1.3 ou superior - Python 3.10 ou superior.
- Use computação sem servidor ou Databricks Runtime 13.3 LTS ou superior para atender a esse requisito.
%pip install -U -qqqq databricks-agents mlflow
A Databricks também recomenda a instalação de pacotes de integração do Databricks AI Bridge para agentes de autor. Esses pacotes de integração fornecem uma camada compartilhada de APIs que interagem com os recursos de IA do Databricks, como o Databricks AI/BI Genie e o Vetor Search, entre estruturas de criação de agentes e SDKs.
OpenAI
%pip install -U -qqqq databricks-openai
LangChain/LangGraph
%pip install -U -qqqq databricks-langchain
DSPy
%pip install -U -qqqq databricks-dspy
Agentes Python puros
%pip install -U -qqqq databricks-ai-bridge
Utilizar ResponsesAgent para criar agentes
A Databricks recomenda a interface ResponsesAgent MLflow para criar agentes de nível de produção.
ResponsesAgent permite criar agentes com qualquer estrutura de terceiros e, em seguida, integrá-la aos recursos de IA do Databricks para recursos robustos de registro, rastreamento, avaliação, implantação e monitoramento.
O ResponsesAgent esquema é compatível com o esquema OpenAI Responses . Para saber mais sobre o OpenAI Responses, consulte OpenAI: Respostas vs. ChatCompletion.
Observação
A interface mais antiga ChatAgent ainda é suportada no Databricks. No entanto, para novos agentes, o Databricks recomenda o uso da versão mais recente do MLflow e da ResponsesAgent interface.
ResponsesAgent oferece os seguintes benefícios:
Recursos avançados do agente
- Suporte a vários agentes
- Saída de streaming: transmita a saída em partes menores.
- Histórico abrangente de mensagens de chamada de ferramentas: Devolve várias mensagens, incluindo mensagens intermediárias de chamada de ferramentas, para aperfeiçoar a qualidade e o gerenciamento das conversas.
- Suporte de confirmação de invocação de ferramenta
- Suporte a ferramentas de longa duração
Desenvolvimento, implantação e monitoramento simplificados
-
Crie agentes usando qualquer estrutura: envolva qualquer agente existente usando a interface para obter compatibilidade imediata com o AI Playground, a Avaliação de Agentes e o
ResponsesAgentMonitoramento de Agentes. - Interfaces de criação tipadas: Escreva código de agente usando classes Python tipadas, beneficiando-se do IDE e do preenchimento automático do notebook.
-
Inferência automática de assinaturas: o MLflow infere
ResponsesAgentautomaticamente assinaturas ao registrar um agente, simplificando o registro e a implantação. Consulte Infer Model Signature durante o registo. -
Rastreamento automático: o MLflow rastreia automaticamente suas
predictfunçõespredict_stream, agregando respostas transmitidas para facilitar a avaliação e exibição. - tabelas de inferência aprimoradas pelo AI Gateway: as tabelas de inferência do AI Gateway são ativadas automaticamente para agentes implantados, fornecendo acesso a metadados detalhados do log de solicitações.
-
Crie agentes usando qualquer estrutura: envolva qualquer agente existente usando a interface para obter compatibilidade imediata com o AI Playground, a Avaliação de Agentes e o
Para saber como criar um ResponsesAgent, consulte os exemplos na seção a seguir e a documentação do MLflow - ResponsesAgent for Model Serving.
ResponsesAgent Exemplos
Os blocos de anotações a seguir mostram como criar streaming e não streaming ResponsesAgent usando bibliotecas populares. Para saber como expandir os recursos desses agentes, consulte Ferramentas de agente de IA.
OpenAI
Agente de chat simples OpenAI usando modelos hospedados pelo Databricks
Agente de chamada de ferramentas OpenAI usando modelos hospedados pelo Databricks
Agente de chamada de ferramentas OpenAI usando modelos hospedados pela OpenAI
LangGraph
Agente de chamada de ferramentas LangGraph
DSPy
Agente de chamada de ferramentas DSPy de turno único
Exemplo de multiagente
Para saber como criar um sistema multiagente, consulte Usar o Genie em sistemas multiagentes.
Exemplo do agente com estado
Para aprender a criar agentes com estado com memória de curto e longo prazo usando o Lakebase como armazenamento de memória, veja Memória de agentes de IA.
Exemplo de agente não conversacional
Ao contrário dos agentes conversacionais que gerem diálogos com múltiplos turnos, os agentes não conversacionais focam-se em executar tarefas bem definidas de forma eficiente. Esta arquitetura simplificada permite uma maior taxa de transferência para pedidos independentes.
Para aprender a criar um agente não conversacional, veja Agentes de IA não conversacionais usando MLflow.
E se eu já tiver um agente?
Se você já tiver um agente criado com LangChain, LangGraph ou uma estrutura semelhante, não precisará reescrever seu agente para usá-lo no Databricks. Em vez disso, basta envolver seu agente existente com a interface MLflow ResponsesAgent :
Escreva uma classe de wrapper Python que herda do
mlflow.pyfunc.ResponsesAgent.Dentro da classe wrapper, faça referência ao agente existente como um atributo
self.agent = your_existing_agent.A
ResponsesAgentclasse requer a implementação de umpredictmétodo que retorna umResponsesAgentResponsepara lidar com solicitações que não são de streaming. Segue-se um exemplo doResponsesAgentResponsesesquema:import uuid # input as a dict {"input": [{"role": "user", "content": "What did the data scientist say when their Spark job finally completed?"}]} # output example ResponsesAgentResponse( output=[ { "type": "message", "id": str(uuid.uuid4()), "content": [{"type": "output_text", "text": "Well, that really sparked joy!"}], "role": "assistant", }, ] )predictNa função, converta as mensagens recebidas no formato esperado peloResponsesAgentRequestagente. Depois que o agente gerar uma resposta, converta sua saída em umResponsesAgentResponseobjeto.
Consulte os exemplos de código a seguir para ver como converter agentes existentes em ResponsesAgent:
Conversão básica
Para agentes que não são de streaming, converta entradas e saídas na predict função.
from uuid import uuid4
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
)
class MyWrappedAgent(ResponsesAgent):
def __init__(self, agent):
# Reference your existing agent
self.agent = agent
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
# Convert incoming messages to your agent's format
# prep_msgs_for_llm is a function you write to convert the incoming messages
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# Call your existing agent (non-streaming)
agent_response = self.agent.invoke(messages)
# Convert your agent's output to ResponsesAgent format, assuming agent_response is a str
output_item = (self.create_text_output_item(text=agent_response, id=str(uuid4())),)
# Return the response
return ResponsesAgentResponse(output=[output_item])
Streaming com reutilização de código
Para agentes de streaming, você pode ser inteligente e reutilizar a lógica para evitar a duplicação do código que converte mensagens:
from typing import Generator
from uuid import uuid4
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
class MyWrappedStreamingAgent(ResponsesAgent):
def __init__(self, agent):
# Reference your existing agent
self.agent = agent
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Non-streaming predict: collects all streaming chunks into a single response."""
# Reuse the streaming logic and collect all output items
output_items = []
for stream_event in self.predict_stream(request):
if stream_event.type == "response.output_item.done":
output_items.append(stream_event.item)
# Return all collected items as a single response
return ResponsesAgentResponse(output=output_items)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Streaming predict: the core logic that both methods use."""
# Convert incoming messages to your agent's format
# prep_msgs_for_llm is a function you write to convert the incoming messages, included in full examples linked below
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# Stream from your existing agent
item_id = str(uuid4())
aggregated_stream = ""
for chunk in self.agent.stream(messages):
# Convert each chunk to ResponsesAgent format
yield self.create_text_delta(delta=chunk, item_id=item_id)
aggregated_stream += chunk
# Emit an aggregated output_item for all the text deltas with id=item_id
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(text=aggregated_stream, id=item_id),
)
Migrar do ChatCompletions
Se seu agente existente usa a API OpenAI ChatCompletions, você pode migrá-lo para ResponsesAgent sem reescrever sua lógica principal. Adicione um invólucro que:
- Converte as mensagens recebidas
ResponsesAgentRequestpara o formato esperado peloChatCompletionsagente. - Traduz saídas
ChatCompletionspara oResponsesAgentResponseesquema. - Opcionalmente, suporta streaming mapeando deltas incrementais de
ChatCompletionsemResponsesAgentStreamEventobjetos.
from typing import Generator
from uuid import uuid4
from databricks.sdk import WorkspaceClient
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
# Legacy agent that outputs ChatCompletions objects
class LegacyAgent:
def __init__(self):
self.w = WorkspaceClient()
self.OpenAI = self.w.serving_endpoints.get_open_ai_client()
def stream(self, messages):
for chunk in self.OpenAI.chat.completions.create(
model="databricks-claude-sonnet-4-5",
messages=messages,
stream=True,
):
yield chunk.to_dict()
# Wrapper that converts the legacy agent to a ResponsesAgent
class MyWrappedStreamingAgent(ResponsesAgent):
def __init__(self, agent):
# `agent` is your existing ChatCompletions agent
self.agent = agent
def prep_msgs_for_llm(self, messages):
# dummy example of prep_msgs_for_llm
# real example of prep_msgs_for_llm included in full examples linked below
return [{"role": "user", "content": "Hello, how are you?"}]
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Non-streaming predict: collects all streaming chunks into a single response."""
# Reuse the streaming logic and collect all output items
output_items = []
for stream_event in self.predict_stream(request):
if stream_event.type == "response.output_item.done":
output_items.append(stream_event.item)
# Return all collected items as a single response
return ResponsesAgentResponse(output=output_items)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Streaming predict: the core logic that both methods use."""
# Convert incoming messages to your agent's format
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# process the ChatCompletion output stream
agent_content = ""
tool_calls = []
msg_id = None
for chunk in self.agent.stream(messages): # call the underlying agent's stream method
delta = chunk["choices"][0]["delta"]
msg_id = chunk.get("id", None)
content = delta.get("content", None)
if tc := delta.get("tool_calls"):
if not tool_calls: # only accommodate for single tool call right now
tool_calls = tc
else:
tool_calls[0]["function"]["arguments"] += tc[0]["function"]["arguments"]
elif content is not None:
agent_content += content
yield ResponsesAgentStreamEvent(**self.create_text_delta(content, item_id=msg_id))
# aggregate the streamed text content
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(agent_content, msg_id),
)
for tool_call in tool_calls:
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_function_call_item(
str(uuid4()),
tool_call["id"],
tool_call["function"]["name"],
tool_call["function"]["arguments"],
),
)
agent = MyWrappedStreamingAgent(LegacyAgent())
for chunk in agent.predict_stream(
ResponsesAgentRequest(input=[{"role": "user", "content": "Hello, how are you?"}])
):
print(chunk)
Para obter exemplos completos, consulte ResponsesAgent exemplos.
Respostas de streaming
O streaming permite que os agentes enviem respostas em partes em tempo real, em vez de esperar pela resposta completa. Para implementar o streaming com ResponsesAgento , emita uma série de eventos delta seguidos por um evento de conclusão final:
-
Emitir eventos delta: envie vários
output_text.deltaeventos com o mesmoitem_idpara transmitir blocos de texto em tempo real. -
Terminar com evento concluído: envie um evento final
response.output_item.donecom o mesmoitem_idque os eventos delta contendo o texto de saída final completo.
Cada evento delta transmite um pedaço de texto para o cliente. O evento final done contém o texto de resposta completo e sinaliza o Databricks para fazer o seguinte:
- Rastreie a saída do seu agente com o rastreamento MLflow
- Agregar respostas transmitidas em fluxo nas tabelas de inferência do AI Gateway
- Mostrar a saída completa na interface do usuário do AI Playground
Propagação de erros de streaming
O Mosaic AI propaga quaisquer erros encontrados durante o streaming com o último token em databricks_output.error. Cabe ao cliente que faz a chamada lidar adequadamente e tratar esse erro.
{
"delta": …,
"databricks_output": {
"trace": {...},
"error": {
"error_code": BAD_REQUEST,
"message": "TimeoutException: Tool XYZ failed to execute."
}
}
}
Funcionalidades avançadas
Entradas e saídas personalizadas
Alguns cenários podem exigir entradas adicionais do agente, como client_type e session_id, ou saídas, como links de origem de recuperação, que não devem ser incluídos no histórico de bate-papo para interações futuras.
Para esses cenários, o MLflow ResponsesAgent suporta nativamente os campos custom_inputs e custom_outputs. Você pode acessar as entradas personalizadas por meio de request.custom_inputs todos os exemplos vinculados acima em Exemplos de ResponsesAgent.
Advertência
O aplicativo de revisão Avaliação do Agente não oferece suporte à renderização de rastreamentos para agentes com campos de entrada adicionais.
Consulte os blocos de notas seguintes para saber como definir entradas e saídas personalizadas.
Forneça custom_inputs no AI Playground e revise o aplicativo
Se o seu agente aceitar entradas adicionais usando o custom_inputs campo, você poderá fornecer manualmente essas entradas no AI Playground e no aplicativo de revisão.
No AI Playground ou na aplicação Agent Review, selecione o ícone de engrenagem
Habilite custom_inputs.
Forneça um objeto JSON que corresponda ao esquema de entrada definido pelo agente.
Especificar esquemas de recuperador personalizados
Os agentes de IA geralmente usam retrievers para encontrar e consultar dados não estruturados de índices de pesquisa vetorial. Por exemplo, ferramentas de recuperação de dados, consulte Desenvolver e rastrear ferramentas de recuperação para dados não estruturados.
Rastreie esses retrievers dentro do seu agente com as spans MLflow RETRIEVER para habilitar as funcionalidades do produto Databricks, incluindo:
- Exibição automática de links para documentos de origem recuperados na interface do usuário do AI Playground
- Execução automática de julgamentos automatizados de fundamentação na recuperação e relevância na Avaliação de Agentes
Observação
O Databricks recomenda o uso de ferramentas de recuperação fornecidas pelos pacotes Databricks AI Bridge, como databricks_langchain.VectorSearchRetrieverTool e databricks_openai.VectorSearchRetrieverTool, porque eles já estão em conformidade com o esquema MLflow retriever. Veja como desenvolver localmente ferramentas de pesquisa vetorial com o AI Bridge.
Se o agente incluir extensões de retriever com um esquema personalizado, chame mlflow.models.set_retriever_schema quando definir o agente no código. Isso mapeia as colunas de saída do seu retriever para os campos esperados do MLflow (primary_key, text_column, doc_uri).
import mlflow
# Define the retriever's schema by providing your column names
# For example, the following call specifies the schema of a retriever that returns a list of objects like
# [
# {
# 'document_id': '9a8292da3a9d4005a988bf0bfdd0024c',
# 'chunk_text': 'MLflow is an open-source platform, purpose-built to assist machine learning practitioners...',
# 'doc_uri': 'https://mlflow.org/docs/latest/index.html',
# 'title': 'MLflow: A Tool for Managing the Machine Learning Lifecycle'
# },
# {
# 'document_id': '7537fe93c97f4fdb9867412e9c1f9e5b',
# 'chunk_text': 'A great way to get started with MLflow is to use the autologging feature. Autologging automatically logs your model...',
# 'doc_uri': 'https://mlflow.org/docs/latest/getting-started/',
# 'title': 'Getting Started with MLflow'
# },
# ...
# ]
mlflow.models.set_retriever_schema(
# Specify the name of your retriever span
name="mlflow_docs_vector_search",
# Specify the output column name to treat as the primary key (ID) of each retrieved document
primary_key="document_id",
# Specify the output column name to treat as the text content (page content) of each retrieved document
text_column="chunk_text",
# Specify the output column name to treat as the document URI of each retrieved document
doc_uri="doc_uri",
# Specify any other columns returned by the retriever
other_columns=["title"],
)
Observação
A doc_uri coluna é especialmente importante ao avaliar o desempenho do retriever.
doc_uri é o identificador principal dos documentos devolvidos pelo recuperador, permitindo-lhe compará-los com conjuntos de avaliação das verdades de referência. Consulte Conjuntos de avaliação (MLflow 2).
Considerações sobre implantação
Prepare-se para o serviço do modelo Databricks
O Databricks implanta ResponsesAgents em um ambiente distribuído no Databricks Model Serving. Isso significa que, durante uma conversa de vários turnos, a mesma réplica de serviço pode não lidar com todas as solicitações. Preste atenção às seguintes implicações para o estado do agente de gerenciamento:
Evite o cache local: ao implantar um
ResponsesAgent, não assuma que a mesma réplica lida com todas as solicitações em uma conversa de várias voltas. Reconstrua o estado interno utilizando um esquema de dicionárioResponsesAgentRequestem cada turno.Estado seguro para threads: Projete o estado do agente para ser seguro para threads, evitando conflitos em ambientes multi-thread.
Início de estado na função
predict: Início de estado sempre que a funçãopredicté chamada, não durante aResponsesAgentinicialização. Armazenar o estado ao nívelResponsesAgentpode vazar informações entre conversas e causar conflitos, pois uma única réplica deResponsesAgentpode processar pedidos de várias conversas.
Parametrizar código para implantação em ambientes
Parametrize o código do agente para reutilizar o mesmo código do agente em ambientes diferentes.
Os parâmetros são pares chave-valor que você define em um dicionário Python ou em um arquivo .yaml.
Para configurar o código, crie um ModelConfig usando um dicionário Python ou um .yaml arquivo.
ModelConfig é um conjunto de parâmetros chave-valor que permite um gerenciamento de configuração flexível. Por exemplo, você pode usar um dicionário durante o desenvolvimento e, em seguida, convertê-lo em um arquivo de .yaml para implantação de produção e CI/CD.
Um exemplo ModelConfig é mostrado abaixo:
llm_parameters:
max_tokens: 500
temperature: 0.01
model_serving_endpoint: databricks-meta-llama-3-3-70b-instruct
vector_search_index: ml.docs.databricks_docs_index
prompt_template: 'You are a hello world bot. Respond with a reply to the user''s
question that indicates your prompt template came from a YAML file. Your response
must use the word "YAML" somewhere. User''s question: {question}'
prompt_template_input_vars:
- question
No código do agente, você pode fazer referência a uma configuração padrão (de desenvolvimento) a partir do arquivo .yaml ou dicionário:
import mlflow
# Example for loading from a .yml file
config_file = "configs/hello_world_config.yml"
model_config = mlflow.models.ModelConfig(development_config=config_file)
# Example of using a dictionary
config_dict = {
"prompt_template": "You are a hello world bot. Respond with a reply to the user's question that is fun and interesting to the user. User's question: {question}",
"prompt_template_input_vars": ["question"],
"model_serving_endpoint": "databricks-meta-llama-3-3-70b-instruct",
"llm_parameters": {"temperature": 0.01, "max_tokens": 500},
}
model_config = mlflow.models.ModelConfig(development_config=config_dict)
# Use model_config.get() to retrieve a parameter value
# You can also use model_config.to_dict() to convert the loaded config object
# into a dictionary
value = model_config.get('sample_param')
Em seguida, ao registrar seu agente, especifique o parâmetro model_config para log_model para especificar um conjunto personalizado de parâmetros a serem usados ao carregar o agente registrado. Consulte a documentação do MLflow - ModelConfig.
Usar código síncrono ou padrões de retorno de chamada
Para garantir estabilidade e compatibilidade, use código síncrono ou padrões baseados em retorno de chamada na implementação do agente.
O Azure Databricks gerencia automaticamente a comunicação assíncrona para fornecer simultaneidade e desempenho ideais quando você implanta um agente. A introdução de loops de eventos personalizados ou estruturas assíncronas pode levar a erros como RuntimeError: This event loop is already running and caused unpredictable behavior.
O Azure Databricks recomenda evitar a programação assíncrona, como usar asyncio ou criar loops de eventos personalizados, ao desenvolver agentes.