Partilhar via


Crie agentes de IA em código

Esta página mostra como criar um agente de IA em Python usando o Mosaic AI Agent Framework e bibliotecas populares de criação de agentes como LangGraph e OpenAI.

Requerimentos

Sugestão

O Databricks recomenda a instalação da versão mais recente do cliente Python MLflow ao desenvolver agentes.

Para criar e implantar agentes usando a abordagem nesta página, instale o seguinte:

  • databricks-agents 1.2.0 ou superior
  • mlflow 3.1.3 ou superior
  • Python 3.10 ou superior.
    • Use computação sem servidor ou Databricks Runtime 13.3 LTS ou superior para atender a esse requisito.
%pip install -U -qqqq databricks-agents mlflow

A Databricks também recomenda a instalação de pacotes de integração do Databricks AI Bridge para agentes de autor. Esses pacotes de integração fornecem uma camada compartilhada de APIs que interagem com os recursos de IA do Databricks, como o Databricks AI/BI Genie e o Vetor Search, entre estruturas de criação de agentes e SDKs.

OpenAI

%pip install -U -qqqq databricks-openai

LangChain/LangGraph

%pip install -U -qqqq databricks-langchain

DSPy

%pip install -U -qqqq databricks-dspy

Agentes Python puros

%pip install -U -qqqq databricks-ai-bridge

Utilizar ResponsesAgent para criar agentes

A Databricks recomenda a interface ResponsesAgent MLflow para criar agentes de nível de produção. ResponsesAgent permite criar agentes com qualquer estrutura de terceiros e, em seguida, integrá-la aos recursos de IA do Databricks para recursos robustos de registro, rastreamento, avaliação, implantação e monitoramento.

O ResponsesAgent esquema é compatível com o esquema OpenAI Responses . Para saber mais sobre o OpenAI Responses, consulte OpenAI: Respostas vs. ChatCompletion.

Observação

A interface mais antiga ChatAgent ainda é suportada no Databricks. No entanto, para novos agentes, o Databricks recomenda o uso da versão mais recente do MLflow e da ResponsesAgent interface.

Consulte Esquema de agente de entrada e saída herdado.

O ResponsesAgent encapsula facilmente agentes existentes para compatibilidade com Databricks.

ResponsesAgent oferece os seguintes benefícios:

  • Recursos avançados do agente

    • Suporte a vários agentes
    • Saída de streaming: transmita a saída em partes menores.
    • Histórico abrangente de mensagens de chamada de ferramentas: Devolve várias mensagens, incluindo mensagens intermediárias de chamada de ferramentas, para aperfeiçoar a qualidade e o gerenciamento das conversas.
    • Suporte de confirmação de invocação de ferramenta
    • Suporte a ferramentas de longa duração
  • Desenvolvimento, implantação e monitoramento simplificados

    • Crie agentes usando qualquer estrutura: envolva qualquer agente existente usando a interface para obter compatibilidade imediata com o AI Playground, a Avaliação de Agentes e o ResponsesAgent Monitoramento de Agentes.
    • Interfaces de criação tipadas: Escreva código de agente usando classes Python tipadas, beneficiando-se do IDE e do preenchimento automático do notebook.
    • Inferência automática de assinaturas: o MLflow infere ResponsesAgent automaticamente assinaturas ao registrar um agente, simplificando o registro e a implantação. Consulte Infer Model Signature durante o registo.
    • Rastreamento automático: o MLflow rastreia automaticamente suas predict funções predict_stream , agregando respostas transmitidas para facilitar a avaliação e exibição.
    • tabelas de inferência aprimoradas pelo AI Gateway: as tabelas de inferência do AI Gateway são ativadas automaticamente para agentes implantados, fornecendo acesso a metadados detalhados do log de solicitações.

Para saber como criar um ResponsesAgent, consulte os exemplos na seção a seguir e a documentação do MLflow - ResponsesAgent for Model Serving.

ResponsesAgent Exemplos

Os blocos de anotações a seguir mostram como criar streaming e não streaming ResponsesAgent usando bibliotecas populares. Para saber como expandir os recursos desses agentes, consulte Ferramentas de agente de IA.

OpenAI

Agente de chat simples OpenAI usando modelos hospedados pelo Databricks

Obter portátil

Agente de chamada de ferramentas OpenAI usando modelos hospedados pelo Databricks

Obter portátil

