Compartilhar via


agent_framework Pacote

Pacotes

a2a
ag_ui
anthropic
azure
chatkit
declarative
devui
lab
mem0
microsoft
ollama
openai
redis

Módulos

exceptions
observability

Classes

AIFunction

Uma ferramenta que encapsula uma função python para torná-la chamada por modelos de IA.

Essa classe encapsula uma função python para torná-la chamada por modelos de IA com validação automática de parâmetro e geração de esquema JSON.

Inicialize o AIFunction.

AgentExecutor

executor interno que encapsula um agente para lidar com mensagens.

AgentExecutor adapta seu comportamento com base no modo de execução do fluxo de trabalho:

  • run_stream(): emite eventos incrementais agentRunUpdateEvent à medida que o agente produz tokens
  • run(): emite um único AgentRunEvent contendo a resposta completa

O executor detecta automaticamente o modo por meio de WorkflowContext.is_streaming().

Inicialize o executor com um identificador exclusivo.

AgentExecutorRequest

Uma solicitação para um executor de agente.

AgentExecutorResponse

Uma resposta de um executor de agente.

AgentInputRequest

Solicite a entrada humana antes que um agente seja executado em fluxos de trabalho de construtor de alto nível.

Emitido por meio de RequestInfoEvent quando um fluxo de trabalho pausa antes de um agente ser executado. A resposta é injetada na conversa como uma mensagem de usuário para orientar o comportamento do agente.

Esse é o tipo de solicitação padrão usado por .with_request_info() em SequentialBuilder, ConcurrentBuilder, GroupChatBuilder e HandoffBuilder.

AgentMiddleware

Classe base abstrata para middleware de agente que pode interceptar invocações de agente.

O middleware do agente permite interceptar e modificar invocações de agente antes e depois da execução. Você pode inspecionar mensagens, modificar o contexto, substituir os resultados ou encerrar a execução antecipadamente.

Observação

AgentMiddleware é uma classe base abstrata. Você deve subclassá-lo e implementá-lo

o método process() para criar middleware de agente personalizado.

AgentProtocol

Um protocolo para um agente que pode ser invocado.

Esse protocolo define a interface que todos os agentes devem implementar, incluindo propriedades para identificação e métodos para execução.

Observação

Os protocolos usam subtipagem estrutural (digitação de pato). Classes não precisam

para herdar explicitamente desse protocolo para ser considerado compatível.

Isso permite que você crie agentes completamente personalizados sem usar

classes base do Agent Framework.

AgentRunContext

Objeto de contexto para invocações de middleware do agente.

Esse contexto é passado pelo pipeline de middleware do agente e contém todas as informações sobre a invocação do agente.

Inicialize o AgentRunContext.

AgentRunEvent

Evento disparado quando uma execução de agente é concluída.

Inicialize o evento de execução do agente.

AgentRunResponse

Representa a resposta a uma solicitação de execução do Agente.

Fornece uma ou mais mensagens de resposta e metadados sobre a resposta. Uma resposta típica conterá uma única mensagem, mas pode conter várias mensagens em cenários que envolvem chamadas de função, recuperações de RAG ou lógica complexa.

Inicializar um AgentRunResponse.

AgentRunResponseUpdate

Representa uma única parte de resposta de streaming de um Agente.

Inicialize um AgentRunResponseUpdate.

AgentRunUpdateEvent

Evento disparado quando um agente está transmitindo mensagens.

Inicialize o evento de streaming do agente.

AgentThread

A classe de thread agent, isso pode representar um thread gerenciado localmente ou um thread gerenciado pelo serviço.

Um AgentThread mantém o estado da conversa e o histórico de mensagens para uma interação do agente. Ele pode usar um thread gerenciado pelo serviço (via service_thread_id) ou um repositório de mensagens local (via message_store), mas não ambos.

Inicialize um AgentThread, não use este método manualmente, sempre use: agent.get_new_thread().

Observação

Pode ser definido service_thread_id ou message_store, mas não ambos.

AggregateContextProvider

Um ContextProvider que contém vários provedores de contexto.

Ele delega eventos a vários provedores de contexto e agrega respostas desses eventos antes de retornar. Isso permite que você combine vários provedores de contexto em um único provedor.

Observação

Um AggregateContextProvider é criado automaticamente quando você passa um único contexto

provedor ou uma sequência de provedores de contexto para o construtor do agente.

Inicialize o AggregateContextProvider com provedores de contexto.

BaseAgent

Classe base para todos os agentes do Agent Framework.

Essa classe fornece a funcionalidade principal para implementações de agente, incluindo provedores de contexto, suporte a middleware e gerenciamento de threads.

Observação

BaseAgent não pode ser instanciado diretamente, pois não implementa o

run(), run_stream() e outros métodos exigidos pelo AgentProtocol.

Use uma implementação concreta como ChatAgent ou crie uma subclasse.

Inicialize uma instância do BaseAgent.

BaseAnnotation

Classe base para todos os tipos de Anotação de IA.

Inicializar BaseAnnotation.

BaseChatClient

Classe base para clientes de chat.

Essa classe base abstrata fornece funcionalidade principal para implementações de cliente de chat, incluindo suporte a middleware, preparação de mensagens e normalização de ferramentas.

Observação

O BaseChatClient não pode ser instanciado diretamente, pois é uma classe base abstrata.

As subclasses devem implementar _inner_get_response() e _inner_get_streaming_response().

Inicialize uma instância do BaseChatClient.

BaseContent

Representa o conteúdo usado pelos serviços de IA.

Inicializar BaseContent.

Case

Wrapper de runtime combinando um predicado switch-case com seu destino.

Cada Caso casa um predicado booliano com o executor que deve lidar com a mensagem quando o predicado é avaliado como True. O runtime mantém esse contêiner leve separado do SwitchCaseEdgeGroupCase serializável para que a execução possa operar com callables dinâmicos sem poluir o estado persistente.

ChatAgent

Um agente cliente de chat.

Essa é a implementação do agente principal que usa um cliente de chat para interagir com modelos de linguagem. Ele dá suporte a ferramentas, provedores de contexto, middleware e respostas de streaming e não streaming.

Inicializar uma instância do ChatAgent.

Observação

O conjunto de parâmetros de frequency_penalty para request_kwargs são usados para

chame o cliente de chat. Eles também podem ser passados para ambos os métodos de execução.

Quando ambos são definidos, os passados para os métodos de execução têm precedência.

ChatClientProtocol

Um protocolo para um cliente de chat que pode gerar respostas.

Esse protocolo define a interface que todos os clientes de chat devem implementar, incluindo métodos para gerar respostas de streaming e não streaming.

Observação

Os protocolos usam subtipagem estrutural (digitação de pato). Classes não precisam

para herdar explicitamente desse protocolo para ser considerado compatível.

ChatContext

Objeto de contexto para invocações de middleware de chat.

Esse contexto é passado pelo pipeline de middleware de chat e contém todas as informações sobre a solicitação de chat.

Inicialize o ChatContext.

ChatMessage

Representa uma mensagem de chat.

Inicializar ChatMessage.

ChatMessageStore

Uma implementação na memória de ChatMessageStoreProtocol que armazena mensagens em uma lista.

Essa implementação fornece um armazenamento simples baseado em lista para mensagens de chat com suporte para serialização e desserialização. Ele implementa todos os métodos necessários do ChatMessageStoreProtocol protocolo.

O repositório mantém mensagens na memória e fornece métodos para serializar e desserializar o estado para fins de persistência.

Crie um ChatMessageStore para uso em um thread.

ChatMessageStoreProtocol

Define métodos para armazenar e recuperar mensagens de chat associadas a um thread específico.

As implementações desse protocolo são responsáveis por gerenciar o armazenamento de mensagens de chat, incluindo o tratamento de grandes volumes de dados truncando ou resumindo mensagens conforme necessário.

ChatMiddleware

Classe base abstrata para middleware de chat que pode interceptar solicitações de cliente de chat.

O middleware de chat permite interceptar e modificar solicitações de cliente de chat antes e depois da execução. Você pode modificar mensagens, adicionar prompts do sistema, solicitações de log ou substituir respostas de chat.

Observação

ChatMiddleware é uma classe base abstrata. Você deve subclassá-lo e implementá-lo

o método process() para criar middleware de chat personalizado.

ChatOptions

Configurações comuns de solicitação para serviços de IA.

Inicializar ChatOptions.

ChatResponse

Representa a resposta a uma solicitação de chat.

Inicializa um ChatResponse com os parâmetros fornecidos.

ChatResponseUpdate

Representa uma única parte de resposta de streaming de um ChatClient.

Inicializa um ChatResponseUpdate com os parâmetros fornecidos.

CheckpointStorage

Protocolo para back-ends de armazenamento de ponto de verificação.

CitationAnnotation

Representa uma anotação de citação.

Inicializar CitationAnnotation.

ConcurrentBuilder

Construtor de alto nível para fluxos de trabalho de agente simultâneos.

  • participantes([...]) aceita uma lista de AgentProtocol (recomendado) ou Executor.

  • register_participants([...]) aceita uma lista de fábricas para AgentProtocol (recomendado)

    ou fábricas executores

  • fios build() : dispatcher -> fan-out -> participantes -> fan-in -> agregador.

  • with_aggregator(...) substitui o agregador padrão por um Executor ou retorno de chamada.

  • register_aggregator(...) aceita uma fábrica para um Executor como agregador personalizado.