Agente de chamada de ferramentas OpenAI usando modelos hospedados pela OpenAI

Obter portátil

LangGraph

Agente de chamada de ferramentas LangGraph

Obter portátil

DSPy

Agente de chamada de ferramentas DSPy de turno único

Obter portátil

Exemplo de multiagente

Para saber como criar um sistema multiagente, consulte Usar o Genie em sistemas multiagentes.

Exemplo do agente com estado

Para aprender a criar agentes com estado com memória de curto e longo prazo usando o Lakebase como armazenamento de memória, veja Memória de agentes de IA.

Exemplo de agente não conversacional

Ao contrário dos agentes conversacionais que gerem diálogos com múltiplos turnos, os agentes não conversacionais focam-se em executar tarefas bem definidas de forma eficiente. Esta arquitetura simplificada permite uma maior taxa de transferência para pedidos independentes.

Para aprender a criar um agente não conversacional, veja Agentes de IA não conversacionais usando MLflow.

E se eu já tiver um agente?

Se você já tiver um agente criado com LangChain, LangGraph ou uma estrutura semelhante, não precisará reescrever seu agente para usá-lo no Databricks. Em vez disso, basta envolver seu agente existente com a interface MLflow ResponsesAgent :

  1. Escreva uma classe de wrapper Python que herda do mlflow.pyfunc.ResponsesAgent.

    Dentro da classe wrapper, faça referência ao agente existente como um atributo self.agent = your_existing_agent.

  2. A ResponsesAgent classe requer a implementação de um predict método que retorna um ResponsesAgentResponse para lidar com solicitações que não são de streaming. Segue-se um exemplo do ResponsesAgentResponses esquema:

    import uuid
    # input as a dict
    {"input": [{"role": "user", "content": "What did the data scientist say when their Spark job finally completed?"}]}
    
    # output example
    ResponsesAgentResponse(
        output=[
            {
                "type": "message",
                "id": str(uuid.uuid4()),
                "content": [{"type": "output_text", "text": "Well, that really sparked joy!"}],
                "role": "assistant",
            },
        ]
    )
    
  3. predict Na função, converta as mensagens recebidas no formato esperado pelo ResponsesAgentRequest agente. Depois que o agente gerar uma resposta, converta sua saída em um ResponsesAgentResponse objeto.

Consulte os exemplos de código a seguir para ver como converter agentes existentes em ResponsesAgent:

Conversão básica

Para agentes que não são de streaming, converta entradas e saídas na predict função.

from uuid import uuid4

from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
)


class MyWrappedAgent(ResponsesAgent):
    def __init__(self, agent):
        # Reference your existing agent
        self.agent = agent

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        # Convert incoming messages to your agent's format
        # prep_msgs_for_llm is a function you write to convert the incoming messages
        messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])

        # Call your existing agent (non-streaming)
        agent_response = self.agent.invoke(messages)

        # Convert your agent's output to ResponsesAgent format, assuming agent_response is a str
        output_item = (self.create_text_output_item(text=agent_response, id=str(uuid4())),)

        # Return the response
        return ResponsesAgentResponse(output=[output_item])

Streaming com reutilização de código

Para agentes de streaming, você pode ser inteligente e reutilizar a lógica para evitar a duplicação do código que converte mensagens:

from typing import Generator
from uuid import uuid4

from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)


class MyWrappedStreamingAgent(ResponsesAgent):
    def __init__(self, agent):
        # Reference your existing agent
        self.agent = agent

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        """Non-streaming predict: collects all streaming chunks into a single response."""
        # Reuse the streaming logic and collect all output items
        output_items = []
        for stream_event in self.predict_stream(request):
            if stream_event.type == "response.output_item.done":
                output_items.append(stream_event.item)

        # Return all collected items as a single response
        return ResponsesAgentResponse(output=output_items)

    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        """Streaming predict: the core logic that both methods use."""
        # Convert incoming messages to your agent's format
        # prep_msgs_for_llm is a function you write to convert the incoming messages, included in full examples linked below
        messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])

        # Stream from your existing agent
        item_id = str(uuid4())
        aggregated_stream = ""
        for chunk in self.agent.stream(messages):
            # Convert each chunk to ResponsesAgent format
            yield self.create_text_delta(delta=chunk, item_id=item_id)
            aggregated_stream += chunk

        # Emit an aggregated output_item for all the text deltas with id=item_id
        yield ResponsesAgentStreamEvent(
            type="response.output_item.done",
            item=self.create_text_output_item(text=aggregated_stream, id=item_id),
        )

Migrar do ChatCompletions

Se seu agente existente usa a API OpenAI ChatCompletions, você pode migrá-lo para ResponsesAgent sem reescrever sua lógica principal. Adicione um invólucro que:

  1. Converte as mensagens recebidas ResponsesAgentRequest para o formato esperado pelo ChatCompletions agente.
  2. Traduz saídas ChatCompletions para o ResponsesAgentResponse esquema.
  3. Opcionalmente, suporta streaming mapeando deltas incrementais de ChatCompletions em ResponsesAgentStreamEvent objetos.
from typing import Generator
from uuid import uuid4

from databricks.sdk import WorkspaceClient
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)


# Legacy agent that outputs ChatCompletions objects
class LegacyAgent:
    def __init__(self):
        self.w = WorkspaceClient()
        self.OpenAI = self.w.serving_endpoints.get_open_ai_client()

    def stream(self, messages):
        for chunk in self.OpenAI.chat.completions.create(
            model="databricks-claude-sonnet-4-5",
            messages=messages,
            stream=True,
        ):
            yield chunk.to_dict()


# Wrapper that converts the legacy agent to a ResponsesAgent
class MyWrappedStreamingAgent(ResponsesAgent):
    def __init__(self, agent):
        # `agent` is your existing ChatCompletions agent
        self.agent = agent

    def prep_msgs_for_llm(self, messages):
        # dummy example of prep_msgs_for_llm
        # real example of prep_msgs_for_llm included in full examples linked below
        return [{"role": "user", "content": "Hello, how are you?"}]

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        """Non-streaming predict: collects all streaming chunks into a single response."""
        # Reuse the streaming logic and collect all output items
        output_items = []
        for stream_event in self.predict_stream(request):
            if stream_event.type == "response.output_item.done":
                output_items.append(stream_event.item)

        # Return all collected items as a single response
        return ResponsesAgentResponse(output=output_items)

    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        """Streaming predict: the core logic that both methods use."""
        # Convert incoming messages to your agent's format
        messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])

        # process the ChatCompletion output stream
        agent_content = ""
        tool_calls = []
        msg_id = None
        for chunk in self.agent.stream(messages):  # call the underlying agent's stream method
            delta = chunk["choices"][0]["delta"]
            msg_id = chunk.get("id", None)
            content = delta.get("content", None)
            if tc := delta.get("tool_calls"):
                if not tool_calls:  # only accommodate for single tool call right now
                    tool_calls = tc
                else:
                    tool_calls[0]["function"]["arguments"] += tc[0]["function"]["arguments"]
            elif content is not None:
                agent_content += content
                yield ResponsesAgentStreamEvent(**self.create_text_delta(content, item_id=msg_id))

        # aggregate the streamed text content
        yield ResponsesAgentStreamEvent(
            type="response.output_item.done",
            item=self.create_text_output_item(agent_content, msg_id),
        )

        for tool_call in tool_calls:
            yield ResponsesAgentStreamEvent(
                type="response.output_item.done",
                item=self.create_function_call_item(
                    str(uuid4()),
                    tool_call["id"],
                    tool_call["function"]["name"],
                    tool_call["function"]["arguments"],
                ),
            )


agent = MyWrappedStreamingAgent(LegacyAgent())

for chunk in agent.predict_stream(
    ResponsesAgentRequest(input=[{"role": "user", "content": "Hello, how are you?"}])
):
    print(chunk)

Para obter exemplos completos, consulte ResponsesAgent exemplos.

Respostas de streaming

O streaming permite que os agentes enviem respostas em partes em tempo real, em vez de esperar pela resposta completa. Para implementar o streaming com ResponsesAgento , emita uma série de eventos delta seguidos por um evento de conclusão final:

  1. Emitir eventos delta: envie vários output_text.delta eventos com o mesmo item_id para transmitir blocos de texto em tempo real.
  2. Terminar com evento concluído: envie um evento final response.output_item.done com o mesmo item_id que os eventos delta contendo o texto de saída final completo.

Cada evento delta transmite um pedaço de texto para o cliente. O evento final done contém o texto de resposta completo e sinaliza o Databricks para fazer o seguinte:

  • Rastreie a saída do seu agente com o rastreamento MLflow
  • Agregar respostas transmitidas em fluxo nas tabelas de inferência do AI Gateway
  • Mostrar a saída completa na interface do usuário do AI Playground