Uso:


   from agent_framework import ConcurrentBuilder

   # Minimal: use default aggregator (returns list[ChatMessage])
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).build()

   # With agent factories
   workflow = ConcurrentBuilder().register_participants([create_agent1, create_agent2, create_agent3]).build()


   # Custom aggregator via callback (sync or async). The callback receives
   # list[AgentExecutorResponse] and its return value becomes the workflow's output.
   def summarize(results: list[AgentExecutorResponse]) -> str:
       return " | ".join(r.agent_run_response.messages[-1].text for r in results)


   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_aggregator(summarize).build()


   # Custom aggregator via a factory
   class MyAggregator(Executor):
       @handler
       async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None:
           await ctx.yield_output(" | ".join(r.agent_run_response.messages[-1].text for r in results))


   workflow = (
       ConcurrentBuilder()
       .register_participants([create_agent1, create_agent2, create_agent3])
       .register_aggregator(lambda: MyAggregator(id="my_aggregator"))
       .build()
   )


   # Enable checkpoint persistence so runs can resume
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_checkpointing(storage).build()

   # Enable request info before aggregation
   workflow = ConcurrentBuilder().participants([agent1, agent2]).with_request_info().build()
Context

Uma classe que contém qualquer contexto que deve ser fornecido para o modelo de IA, conforme fornecido por um ContextProvider.

Cada ContextProvider tem a capacidade de fornecer seu próprio contexto para cada invocação. A classe Context contém o contexto adicional fornecido pelo ContextProvider. Esse contexto será combinado com o contexto fornecido por outros provedores antes de ser passado para o modelo de IA. Esse contexto é por invocação e não será armazenado como parte do histórico de chat.

Crie um novo objeto Context.

ContextProvider

Classe base para todos os provedores de contexto.

Um provedor de contexto é um componente que pode ser usado para aprimorar o gerenciamento de contexto da IA. Ele pode ouvir as alterações na conversa e fornecer contexto adicional ao modelo de IA pouco antes da invocação.

Observação

ContextProvider é uma classe base abstrata. Você deve subclassá-lo e implementá-lo

o método invocando() para criar um provedor de contexto personalizado. Idealmente, você deve

também implemente os métodos invoked() e thread_created() para acompanhar a conversa

o estado, mas eles são opcionais.

DataContent

Representa o conteúdo de dados binários com um tipo de mídia associado (também conhecido como tipo MIME).

Importante

Isso é para dados binários que são representados como um URI de dados, não para recursos online.

Use UriContent para recursos online.

Inicializa uma instância do DataContent.

Importante

Isso é para dados binários que são representados como um URI de dados, não para recursos online.

Use UriContent para recursos online.

Default

Representação de runtime do branch padrão em um grupo de casos de alternância.

O branch padrão é invocado somente quando nenhum outro predicado de caso corresponde. Na prática, é garantido que exista para que o roteamento nunca produza um destino vazio.

Edge

Modele uma entrega direcionada, opcionalmente condicional, entre dois executores.

Cada Borda captura os metadados mínimos necessários para mover uma mensagem de um executor para outro dentro do gráfico de fluxo de trabalho. Opcionalmente, ele inseri um predicado booliano que decide se a borda deve ser tomada em runtime. Serializando a borda para baixo para primitivos, podemos reconstruir a topologia de um fluxo de trabalho, independentemente do processo original do Python.

Inicialize uma borda totalmente especificada entre dois executores de fluxo de trabalho.

EdgeDuplicationError

Exceção gerada quando bordas duplicadas são detectadas no fluxo de trabalho.

ErrorContent

Representa um erro.

Comentários: normalmente usado para erros não fatais, em que algo deu errado como parte da operação, mas a operação ainda foi capaz de continuar.

Inicializa uma instância ErrorContent.

Executor

Classe base para todos os executores de fluxo de trabalho que processam mensagens e executam cálculos.

Visão geral

Executores são os blocos de construção fundamentais de fluxos de trabalho, representando unidades de processamento individuais que recebem mensagens, executam operações e produzem saídas. Cada executor é identificado exclusivamente e pode lidar com tipos de mensagem específicos por meio de métodos de manipulador decorados.

Sistema de Tipos

Os executores têm um sistema de tipo avançado que define suas funcionalidades:

Tipos de entrada

Os tipos de mensagens que um executor pode processar, descobertos a partir de assinaturas de método de manipulador:


   class MyExecutor(Executor):
       @handler
       async def handle_string(self, message: str, ctx: WorkflowContext) -> None:
           # This executor can handle 'str' input types

Acesse por meio da propriedade input_types .

Tipos de saída

Os tipos de mensagens que um executor pode enviar para outros executores por meio de ctx.send_message():


   class MyExecutor(Executor):
       @handler
       async def handle_data(self, message: str, ctx: WorkflowContext[int | bool]) -> None:
           # This executor can send 'int' or 'bool' messages

Acesse por meio da propriedade output_types .

Tipos de saída de fluxo de trabalho

Os tipos de dados que um executor pode emitir como saídas no nível do fluxo de trabalho por meio de ctx.yield_output():


   class MyExecutor(Executor):
       @handler
       async def process(self, message: str, ctx: WorkflowContext[int, str]) -> None:
           # Can send 'int' messages AND yield 'str' workflow outputs

Acesso por meio da propriedade workflow_output_types .

Descoberta de manipulador

Os executores descobrem suas funcionalidades por meio de métodos decorados:

@handler Decorador

Marca os métodos que processam mensagens de entrada:


   class MyExecutor(Executor):
       @handler
       async def handle_text(self, message: str, ctx: WorkflowContext[str]) -> None:
           await ctx.send_message(message.upper())

Interceptação de solicitação de sub-fluxo de trabalho

Use @handler métodos para interceptar solicitações de sub-fluxo de trabalho:


   class ParentExecutor(Executor):
       @handler
       async def handle_subworkflow_request(
           self,
           request: SubWorkflowRequestMessage,
           ctx: WorkflowContext[SubWorkflowResponseMessage],
       ) -> None:
           if self.is_allowed(request.domain):
               response = request.create_response(data=True)
               await ctx.send_message(response, target_id=request.executor_id)
           else:
               await ctx.request_info(request.source_event, response_type=request.source_event.response_type)

Tipos de contexto

Os métodos de manipulador recebem diferentes variantes WorkflowContext com base em suas anotações de tipo:

WorkflowContext (sem parâmetros de tipo)

Para manipuladores que só executam efeitos colaterais sem enviar mensagens ou produzir saídas:


   class LoggingExecutor(Executor):
       @handler
       async def log_message(self, msg: str, ctx: WorkflowContext) -> None:
           print(f"Received: {msg}")  # Only logging, no outputs

WorkflowContext[T_Out]

Habilita o envio de mensagens do tipo T_Out via ctx.send_message():


   class ProcessorExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int]) -> None:
           await ctx.send_message(42)  # Can send int messages

WorkflowContext[T_Out, T_W_Out]

Habilita o envio de mensagens (T_Out) e a geração de saídas de fluxo de trabalho (T_W_Out):


   class DualOutputExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int, str]) -> None:
           await ctx.send_message(42)  # Send int message
           await ctx.yield_output("done")  # Yield str workflow output

Executores de funções

Funções simples podem ser convertidas em executores usando o decorador @executor :


   @executor
   async def process_text(text: str, ctx: WorkflowContext[str]) -> None:
       await ctx.send_message(text.upper())


   # Or with custom ID:
   @executor(id="text_processor")
   def sync_process(text: str, ctx: WorkflowContext[str]) -> None:
       ctx.send_message(text.lower())  # Sync functions run in thread pool

Composição de sub-fluxo de trabalho

Os executores podem conter sub-fluxos de trabalho usando WorkflowExecutor. Os sub-fluxos de trabalho podem fazer solicitações que os fluxos de trabalho pai podem interceptar. Consulte a documentação do WorkflowExecutor para obter detalhes sobre padrões de composição de fluxo de trabalho e tratamento de solicitação/resposta.

Gerenciamento de estado

Os executores podem conter estados que persistem entre execuções de fluxo de trabalho e pontos de verificação. Substitua os métodos on_checkpoint_save e on_checkpoint_restore para implementar a lógica personalizada de serialização e restauração de estado.

Notas de Implementação

  • Não chame execute() diretamente - ele é invocado pelo mecanismo de fluxo de trabalho
  • Não substitua execute() – definir manipuladores usando decoradores
  • Cada executor deve ter pelo menos um método @handler
  • As assinaturas de método do manipulador são validadas no momento da inicialização

Inicialize o executor com um identificador exclusivo.

ExecutorCompletedEvent

Evento disparado quando um manipulador executor é concluído.

Inicialize o evento executor com uma ID do executor e dados opcionais.

ExecutorEvent

Classe base para eventos de executor.

Inicialize o evento executor com uma ID do executor e dados opcionais.

ExecutorFailedEvent

Evento disparado quando um manipulador executor gera um erro.

ExecutorInvokedEvent

Evento disparado quando um manipulador executor é invocado.

Inicialize o evento executor com uma ID do executor e dados opcionais.

FanInEdgeGroup

Represente um conjunto convergente de bordas que alimentam um único executor downstream.

Os grupos de fan-in normalmente são usados quando vários estágios upstream produzem independentemente mensagens que devem chegar ao mesmo processador downstream.

Crie um mapeamento de fan-in que mescla várias fontes em um único destino.

FanOutEdgeGroup

Represente um grupo de borda no estilo de difusão com lógica de seleção opcional.

Um fan-out encaminha uma mensagem produzida por um único executor de origem para um ou mais executores downstream. Em runtime, podemos restringir ainda mais os destinos executando uma selection_func que inspeciona o conteúdo e retorna o subconjunto de IDs que devem receber a mensagem.

Crie um mapeamento de fan-out de uma única origem para muitos destinos.

FileCheckpointStorage

Armazenamento de ponto de verificação baseado em arquivo para persistência.

Inicialize o armazenamento de arquivos.

FinishReason

Representa o motivo pelo qual uma resposta de chat foi concluída.

Inicialize FinishReason com um valor.

FunctionApprovalRequestContent

Representa uma solicitação de aprovação do usuário de uma chamada de função.

Inicializa uma instância FunctionApprovalRequestContent.

FunctionApprovalResponseContent

Representa uma resposta para a aprovação do usuário de uma chamada de função.

Inicializa uma instância FunctionApprovalResponseContent.

FunctionCallContent

Representa uma solicitação de chamada de função.