Propagação de erros de streaming

O Mosaic AI propaga quaisquer erros encontrados durante o streaming com o último token em databricks_output.error. Cabe ao cliente que faz a chamada lidar adequadamente e tratar esse erro.

{
  "delta": …,
  "databricks_output": {
    "trace": {...},
    "error": {
      "error_code": BAD_REQUEST,
      "message": "TimeoutException: Tool XYZ failed to execute."
    }
  }
}

Funcionalidades avançadas

Entradas e saídas personalizadas

Alguns cenários podem exigir entradas adicionais do agente, como client_type e session_id, ou saídas, como links de origem de recuperação, que não devem ser incluídos no histórico de bate-papo para interações futuras.

Para esses cenários, o MLflow ResponsesAgent suporta nativamente os campos custom_inputs e custom_outputs. Você pode acessar as entradas personalizadas por meio de request.custom_inputs todos os exemplos vinculados acima em Exemplos de ResponsesAgent.

Advertência

O aplicativo de revisão Avaliação do Agente não oferece suporte à renderização de rastreamentos para agentes com campos de entrada adicionais.

Consulte os blocos de notas seguintes para saber como definir entradas e saídas personalizadas.

Forneça custom_inputs no AI Playground e revise o aplicativo

Se o seu agente aceitar entradas adicionais usando o custom_inputs campo, você poderá fornecer manualmente essas entradas no AI Playground e no aplicativo de revisão.

  1. No AI Playground ou na aplicação Agent Review, selecione o ícone de engrenagem Ícone de engrenagem.

  2. Habilite custom_inputs.

  3. Forneça um objeto JSON que corresponda ao esquema de entrada definido pelo agente.

    Forneça entradas personalizadas na área de experimentação de IA.

Especificar esquemas de recuperador personalizados

Os agentes de IA geralmente usam retrievers para encontrar e consultar dados não estruturados de índices de pesquisa vetorial. Por exemplo, ferramentas de recuperação de dados, consulte Desenvolver e rastrear ferramentas de recuperação para dados não estruturados.

Rastreie esses retrievers dentro do seu agente com as spans MLflow RETRIEVER para habilitar as funcionalidades do produto Databricks, incluindo:

  • Exibição automática de links para documentos de origem recuperados na interface do usuário do AI Playground
  • Execução automática de julgamentos automatizados de fundamentação na recuperação e relevância na Avaliação de Agentes

Observação

O Databricks recomenda o uso de ferramentas de recuperação fornecidas pelos pacotes Databricks AI Bridge, como databricks_langchain.VectorSearchRetrieverTool e databricks_openai.VectorSearchRetrieverTool, porque eles já estão em conformidade com o esquema MLflow retriever. Veja como desenvolver localmente ferramentas de pesquisa vetorial com o AI Bridge.

Se o agente incluir extensões de retriever com um esquema personalizado, chame mlflow.models.set_retriever_schema quando definir o agente no código. Isso mapeia as colunas de saída do seu retriever para os campos esperados do MLflow (primary_key, text_column, doc_uri).

import mlflow
# Define the retriever's schema by providing your column names
# For example, the following call specifies the schema of a retriever that returns a list of objects like
# [
#     {
#         'document_id': '9a8292da3a9d4005a988bf0bfdd0024c',
#         'chunk_text': 'MLflow is an open-source platform, purpose-built to assist machine learning practitioners...',
#         'doc_uri': 'https://mlflow.org/docs/latest/index.html',
#         'title': 'MLflow: A Tool for Managing the Machine Learning Lifecycle'
#     },
#     {
#         'document_id': '7537fe93c97f4fdb9867412e9c1f9e5b',
#         'chunk_text': 'A great way to get started with MLflow is to use the autologging feature. Autologging automatically logs your model...',
#         'doc_uri': 'https://mlflow.org/docs/latest/getting-started/',
#         'title': 'Getting Started with MLflow'
#     },
# ...
# ]
mlflow.models.set_retriever_schema(
    # Specify the name of your retriever span
    name="mlflow_docs_vector_search",
    # Specify the output column name to treat as the primary key (ID) of each retrieved document
    primary_key="document_id",
    # Specify the output column name to treat as the text content (page content) of each retrieved document
    text_column="chunk_text",
    # Specify the output column name to treat as the document URI of each retrieved document
    doc_uri="doc_uri",
    # Specify any other columns returned by the retriever
    other_columns=["title"],
)

Observação

A doc_uri coluna é especialmente importante ao avaliar o desempenho do retriever. doc_uri é o identificador principal dos documentos devolvidos pelo recuperador, permitindo-lhe compará-los com conjuntos de avaliação das verdades de referência. Consulte Conjuntos de avaliação (MLflow 2).

Considerações sobre implantação

Prepare-se para o serviço do modelo Databricks

O Databricks implanta ResponsesAgents em um ambiente distribuído no Databricks Model Serving. Isso significa que, durante uma conversa de vários turnos, a mesma réplica de serviço pode não lidar com todas as solicitações. Preste atenção às seguintes implicações para o estado do agente de gerenciamento:

  • Evite o cache local: ao implantar um ResponsesAgent, não assuma que a mesma réplica lida com todas as solicitações em uma conversa de várias voltas. Reconstrua o estado interno utilizando um esquema de dicionário ResponsesAgentRequest em cada turno.

  • Estado seguro para threads: Projete o estado do agente para ser seguro para threads, evitando conflitos em ambientes multi-thread.

  • Início de estado na função predict: Início de estado sempre que a função predict é chamada, não durante a ResponsesAgent inicialização. Armazenar o estado ao nível ResponsesAgent pode vazar informações entre conversas e causar conflitos, pois uma única réplica de ResponsesAgent pode processar pedidos de várias conversas.

Parametrizar código para implantação em ambientes

Parametrize o código do agente para reutilizar o mesmo código do agente em ambientes diferentes.

Os parâmetros são pares chave-valor que você define em um dicionário Python ou em um arquivo .yaml.

Para configurar o código, crie um ModelConfig usando um dicionário Python ou um .yaml arquivo. ModelConfig é um conjunto de parâmetros chave-valor que permite um gerenciamento de configuração flexível. Por exemplo, você pode usar um dicionário durante o desenvolvimento e, em seguida, convertê-lo em um arquivo de .yaml para implantação de produção e CI/CD.

Um exemplo ModelConfig é mostrado abaixo:

llm_parameters:
  max_tokens: 500
  temperature: 0.01
model_serving_endpoint: databricks-meta-llama-3-3-70b-instruct
vector_search_index: ml.docs.databricks_docs_index
prompt_template: 'You are a hello world bot. Respond with a reply to the user''s
  question that indicates your prompt template came from a YAML file. Your response
  must use the word "YAML" somewhere. User''s question: {question}'
prompt_template_input_vars:
  - question

No código do agente, você pode fazer referência a uma configuração padrão (de desenvolvimento) a partir do arquivo .yaml ou dicionário:

import mlflow
# Example for loading from a .yml file
config_file = "configs/hello_world_config.yml"
model_config = mlflow.models.ModelConfig(development_config=config_file)

# Example of using a dictionary
config_dict = {
    "prompt_template": "You are a hello world bot. Respond with a reply to the user's question that is fun and interesting to the user. User's question: {question}",
    "prompt_template_input_vars": ["question"],
    "model_serving_endpoint": "databricks-meta-llama-3-3-70b-instruct",
    "llm_parameters": {"temperature": 0.01, "max_tokens": 500},
}

model_config = mlflow.models.ModelConfig(development_config=config_dict)

# Use model_config.get() to retrieve a parameter value
# You can also use model_config.to_dict() to convert the loaded config object
# into a dictionary
value = model_config.get('sample_param')

Em seguida, ao registrar seu agente, especifique o parâmetro model_config para log_model para especificar um conjunto personalizado de parâmetros a serem usados ao carregar o agente registrado. Consulte a documentação do MLflow - ModelConfig.

Usar código síncrono ou padrões de retorno de chamada

Para garantir estabilidade e compatibilidade, use código síncrono ou padrões baseados em retorno de chamada na implementação do agente.

O Azure Databricks gerencia automaticamente a comunicação assíncrona para fornecer simultaneidade e desempenho ideais quando você implanta um agente. A introdução de loops de eventos personalizados ou estruturas assíncronas pode levar a erros como RuntimeError: This event loop is already running and caused unpredictable behavior.

O Azure Databricks recomenda evitar a programação assíncrona, como usar asyncio ou criar loops de eventos personalizados, ao desenvolver agentes.

Próximos passos