Inicializa uma instância de FunctionCallContent.

FunctionExecutor

Executor que encapsula uma função definida pelo usuário.

Esse executor permite que os usuários definam funções simples (sincronizar e assíncronas) e usá-las como executores de fluxo de trabalho sem a necessidade de criar classes de executor completas.

Funções síncronas são executadas em um pool de threads usando asyncio.to_thread() para evitar o bloqueio do loop de eventos.

Inicialize o FunctionExecutor com uma função definida pelo usuário.

FunctionInvocationConfiguration

Configuração para invocação de função em clientes de chat.

Essa classe é criada automaticamente em cada cliente de chat que dá suporte à invocação de função. Isso significa que, para a maioria dos casos, você pode apenas alterar os atributos na instância, em vez de criar um novo.

Inicializar FunctionInvocationConfiguration.

FunctionInvocationContext

Objeto de contexto para invocações de middleware de função.

Esse contexto é passado pelo pipeline de middleware de função e contém todas as informações sobre a invocação da função.

Inicialize o FunctionInvocationContext.

FunctionMiddleware

Classe base abstrata para middleware de função que pode interceptar invocações de função.

O middleware de função permite interceptar e modificar invocações de função/ferramenta antes e depois da execução. Você pode validar argumentos, resultados de cache, invocações de log ou substituir a execução da função.

Observação

FunctionMiddleware é uma classe base abstrata. Você deve subclassá-lo e implementá-lo

o método process() para criar middleware de função personalizado.

FunctionResultContent

Representa o resultado de uma chamada de função.

Inicializa uma instância de FunctionResultContent.

GraphConnectivityError

Exceção gerada quando problemas de conectividade de grafo são detectados.

GroupChatBuilder

Construtor de alto nível para fluxos de trabalho de chat de grupo direcionados pelo gerente com orquestração dinâmica.

O GroupChat coordena conversas de vários agentes usando um gerente que seleciona qual participante fala em seguida. O gerenciador pode ser uma função Python simples (set_select_speakers_func) ou um seletor baseado em agente por meio de set_manager. Essas duas abordagens são mutuamente exclusivas.

Fluxo de trabalho principal:

  1. Definir participantes: lista de agentes (usa seu .name) ou nomes de mapeamento de ditado para agentes

  2. Configurar a seleção do locutor: set_select_speakers_func OR

    set_manager (não ambos)

  3. Opcional: definir limites de rodada, ponto de verificação, condições de término

  4. Compilar e executar o fluxo de trabalho

Padrões de seleção do locutor:

Padrão 1: seleção simples baseada em função (recomendado)


   from agent_framework import GroupChatBuilder, GroupChatStateSnapshot


   def select_next_speaker(state: GroupChatStateSnapshot) -> str | None:
       # state contains: task, participants, conversation, history, round_index
       if state["round_index"] >= 5:
           return None  # Finish
       last_speaker = state["history"][-1].speaker if state["history"] else None
       if last_speaker == "researcher":
           return "writer"
       return "researcher"


   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher_agent, writer_agent])  # Uses agent.name
       .build()
   )

Padrão 2: seleção baseada em LLM


   from agent_framework import ChatAgent
   from agent_framework.azure import AzureOpenAIChatClient

   manager_agent = AzureOpenAIChatClient().create_agent(
       instructions="Coordinate the conversation and pick the next speaker.",
       name="Coordinator",
       temperature=0.3,
       seed=42,
       max_tokens=500,
   )

   workflow = (
       GroupChatBuilder()
       .set_manager(manager_agent, display_name="Coordinator")
       .participants([researcher, writer])  # Or use dict: researcher=r, writer=w
       .with_max_rounds(10)
       .build()
   )

Padrão 3: Solicitar informações para comentários no meio da conversa


   from agent_framework import GroupChatBuilder

   # Pause before all participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer])
       .with_request_info()
       .build()
   )

   # Pause only before specific participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer, editor])
       .with_request_info(agents=[editor])  # Only pause before editor responds
       .build()
   )

Especificação do participante:

Duas maneiras de especificar os participantes:

  • Formulário de lista: [agent1, agent2] - usa agent.name atributo para nomes de participantes
  • Formulário de ditado: {name1: agent1, name2: agent2} - controle de nome explícito
  • Formulário de palavra-chave: participantes(name1=agent1, name2=agent2) - controle de nome explícito

Estrutura de instantâneo de estado:

O GroupChatStateSnapshot passado para set_select_speakers_func contém:

  • tarefa: ChatMessage – Tarefa de usuário original
  • participantes: dict[str, str] – Mapeamento de nomes de participantes para descrições
  • conversation: tuple[ChatMessage, ...] - Histórico completo da conversa
  • histórico: tupla[GroupChatTurn, ...] – registro turn-by-turn com atribuição de alto-falante
  • round_index: int - Número de rodadas de seleção do gerente até agora
  • pending_agent: str | Nenhum - Nome do processamento do agente no momento (se houver)

Restrições importantes:

  • Não é possível combinar set_select_speakers_func e set_manager
  • Os nomes dos participantes devem ser exclusivos
  • Ao usar o formulário de lista, os agentes devem ter um atributo de nome não vazio

Inicialize o GroupChatBuilder.

GroupChatDirective

Instrução emitida por uma implementação do gerenciador de chat em grupo.

HandoffBuilder

Construtor fluente para fluxos de trabalho de entrega conversacional com coordenador e agentes especialistas.

O padrão de entrega permite que um agente coordenador encaminhe solicitações para agentes especializados. O modo de interação controla se o fluxo de trabalho solicita a entrada do usuário após cada resposta do agente ou é concluído de forma autônoma quando os agentes terminam de responder. Uma condição de encerramento determina quando o fluxo de trabalho deve parar de solicitar entrada e ser concluído.

Padrões de roteamento:

Single-Tier (padrão): Somente o coordenador pode entregar aos especialistas. Por padrão, depois que qualquer especialista responde, o controle retorna ao usuário para obter mais entradas. Isso cria um fluxo cíclico: usuário –> coordenador –> [especialista opcional] – usuário –>> coordenador –> ... Use with_interaction_mode("autônomo") para ignorar a solicitação de entrada adicional do usuário e gerar a conversa final quando um agente responder sem delegar.

Várias camadas (avançado): Especialistas podem entregar a outros especialistas usando .add_handoff(). Isso fornece mais flexibilidade para fluxos de trabalho complexos, mas é menos controlável do que o padrão de camada única. Os usuários perdem visibilidade em tempo real em etapas intermediárias durante entregas especializadas para especialistas (embora o histórico completo de conversas, incluindo todas as entregas, seja preservado e possa ser inspecionado posteriormente).

Principais recursos:

  • Detecção automática de entrega: o coordenador invoca uma ferramenta de entrega cuja

    argumentos (por exemplo {"handoff_to": "shipping_agent"}) identificam o especialista para receber o controle.

  • Ferramentas geradas automaticamente: por padrão, o construtor sintetiza handoff_to_<agent> tools para o coordenador, para que você não defina manualmente as funções de espaço reservado.

  • Histórico completo de conversas: toda a conversa (incluindo qualquer ChatMessage.additional_properties) é preservada e passada para cada agente.

  • Controle de terminação: por padrão, termina após 10 mensagens de usuário. Substitua com .with_termination_condition(lambda conv: ...) para lógica personalizada (por exemplo, detectar "adeus").

  • Modos de interação: escolha human_in_loop (padrão) para solicitar aos usuários entre turnos de agente ou autônomos para continuar o roteamento de volta para agentes sem solicitar a entrada do usuário até que ocorra uma entrega ou um limite de término/turno seja atingido (limite de turno autônomo padrão: 50).

  • Ponto de verificação: persistência opcional para fluxos de trabalho retomáveis.

Uso (Single-Tier):


   from agent_framework import HandoffBuilder
   from agent_framework.openai import OpenAIChatClient

   chat_client = OpenAIChatClient()

   # Create coordinator and specialist agents
   coordinator = chat_client.create_agent(
       instructions=(
           "You are a frontline support agent. Assess the user's issue and decide "
           "whether to hand off to 'refund_agent' or 'shipping_agent'. When delegation is "
           "required, call the matching handoff tool (for example `handoff_to_refund_agent`)."
       ),
       name="coordinator_agent",
   )

   refund = chat_client.create_agent(
       instructions="You handle refund requests. Ask for order details and process refunds.",
       name="refund_agent",
   )

   shipping = chat_client.create_agent(
       instructions="You resolve shipping issues. Track packages and update delivery status.",
       name="shipping_agent",
   )

   # Build the handoff workflow - default single-tier routing
   workflow = (
       HandoffBuilder(
           name="customer_support",
           participants=[coordinator, refund, shipping],
       )
       .set_coordinator(coordinator)
       .build()
   )

   # Run the workflow
   events = await workflow.run_stream("My package hasn't arrived yet")
   async for event in events:
       if isinstance(event, RequestInfoEvent):
           # Request user input
           user_response = input("You: ")
           await workflow.send_response(event.data.request_id, user_response)

Roteamento de várias camadas com .add_handoff():


   # Enable specialist-to-specialist handoffs with fluent API
   workflow = (
       HandoffBuilder(participants=[coordinator, replacement, delivery, billing])
       .set_coordinator(coordinator)
       .add_handoff(coordinator, [replacement, delivery, billing])  # Coordinator routes to all
       .add_handoff(replacement, [delivery, billing])  # Replacement delegates to delivery/billing
       .add_handoff(delivery, billing)  # Delivery escalates to billing
       .build()
   )

   # Flow: User → Coordinator → Replacement → Delivery → Back to User
   # (Replacement hands off to Delivery without returning to user)

Usar fábricas de participantes para isolamento de estado:

Condição de encerramento personalizada:


   # Terminate when user says goodbye or after 5 exchanges
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_termination_condition(
           lambda conv: (
               sum(1 for msg in conv if msg.role.value == "user") >= 5
               or any("goodbye" in msg.text.lower() for msg in conv[-2:])
           )
       )
       .build()
   )

Verificação:


   from agent_framework import InMemoryCheckpointStorage

   storage = InMemoryCheckpointStorage()
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_checkpointing(storage)
       .build()
   )

Inicialize um HandoffBuilder para criar fluxos de trabalho de entrega de conversação.

O construtor começa em um estado não configurado e exige que você chame:

  1. .participants([...]) - Registrar agentes
  2. ou .participant_factories({...}) – registrar fábricas de agente/executor
  3. .set_coordinator(...) – Designar qual agente recebe a entrada inicial do usuário
  4. .build() – Construir o fluxo de trabalho final

Os métodos de configuração opcionais permitem personalizar o gerenciamento de contexto, a lógica de término e a persistência.

Observação

Os participantes devem ter nomes/IDs estáveis porque o fluxo de trabalho mapeia o

argumentos de ferramenta de entrega para esses identificadores. Os nomes do agente devem corresponder

as cadeias de caracteres emitidas pela ferramenta de entrega do coordenador (por exemplo, uma ferramenta que

saídas {"handoff_to": "cobrança"} requer um agente chamado cobrança).

HandoffUserInputRequest

Solicitar mensagem emitida quando o fluxo de trabalho precisar de uma nova entrada do usuário.

Observação: o campo de conversa é excluído intencionalmente da serialização do ponto de verificação para evitar a duplicação. A conversa é preservada no estado do coordenador e será reconstruída na restauração. Consulte o problema nº 2667.

HostedCodeInterpreterTool

Representa uma ferramenta hospedada que pode ser especificada para um serviço de IA para habilitá-la a executar o código gerado.

Essa ferramenta não implementa a interpretação de código em si. Ele serve como um marcador para informar a um serviço que ele tem permissão para executar o código gerado se o serviço for capaz de fazer isso.

Inicialize o HostedCodeInterpreterTool.

HostedFileContent

Representa um conteúdo de arquivo hospedado.

Inicializa uma instância HostedFileContent.

HostedFileSearchTool

Representa uma ferramenta de pesquisa de arquivo que pode ser especificada para um serviço de IA para habilitá-la a realizar pesquisas de arquivo.

Inicialize um FileSearchTool.

HostedMCPSpecificApproval

Representa o modo específico para uma ferramenta hospedada.

Ao usar esse modo, o usuário deve especificar quais ferramentas sempre ou nunca exigem aprovação. Isso é representado como um dicionário com duas chaves opcionais:

HostedMCPTool

Representa uma ferramenta MCP gerenciada e executada pelo serviço.

Crie uma ferramenta MCP hospedada.

HostedVectorStoreContent

Representa um conteúdo do repositório de vetores hospedado.

Inicializa uma instância HostedVectorStoreContent.

HostedWebSearchTool

Representa uma ferramenta de pesquisa na Web que pode ser especificada para um serviço de IA para habilitá-la a realizar pesquisas na Web.

Inicialize um HostedWebSearchTool.

InMemoryCheckpointStorage

Armazenamento de ponto de verificação na memória para teste e desenvolvimento.

Inicialize o armazenamento de memória.

InProcRunnerContext

Contexto de execução em processo para execução local e ponto de verificação opcional.

Inicialize o contexto de execução em processo.

MCPStdioTool

Ferramenta MCP para se conectar a servidores MCP baseados em stdio.

Essa classe se conecta a servidores MCP que se comunicam por meio de entrada/saída padrão, normalmente usados para processos locais.

Inicialize a ferramenta de stdio MCP.

Observação

Os argumentos são usados para criar um objeto StdioServerParameters,

que é então usado para criar um cliente stdio. Consulte mcp.client.stdio.stdio_client

e mcp.client.stdio.stdio_server_parameters para obter mais detalhes.

MCPStreamableHTTPTool

Ferramenta MCP para se conectar a servidores MCP baseados em HTTP.

Essa classe se conecta a servidores MCP que se comunicam por meio de HTTP/SSE que podem ser transmitidos.

Inicialize a ferramenta HTTP transmitivel do MCP.

Observação

Os argumentos são usados para criar um cliente HTTP que pode ser transmitido.

Consulte mcp.client.streamable_http.streamablehttp_client para obter mais detalhes.

Quaisquer argumentos extras passados para o construtor serão passados para o

Construtor de cliente HTTP que pode ser transmitido.

MCPWebsocketTool

Ferramenta MCP para se conectar a servidores MCP baseados em WebSocket.

Essa classe se conecta a servidores MCP que se comunicam por meio do WebSocket.

Inicialize a ferramenta WebSocket do MCP.

Observação

Os argumentos são usados para criar um cliente WebSocket.

Consulte mcp.client.websocket.websocket_client para obter mais detalhes.

Quaisquer argumentos extras passados para o construtor serão passados para o

Construtor de cliente WebSocket.

MagenticBuilder

Construtor fluente para criar fluxos de trabalho de orquestração de vários agentes do Magentic One.

Os fluxos de trabalho do Magentic One usam um gerenciador habilitado para LLM para coordenar vários agentes por meio de planejamento dinâmico de tarefas, acompanhamento de progresso e replanificação adaptável. O gerente cria planos, seleciona agentes, monitora o progresso e determina quando replanexar ou concluir.

O construtor fornece uma API fluente para configurar participantes, o gerente, revisão de plano opcional, ponto de verificação e retornos de chamada de evento.

Suporte humano no loop: o Magentic fornece mecanismos HITL especializados por meio de:

  • .with_plan_review() – Revisar e aprovar/revisar planos antes da execução

  • .with_human_input_on_stall() – Intervir quando o fluxo de trabalho parar

  • Aprovação da ferramenta via FunctionApprovalRequestContent – Aprovar chamadas de ferramentas individuais

Eles emitem eventos MagenticHumanInterventionRequest que fornecem opções de decisão estruturadas (APPROVE, REVISE, CONTINUE, REPLAN, GUIDANCE) apropriadas para a orquestração baseada em planejamento da Magentic.

Uso:


   from agent_framework import MagenticBuilder, StandardMagenticManager
   from azure.ai.projects.aio import AIProjectClient

   # Create manager with LLM client
   project_client = AIProjectClient.from_connection_string(...)
   chat_client = project_client.inference.get_chat_completions_client()

   # Build Magentic workflow with agents
   workflow = (
       MagenticBuilder()
       .participants(researcher=research_agent, writer=writing_agent, coder=coding_agent)
       .with_standard_manager(chat_client=chat_client, max_round_count=20, max_stall_count=3)
       .with_plan_review(enable=True)
       .with_checkpointing(checkpoint_storage)
       .build()
   )

   # Execute workflow
   async for message in workflow.run("Research and write article about AI agents"):
       print(message.text)

Com o gerenciador personalizado:


   # Create custom manager subclass
   class MyCustomManager(MagenticManagerBase):
       async def plan(self, context: MagenticContext) -> ChatMessage:
           # Custom planning logic
           ...


   manager = MyCustomManager()
   workflow = MagenticBuilder().participants(agent1=agent1, agent2=agent2).with_standard_manager(manager).build()
MagenticContext

Contexto para o gerente do Magentic.

MagenticManagerBase

Classe base para o gerente do Magentic One.

ManagerDirectiveModel

Modelo pedante para saída de diretiva do gerenciador estruturado.

Crie um novo modelo analisando e validando dados de entrada de argumentos de palavra-chave.

Raises [ValidationError][pydantic_core. ValidationError] se os dados de entrada não puderem ser validados para formar um modelo válido.

self é explicitamente posicional somente para permitir a si mesmo como um nome de campo.

ManagerSelectionRequest

Solicitação enviada ao agente do gerente para a próxima seleção de alto-falante.

Essa classe de dados empacota o estado completo da conversa e o contexto da tarefa para o agente do gerente analisar e tomar uma decisão de seleção do locutor.

ManagerSelectionResponse

Resposta do agente do gerente com a decisão de seleção do locutor.

O agente do gerente deve produzir essa estrutura (ou dicto compatível/JSON) para comunicar sua decisão de volta ao orquestrador.

Crie um novo modelo analisando e validando dados de entrada de argumentos de palavra-chave.

Raises [ValidationError][pydantic_core. ValidationError] se os dados de entrada não puderem ser validados para formar um modelo válido.

self é explicitamente posicional somente para permitir a si mesmo como um nome de campo.

Message

Uma classe que representa uma mensagem no fluxo de trabalho.

OrchestrationState

Contêiner de estado unificado para o ponto de verificação do orquestrador.

Essa classe de dados padroniza a serialização de ponto de verificação em todos os três padrões de chat de grupo, permitindo extensões específicas de padrão por meio de metadados.

Atributos comuns abrangem preocupações de orquestração compartilhada (tarefa, conversa, acompanhamento redondo). O estado específico do padrão vai no ditado de metadados.

RequestInfoEvent

Evento disparado quando um executor de fluxo de trabalho solicita informações externas.

Inicialize o evento de informações de solicitação.

RequestInfoInterceptor

Executor interno que pausa o fluxo de trabalho para entrada humana antes da execução do agente.

Esse executor é inserido no grafo de fluxo de trabalho pelos construtores quando .with_request_info() é chamado. Ele intercepta mensagens AgentExecutorRequest ANTES que o agente seja executado e pausa o fluxo de trabalho por meio de ctx.request_info() com um AgentInputRequest.

Quando uma resposta é recebida, o manipulador de resposta injeta a entrada como uma mensagem de usuário na conversa e encaminha a solicitação para o agente.

O parâmetro agent_filter opcional permite limitar quais agentes disparam a pausa. Se a ID do agente de destino não estiver no conjunto de filtros, a solicitação será encaminhada sem pausar.

Inicialize o executor do interceptador de informações de solicitação.

Role

Descreve a finalidade pretendida de uma mensagem dentro de uma interação de chat.

Propriedades: SYSTEM: a função que instrui ou define o comportamento do sistema de IA. USUÁRIO: A função que fornece entrada do usuário para interações de chat. ASSISTENTE: A função que fornece respostas para entradas orientadas pelo sistema e solicitadas pelo usuário. FERRAMENTA: A função que fornece informações e referências adicionais em resposta às solicitações de uso da ferramenta.

Inicializar função com um valor.

Runner

Uma classe para executar um fluxo de trabalho em superespas pregel.

Inicialize o executor com bordas, estado compartilhado e contexto.

RunnerContext

Protocolo para o contexto de execução usado pelo executor.

Um único contexto que dá suporte a mensagens, eventos e pontos de verificação opcionais. Se o armazenamento de ponto de verificação não estiver configurado, os métodos de ponto de verificação poderão ser gerados.

SequentialBuilder

Construtor de alto nível para fluxos de trabalho sequenciais de agente/executor com contexto compartilhado.

  • participantes([...]) aceita uma lista de instâncias AgentProtocol (recomendado) ou Executor

  • register_participants([...]) aceita uma lista de fábricas para AgentProtocol (recomendado)

    ou fábricas executores

  • Os executores devem definir um manipulador que consome lista[ChatMessage] e envia uma lista[ChatMessage]

  • O fluxo de trabalho conecta os participantes em ordem, passando uma lista[ChatMessage] pela cadeia

  • Os agentes acrescentam suas mensagens de assistente à conversa

  • Executores personalizados podem transformar/resumir e retornar uma lista[ChatMessage]

  • A saída final é a conversa produzida pelo último participante

Uso:


   from agent_framework import SequentialBuilder

   # With agent instances
   workflow = SequentialBuilder().participants([agent1, agent2, summarizer_exec]).build()

   # With agent factories
   workflow = (
       SequentialBuilder().register_participants([create_agent1, create_agent2, create_summarizer_exec]).build()
   )

   # Enable checkpoint persistence
   workflow = SequentialBuilder().participants([agent1, agent2]).with_checkpointing(storage).build()

   # Enable request info for mid-workflow feedback (pauses before each agent)
   workflow = SequentialBuilder().participants([agent1, agent2]).with_request_info().build()

   # Enable request info only for specific agents
   workflow = (
       SequentialBuilder()
       .participants([agent1, agent2, agent3])
       .with_request_info(agents=[agent2])  # Only pause before agent2
       .build()
   )
SharedState

Uma classe para gerenciar o estado compartilhado em um fluxo de trabalho.

SharedState fornece acesso thread-safe aos dados de estado do fluxo de trabalho que precisam ser compartilhados entre executores durante a execução do fluxo de trabalho.

Chaves Reservadas: as seguintes chaves são reservadas para uso interno da estrutura e não devem ser modificadas pelo código do usuário:

  • _executor_state: armazena o estado do executor para o ponto de verificação (gerenciado pelo Runner)

Aviso

Não use chaves começando com sublinhado (_) pois elas podem ser reservadas para

operações da estrutura interna.

Inicialize o estado compartilhado.

SingleEdgeGroup

Wrapper de conveniência para uma borda solitária, mantendo o uniforme da API de grupo.

Crie um grupo de borda um-para-um entre dois executores.

StandardMagenticManager

Gerenciador magêntico padrão que executa chamadas LLM reais por meio de um ChatAgent.

O gerente constrói prompts que espelham a orquestração do Magentic One original:

  • Coleta de fatos
  • Criação de plano
  • Razão de progresso no JSON
  • Atualização de fatos e atualização de plano na redefinição
  • Síntese de resposta final

Inicialize o Gerenciador Magnético Padrão.

SubWorkflowRequestMessage

Mensagem enviada de um sub-fluxo de trabalho para um executor no fluxo de trabalho pai para solicitar informações.

Essa mensagem encapsula um RequestInfoEvent emitido pelo executor no sub-fluxo de trabalho.

SubWorkflowResponseMessage

Mensagem enviada de um fluxo de trabalho pai para um sub-fluxo de trabalho por meio de WorkflowExecutor para fornecer informações solicitadas.

Essa mensagem encapsula os dados de resposta junto com o RequestInfoEvent original emitido pelo executor de sub-fluxo de trabalho.

SuperStepCompletedEvent

Evento disparado quando um superstep termina.

Inicialize o evento superstep.

SuperStepStartedEvent

Evento disparado quando um superstep é iniciado.

Inicialize o evento superstep.

SwitchCaseEdgeGroup

Variante do fan-out que imita um fluxo de controle de comutador/caso tradicional.

Cada caso inspeciona o conteúdo da mensagem e decide se ele deve lidar com a mensagem. Exatamente um caso ou o branch padrão retorna um destino em runtime, preservando a semântica de expedição única.

Configure uma estrutura de roteamento switch/case para um único executor de origem.

SwitchCaseEdgeGroupCase

Descrição persistente de um único branch condicional em um caso de alternância.

Ao contrário do objeto Case do runtime, essa variante serializável armazena apenas o identificador de destino e um nome descritivo para o predicado. Quando o callable subjacente está indisponível durante a desserialização, substituimos um espaço reservado de proxy que falha alto, garantindo que a dependência ausente seja imediatamente visível.

Registre os metadados de roteamento para um branch de caso condicional.

SwitchCaseEdgeGroupDefault

Descritor persistente para o branch de fallback de um grupo de maiúsculas e minúsculas.

O branch padrão tem a garantia de existir e é invocado quando todos os outros casos de predicado não correspondem ao conteúdo.

Aponte o branch padrão para o identificador de executor especificado.

TextContent

Representa o conteúdo do texto em um chat.

Inicializa uma instância textContent.

TextReasoningContent

Representa o conteúdo do raciocínio de texto em um chat.

Comentários: essa classe e TextContent são superficialmente semelhantes, mas distintas.

Inicializa uma instância textReasoningContent.

TextSpanRegion

Representa uma região de texto que foi anotada.

Inicializar TextSpanRegion.

ToolMode

Define se e como as ferramentas são usadas em uma solicitação de chat.

Inicializar ToolMode.

ToolProtocol

Representa uma ferramenta genérica.

Esse protocolo define a interface que todas as ferramentas devem implementar para serem compatíveis com a estrutura do agente. Ele é implementado por várias classes de ferramentas, como HostedMCPTool, HostedWebSearchTool e AIFunction. Uma AIFunction geralmente é criada pelo decorador de ai_function .

Como cada conector precisa analisar as ferramentas de forma diferente, os usuários podem passar um ditado para especificar uma ferramenta específica do serviço quando nenhuma abstração estiver disponível.

TypeCompatibilityError

Exceção gerada quando a incompatibilidade de tipos é detectada entre executores conectados.

UriContent

Representa um conteúdo de URI.

Importante

Isso é usado para conteúdo identificado por um URI, como uma imagem ou um arquivo.

Para URIs de dados (binários), use DataContent.

Inicializa uma instância de UriContent.

Comentários: isso é usado para conteúdo identificado por um URI, como uma imagem ou um arquivo. Para URIs de dados (binários), use DataContent .

UsageContent

Representa as informações de uso associadas a uma solicitação de chat e resposta.

Inicializa uma instância UsageContent.

UsageDetails

Fornece detalhes de uso sobre uma solicitação/resposta.

Inicializa a instância UsageDetails.

Workflow

Um mecanismo de execução baseado em grafo que orquestra executores conectados.

Visão geral

Um fluxo de trabalho executa um grafo direcionado de executores conectados por meio de grupos de borda usando um modelo semelhante a Pregel, em execução em supersteps até que o grafo fique ocioso. Fluxos de trabalho são criados usando a classe WorkflowBuilder – não instancie essa classe diretamente.

Modelo de execução

Os executores são executados em superespas sincronizadas em que cada executor:

  • É invocado quando recebe mensagens de grupos de borda conectados
  • Pode enviar mensagens para executores downstream por meio de ctx.send_message()
  • Pode produzir saídas no nível do fluxo de trabalho por meio de ctx.yield_output()
  • Pode emitir eventos personalizados por meio de ctx.add_event()

As mensagens entre executores são entregues no final de cada superestar e não são visíveis no fluxo de eventos. Somente eventos no nível do fluxo de trabalho (saídas, eventos personalizados) e eventos de status são observáveis aos chamadores.

Tipos de entrada/saída

Os tipos de fluxo de trabalho são descobertos em runtime inspecionando:

  • Tipos de entrada: dos tipos de entrada do executor inicial
  • Tipos de saída: união de todos os tipos de saída de fluxo de trabalho de executores Acesse-os por meio das propriedades input_types e output_types.

Métodos de execução

O fluxo de trabalho fornece duas APIs de execução primária, cada uma com suporte para vários cenários:

  • run(): Executar até a conclusão, retorna WorkflowRunResult com todos os eventos

  • run_stream(): retorna eventos de produção de gerador assíncrono conforme eles ocorrem

Ambos os métodos dão suporte a:

  • Execuções iniciais de fluxo de trabalho: fornecer parâmetro de mensagem
  • Restauração do ponto de verificação: forneça checkpoint_id (e, opcionalmente , checkpoint_storage)
  • Continuação hil: fornecer respostas para continuar após solicitações RequestInfoExecutor
  • Ponto de verificação de tempo de execução: forneça checkpoint_storage para habilitar/substituir o ponto de verificação para esta execução

Gerenciamento de estado

As instâncias de fluxo de trabalho contêm estados e estados são preservados entre chamadas a serem executadas e run_stream. Para executar várias execuções independentes, crie instâncias de fluxo de trabalho separadas por meio do WorkflowBuilder.

Solicitações de entrada externas

Executores em um fluxo de trabalho podem solicitar entrada externa usando ctx.request_info():

  1. Executor chama ctx.request_info() para solicitar entrada
  2. Executor implementa response_handler() para processar a resposta
  3. As solicitações são emitidas como instâncias RequestInfoEvent no fluxo de eventos
  4. O fluxo de trabalho entra IDLE_WITH_PENDING_REQUESTS estado
  5. O chamador manipula solicitações e fornece respostas por meio dos métodos send_responses ou send_responses_streaming
  6. As respostas são roteada para os executores solicitantes e os manipuladores de resposta são invocados

Definindo o ponto de verificação

O ponto de verificação pode ser configurado em tempo de build ou em runtime:

Tempo de build (via WorkflowBuilder): fluxo de trabalho = WorkflowBuilder().with_checkpointing(storage).build()

Runtime (por meio de parâmetros de execução/run_stream): resultado = await workflow.run(message, checkpoint_storage=runtime_storage)

Quando habilitados, os pontos de verificação são criados no final de cada superestar, capturando:

  • Estados do executor
  • Mensagens em trânsito
  • Fluxos de trabalho de estado compartilhado podem ser pausados e retomados entre reinicializações de processo usando o armazenamento de ponto de verificação.

Composição

Os fluxos de trabalho podem ser aninhados usando WorkflowExecutor, que encapsula um fluxo de trabalho filho como executor. Os tipos de entrada/saída do fluxo de trabalho aninhado tornam-se parte dos tipos do WorkflowExecutor. Quando invocado, o WorkflowExecutor executa o fluxo de trabalho aninhado para conclusão e processa suas saídas.

Inicialize o fluxo de trabalho com uma lista de bordas.

WorkflowAgent

Uma subclasse agent que encapsula um fluxo de trabalho e o expõe como um agente.

Inicialize o WorkflowAgent.

WorkflowBuilder

Uma classe de construtor para construir fluxos de trabalho.

Essa classe fornece uma API fluente para definir grafos de fluxo de trabalho conectando executores com bordas e configurando parâmetros de execução. Chame build para criar uma instância imutável Workflow .

Inicialize o WorkflowBuilder com uma lista vazia de bordas e nenhum executor inicial.

WorkflowCheckpoint

Representa um ponto de verificação completo do estado do fluxo de trabalho.

Os pontos de verificação capturam o estado de execução completo de um fluxo de trabalho em um ponto específico, permitindo que os fluxos de trabalho sejam pausados e retomados.

Observação

O ditado shared_state pode conter chaves reservadas gerenciadas pela estrutura.

Consulte a documentação da classe SharedState para obter detalhes sobre chaves reservadas.

WorkflowCheckpointSummary

Resumo legível por humanos de um ponto de verificação de fluxo de trabalho.

WorkflowContext

Contexto de execução que permite que os executores interajam com fluxos de trabalho e outros executores.

Visão geral

WorkflowContext fornece uma interface controlada para executores enviarem mensagens, produzirem saídas, gerenciarem o estado e interagirem com o ecossistema de fluxo de trabalho mais amplo. Ele impõe a segurança do tipo por meio de parâmetros genéricos, impedindo o acesso direto a componentes de runtime internos.

Parâmetros de tipo

O contexto é parametrizado para impor a segurança do tipo para operações diferentes:

WorkflowContext (sem parâmetros)

Para executores que só executam efeitos colaterais sem enviar mensagens ou produzir saídas:


   async def log_handler(message: str, ctx: WorkflowContext) -> None:
       print(f"Received: {message}")  # Only side effects

WorkflowContext[T_Out]

Habilita o envio de mensagens do tipo T_Out para outros executores:


   async def processor(message: str, ctx: WorkflowContext[int]) -> None:
       result = len(message)
       await ctx.send_message(result)  # Send int to downstream executors

WorkflowContext[T_Out, T_W_Out]

Habilita o envio de mensagens (T_Out) e a geração de saídas de fluxo de trabalho (T_W_Out):


   async def dual_output(message: str, ctx: WorkflowContext[int, str]) -> None:
       await ctx.send_message(42)  # Send int message
       await ctx.yield_output("complete")  # Yield str workflow output

Tipos de união

Vários tipos podem ser especificados usando notação de união:


   async def flexible(message: str, ctx: WorkflowContext[int | str, bool | dict]) -> None:
       await ctx.send_message("text")  # or send 42
       await ctx.yield_output(True)  # or yield {"status": "done"}

Inicialize o contexto do executor com o contexto de fluxo de trabalho fornecido.

WorkflowErrorDetails

Informações de erro estruturadas a serem exibidas em eventos/resultados de erro.

WorkflowEvent

Classe base para eventos de fluxo de trabalho.

Inicialize o evento de fluxo de trabalho com dados opcionais.

WorkflowExecutor

Um executor que encapsula um fluxo de trabalho para habilitar a composição de fluxo de trabalho hierárquica.

Visão geral

WorkflowExecutor faz com que um fluxo de trabalho se comporte como um único executor dentro de um fluxo de trabalho pai, habilitando arquiteturas de fluxo de trabalho aninhadas. Ele lida com o ciclo de vida completo da execução de fluxo de trabalho, incluindo processamento de eventos, encaminhamento de saída e coordenação de solicitação/resposta entre fluxos de trabalho pai e filho.

Modelo de execução

Quando invocado, WorkflowExecutor:

  1. Inicia o fluxo de trabalho encapsulado com a mensagem de entrada
  2. Executa o sub-fluxo de trabalho até a conclusão ou até que precise de entrada externa
  3. Processa o fluxo de eventos completo do sub-fluxo de trabalho após a execução
  4. Encaminha saídas para o fluxo de trabalho pai como mensagens
  5. Manipula solicitações externas roteando-as para o fluxo de trabalho pai
  6. Acumula respostas e retoma a execução do sub-fluxo de trabalho

Processamento de fluxo de eventos

WorkflowExecutor processa eventos após a conclusão do fluxo de trabalho:

Encaminhamento de saída

Todas as saídas do subconsulta são encaminhadas automaticamente para o pai:

Quando allow_direct_output é False (padrão):


   # An executor in the sub-workflow yields outputs
   await ctx.yield_output("sub-workflow result")

   # WorkflowExecutor forwards to parent via ctx.send_message()
   # Parent receives the output as a regular message

Quando allow_direct_output for True:

Coordenação de solicitação/resposta

Quando os sub-fluxos de trabalho precisam de informações externas:


   # An executor in the sub-workflow makes request
   request = MyDataRequest(query="user info")

   # WorkflowExecutor captures RequestInfoEvent and wraps it in a SubWorkflowRequestMessage
   # then send it to the receiving executor in parent workflow. The executor in parent workflow
   # can handle the request locally or forward it to an external source.
   # The WorkflowExecutor tracks the pending request, and implements a response handler.
   # When the response is received, it executes the response handler to accumulate responses
   # and resume the sub-workflow when all expected responses are received.
   # The response handler expects a SubWorkflowResponseMessage wrapping the response data.

Gerenciamento de estado

WorkflowExecutor mantém o estado de execução entre ciclos de solicitação/resposta:

  • Rastreia solicitações pendentes por request_id
  • Acumula respostas até que todas as respostas esperadas sejam recebidas
  • Retoma a execução de sub-fluxo de trabalho com o lote de resposta completo
  • Manipula execuções simultâneas e várias solicitações pendentes

Tipo de Integração do Sistema

WorkflowExecutor herda sua assinatura de tipo do fluxo de trabalho encapsulado:

Tipos de entrada

Corresponde aos tipos de entrada do executor inicial do fluxo de trabalho encapsulado:


   # If sub-workflow accepts str, WorkflowExecutor accepts str
   workflow_executor = WorkflowExecutor(my_workflow, id="wrapper")
   assert workflow_executor.input_types == my_workflow.input_types

Tipos de saída

Combina saídas de sub-fluxo de trabalho com tipos de coordenação de solicitação:


   # Includes all sub-workflow output types
   # Plus SubWorkflowRequestMessage if sub-workflow can make requests
   output_types = workflow.output_types + [SubWorkflowRequestMessage]  # if applicable

Tratamento de erros

WorkflowExecutor propaga falhas de sub-fluxo de trabalho:

  • Captura WorkflowFailedEvent do sub-fluxo de trabalho
  • Converte em WorkflowErrorEvent no contexto pai
  • Fornece informações detalhadas de erro, incluindo a ID do sub-fluxo de trabalho

Suporte à execução simultânea

O WorkflowExecutor dá suporte total a várias execuções simultâneas de sub-fluxo de trabalho:

Isolamento de estado Per-Execution

Cada invocação de sub-fluxo de trabalho cria um ExecutionContext isolado:


   # Multiple concurrent invocations are supported
   workflow_executor = WorkflowExecutor(my_workflow, id="concurrent_executor")

   # Each invocation gets its own execution context
   # Execution 1: processes input_1 independently
   # Execution 2: processes input_2 independently
   # No state interference between executions

Coordenação de solicitação/resposta

As respostas são roteadas corretamente para a execução de origem:

  • Cada execução rastreia suas próprias solicitações pendentes e respostas esperadas
  • O mapeamento de solicitação para execução garante que as respostas atinjam o sub-fluxo de trabalho correto
  • O acúmulo de resposta é isolado por execução
  • Limpeza automática quando a execução é concluída

Gerenciamento de memória

  • Execuções simultâneas ilimitadas com suporte
  • Cada execução tem identificação exclusiva baseada em UUID
  • Limpeza de contextos de execução concluídos
  • Gerenciamento de estado thread-safe para acesso simultâneo

Considerações importantes

Instância de Fluxo de Trabalho Compartilhado: todas as execuções simultâneas usam a mesma instância de fluxo de trabalho subjacente. Para isolamento adequado, verifique se o fluxo de trabalho encapsulado e seus executores estão sem estado.


   # Avoid: Stateful executor with instance variables
   class StatefulExecutor(Executor):
       def __init__(self):
           super().__init__(id="stateful")
           self.data = []  # This will be shared across concurrent executions!

Integração com fluxos de trabalho pai

Os fluxos de trabalho pai podem interceptar solicitações de sub-fluxo de trabalho:

Notas de Implementação

  • Os sub-fluxos de trabalho são executados até a conclusão antes de processar seus resultados
  • O processamento de eventos é atômico – todas as saídas são encaminhadas antes das solicitações
  • O acúmulo de resposta garante que os sub-fluxos de trabalho recebam lotes de resposta completos
  • O estado de execução é mantido para a retomada adequada após solicitações externas
  • Execuções simultâneas são totalmente isoladas e não interferem entre si

Inicialize o WorkflowExecutor.

WorkflowFailedEvent

Evento de ciclo de vida interno emitido quando uma execução de fluxo de trabalho termina com um erro.

WorkflowOutputEvent

Evento disparado quando um executor de fluxo de trabalho gera saída.

Inicialize o evento de saída do fluxo de trabalho.

WorkflowRunResult

Contêiner para eventos gerados durante a execução de fluxo de trabalho sem streaming.

Visão geral

Representa os resultados de execução completos de uma execução de fluxo de trabalho, contendo todos os eventos gerados do estado inicial ao ocioso. Os fluxos de trabalho produzem saídas incrementalmente por meio de chamadas ctx.yield_output() durante a execução.

Estrutura de eventos

Mantém a separação entre eventos do plano de dados e do plano de controle:

  • Eventos do plano de dados: invocações, conclusões, saídas e solicitações do executor (na lista principal)
  • Eventos do plano de controle: linha do tempo de status acessível por meio do método status_timeline()

Métodos de chave

  • get_outputs(): extrair todas as saídas de fluxo de trabalho da execução
  • get_request_info_events(): recuperar solicitações de entrada externas feitas durante a execução
  • get_final_state(): Obter o estado final do fluxo de trabalho (IDLE, IDLE_WITH_PENDING_REQUESTS etc.)
  • status_timeline(): Acessar o histórico de eventos de status completo
WorkflowStartedEvent

Evento de ciclo de vida interno emitido quando uma execução de fluxo de trabalho começa.

Inicialize o evento de fluxo de trabalho com dados opcionais.

WorkflowStatusEvent

Evento de ciclo de vida interno emitido para transições de estado de execução de fluxo de trabalho.

Inicialize o evento de status do fluxo de trabalho com um novo estado e dados opcionais.

WorkflowValidationError

Exceção base para erros de validação de fluxo de trabalho.

WorkflowViz

Uma classe para visualizar fluxos de trabalho usando graphviz e Sereia.

Inicialize o WorkflowViz com um fluxo de trabalho.

Enumerações

MagenticHumanInterventionDecision

Opções de decisão para respostas de intervenção humana.

MagenticHumanInterventionKind

O tipo de intervenção humana que está sendo solicitada.

ValidationTypeEnum

Enumeração de tipos de validação de fluxo de trabalho.

WorkflowEventSource

Identifica se um evento de fluxo de trabalho veio da estrutura ou de um executor.

Use o FRAMEWORK para eventos emitidos por caminhos de orquestração internos, mesmo quando o código que os gera vidas em módulos relacionados a executores, e EXECUTOR para eventos exibidos por implementações de executor fornecidas pelo desenvolvedor.

WorkflowRunState

Estado em nível de execução de uma execução de fluxo de trabalho.

Semântica:

  • STARTED: A execução foi iniciada e o contexto do fluxo de trabalho foi criado. Esse é um estado inicial antes de qualquer trabalho significativo ser executado. Nessa base de código, emitemos um WorkflowStartedEvent dedicado para telemetria e, normalmente, avançamos o status diretamente para IN_PROGRESS. Os consumidores ainda podem contar com STARTED para computadores de estado que precisam de uma fase de pré-trabalho explícita.

  • IN_PROGRESS: o fluxo de trabalho está sendo executado ativamente (por exemplo, a mensagem inicial foi entregue ao executor inicial ou um superstep está em execução). Esse status é emitido no início de uma execução e pode ser seguido por outros status à medida que a execução progride.

  • IN_PROGRESS_PENDING_REQUESTS: execução ativa enquanto uma ou mais operações de solicitação de informações estão pendentes. O novo trabalho ainda pode ser agendado enquanto as solicitações estão em pré-lançamento.

  • OCIOSO: o fluxo de trabalho é quiescente sem solicitações pendentes e não há mais trabalho a ser feito. Esse é o estado normal do terminal para fluxos de trabalho que terminaram de ser executados, potencialmente tendo produzido saídas ao longo do caminho.

  • IDLE_WITH_PENDING_REQUESTS: o fluxo de trabalho está em pausa aguardando entrada externa (por exemplo, emitido um RequestInfoEvent). Esse é um estado não terminal; o fluxo de trabalho pode ser retomado quando as respostas são fornecidas.

  • FALHA: o estado do terminal indicando que um erro veio à tona. Acompanhado por um WorkflowFailedEvent com detalhes de erro estruturados.

  • CANCELLED: Estado do terminal que indica que a execução foi cancelada por um chamador ou orquestrador. Não emitidos atualmente por caminhos de executor padrão, mas incluídos para integradores/orquestradores que dão suporte ao cancelamento.

Funções

agent_middleware

Decorador para marcar uma função como middleware do agente.

Esse decorador identifica explicitamente uma função como middleware de agente, que processa objetos AgentRunContext.

agent_middleware(func: Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]

Parâmetros

Nome Description
func
Obrigatório

A função de middleware para marcar como middleware do agente.

Retornos

Tipo Description

A mesma função com o marcador de middleware do agente.

Exemplos


   from agent_framework import agent_middleware, AgentRunContext, ChatAgent


   @agent_middleware
   async def logging_middleware(context: AgentRunContext, next):
       print(f"Before: {context.agent.name}")
       await next(context)
       print(f"After: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

ai_function

Decore uma função para transformá-la em uma AIFunction que pode ser passada para modelos e executada automaticamente.

Esse decorador cria um modelo Pydantic a partir da assinatura da função, que será usado para validar os argumentos passados para a função e para gerar o esquema JSON para os parâmetros da função.

Para adicionar descrições aos parâmetros, use o Annotated tipo com uma descrição de cadeia de typing caracteres como o segundo argumento. Você também pode usar a classe Pydantic Field para uma configuração mais avançada.

Observação

Quando approval_mode for definido como "always_require", a função não será executada

até que a aprovação explícita seja dada, isso só se aplica ao fluxo de invocação automática.

Também é importante observar que, se o modelo retornar várias chamadas de função, algumas que exigem aprovação

e outros que não o fizerem, ele pedirá aprovação para todos eles.

ai_function(func: Callable[[...], ReturnT | Awaitable[ReturnT]] | None = None, *, name: str | None = None, description: str | None = None, approval_mode: Literal['always_require', 'never_require'] | None = None, max_invocations: int | None = None, max_invocation_exceptions: int | None = None, additional_properties: dict[str, Any] | None = None) -> AIFunction[Any, ReturnT] | Callable[[Callable[[...], ReturnT | Awaitable[ReturnT]]], AIFunction[Any, ReturnT]]

Parâmetros

Nome Description
func
Callable[[...], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]] | None

A função a ser decorada.

Valor padrão: None
name
Obrigatório
str | None
description
Obrigatório
str | None
approval_mode
Obrigatório
Literal['always_require', 'never_require'] | None
max_invocations
Obrigatório
int | None
max_invocation_exceptions
Obrigatório
int | None
additional_properties
Obrigatório

Parâmetros somente de palavra-chave

Nome Description
name

O nome da função. Se não for fornecido, o atributo da __name__ função será usado.

Valor padrão: None
description

Uma descrição da função. Se não for fornecido, o docstring da função será usado.

Valor padrão: None
approval_mode

Se a aprovação é necessária ou não para executar essa ferramenta. O padrão é que a aprovação não é necessária.

Valor padrão: None
max_invocations

O número máximo de vezes que essa função pode ser invocada. Se Nenhum, não há limite, deve ser pelo menos 1.

Valor padrão: None
max_invocation_exceptions

O número máximo de exceções permitidas durante invocações. Se Nenhum, não há limite, deve ser pelo menos 1.

Valor padrão: None
additional_properties

Propriedades adicionais a serem definidas na função.

Valor padrão: None

Retornos

Tipo Description
AIFunction[Any, <xref:agent_framework._tools.ReturnT>] | Callable[[Callable[[…], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]]], AIFunction[Any, <xref:agent_framework._tools.ReturnT>]]

Exemplos


   from agent_framework import ai_function
   from typing import Annotated


   @ai_function
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # the same function but with approval required to run
   @ai_function(approval_mode="always_require")
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # With custom name and description
   @ai_function(name="custom_weather", description="Custom weather function")
   def another_weather_func(location: str) -> str:
       return f"Weather in {location}"


   # Async functions are also supported
   @ai_function
   async def async_get_weather(location: str) -> str:
       '''Get weather asynchronously.'''
       # Simulate async operation
       return f"Weather in {location}"

chat_middleware

Decorador para marcar uma função como middleware de chat.

Esse decorador identifica explicitamente uma função como middleware de chat, que processa objetos ChatContext.

chat_middleware(func: Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]

Parâmetros

Nome Description
func
Obrigatório

A função middleware a ser marcado como middleware de chat.

Retornos

Tipo Description

A mesma função com o marcador de middleware de chat.

Exemplos


   from agent_framework import chat_middleware, ChatContext, ChatAgent


   @chat_middleware
   async def logging_middleware(context: ChatContext, next):
       print(f"Messages: {len(context.messages)}")
       await next(context)
       print(f"Response: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

create_edge_runner

Função de fábrica para criar o executor de borda apropriado para um grupo de borda.

create_edge_runner(edge_group: EdgeGroup, executors: dict[str, Executor]) -> EdgeRunner

Parâmetros

Nome Description
edge_group
Obrigatório
<xref:agent_framework._workflows._edge.EdgeGroup>

O grupo de borda para o qual criar um executor.

executors
Obrigatório

Mapa de IDs do executor para instâncias de executor.

Retornos

Tipo Description
<xref:agent_framework._workflows._edge_runner.EdgeRunner>

A instância apropriada do EdgeRunner.

executor

Decorador que converte uma função autônoma em uma instância do FunctionExecutor.

O @executor decorador foi projetado apenas para funções autônomas no nível do módulo. Para executores baseados em classe, use a classe base Executor com @handler métodos de instância.

Dá suporte a funções síncronas e assíncronas. Funções síncronas são executadas em um pool de threads para evitar o bloqueio do loop de eventos.

Importante

Usar @executor para funções autônomas (funções locais ou no nível do módulo)

Não usar @executor com staticmethod ou classmethod

Para executores baseados em classe, executor de subclasse e uso @handler em métodos de instância

Uso:


   # Standalone async function (RECOMMENDED):
   @executor(id="upper_case")
   async def to_upper(text: str, ctx: WorkflowContext[str]):
       await ctx.send_message(text.upper())


   # Standalone sync function (runs in thread pool):
   @executor
   def process_data(data: str):
       return data.upper()


   # For class-based executors, use @handler instead:
   class MyExecutor(Executor):
       def __init__(self):
           super().__init__(id="my_executor")

       @handler
       async def process(self, data: str, ctx: WorkflowContext[str]):
           await ctx.send_message(data.upper())
executor(func: Callable[[...], Any] | None = None, *, id: str | None = None) -> Callable[[Callable[[...], Any]], FunctionExecutor] | FunctionExecutor

Parâmetros

Nome Description
func
Callable[[...], Any] | None

A função a ser decorada (quando usada sem parênteses)

Valor padrão: None
id
Obrigatório
str | None

ID personalizada opcional para o executor. Se Nenhum, usará o nome da função.

Parâmetros somente de palavra-chave

Nome Description
id
Valor padrão: None

Retornos

Tipo Description

Uma instância do FunctionExecutor que pode ser conectada a um fluxo de trabalho.

Exceções

Tipo Description

Se usado com staticmethod ou classmethod (padrão sem suporte)

function_middleware

Decorador para marcar uma função como middleware de função.

Esse decorador identifica explicitamente uma função como middleware de função, que processa objetos FunctionInvocationContext.

function_middleware(func: Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]

Parâmetros

Nome Description
func
Obrigatório

A função middleware a ser marcado como middleware de função.

Retornos

Tipo Description

A mesma função com o marcador de middleware de função.

Exemplos


   from agent_framework import function_middleware, FunctionInvocationContext, ChatAgent


   @function_middleware
   async def logging_middleware(context: FunctionInvocationContext, next):
       print(f"Calling: {context.function.name}")
       await next(context)
       print(f"Result: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

get_checkpoint_summary

get_checkpoint_summary(checkpoint: WorkflowCheckpoint) -> WorkflowCheckpointSummary

Parâmetros

Nome Description
checkpoint
Obrigatório

Retornos

Tipo Description

get_logger

Obtenha um agente com o nome especificado, padrão para 'agent_framework'.

get_logger(name: str = 'agent_framework') -> Logger

Parâmetros

Nome Description
name
str

O nome do agente. O padrão é "agent_framework".

Valor padrão: "agent_framework"

Retornos

Tipo Description

A instância do agente configurada.

handler

Decorador para registrar um manipulador para um executor.

handler(func: Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]) -> Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]

Parâmetros

Nome Description
func
Obrigatório
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

A função a ser decorada. Pode ser Nenhum quando usado sem parâmetros.

Retornos

Tipo Description
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

A função decorada com metadados do manipulador.

Exemplos

@handler async def handle_string(self, message: str, ctx: WorkflowContext[str]) -> None:

...

@handler async def handle_data(self, message: dict, ctx: WorkflowContext[str | int]) -> Nenhum:

...

prepare_function_call_results

Prepare os valores dos resultados da chamada de função.

prepare_function_call_results(content: TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any | list[TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any]) -> str

Parâmetros

Nome Description
content
Obrigatório

Retornos

Tipo Description
str

prepend_agent_framework_to_user_agent

Prepend "agent-framework" to the User-Agent in the headers.

Quando a telemetria do agente do usuário é desabilitada por meio da AGENT_FRAMEWORK_USER_AGENT_DISABLED variável de ambiente, o cabeçalho User-Agent não incluirá as informações da estrutura do agente. Ele será enviado de volta como está ou como um ditado vazio quando Nenhum for passado.

prepend_agent_framework_to_user_agent(headers: dict[str, Any] | None = None) -> dict[str, Any]

Parâmetros

Nome Description
headers

O dicionário de cabeçalhos existente.

Valor padrão: None

Retornos

Tipo Description

Um novo ditado com "User-Agent" definido como "agent-framework-python/{version}" se os cabeçalhos forem Nenhum. O dicionário de cabeçalhos modificado com "agent-framework-python/{version}" anexado ao User-Agent.

Exemplos


   from agent_framework import prepend_agent_framework_to_user_agent

   # Add agent-framework to new headers
   headers = prepend_agent_framework_to_user_agent()
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0"

   # Prepend to existing headers
   existing = {"User-Agent": "my-app/1.0"}
   headers = prepend_agent_framework_to_user_agent(existing)
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0 my-app/1.0"

response_handler

Decorador para registrar um manipulador para lidar com as respostas de uma solicitação.

response_handler(func: Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]) -> Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]

Parâmetros

Nome Description
func
Obrigatório
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

A função a ser decorada.

Retornos

Tipo Description
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

A função decorada com metadados do manipulador.

Exemplos


   @handler
   async def run(self, message: int, context: WorkflowContext[str]) -> None:
       # Example of a handler that sends a request
       ...
       # Send a request with a `CustomRequest` payload and expect a `str` response.
       await context.request_info(CustomRequest(...), str)


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: str,
       context: WorkflowContext[str],
   ) -> None:
       # Example of a response handler for the above request
       ...


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: dict,
       context: WorkflowContext[int],
   ) -> None:
       # Example of a response handler for a request expecting a dict response
       ...

setup_logging

Configure a configuração de log para a estrutura do agente.

setup_logging() -> None

Retornos

Tipo Description

use_agent_middleware

Decorador de classe que adiciona suporte a middleware a uma classe de agente.

Esse decorador adiciona funcionalidade de middleware a qualquer classe de agente. Ele encapsula os run() métodos e run_stream() os métodos para fornecer a execução do middleware.

A execução do middleware pode ser encerrada a qualquer momento definindo a context.terminate propriedade como True. Uma vez definido, o pipeline interromperá a execução de mais middleware assim que o controle retornar ao pipeline.

Observação

Esse decorador já é aplicado a classes de agente internas. Você só precisa usar

se você estiver criando implementações de agente personalizado.

use_agent_middleware(agent_class: type[TAgent]) -> type[TAgent]

Parâmetros

Nome Description
agent_class
Obrigatório
type[<xref:TAgent>]

A classe de agente à qual adicionar suporte a middleware.

Retornos

Tipo Description
type[~<xref:TAgent>]

A classe de agente modificada com suporte a middleware.

Exemplos


   from agent_framework import use_agent_middleware


   @use_agent_middleware
   class CustomAgent:
       async def run(self, messages, **kwargs):
           # Agent implementation
           pass

       async def run_stream(self, messages, **kwargs):
           # Streaming implementation
           pass

use_chat_middleware

Decorador de classe que adiciona suporte a middleware a uma classe de cliente de chat.

Este decorador adiciona funcionalidade de middleware a qualquer classe de cliente de chat. Ele encapsula os get_response() métodos e get_streaming_response() os métodos para fornecer a execução do middleware.

Observação

Esse decorador já é aplicado a classes de cliente de chat internas. Você só precisa usar

se você estiver criando implementações personalizadas do cliente de chat.

use_chat_middleware(chat_client_class: type[TChatClient]) -> type[TChatClient]

Parâmetros

Nome Description
chat_client_class
Obrigatório
type[<xref:TChatClient>]

A classe de cliente de chat à qual adicionar suporte a middleware.

Retornos

Tipo Description
type[~<xref:TChatClient>]

A classe de cliente de chat modificada com suporte a middleware.

Exemplos


   from agent_framework import use_chat_middleware


   @use_chat_middleware
   class CustomChatClient:
       async def get_response(self, messages, **kwargs):
           # Chat client implementation
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Streaming implementation
           pass

use_function_invocation

Decorador de classe que habilita a ferramenta de chamada para um cliente de chat.

Esse decorador encapsula os métodos e get_response os get_streaming_response métodos para lidar automaticamente com chamadas de função do modelo, executá-las e retornar os resultados para o modelo para processamento adicional.

use_function_invocation(chat_client: type[TChatClient]) -> type[TChatClient]

Parâmetros

Nome Description
chat_client
Obrigatório
type[<xref:TChatClient>]

A classe de cliente de chat a ser decorada.

Retornos

Tipo Description
type[~<xref:TChatClient>]

A classe de cliente de chat decorada com invocação de função habilitada.

Exceções

Tipo Description

Se o cliente de chat não tiver os métodos necessários.

Exemplos


   from agent_framework import use_function_invocation, BaseChatClient


   @use_function_invocation
   class MyCustomClient(BaseChatClient):
       async def get_response(self, messages, **kwargs):
           # Implementation here
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Implementation here
           pass


   # The client now automatically handles function calls
   client = MyCustomClient()

validate_workflow_graph

Função de conveniência para validar um grafo de fluxo de trabalho.

validate_workflow_graph(edge_groups: Sequence[EdgeGroup], executors: dict[str, Executor], start_executor: Executor) -> None

Parâmetros

Nome Description
edge_groups
Obrigatório
Sequence[<xref:agent_framework._workflows._edge.EdgeGroup>]

lista de grupos de borda no fluxo de trabalho

executors
Obrigatório

Mapa de IDs do executor para instâncias de executor

start_executor
Obrigatório

O executor inicial (pode ser instância ou ID)

Retornos

Tipo Description

Exceções

Tipo Description

Se alguma validação falhar