Partilhar via


agent_framework Pacote

Pacotes

a2a
ag_ui
anthropic
azure
chatkit
declarative
devui
lab
mem0
microsoft
ollama
openai
redis

Módulos

exceptions
observability

Classes

AIFunction

Uma ferramenta que envolve uma função Python para a tornar chamável por modelos de IA.

Esta classe envolve uma função Python para a tornar chamável por modelos de IA com validação automática de parâmetros e geração de esquemas JSON.

Inicialize a Introdução da AIFunction.

AgentExecutor

executor incorporado que envolve um agente para o tratamento de mensagens.

O AgentExecutor adapta o seu comportamento com base no modo de execução do fluxo de trabalho:

  • run_stream(): Emite eventos incrementais AgentRunUpdateEvent à medida que o agente produz tokens
  • run(): Emite um único AgentRunEvent contendo a resposta completa

O executor deteta automaticamente o modo através de WorkflowContext.is_streaming().

Inicialize o executor com um identificador único.

AgentExecutorRequest

Um pedido a um agente executor.

AgentExecutorResponse

Uma resposta de um executor agente.

AgentInputRequest

Pede input humano antes de um agente correr em fluxos de trabalho de alto nível do builder.

Emitido via RequestInfoEvent quando um fluxo de trabalho pausa antes de um agente ser executado. A resposta é injetada na conversa como uma mensagem do utilizador para orientar o comportamento do agente.

Este é o tipo de pedido padrão usado por .with_request_info() no SequentialBuilder, ConcurrentBuilder, GroupChatBuilder e HandoffBuilder.

AgentMiddleware

Classe base abstrata para middleware de agentes que pode intercetar invocações de agentes.

O middleware de agentes permite intercetar e modificar invocações de agentes antes e depois da execução. Pode inspecionar mensagens, modificar o contexto, sobrescrever resultados ou terminar a execução mais cedo.

Observação

AgentMiddleware é uma classe base abstrata. Tens de o subclassificar e implementar

O método Process() para criar middleware de agentes personalizados.

AgentProtocol

Um protocolo para um agente que pode ser invocado.

Este protocolo define a interface que todos os agentes devem implementar, incluindo propriedades para identificação e métodos para execução.

Observação

Os protocolos usam subtipagem estrutural (tipagem duck). As aulas não precisam

herdar explicitamente deste protocolo para ser considerado compatível.

Isto permite-lhe criar agentes completamente personalizados sem usar

quaisquer classes base do Agent Framework.

AgentRunContext

Objeto de contexto para invocações de middleware de agentes.

Este contexto é passado pelo pipeline de middleware do agente e contém toda a informação sobre a invocação do agente.

Inicialize o AgentRunContext.

AgentRunEvent

Evento desencadeado quando uma execução de agente é concluída.

Inicialize o evento de execução do agente.

AgentRunResponse

Representa a resposta a um pedido de execução de Agente.

Fornece uma ou mais mensagens de resposta e metadados sobre a resposta. Uma resposta típica conterá uma única mensagem, mas pode conter múltiplas mensagens em cenários que envolvam chamadas de funções, recuperações RAG ou lógica complexa.

Inicialize um AgentRunResponse.

AgentRunResponseUpdate

Representa um único bloco de resposta em streaming de um Agente.

Inicializar um AgentRunResponseUpdate.

AgentRunUpdateEvent

Evento desencadeado quando um agente está a transmitir mensagens.

Inicia o evento de streaming do agente.

AgentThread

A classe thread Agent, que pode representar tanto uma thread gerida localmente como uma thread gerida pelo serviço.

An AgentThread mantém o estado da conversa e o histórico de mensagens para a interação com o agente. Pode usar um fio gerido por serviços (via service_thread_id) ou um armazenamento local de mensagens (via message_store), mas não ambos.

Inicialize um AgentThread, não use este método manualmente, use sempre: agent.get_new_thread().

Observação

Podem ser definidos service_thread_id ou message_store, mas não ambos.

AggregateContextProvider

Um ContextProvider que contém múltiplos fornecedores de contexto.

Delega eventos a múltiplos fornecedores de contexto e agrega respostas desses eventos antes de regressar. Isto permite combinar vários fornecedores de contexto num único fornecedor.

Observação

Um AggregateContextProvider é criado automaticamente quando passa um único contexto

ou uma sequência de fornecedores de contexto para o construtor agente.

Inicialize o AggregateContextProvider com fornecedores de contexto.

BaseAgent

Classe base para todos os agentes do Agent Framework.

Esta classe fornece funcionalidades essenciais para implementações de agentes, incluindo fornecedores de contexto, suporte a middleware e gestão de threads.

Observação

O BaseAgent não pode ser instanciado diretamente porque não implementa o

run(), run_stream(), e outros métodos exigidos pelo AgentProtocol.

Usa uma implementação concreta como o ChatAgent ou cria uma subclasse.

Inicializar uma instância BaseAgent.

BaseAnnotation

Classe base para todos os tipos de Anotação de IA.

Inicializar BaseAnnotation.

BaseChatClient

Classe base para clientes de chat.

Esta classe base abstrata fornece funcionalidades essenciais para implementações de clientes de chat, incluindo suporte a middleware, preparação de mensagens e normalização de ferramentas.

Observação

BaseChatClient não pode ser instanciado diretamente porque é uma classe base abstrata.

Subclasses devem implementar _inner_get_response() e _inner_get_streaming_response().

Inicializar uma instância BaseChatClient.

BaseContent

Representa o conteúdo utilizado por serviços de IA.

Inicializar o BaseContent.

Case

Wrapper em tempo de execução que combina um predicado switch-case com o seu destino.

Cada Caso associa um predicado booleano ao executor que deve tratar da mensagem quando o predicado for avaliado para Verdadeiro. O runtime mantém este contentor leve separado do SwitchCaseEdgeGroupCase serializável, permitindo que a execução possa operar com chamadas ao vivo sem poluir o estado persistente.

ChatAgent

Um Agente de Chat para Clientes.

Esta é a implementação principal do agente que utiliza um cliente de chat para interagir com modelos de linguagem. Suporta ferramentas, fornecedores de contexto, middleware e respostas tanto em streaming como em não-streaming.

Inicializar uma instância ChatAgent.

Observação

O conjunto de parâmetros de frequency_penalty a request_kwargs é usado para

Liga para o cliente de chat. Também podem ser passados para ambos os métodos de execução.

Quando ambos são definidos, os que passam para os métodos de execução têm prioridade.

ChatClientProtocol

Um protocolo para um cliente de chat que pode gerar respostas.

Este protocolo define a interface que todos os clientes de chat devem implementar, incluindo métodos para gerar respostas tanto em streaming como em não-streaming.

Observação

Os protocolos usam subtipagem estrutural (tipagem duck). As aulas não precisam

herdar explicitamente deste protocolo para ser considerado compatível.

ChatContext

Objeto de contexto para invocações de middleware de chat.

Este contexto é passado pelo pipeline de middleware de chat e contém toda a informação sobre o pedido de chat.

Inicialize o ChatContext.

ChatMessage

Representa uma mensagem de chat.

Inicializar ChatMessage.

ChatMessageStore

Uma implementação em memória do ChatMessageStoreProtocol que armazena mensagens numa lista.

Esta implementação fornece um armazenamento simples baseado em listas para mensagens de chat, com suporte para serialização e desserialização. Implementa todos os métodos necessários do ChatMessageStoreProtocol protocolo.

O armazenamento mantém mensagens em memória e fornece métodos para serializar e desserializar o estado para fins de persistência.

Crie um ChatMessageStore para usar num tópico.

ChatMessageStoreProtocol

Define métodos para armazenar e recuperar mensagens de chat associadas a um tópico específico.

As implementações deste protocolo são responsáveis por gerir o armazenamento das mensagens de chat, incluindo o manuseamento de grandes volumes de dados através de truncamento ou resumo das mensagens conforme necessário.

ChatMiddleware

Classe base abstrata para middleware de chat que pode interceptar pedidos de clientes de chat.

O middleware de chat permite-lhe intercetar e modificar pedidos do cliente de chat antes e depois da execução. Pode modificar mensagens, adicionar comandos de sistema, registar pedidos ou sobrepor respostas do chat.

Observação

ChatMiddleware é uma classe base abstrata. Tens de o subclassificar e implementar

O método Process() para criar middleware de chat personalizado.

ChatOptions

Definições comuns de pedidos para serviços de IA.

Inicializar o ChatOptions.

ChatResponse

Representa a resposta a um pedido de chat.

Inicializa um ChatResponse com os parâmetros fornecidos.

ChatResponseUpdate

Representa um único bloco de resposta em streaming de um ChatClient.

Inicializa um ChatResponseUpdate com os parâmetros fornecidos.

CheckpointStorage

Protocolo para backends de armazenamento em checkpoint.

CitationAnnotation

Representa uma anotação de citação.

Inicializar CitationAnnotation.

ConcurrentBuilder

Construtor de alto nível para fluxos de trabalho concorrentes de agentes.

  • participantes([...]) aceita uma lista de AgentProtocol (recomendado) ou Executor.

  • register_participants([...]) aceita uma lista de fábricas para o AgentProtocol (recomendado)

    ou fábricas executoras

  • fios build(): despachante -> dispersão -> participantes -> ventilação -> agregador.

  • with_aggregator(...) sobrepõe o agregador predefinido com um Executor ou callback.

  • register_aggregator(...) aceita uma fábrica para um Executor como agregador personalizado.

Utilização:


   from agent_framework import ConcurrentBuilder

   # Minimal: use default aggregator (returns list[ChatMessage])
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).build()

   # With agent factories
   workflow = ConcurrentBuilder().register_participants([create_agent1, create_agent2, create_agent3]).build()


   # Custom aggregator via callback (sync or async). The callback receives
   # list[AgentExecutorResponse] and its return value becomes the workflow's output.
   def summarize(results: list[AgentExecutorResponse]) -> str:
       return " | ".join(r.agent_run_response.messages[-1].text for r in results)


   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_aggregator(summarize).build()


   # Custom aggregator via a factory
   class MyAggregator(Executor):
       @handler
       async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None:
           await ctx.yield_output(" | ".join(r.agent_run_response.messages[-1].text for r in results))


   workflow = (
       ConcurrentBuilder()
       .register_participants([create_agent1, create_agent2, create_agent3])
       .register_aggregator(lambda: MyAggregator(id="my_aggregator"))
       .build()
   )


   # Enable checkpoint persistence so runs can resume
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_checkpointing(storage).build()

   # Enable request info before aggregation
   workflow = ConcurrentBuilder().participants([agent1, agent2]).with_request_info().build()
Context

Uma classe contendo qualquer contexto que deva ser fornecido ao modelo de IA, conforme fornecido por um ContextProvider.

Cada ContextProvider tem a capacidade de fornecer o seu próprio contexto para cada invocação. A classe Context contém o contexto adicional fornecido pelo ContextProvider. Este contexto será combinado com o fornecido por outros fornecedores antes de ser passado para o modelo de IA. Este contexto é por invocação e não será armazenado como parte do histórico do chat.

Crie um novo objeto de Contexto.

ContextProvider

Classe base para todos os fornecedores de contexto.

Um fornecedor de contexto é um componente que pode ser usado para melhorar a gestão de contexto da IA. Pode ouvir alterações na conversa e fornecer contexto adicional ao modelo de IA pouco antes da invocação.

Observação

ContextProvider é uma classe base abstrata. Tens de o subclassificar e implementar

o método invoking() para criar um fornecedor de contexto personalizado. Idealmente, devias

implemente também os métodos invocado() e thread_created() para acompanhar conversas

mas estes são opcionais.

DataContent

Representa conteúdo de dados binários com um tipo de media associado (também conhecido como tipo MIME).

Importante

Isto é para dados binários que são representados como um URI de dados, não para recursos online.

Use o UriContent para recursos online.

Inicializa uma instância DataContent.

Importante

Isto é para dados binários que são representados como um URI de dados, não para recursos online.

Use o UriContent para recursos online.

Default

Representação em tempo de execução do ramo padrão num grupo switch-case.

O desvio padrão é invocado apenas quando nenhum outro predicado de caso coincide. Na prática, é garantido que existe para que o encaminhamento nunca produza um alvo vazio.

Edge

Modele uma transferência direcionada, opcionalmente condicional, entre dois executores.

Cada Edge captura os metadados mínimos necessários para mover uma mensagem de um executor para outro dentro do grafo de workflow. Opcionalmente, incorpora um predicado booleano que decide se a aresta deve ser tomada em tempo de execução. Ao serializar a aresta até às primitivas, podemos reconstruir a topologia de um fluxo de trabalho independentemente do processo original em Python.

Inicialize uma aresta totalmente especificada entre dois executores de workflow.

EdgeDuplicationError

Exceção levantada quando são detetadas arestas duplicadas no fluxo de trabalho.

ErrorContent

Representa um erro.

Observações: Normalmente usado para erros não fatais, quando algo correu mal como parte da operação, mas a operação ainda assim conseguiu continuar.

Inicializa uma instância ErrorContent.

Executor

Classe base para todos os executores de workflow que processam mensagens e realizam cálculos.

Visão geral

Os executores são os blocos fundamentais dos fluxos de trabalho, representando unidades de processamento individuais que recebem mensagens, realizam operações e produzem resultados. Cada executor é identificado de forma única e pode lidar com tipos específicos de mensagens através de métodos de handler decorados.

Sistema de Tipos

Os executores têm um sistema de tipos rico que define as suas capacidades:

Tipos de entrada

Os tipos de mensagens que um executor pode processar, descobertos a partir das assinaturas do método handler:


   class MyExecutor(Executor):
       @handler
       async def handle_string(self, message: str, ctx: WorkflowContext) -> None:
           # This executor can handle 'str' input types

Acesso através da propriedade input_types .

Tipos de Saída

Os tipos de mensagens que um executor pode enviar a outros executores via ctx.send_message():


   class MyExecutor(Executor):
       @handler
       async def handle_data(self, message: str, ctx: WorkflowContext[int | bool]) -> None:
           # This executor can send 'int' or 'bool' messages

Acesso através da propriedade output_types .

Tipos de Saída do Fluxo de Trabalho

Os tipos de dados que um executor pode emitir como saídas ao nível do fluxo de trabalho via ctx.yield_output():


   class MyExecutor(Executor):
       @handler
       async def process(self, message: str, ctx: WorkflowContext[int, str]) -> None:
           # Can send 'int' messages AND yield 'str' workflow outputs

Acesso através da propriedade workflow_output_types .

Descoberta do Handler

Os executores descobrem as suas capacidades através de métodos decorados:

@handler Decorador

Marca métodos que processam mensagens recebidas:


   class MyExecutor(Executor):
       @handler
       async def handle_text(self, message: str, ctx: WorkflowContext[str]) -> None:
           await ctx.send_message(message.upper())

Interceção de Pedidos de Subfluxo de Trabalho

Use @handler métodos para intercetar pedidos de subfluxo de trabalho:


   class ParentExecutor(Executor):
       @handler
       async def handle_subworkflow_request(
           self,
           request: SubWorkflowRequestMessage,
           ctx: WorkflowContext[SubWorkflowResponseMessage],
       ) -> None:
           if self.is_allowed(request.domain):
               response = request.create_response(data=True)
               await ctx.send_message(response, target_id=request.executor_id)
           else:
               await ctx.request_info(request.source_event, response_type=request.source_event.response_type)

Tipos de contexto

Os métodos handler recebem diferentes variantes do WorkflowContext com base nas suas anotações de tipos:

WorkflowContext (sem parâmetros de tipo)

Para manipuladores que só realizam efeitos secundários sem enviar mensagens ou obter saídas:


   class LoggingExecutor(Executor):
       @handler
       async def log_message(self, msg: str, ctx: WorkflowContext) -> None:
           print(f"Received: {msg}")  # Only logging, no outputs

WorkflowContexto[T_Out]

Permite enviar mensagens do tipo T_Out via ctx.send_message():


   class ProcessorExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int]) -> None:
           await ctx.send_message(42)  # Can send int messages

WorkflowContext[T_Out, T_W_Out]

Permite tanto o envio de mensagens (T_Out) como a obtenção de saídas de fluxo de trabalho (T_W_Out):


   class DualOutputExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int, str]) -> None:
           await ctx.send_message(42)  # Send int message
           await ctx.yield_output("done")  # Yield str workflow output

Executores de Funções

Funções simples podem ser convertidas em executores usando o decorador @executor :


   @executor
   async def process_text(text: str, ctx: WorkflowContext[str]) -> None:
       await ctx.send_message(text.upper())


   # Or with custom ID:
   @executor(id="text_processor")
   def sync_process(text: str, ctx: WorkflowContext[str]) -> None:
       ctx.send_message(text.lower())  # Sync functions run in thread pool

Composição de Subfluxos de Trabalho

Os executores podem conter subfluxos de trabalho usando o WorkflowExecutor. Os subfluxos de trabalho podem fazer pedidos que os fluxos de trabalho pais podem interceptar. Consulte a documentação do WorkflowExecutor para detalhes sobre padrões de composição de workflow e gestão de pedidos/respostas.

Gestão do Estado

Os executores podem conter estados que persistem através das execuções de fluxo de trabalho e pontos de controlo. Substitua os métodos on_checkpoint_save e on_checkpoint_restore para implementar serialização de estado personalizada e lógica de restauração.

Notas de implementação

  • Não chames execute() diretamente – é invocado pelo motor de workflow
  • Não sobreescrevas execute() – define os handlers usando decoradores em vez disso
  • Cada executor deve ter pelo menos um método @handler
  • As assinaturas do método handler são validadas no momento da inicialização

Inicialize o executor com um identificador único.

ExecutorCompletedEvent

Evento desencadeado quando um handler executor é concluído.

Inicialize o evento executor com um ID do executor e dados opcionais.

ExecutorEvent

Classe base para eventos de executor.

Inicialize o evento executor com um ID do executor e dados opcionais.

ExecutorFailedEvent

Evento desencadeado quando um handler executor apresenta um erro.

ExecutorInvokedEvent

Evento desencadeado quando um executor handler é invocado.

Inicialize o evento executor com um ID do executor e dados opcionais.

FanInEdgeGroup

Representa um conjunto convergente de arestas que alimentam um único executor a jusante.

Os grupos fan-in são tipicamente usados quando múltiplos estágios ascendentes produzem independentemente mensagens que devem chegar todas ao mesmo processador a jusante.

Constrói um mapeamento fan-in que funde várias fontes num único alvo.

FanOutEdgeGroup

Representa um grupo de bordas de estilo broadcast com lógica de seleção opcional.

Um fan-out encaminha uma mensagem produzida por um único executor de origem para um ou mais executores a jusante. Em tempo de execução, podemos restringir ainda mais os alvos executando um selection_func que inspeciona a carga útil e devolve o subconjunto de ids que deve receber a mensagem.

Crie um mapeamento disperso de uma única fonte para muitos alvos.

FileCheckpointStorage

Armazenamento de checkpoint baseado em ficheiros para persistência.

Inicialize o armazenamento de ficheiros.

FinishReason

Representa a razão pela qual uma resposta no chat foi concluída.

Inicialize o FinishReason com um valor.

FunctionApprovalRequestContent

Representa um pedido de aprovação do utilizador para uma chamada de função.

Inicializa uma instância FunctionApprovalRequestContent.

FunctionApprovalResponseContent

Representa uma resposta para aprovação do utilizador a uma chamada de função.

Inicializa uma instância FunctionApprovalResponseContent.

FunctionCallContent

Representa um pedido de chamada de função.

Inicializa uma instância FunctionCallContent.

FunctionExecutor

Executor que envolve uma função definida pelo utilizador.

Este executor permite aos utilizadores definir funções simples (tanto sincronizadas como assíncronas) e usá-las como executores de workflow sem necessidade de criar classes executoras completas.

As funções síncronas são executadas num pool de threads usando asyncio.to_thread() para evitar bloquear o ciclo de eventos.

Inicialize o FunctionExecutor com uma função definida pelo utilizador.

FunctionInvocationConfiguration

Configuração para invocação de funções em clientes de chat.

Esta classe é criada automaticamente em todos os clientes de chat que suportam invocação de funções. Isto significa que, na maioria dos casos, pode simplesmente alterar os atributos da instância, em vez de criar uma nova.

Inicializar FunctionInvocationConfiguration.

FunctionInvocationContext

Objeto de contexto para invocações de middleware de funções.

Este contexto é passado pelo pipeline de middleware de funções e contém toda a informação sobre a invocação de funções.

Inicialize o FunctionInvocationContext.

FunctionMiddleware

Classe base abstrata para middleware de funções que pode intercetar invocações de funções.

O middleware de funções permite-te intercetar e modificar invocações de funções/ferramentas antes e depois da execução. Podes validar argumentos, resultados em cache, invocações de registo ou sobrescrever a execução de funções.

Observação

FunctionMiddleware é uma classe base abstrata. Tens de o subclassificar e implementar

O método Process() para criar middleware de funções personalizadas.

FunctionResultContent

Representa o resultado de uma chamada de função.

Inicializa uma instância FunctionResultContent.

GraphConnectivityError

Exceção levantada quando são detetados problemas de conectividade de grafos.

GroupChatBuilder

Um construtor de alto nível para fluxos de trabalho de chat de grupo dirigidos pelo gestor com orquestração dinâmica.

O GroupChat coordena conversas entre múltiplos agentes através de um gestor que seleciona qual o participante que fala a seguir. O gestor pode ser uma simples função Python (set_select_speakers_func) ou um seletor baseado em agentes via set_manager. Estas duas abordagens são mutuamente exclusivas.

Fluxo de Trabalho Principal:

  1. Defina os participantes: lista de agentes (usa o seu .name) ou dit, mapeando nomes para agentes

  2. Configurar seleção de altifalantes: set_select_speakers_func OU

    set_manager (não ambos)

  3. Opcional: definir limites de rondas, checkpointing, condições de terminação

  4. Construir e executar o fluxo de trabalho

Padrões de Seleção de Oradores:

Padrão 1: Seleção simples baseada em funções (recomendado)


   from agent_framework import GroupChatBuilder, GroupChatStateSnapshot


   def select_next_speaker(state: GroupChatStateSnapshot) -> str | None:
       # state contains: task, participants, conversation, history, round_index
       if state["round_index"] >= 5:
           return None  # Finish
       last_speaker = state["history"][-1].speaker if state["history"] else None
       if last_speaker == "researcher":
           return "writer"
       return "researcher"


   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher_agent, writer_agent])  # Uses agent.name
       .build()
   )

Padrão 2: Seleção baseada em LLM


   from agent_framework import ChatAgent
   from agent_framework.azure import AzureOpenAIChatClient

   manager_agent = AzureOpenAIChatClient().create_agent(
       instructions="Coordinate the conversation and pick the next speaker.",
       name="Coordinator",
       temperature=0.3,
       seed=42,
       max_tokens=500,
   )

   workflow = (
       GroupChatBuilder()
       .set_manager(manager_agent, display_name="Coordinator")
       .participants([researcher, writer])  # Or use dict: researcher=r, writer=w
       .with_max_rounds(10)
       .build()
   )

Padrão 3: Pedir informações para feedback a meio da conversa


   from agent_framework import GroupChatBuilder

   # Pause before all participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer])
       .with_request_info()
       .build()
   )

   # Pause only before specific participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer, editor])
       .with_request_info(agents=[editor])  # Only pause before editor responds
       .build()
   )

Especificação do Participante:

Duas formas de especificar os participantes:

  • Forma de lista: [agent1, agent2] - usa agent.name atributo para nomes dos participantes
  • Forma de ditado: {name1: agent1, name2: agent2} - controlo explícito de nomes
  • Forma de palavra-chave: participants(name1=agent1, name2=agent2) - controlo explícito do nome

Estrutura do Snapshot de Estado:

O GroupChatStateSnapshot passado para set_select_speakers_func contém:

  • tarefa: ChatMessage - Tarefa do utilizador original
  • participantes: dict[str, str] - Mapeamento dos nomes dos participantes para descrições
  • conversação: tuple[ChatMessage, ...] - Histórico completo de conversas
  • história: tuple[GroupChatTurn, ...] - Registo turn-by-turn com atribuição do altifalante
  • round_index: int - Número de rondas de seleção de treinadores até agora
  • pending_agent: força | Nenhum - Nome do agente atualmente em processamento (se houver)

Restrições Importantes:

  • Não podem combinar set_select_speakers_func e set_manager
  • Os nomes dos participantes devem ser únicos
  • Ao usar a forma de lista, os agentes devem ter um atributo nome não vazio

Inicialize o GroupChatBuilder.

GroupChatDirective

Instrução emitida por uma implementação de gestor de chat de grupo.

HandoffBuilder

Fluent Builder para fluxos de trabalho conversacionais de handoff com coordenadores e agentes especializados.

O padrão de handoff permite a um agente coordenador encaminhar pedidos para agentes especialistas. O modo de interação controla se o fluxo de trabalho solicita input do utilizador após cada resposta do agente ou se conclui autonomamente quando os agentes terminam de responder. Uma condição de terminação determina quando o fluxo de trabalho deve deixar de pedir entrada e completar.

Padrões de Roteamento:

Single-Tier (Padrão): Só o coordenador pode passar para especialistas. Por defeito, após qualquer especialista responder, o controlo regressa ao utilizador para mais entrada. Isto cria um fluxo cíclico: utilizador -> coordenador -> [especialista opcional] -> utilizador -> coordenador -> ... Use with_interaction_mode ("autónoma") para evitar pedir input adicional do utilizador e obter a conversa final quando um agente responde sem delegar.

Multi-Nível (Avançado): Os especialistas podem passar para outros especialistas usando .add_handoff(). Isto proporciona mais flexibilidade para fluxos de trabalho complexos, mas é menos controlável do que o padrão de nível único. Os utilizadores perdem visibilidade em tempo real dos passos intermédios durante as transferências entre especialistas (embora o histórico completo de conversas, incluindo todas as transferências, seja preservado e possa ser inspecionado posteriormente).

Principais Funcionalidades:

  • Deteção automática de handoff: O coordenador invoca uma ferramenta de handoff cujo

    Os argumentos (por exemplo {"handoff_to": "shipping_agent"}) identificam o especialista a receber o controlo.

  • Ferramentas geradas automaticamente: Por defeito, o builder sintetiza ferramentas handoff_to_<agent> para o coordenador, por isso não defines manualmente funções marcadas.

  • Histórico completo da conversa: Toda a conversa (incluindo qualquer ChatMessage.additional_properties) é preservada e passada a cada agente.

  • Controlo de terminação: Por defeito, termina após 10 mensagens de utilizador. Override com .with_termination_condition(lambda conv: ...) para lógica personalizada (por exemplo, detetar "adeus").

  • Modos de interação: Escolher human_in_loop (por defeito) para avisar os utilizadores entre turnos de agente, ou autónomo para continuar a rotear de volta aos agentes sem pedir entrada do utilizador até ocorrer uma transferência ou atingir um limite de terminação/turno (limite de turnos autónomo padrão: 50).

  • Checkpointing: Persistência opcional para fluxos de trabalho retomáveis.

Utilização (Single-Tier):


   from agent_framework import HandoffBuilder
   from agent_framework.openai import OpenAIChatClient

   chat_client = OpenAIChatClient()

   # Create coordinator and specialist agents
   coordinator = chat_client.create_agent(
       instructions=(
           "You are a frontline support agent. Assess the user's issue and decide "
           "whether to hand off to 'refund_agent' or 'shipping_agent'. When delegation is "
           "required, call the matching handoff tool (for example `handoff_to_refund_agent`)."
       ),
       name="coordinator_agent",
   )

   refund = chat_client.create_agent(
       instructions="You handle refund requests. Ask for order details and process refunds.",
       name="refund_agent",
   )

   shipping = chat_client.create_agent(
       instructions="You resolve shipping issues. Track packages and update delivery status.",
       name="shipping_agent",
   )

   # Build the handoff workflow - default single-tier routing
   workflow = (
       HandoffBuilder(
           name="customer_support",
           participants=[coordinator, refund, shipping],
       )
       .set_coordinator(coordinator)
       .build()
   )

   # Run the workflow
   events = await workflow.run_stream("My package hasn't arrived yet")
   async for event in events:
       if isinstance(event, RequestInfoEvent):
           # Request user input
           user_response = input("You: ")
           await workflow.send_response(event.data.request_id, user_response)

Roteamento Multi-Nível com .add_handoff():


   # Enable specialist-to-specialist handoffs with fluent API
   workflow = (
       HandoffBuilder(participants=[coordinator, replacement, delivery, billing])
       .set_coordinator(coordinator)
       .add_handoff(coordinator, [replacement, delivery, billing])  # Coordinator routes to all
       .add_handoff(replacement, [delivery, billing])  # Replacement delegates to delivery/billing
       .add_handoff(delivery, billing)  # Delivery escalates to billing
       .build()
   )

   # Flow: User → Coordinator → Replacement → Delivery → Back to User
   # (Replacement hands off to Delivery without returning to user)

Use as Fábricas Participantes para o Isolamento do Estado:

Condição de Terminação Personalizada:


   # Terminate when user says goodbye or after 5 exchanges
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_termination_condition(
           lambda conv: (
               sum(1 for msg in conv if msg.role.value == "user") >= 5
               or any("goodbye" in msg.text.lower() for msg in conv[-2:])
           )
       )
       .build()
   )

Pontos de controlo:


   from agent_framework import InMemoryCheckpointStorage

   storage = InMemoryCheckpointStorage()
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_checkpointing(storage)
       .build()
   )

Inicialize um HandoffBuilder para criar fluxos de trabalho de handoff conversacionais.

O construtor começa num estado não configurado e exige que chame:

  1. .participantes([...]) - Agentes de registo
  2. ou .participant_factories({...}) - Fábricas de agente registador/executor
  3. .set_coordinator(...) - Designar qual agente recebe a entrada inicial do utilizador
  4. .build() - Construir o fluxo de trabalho final

Métodos de configuração opcionais permitem-lhe personalizar a gestão do contexto, a lógica de terminação e a persistência.

Observação

Os participantes devem ter nomes/IDs estáveis porque o fluxo de trabalho mapeia o

Ferramentas de transferência para estes identificadores. Os nomes dos agentes devem coincidir

as strings emitidas pela ferramenta de handoff do coordenador (por exemplo, uma ferramenta que

outputs {"handoff_to": "billing"} requer um agente chamado faturamento).

HandoffUserInputRequest

Mensagem de pedido emitida quando o fluxo de trabalho necessita de input novo do utilizador.

Nota: O campo de conversa é intencionalmente excluído da serialização por checkpoint para evitar duplicação. A conversa é preservada no estado do coordenador e será reconstruída na restauração. Ver edição #2667.

HostedCodeInterpreterTool

Representa uma ferramenta alojada que pode ser especificada a um serviço de IA para permitir a execução de código gerado.

Esta ferramenta não implementa a interpretação de código por si só. Serve como um marcador para informar um serviço de que pode executar código gerado se o serviço for capaz de o fazer.

Inicialize o HostedCodeInterpreterTool.

HostedFileContent

Representa o conteúdo de um ficheiro alojado.

Inicializa uma instância HostedFileContent.

HostedFileSearchTool

Representa uma ferramenta de pesquisa de ficheiros que pode ser especificada a um serviço de IA para permitir a realização de pesquisas de ficheiros.

Inicialize uma ferramenta de pesquisa de ficheiros.

HostedMCPSpecificApproval

Representa o modo específico para uma ferramenta alojada.

Ao usar este modo, o utilizador deve especificar quais as ferramentas que sempre ou nunca requerem aprovação. Isto é representado como um dicionário com duas chaves opcionais:

HostedMCPTool

Representa uma ferramenta MCP que é gerida e executada pelo serviço.

Crie uma ferramenta MCP alojada.

HostedVectorStoreContent

Representa um conteúdo de armazenamento vetorial alojado.

Inicializa uma instância HostedVectorStoreContent.

HostedWebSearchTool

Representa uma ferramenta de pesquisa web que pode ser especificada a um serviço de IA para lhe permitir realizar pesquisas na web.

Inicialize uma HostedWebSearchTool.

InMemoryCheckpointStorage

Armazenamento em checkpoint em memória para testes e desenvolvimento.

Inicialize o armazenamento de memória.

InProcRunnerContext

Contexto de execução em processo para execução local e checkpointing opcional.

Inicialize o contexto de execução em processo.

MCPStdioTool

Ferramenta MCP para ligação a servidores MCP baseados em stdio.

Esta classe liga-se a servidores MCP que comunicam através de entrada/saída padrão, normalmente usados para processos locais.

Inicialize a ferramenta MCP stdio.

Observação

Os argumentos são usados para criar um objeto StdioServerParstates,

que é então usado para criar um cliente stdio. Ver mcp.client.stdio.stdio_client

e mcp.client.stdio.stdio_server_parameters para mais detalhes.

MCPStreamableHTTPTool

Ferramenta MCP para ligação a servidores MCP baseados em HTTP.

Esta classe liga-se a servidores MCP que comunicam via HTTP/SSE streamable.

Inicialize a ferramenta HTTP streamable MCP.

Observação

Os argumentos são usados para criar um cliente HTTP streamable.

Consulte mcp.client.streamable_http.streamablehttp_client para mais detalhes.

Quaisquer argumentos extra passados ao construtor serão entregues ao

construtor cliente HTTP streamable.

MCPWebsocketTool

Ferramenta MCP para ligação a servidores MCP baseados em WebSocket.

Esta classe liga-se a servidores MCP que comunicam via WebSocket.

Inicialize a ferramenta MCP WebSocket.

Observação

Os argumentos são usados para criar um cliente WebSocket.

Consulte mcp.client.websocket.websocket_client para mais detalhes.

Quaisquer argumentos extra passados ao construtor serão entregues ao

Construtor cliente WebSocket.

MagenticBuilder

Fluent Builder para criar fluxos de trabalho de orquestração multi-agente do Magentic One.

Os fluxos de trabalho do Magentic One utilizam um gestor alimentado por LLM para coordenar múltiplos agentes através de planeamento dinâmico de tarefas, acompanhamento de progresso e replaneamento adaptativo. O gestor cria planos, seleciona agentes, monitoriza o progresso e decide quando replanear ou concluir.

O construtor fornece uma API fluente para configurar participantes, o gestor, revisão opcional do plano, checkpointing e callbacks de eventos.

Suporte Humano no Ciclo: A Magentic fornece mecanismos especializados de HITL através de:

  • .with_plan_review() - Rever e aprovar/rever planos antes da execução

  • .with_human_input_on_stall() - Intervir quando o fluxo de trabalho trava

  • Aprovação de ferramentas via FunctionApprovalRequestContent - Aprovar chamadas individuais de ferramenta

Estas emitem eventos MagenticHumanInterventionRequest que fornecem opções estruturadas de decisão (APROVAR, REVISAR, CONTINUAR, REPLANEAR, ORIENTAR) apropriadas para a orquestração baseada no planeamento da Magentic.

Utilização:


   from agent_framework import MagenticBuilder, StandardMagenticManager
   from azure.ai.projects.aio import AIProjectClient

   # Create manager with LLM client
   project_client = AIProjectClient.from_connection_string(...)
   chat_client = project_client.inference.get_chat_completions_client()

   # Build Magentic workflow with agents
   workflow = (
       MagenticBuilder()
       .participants(researcher=research_agent, writer=writing_agent, coder=coding_agent)
       .with_standard_manager(chat_client=chat_client, max_round_count=20, max_stall_count=3)
       .with_plan_review(enable=True)
       .with_checkpointing(checkpoint_storage)
       .build()
   )

   # Execute workflow
   async for message in workflow.run("Research and write article about AI agents"):
       print(message.text)

Com gestor personalizado:


   # Create custom manager subclass
   class MyCustomManager(MagenticManagerBase):
       async def plan(self, context: MagenticContext) -> ChatMessage:
           # Custom planning logic
           ...


   manager = MyCustomManager()
   workflow = MagenticBuilder().participants(agent1=agent1, agent2=agent2).with_standard_manager(manager).build()
MagenticContext

Contexto para o treinador do Magent.

MagenticManagerBase

Classe base para o gestor do Magentic One.

ManagerDirectiveModel

Modelo pidântico para saída estruturada de diretivas de gestores.

Crie um novo modelo analisando e validando dados de entrada a partir de argumentos de palavras-chave.

Aumenta [ValidationError][pydantic_core. ValidationError] se os dados de entrada não puderem ser validados para formar um modelo válido.

self é explicitamente apenas posicional para permitir self como nome de campo.

ManagerSelectionRequest

Pedido enviado ao agente gestor para a seleção do próximo orador.

Esta dataclass reúne o estado completo da conversa e o contexto da tarefa para que o agente gestor possa analisar e tomar uma decisão de seleção de oradores.

ManagerSelectionResponse

Resposta do agente gestor com decisão de seleção do orador.

O agente gestor deve produzir esta estrutura (ou dit/JSON compatível) para comunicar a sua decisão de volta ao orquestrador.

Crie um novo modelo analisando e validando dados de entrada a partir de argumentos de palavras-chave.

Aumenta [ValidationError][pydantic_core. ValidationError] se os dados de entrada não puderem ser validados para formar um modelo válido.

self é explicitamente apenas posicional para permitir self como nome de campo.

Message

Uma classe que representa uma mensagem no fluxo de trabalho.

OrchestrationState

Contentor de estado unificado para checkpointing do orquestrador.

Esta dataclass padroniza a serialização de checkpoints em todos os três padrões de chat de grupo, permitindo extensões específicas do padrão através de metadados.

Atributos comuns abrangem preocupações de orquestração partilhada (tarefa, conversa, acompanhamento de rondas). O estado específico do padrão entra no ditado de metadados.

RequestInfoEvent

Evento desencadeado quando um executor de workflow solicita informação externa.

Inicialize o evento de pedido de informação.

RequestInfoInterceptor

Executor interno que pausa o fluxo de trabalho para input humano antes da execução do agente.

Este executor é inserido no gráfico de workflow pelos construtores quando .with_request_info() é chamado. Interceta mensagens AgentExecutorRequest ANTES da execução do agente e pausa o fluxo de trabalho via ctx.request_info() com um AgentInputRequest.

Quando uma resposta é recebida, o responsável pela resposta injeta a entrada como uma mensagem de utilizador na conversa e encaminha o pedido para o agente.

O parâmetro opcional agent_filter permite limitar quais os agentes que acionam a pausa. Se o ID do agente alvo não estiver no conjunto de filtros, o pedido é encaminhado sem pausa.

Inicialize o executor interceptor de informação de pedido.

Role

Descreve o propósito pretendido de uma mensagem numa interação de chat.

Propriedades: SISTEMA: O papel que instrui ou define o comportamento do sistema de IA. UTILIZADOR: O papel que fornece input do utilizador para interações no chat. ASSISTENTE: O papel que fornece respostas a input instruído pelo sistema e solicitado pelo utilizador. FERRAMENTA: O papel que fornece informações adicionais e referências em resposta a pedidos de utilização de ferramentas.

Inicializar o Papel com um valor.

Runner

Uma classe para executar um fluxo de trabalho no Pregel supersteps.

Inicialize o runner com arestas, estado partilhado e contexto.

RunnerContext

Protocolo para o contexto de execução usado pelo runner.

Um único contexto que suporta mensagens, eventos e checkpoints opcionais. Se o armazenamento de checkpoint não estiver configurado, os métodos de checkpoint podem aumentar.

SequentialBuilder

Construtor de alto nível para fluxos de trabalho sequenciais de agentes/executores com contexto partilhado.

  • participantes([...]) aceita uma lista de instâncias AgentProtocol (recomendado) ou Executor

  • register_participants([...]) aceita uma lista de fábricas para o AgentProtocol (recomendado)

    ou fábricas executoras

  • Os executores devem definir um handler que consuma lista[ChatMessage] e envie uma lista[ChatMessage]

  • O fluxo de trabalho liga os participantes por ordem, passando uma lista[ChatMessage] pela cadeia

  • Os agentes acrescentam as mensagens dos seus assistentes à conversa

  • Executores personalizados podem transformar/resumir e devolver uma lista[ChatMessage]

  • O resultado final é a conversa produzida pelo último participante

Utilização:


   from agent_framework import SequentialBuilder

   # With agent instances
   workflow = SequentialBuilder().participants([agent1, agent2, summarizer_exec]).build()

   # With agent factories
   workflow = (
       SequentialBuilder().register_participants([create_agent1, create_agent2, create_summarizer_exec]).build()
   )

   # Enable checkpoint persistence
   workflow = SequentialBuilder().participants([agent1, agent2]).with_checkpointing(storage).build()

   # Enable request info for mid-workflow feedback (pauses before each agent)
   workflow = SequentialBuilder().participants([agent1, agent2]).with_request_info().build()

   # Enable request info only for specific agents
   workflow = (
       SequentialBuilder()
       .participants([agent1, agent2, agent3])
       .with_request_info(agents=[agent2])  # Only pause before agent2
       .build()
   )
SharedState

Uma classe para gerir estado partilhado num fluxo de trabalho.

O SharedState fornece acesso seguro em thread a dados do estado do fluxo de trabalho que precisam de ser partilhados entre executores durante a execução do fluxo de trabalho.

Chaves Reservadas: As seguintes chaves são reservadas para uso interno na estrutura e não devem ser modificadas pelo código do utilizador:

  • _executor_state: Armazena o estado do executor para checkpointing (gerido pelo Runner)

Advertência

Não use teclas que começam por sublinhado (_), pois podem ser reservadas para

Operações do quadro interno.

Inicialize o estado partilhado.

SingleEdgeGroup

Wrapper de conveniência para uma aresta solitária, mantendo a API do grupo uniforme.

Crie um grupo de arestas um-para-um entre dois executores.

StandardMagenticManager

Gestor Magentic padrão que realiza chamadas reais de LLM através de um ChatAgent.

O gestor constrói prompts que espelham a orquestração original do Magentic One:

  • Recolha de factos
  • Criação do plano
  • Livro de registo de progresso em JSON
  • Atualização dos factos e atualização do plano após reinício
  • Síntese da resposta final

Inicialize o Gestor Magentic Padrão.

SubWorkflowRequestMessage

Mensagem enviada de um sub-fluxo de trabalho para um executor no fluxo de trabalho pai para solicitar informação.

Esta mensagem envolve um RequestInfoEvent emitido pelo executor no subfluxo de trabalho.

SubWorkflowResponseMessage

Mensagem enviada de um fluxo de trabalho pai para um subfluxo de trabalho via WorkflowExecutor para fornecer a informação solicitada.

Esta mensagem envolve os dados da resposta juntamente com o RequestInfoEvent original emitido pelo executor do sub-workflow.

SuperStepCompletedEvent

Evento desencadeado quando um superpasso termina.

Inicializar o evento superstep.

SuperStepStartedEvent

Evento desencadeado quando começa um superpasso.

Inicializar o evento superstep.

SwitchCaseEdgeGroup

Variante com leque que imita um fluxo tradicional de controlo de interruptores/caixas.

Cada caso inspeciona o payload da mensagem e decide se deve tratar a mensagem. Exatamente um caso — ou o ramo padrão — devolve um alvo em tempo de execução, preservando a semântica de despacho único.

Configure uma estrutura de encaminhamento switch/case para um único executor de origem.

SwitchCaseEdgeGroupCase

Descrição persistente de um único ramo condicional num caso de comutação.

Ao contrário do objeto Case em tempo de execução, esta variante serializável armazena apenas o identificador de destino e um nome descritivo para o predicado. Quando a chamada subjacente não está disponível durante a desserialização, substituimos por um substituto proxy que falha de forma ruidosa, garantindo que a dependência em falta é imediatamente visível.

Registar os metadados de encaminhamento para um desvio condicional de caso.

SwitchCaseEdgeGroupDefault

Descritor persistente para o ramo de retenção de um grupo switch-case.

A ramificação padrão é garantida e é invocada quando todos os outros predicados de caso falham em corresponder à carga útil.

Aponte o ramo padrão para o identificador do executor indicado.

TextContent

Representa o conteúdo de texto numa conversa.

Inicializa uma instância TextContent.

TextReasoningContent

Representa conteúdo de raciocínio textual num chat.

Observações: Esta aula e o TextContent são superficialmente semelhantes, mas distintos.

Inicializa uma instância de TextReasoningContent.

TextSpanRegion

Representa uma região de texto que foi anotada.

Inicializar TextSpanRegion.

ToolMode

Define se e como as ferramentas são usadas num pedido de chat.

Inicializar o ToolMode.

ToolProtocol

Representa uma ferramenta genérica.

Este protocolo define a interface que todas as ferramentas devem implementar para serem compatíveis com a framework do agente. É implementado por várias classes de ferramentas como HostedMCPTool, HostedWebSearchTool e AIFunction. Uma AIFunction é geralmente criada pelo ai_function decorador.

Como cada conector precisa de analisar ferramentas de forma diferente, os utilizadores podem passar um dict para especificar uma ferramenta específica de serviço quando não houver abstração disponível.

TypeCompatibilityError

Exceção levantada quando é detetada incompatibilidade de tipo entre executores ligados.

UriContent

Representa um conteúdo de URI.

Importante

Isto é usado para conteúdos identificados por um URI, como uma imagem ou um ficheiro.

Para URIs de dados (binários), use DataContent em vez disso.

Inicializa uma instância UriContent.

Observações: Isto é usado para conteúdos identificados por um URI, como uma imagem ou um ficheiro. Para URIs de dados (binários), use DataContent em vez disso.

UsageContent

Representa informações de utilização associadas a um pedido e resposta por chat.

Inicializa uma instância UsageContent.

UsageDetails

Fornece detalhes de utilização sobre um pedido/resposta.

Inicializa a instância UsageDetails.

Workflow

Um motor de execução baseado em grafos que orquestra executores ligados.

Visão geral

Um fluxo de trabalho executa um grafo direcionado de executores ligados por grupos de arestas usando um modelo semelhante a Pregel, executando em superpassos até o grafo ficar inativo. Os fluxos de trabalho são criados usando a classe WorkflowBuilder – não instancias esta classe diretamente.

Modelo de Execução

Os executores executam em superpassos sincronizados onde cada executor:

  • É invocado quando recebe mensagens de grupos de arestas ligados
  • Pode enviar mensagens para executores posteriores via ctx.send_message()
  • Pode gerar saídas ao nível do fluxo de trabalho via ctx.yield_output()
  • Pode emitir eventos personalizados via ctx.add_event()

As mensagens entre executores são entregues no final de cada superpasso e não são visíveis no fluxo de eventos. Apenas os eventos ao nível do fluxo de trabalho (saídas, eventos personalizados) e os eventos de estado são observáveis pelos chamadores.

Tipos de Entrada/Saída

Os tipos de fluxo de trabalho são descobertos em tempo de execução através da inspeção:

  • Tipos de entrada: Desde o início os tipos de entrada do executor
  • Tipos de saída: União de todos os tipos de fluxo de trabalho dos executores Acede a estes através das propriedades input_types e output_types.

Métodos de Execução

O fluxo de trabalho fornece duas APIs principais de execução, cada uma suportando múltiplos cenários:

  • run(): Execute to completion, devolve WorkflowRunResult com todos os eventos

  • run_stream(): Devolve o gerador assíncrono que apresenta eventos à medida que ocorrem

Ambos os métodos suportam:

  • Execuções iniciais do fluxo de trabalho: Fornecer o parâmetro da mensagem
  • Restauração de pontos de controlo: Fornecer checkpoint_id (e opcionalmente checkpoint_storage)
  • Continuação HIL: Fornecer respostas para continuar após solicitações do RequestInfoExecutor
  • Checkpointing em tempo de execução: Fornecer checkpoint_storage para ativar/sobrepor checkpointing nesta corrida

Gestão do Estado

As instâncias de workflow contêm estados e os estados são preservados entre chamadas para execução e run_stream. Para executar múltiplas execuções independentes, crie instâncias de fluxo de trabalho separadas através do WorkflowBuilder.

Pedidos de Entrada Externa

Executores dentro de um fluxo de trabalho podem solicitar entrada externa usando ctx.request_info():

  1. O executor chama ctx.request_info() para pedir entrada
  2. O executor implementa response_handler() para processar a resposta
  3. Os pedidos são emitidos como instâncias RequestInfoEvent no fluxo de eventos
  4. O fluxo de trabalho entra IDLE_WITH_PENDING_REQUESTS estado
  5. O chamador lida com pedidos e fornece respostas através dos métodos send_responses ou send_responses_streaming
  6. As respostas são encaminhadas para os executores solicitantes e os handlers de resposta são invocados

Pontos de verificação

O checkpointing pode ser configurado em tempo de compilação ou em tempo de execução:

Build-time (via WorkflowBuilder): workflow = WorkflowBuilder().with_checkpointing(storage).build()

Runtime (via parâmetros run/run_stream): result = await workflow.run(message, checkpoint_storage=runtime_storage)

Quando ativados, são criados pontos de controlo no final de cada superpasso, capturando:

  • O executor afirma
  • Mensagens em trânsito
  • Os fluxos de trabalho de estado partilhado podem ser pausados e retomados ao longo dos reinícios de processos usando armazenamento de checkpoint.

Composição

Os fluxos de trabalho podem ser aninhados usando o WorkflowExecutor, que envolve um fluxo de trabalho filho como executor. Os tipos de entrada/saída do fluxo de trabalho aninhado passam a fazer parte dos tipos do WorkflowExecutor. Quando invocado, o WorkflowExecutor executa o fluxo de trabalho aninhado até à conclusão e processa as suas saídas.

Inicialize o fluxo de trabalho com uma lista de arestas.

WorkflowAgent

Uma subclasse Agent que envolve um fluxo de trabalho e o expõe como agente.

Inicialize o WorkflowAgent.

WorkflowBuilder

Uma classe de construção para construir fluxos de trabalho.

Esta classe fornece uma API fluente para definir grafos de workflow, ligando executores com arestas e configurando parâmetros de execução. Chamada build para criar uma instância imutável Workflow .

Inicialize o WorkflowBuilder com uma lista vazia de arestas e sem executor inicial.

WorkflowCheckpoint

Representa um ponto de verificação completo do estado do fluxo de trabalho.

Os checkpoints captam o estado completo de execução de um fluxo de trabalho num ponto específico, permitindo que os fluxos de trabalho sejam pausados e retomados.

Observação

O shared_state dictado pode conter chaves reservadas geridas pelo framework.

Consulte a documentação da classe SharedState para detalhes sobre chaves reservadas.

WorkflowCheckpointSummary

Resumo legível por humanos de um ponto de verificação do fluxo de trabalho.

WorkflowContext

Contexto de execução que permite aos executores interagir com fluxos de trabalho e outros executores.

Visão geral

O WorkflowContext fornece uma interface controlada para executores enviarem mensagens, gerirem resultados, gerirem o estado e interagirem com o ecossistema mais amplo do fluxo de trabalho. Garante a segurança do tipo através de parâmetros genéricos, impedindo o acesso direto a componentes internos de execução.

Parâmetros de Tipo Genérico

O contexto é parametrizado para garantir a segurança do tipo em diferentes operações:

WorkflowContext (sem parâmetros)

Para executores que apenas realizam efeitos secundários sem enviar mensagens ou obter resultados:


   async def log_handler(message: str, ctx: WorkflowContext) -> None:
       print(f"Received: {message}")  # Only side effects

WorkflowContexto[T_Out]

Permite enviar mensagens do tipo T_Out para outros executores:


   async def processor(message: str, ctx: WorkflowContext[int]) -> None:
       result = len(message)
       await ctx.send_message(result)  # Send int to downstream executors

WorkflowContext[T_Out, T_W_Out]

Permite tanto o envio de mensagens (T_Out) como a obtenção de saídas de fluxo de trabalho (T_W_Out):


   async def dual_output(message: str, ctx: WorkflowContext[int, str]) -> None:
       await ctx.send_message(42)  # Send int message
       await ctx.yield_output("complete")  # Yield str workflow output

Tipos de União

Podem ser especificados vários tipos usando notação de união:


   async def flexible(message: str, ctx: WorkflowContext[int | str, bool | dict]) -> None:
       await ctx.send_message("text")  # or send 42
       await ctx.yield_output(True)  # or yield {"status": "done"}

Inicialize o contexto do executor com o contexto de fluxo de trabalho indicado.

WorkflowErrorDetails

Informação estruturada de erro para emergir em eventos/resultados de erro.

WorkflowEvent

Classe base para eventos de fluxo de trabalho.

Inicialize o evento do fluxo de trabalho com dados opcionais.

WorkflowExecutor

Um executor que envolve um fluxo de trabalho para permitir a composição hierárquica de fluxos de trabalho.

Visão geral

O WorkflowExecutor faz com que um fluxo de trabalho se comporte como um único executor dentro de um fluxo de trabalho pai, permitindo arquiteturas de fluxos de trabalho aninhadas. Trata de todo o ciclo de vida da execução de subfluxos de trabalho, incluindo processamento de eventos, encaminhamento de saída e coordenação de pedidos/respostas entre os fluxos de trabalho pai e filho.

Modelo de Execução

Quando invocado, WorkflowExecutor:

  1. Inicia o fluxo de trabalho enrolado com a mensagem de entrada
  2. Executa o subfluxo de trabalho até à conclusão ou até precisar de input externo
  3. Processa o fluxo completo de eventos do subfluxo de trabalho após a execução
  4. Encaminha as saídas para o fluxo de trabalho pai como mensagens
  5. Lida com pedidos externos encaminhando-os para o fluxo de trabalho pai
  6. Acumula respostas e retoma a execução do subfluxo de trabalho

Processamento de Fluxo de Eventos

O WorkflowExecutor processa eventos após a conclusão do subfluxo de trabalho:

Encaminhamento de Saída

Todas as saídas do sub-fluxo de trabalho são automaticamente encaminhadas para o pai:

Quando allow_direct_output é Falso (padrão):


   # An executor in the sub-workflow yields outputs
   await ctx.yield_output("sub-workflow result")

   # WorkflowExecutor forwards to parent via ctx.send_message()
   # Parent receives the output as a regular message

Quando allow_direct_output é Verdade:

Coordenação de Pedidos/Respostas

Quando sub-fluxos de trabalho precisam de informação externa:


   # An executor in the sub-workflow makes request
   request = MyDataRequest(query="user info")

   # WorkflowExecutor captures RequestInfoEvent and wraps it in a SubWorkflowRequestMessage
   # then send it to the receiving executor in parent workflow. The executor in parent workflow
   # can handle the request locally or forward it to an external source.
   # The WorkflowExecutor tracks the pending request, and implements a response handler.
   # When the response is received, it executes the response handler to accumulate responses
   # and resume the sub-workflow when all expected responses are received.
   # The response handler expects a SubWorkflowResponseMessage wrapping the response data.

Gestão do Estado

O WorkflowExecutor mantém o estado de execução ao longo dos ciclos de pedido/resposta:

  • Acompanha pedidos pendentes por request_id
  • Acumula respostas até que todas as respostas esperadas sejam recebidas
  • Retoma a execução do subfluxo de trabalho com lote completo de respostas
  • Trata de execuções simultâneas e múltiplos pedidos pendentes

Integração de Sistemas de Tipos

O WorkflowExecutor herda a sua assinatura de tipo do fluxo de trabalho wrapped:

Tipos de entrada

Corresponde aos tipos de entrada do executor de início do fluxo de trabalho encapsulado:


   # If sub-workflow accepts str, WorkflowExecutor accepts str
   workflow_executor = WorkflowExecutor(my_workflow, id="wrapper")
   assert workflow_executor.input_types == my_workflow.input_types

Tipos de Saída

Combina saídas de sub-workflow com tipos de coordenação de pedidos:


   # Includes all sub-workflow output types
   # Plus SubWorkflowRequestMessage if sub-workflow can make requests
   output_types = workflow.output_types + [SubWorkflowRequestMessage]  # if applicable

Tratamento de erros

O WorkflowExecutor propaga falhas sub-workflow:

  • Captura o WorkflowFailedEvent a partir do sub-workflow
  • Converte para WorkflowErrorEvent no contexto pai
  • Fornece informação detalhada sobre erros, incluindo ID de subfluxo de trabalho

Suporte à Execução Concorrente

O WorkflowExecutor suporta totalmente múltiplas execuções simultâneas de subfluxos de trabalho:

Per-Execution Isolamento do Estado

Cada invocação de subfluxo de trabalho cria um ExecutionContext isolado:


   # Multiple concurrent invocations are supported
   workflow_executor = WorkflowExecutor(my_workflow, id="concurrent_executor")

   # Each invocation gets its own execution context
   # Execution 1: processes input_1 independently
   # Execution 2: processes input_2 independently
   # No state interference between executions

Coordenação de Pedidos/Respostas

As respostas são corretamente encaminhadas para a execução de origem:

  • Cada execução regista os seus próprios pedidos pendentes e respostas esperadas
  • O mapeamento de pedido para execução garante que as respostas chegam ao sub-fluxo de trabalho correto
  • A acumulação de resposta é isolada por execução
  • Limpeza automática quando a execução termina

Gerenciamento de memória

  • Execuções concorrentes ilimitadas suportadas
  • Cada execução tem uma identificação única baseada em UUID
  • Limpeza de contextos de execução concluídos
  • Gestão de estado segura em threads para acesso concorrente

Considerações importantes

Instância de Fluxo de Trabalho Partilhada: Todas as execuções concorrentes usam a mesma instância de fluxo de trabalho subjacente. Para um isolamento adequado, certifique-se de que o fluxo de trabalho wrappeado e os seus executores são stateless.


   # Avoid: Stateful executor with instance variables
   class StatefulExecutor(Executor):
       def __init__(self):
           super().__init__(id="stateful")
           self.data = []  # This will be shared across concurrent executions!

Integração com os Fluxos de Trabalho Parentais

Os fluxos de trabalho pais podem intercetar sub-pedidos de workflow:

Notas de implementação

  • Os subfluxos de trabalho correm até à conclusão antes de processarem os seus resultados
  • O processamento de eventos é atómico – todas as saídas são encaminhadas antes dos pedidos
  • A acumulação de respostas garante que os subfluxos de trabalho recebam lotes completos de respostas
  • O estado de execução é mantido para uma retomada adequada após pedidos externos
  • Execuções concorrentes são totalmente isoladas e não interferem entre si

Inicialize o WorkflowExecutor.

WorkflowFailedEvent

Evento de ciclo de vida incorporado emitido quando uma execução de workflow termina com um erro.

WorkflowOutputEvent

Evento desencadeado quando um executor de workflow gera saída.

Inicialize o evento de saída do fluxo de trabalho.

WorkflowRunResult

Contentor para eventos gerados durante a execução de fluxos de trabalho não em streaming.

Visão geral

Representa os resultados completos da execução de um fluxo de trabalho, contendo todos os eventos gerados desde o início até ao estado de inatividade. Os fluxos de trabalho produzem saídas incrementalmente através de chamadas ctx.yield_output() durante a execução.

Estrutura do Evento

Mantém a separação entre eventos no plano de dados e no plano de controlo:

  • Eventos do plano de dados: Invocações, concluições, saídas e pedidos do executor (na lista principal)
  • Eventos no plano de controlo: Linha temporal de estado acessível via método status_timeline()

Métodos Chave

  • get_outputs(): Extrair todos os resultados do fluxo de trabalho da execução
  • get_request_info_events(): Recuperar pedidos de entrada externos feitos durante a execução
  • get_final_state(): Obtenha o estado final do fluxo de trabalho (IDLE, IDLE_WITH_PENDING_REQUESTS, etc.)
  • status_timeline(): Aceda ao histórico completo de eventos de estado
WorkflowStartedEvent

Evento de ciclo de vida incorporado emitido quando uma execução de fluxo de trabalho começa.

Inicialize o evento do fluxo de trabalho com dados opcionais.

WorkflowStatusEvent

Evento de ciclo de vida incorporado emitido para transições de estado de execução de fluxos de trabalho.

Inicialize o evento de estado do fluxo de trabalho com um novo estado e dados opcionais.

WorkflowValidationError

Exceção base para erros de validação de workflow.

WorkflowViz

Uma aula para visualizar fluxos de trabalho usando graphviz e Mermaid.

Inicialize o WorkflowViz com um fluxo de trabalho.

Enums

MagenticHumanInterventionDecision

Opções de decisão para respostas de intervenção humana.

MagenticHumanInterventionKind

O tipo de intervenção humana que está a ser solicitada.

ValidationTypeEnum

Enumeração dos tipos de validação de fluxos de trabalho.

WorkflowEventSource

Identifica se um evento de workflow veio do framework ou de um executor.

Use o FRAMEWORK para eventos emitidos por caminhos de orquestração incorporados — mesmo quando o código que os gera está em módulos relacionados com o runner — e o EXECUTOR para eventos surgidos por implementações de executores fornecidos pelos desenvolvedores.

WorkflowRunState

Estado ao nível da execução de um fluxo de trabalho.

Semântica:

  • INICIADO: A execução foi iniciada e o contexto do fluxo de trabalho foi criado. Este é um estado inicial antes de qualquer trabalho significativo ser realizado. Nesta base de código emitimos um WorkflowStartedEvent dedicado para telemetria, e normalmente avançamos o estado diretamente para IN_PROGRESS. Os consumidores podem continuar a depender do START para máquinas de estados que necessitam de uma fase pré-trabalho explícita.

  • IN_PROGRESS: O fluxo de trabalho está a ser executado ativamente (por exemplo, a mensagem inicial foi entregue ao executor inicial ou está a correr um superpasso). Este estado é emitido no início de uma sequência e pode ser seguido por outros estados à medida que a sequência avança.

  • IN_PROGRESS_PENDING_REQUESTS: Execução ativa enquanto uma ou mais operações de pedido de informação estão pendentes. Novos trabalhos podem continuar a ser agendados enquanto os pedidos estiverem em andamento.

  • IDLE: O fluxo de trabalho é tranquilo, sem pedidos pendentes e sem mais trabalho para fazer. Este é o estado terminal normal para fluxos de trabalho que terminaram de ser executados, potencialmente tendo produzido resultados ao longo do caminho.

  • IDLE_WITH_PENDING_REQUESTS: O fluxo de trabalho está pausado à espera de entrada externa (por exemplo, emitido um RequestInfoEvent). Este é um estado não terminal; O fluxo de trabalho pode retomar quando as respostas forem fornecidas.

  • FALHADO: Estado terminal a indicar um erro apareceu. Acompanhado por um WorkflowFailedEvent com detalhes estruturados do erro.

  • CANCELADO: Estado terminal a indicar que a corrida foi cancelada por um chamador ou orquestrador. Atualmente não é emitido pelos caminhos de runner por defeito, mas está incluído para integradores/orquestradores que suportam cancelamento.

Funções

agent_middleware

Decorador para marcar uma função como middleware de agente.

Este decorador identifica explicitamente uma função como middleware agente, que processa objetos AgentRunContext.

agent_middleware(func: Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]

Parâmetros

Nome Descrição
func
Necessário

O middleware funciona para marcar como middleware agente.

Regressos

Tipo Descrição

A mesma função com o marcador de middleware do agente.

Exemplos


   from agent_framework import agent_middleware, AgentRunContext, ChatAgent


   @agent_middleware
   async def logging_middleware(context: AgentRunContext, next):
       print(f"Before: {context.agent.name}")
       await next(context)
       print(f"After: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

ai_function

Decora uma função para a transformar numa AIFunction que pode ser passada para os modelos e executada automaticamente.

Este decorador cria um modelo Pydantic a partir da assinatura da função, que será usado para validar os argumentos passados à função e para gerar o esquema JSON para os parâmetros da função.

Para adicionar descrições aos parâmetros, use o Annotated tipo de typing com uma descrição em cadeia como segundo argumento. Também podes usar a classe do Field Pydantic para configurações mais avançadas.

Observação

Quando approval_mode está definido para "always_require", a função não será executada

Até que a aprovação explícita seja dada, isto aplica-se apenas ao fluxo de auto-invocação.

Também é importante notar que, se o modelo devolver múltiplas chamadas de função, algumas que requerem aprovação

e outros que não o fazem, pedirá aprovação para todos eles.

ai_function(func: Callable[[...], ReturnT | Awaitable[ReturnT]] | None = None, *, name: str | None = None, description: str | None = None, approval_mode: Literal['always_require', 'never_require'] | None = None, max_invocations: int | None = None, max_invocation_exceptions: int | None = None, additional_properties: dict[str, Any] | None = None) -> AIFunction[Any, ReturnT] | Callable[[Callable[[...], ReturnT | Awaitable[ReturnT]]], AIFunction[Any, ReturnT]]

Parâmetros

Nome Descrição
func
Callable[[...], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]] | None

A função de decorar.

Valor padrão: None
name
Necessário
str | None
description
Necessário
str | None
approval_mode
Necessário
Literal['always_require', 'never_require'] | None
max_invocations
Necessário
int | None
max_invocation_exceptions
Necessário
int | None
additional_properties
Necessário

Keyword-Only Parâmetros

Nome Descrição
name

O nome da função. Se não for fornecido, será utilizado o atributo da __name__ função.

Valor padrão: None
description

Uma descrição da função. Se não for fornecido, será utilizado o docstring da função.

Valor padrão: None
approval_mode

Se é ou não necessária aprovação para executar esta ferramenta. O padrão é que a aprovação não é necessária.

Valor padrão: None
max_invocations

O número máximo de vezes que esta função pode ser invocada. Se não houver, não há limite, deve haver pelo menos 1.

Valor padrão: None
max_invocation_exceptions

O número máximo de exceções permitidas durante as invocações. Se não houver, não há limite, deve haver pelo menos 1.

Valor padrão: None
additional_properties

Propriedades adicionais a definir na função.

Valor padrão: None

Regressos

Tipo Descrição
AIFunction[Any, <xref:agent_framework._tools.ReturnT>] | Callable[[Callable[[…], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]]], AIFunction[Any, <xref:agent_framework._tools.ReturnT>]]

Exemplos


   from agent_framework import ai_function
   from typing import Annotated


   @ai_function
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # the same function but with approval required to run
   @ai_function(approval_mode="always_require")
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # With custom name and description
   @ai_function(name="custom_weather", description="Custom weather function")
   def another_weather_func(location: str) -> str:
       return f"Weather in {location}"


   # Async functions are also supported
   @ai_function
   async def async_get_weather(location: str) -> str:
       '''Get weather asynchronously.'''
       # Simulate async operation
       return f"Weather in {location}"

chat_middleware

Decorador para marcar uma função como middleware de chat.

Este decorador identifica explicitamente uma função como middleware de chat, que processa objetos ChatContext.

chat_middleware(func: Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]

Parâmetros

Nome Descrição
func
Necessário

O middleware funciona para marcar como middleware de chat.

Regressos

Tipo Descrição

A mesma função com o marcador middleware do chat.

Exemplos


   from agent_framework import chat_middleware, ChatContext, ChatAgent


   @chat_middleware
   async def logging_middleware(context: ChatContext, next):
       print(f"Messages: {len(context.messages)}")
       await next(context)
       print(f"Response: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

create_edge_runner

Função de fábrica para criar o runner de arestas apropriado para um grupo de arestas.

create_edge_runner(edge_group: EdgeGroup, executors: dict[str, Executor]) -> EdgeRunner

Parâmetros

Nome Descrição
edge_group
Necessário
<xref:agent_framework._workflows._edge.EdgeGroup>

O grupo de arestas para criar um corredor.

executors
Necessário

Mapeamento dos IDs do executor para as instâncias do executor.

Regressos

Tipo Descrição
<xref:agent_framework._workflows._edge_runner.EdgeRunner>

A instância apropriada do EdgeRunner.

executor

Decorator que converte uma função autónoma numa instância FunctionExecutor.

O @executor decorador foi concebido apenas para funções autónomas ao nível do módulo. Para executores baseados em classes, use a classe base Executor com @handler métodos on instance.

Suporta funções síncronas e assíncronas. As funções síncronas são executadas num pool de threads para evitar bloquear o ciclo de eventos.

Importante

Utilização @executor para funções autónomas (funções ao nível do módulo ou locais)

NÃO usar @executor com staticmethod ou classmethod

Para executores baseados em classes, subclasse Executor e uso @handler em métodos de instância

Utilização:


   # Standalone async function (RECOMMENDED):
   @executor(id="upper_case")
   async def to_upper(text: str, ctx: WorkflowContext[str]):
       await ctx.send_message(text.upper())


   # Standalone sync function (runs in thread pool):
   @executor
   def process_data(data: str):
       return data.upper()


   # For class-based executors, use @handler instead:
   class MyExecutor(Executor):
       def __init__(self):
           super().__init__(id="my_executor")

       @handler
       async def process(self, data: str, ctx: WorkflowContext[str]):
           await ctx.send_message(data.upper())
executor(func: Callable[[...], Any] | None = None, *, id: str | None = None) -> Callable[[Callable[[...], Any]], FunctionExecutor] | FunctionExecutor

Parâmetros

Nome Descrição
func
Callable[[...], Any] | None

A função de decorar (quando usada sem parênteses)

Valor padrão: None
id
Necessário
str | None

ID personalizado opcional para o executor. Se Não, usa o nome da função.

Keyword-Only Parâmetros

Nome Descrição
id
Valor padrão: None

Regressos

Tipo Descrição

Uma instância de FunctionExecutor que pode ser ligada a um Workflow.

Exceções

Tipo Descrição

Se usado com staticmethod ou classmethod (padrão não suportado)

function_middleware

Decorador para marcar uma função como middleware funcional.

Este decorador identifica explicitamente uma função como middleware de funções, que processa objetos FunctionInvocationContext.

function_middleware(func: Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]

Parâmetros

Nome Descrição
func
Necessário

A função do middleware para marcar como middleware de função.

Regressos

Tipo Descrição

A mesma função com marcador de middleware de função.

Exemplos


   from agent_framework import function_middleware, FunctionInvocationContext, ChatAgent


   @function_middleware
   async def logging_middleware(context: FunctionInvocationContext, next):
       print(f"Calling: {context.function.name}")
       await next(context)
       print(f"Result: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

get_checkpoint_summary

get_checkpoint_summary(checkpoint: WorkflowCheckpoint) -> WorkflowCheckpointSummary

Parâmetros

Nome Descrição
checkpoint
Necessário

Regressos

Tipo Descrição

get_logger

Arranja um logger com o nome especificado, usando por defeito 'agent_framework'.

get_logger(name: str = 'agent_framework') -> Logger

Parâmetros

Nome Descrição
name
str

O nome do lenhador. Por defeito, é 'agent_framework'.

Valor padrão: "agent_framework"

Regressos

Tipo Descrição

A instância do logger configurada.

handler

Decorador para registar um tratador para um executor.

handler(func: Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]) -> Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]

Parâmetros

Nome Descrição
func
Necessário
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

A função de decorar. Pode ser Nenhum quando usado sem parâmetros.

Regressos

Tipo Descrição
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

A função decorada com metadados do handler.

Exemplos

@handler async def handle_string(self, message: str, ctx: WorkflowContext[str]) -> Nenhum:

...

@handler async def handle_data(self, message: dict, ctx: WorkflowContext[str | int]) -> Nenhum:

...

prepare_function_call_results

Prepare os valores dos resultados das chamadas de função.

prepare_function_call_results(content: TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any | list[TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any]) -> str

Parâmetros

Nome Descrição
content
Necessário

Regressos

Tipo Descrição
str

prepend_agent_framework_to_user_agent

Anteponha "agent-framework" ao User-Agent nos cabeçalhos.

Quando a telemetria do agente do utilizador é desativada através da AGENT_FRAMEWORK_USER_AGENT_DISABLED variável de ambiente, o cabeçalho User-Agent não incluirá a informação do agent-framework. Será devolvida tal como está, ou como um ditado vazio quando Nenhum for aprovado.

prepend_agent_framework_to_user_agent(headers: dict[str, Any] | None = None) -> dict[str, Any]

Parâmetros

Nome Descrição
headers

O dicionário de cabeçalhos existente.

Valor padrão: None

Regressos

Tipo Descrição

Um novo dict com "User-Agent" definido para "agent-framework-python/{version}" se os cabeçalhos forem Nenhum. O dicionário de cabeçalhos modificados com "agent-framework-python/{version}" prependido ao User-Agent.

Exemplos


   from agent_framework import prepend_agent_framework_to_user_agent

   # Add agent-framework to new headers
   headers = prepend_agent_framework_to_user_agent()
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0"

   # Prepend to existing headers
   existing = {"User-Agent": "my-app/1.0"}
   headers = prepend_agent_framework_to_user_agent(existing)
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0 my-app/1.0"

response_handler

Decorador para registar um handler para tratar das respostas a um pedido.

response_handler(func: Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]) -> Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]

Parâmetros

Nome Descrição
func
Necessário
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

A função de decorar.

Regressos

Tipo Descrição
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

A função decorada com metadados do handler.

Exemplos


   @handler
   async def run(self, message: int, context: WorkflowContext[str]) -> None:
       # Example of a handler that sends a request
       ...
       # Send a request with a `CustomRequest` payload and expect a `str` response.
       await context.request_info(CustomRequest(...), str)


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: str,
       context: WorkflowContext[str],
   ) -> None:
       # Example of a response handler for the above request
       ...


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: dict,
       context: WorkflowContext[int],
   ) -> None:
       # Example of a response handler for a request expecting a dict response
       ...

setup_logging

Configura a configuração de registos para o framework do agente.

setup_logging() -> None

Regressos

Tipo Descrição

use_agent_middleware

Decorador de classe que adiciona suporte de middleware a uma classe de agente.

Este decorador adiciona funcionalidade de middleware a qualquer classe de agente. Envolve os run() métodos e run_stream() para fornecer a execução de middleware.

A execução do middleware pode ser terminada a qualquer momento ao definir a context.terminate propriedade como True. Uma vez definido, o pipeline deixa de executar mais middleware assim que o controlo regressa ao pipeline.

Observação

Este decorador já está aplicado a aulas integradas para agentes. Só precisas de usar

Se estiveres a criar implementações de agentes personalizados.

use_agent_middleware(agent_class: type[TAgent]) -> type[TAgent]

Parâmetros

Nome Descrição
agent_class
Necessário
type[<xref:TAgent>]

A classe agente para adicionar suporte a middleware.

Regressos

Tipo Descrição
type[~<xref:TAgent>]

A classe agente modificada com suporte a middleware.

Exemplos


   from agent_framework import use_agent_middleware


   @use_agent_middleware
   class CustomAgent:
       async def run(self, messages, **kwargs):
           # Agent implementation
           pass

       async def run_stream(self, messages, **kwargs):
           # Streaming implementation
           pass

use_chat_middleware

Decorador de classe que adiciona suporte de middleware a uma classe cliente de chat.

Este decorador adiciona funcionalidade de middleware a qualquer classe de cliente de chat. Envolve os get_response() métodos e get_streaming_response() para fornecer a execução de middleware.

Observação

Este decorador já é aplicado a classes de chat integradas para clientes. Só precisas de usar

É se estiveres a criar implementações personalizadas de clientes de chat.

use_chat_middleware(chat_client_class: type[TChatClient]) -> type[TChatClient]

Parâmetros

Nome Descrição
chat_client_class
Necessário
type[<xref:TChatClient>]

A classe cliente de chat para adicionar suporte a middleware.

Regressos

Tipo Descrição
type[~<xref:TChatClient>]

A classe cliente de chat modificada com suporte a middleware.

Exemplos


   from agent_framework import use_chat_middleware


   @use_chat_middleware
   class CustomChatClient:
       async def get_response(self, messages, **kwargs):
           # Chat client implementation
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Streaming implementation
           pass

use_function_invocation

Decorador de classe que permite chamadas de ferramentas para um cliente de chat.

Este decorador envolve os get_response métodos e get_streaming_response para tratar automaticamente chamadas de funções do modelo, executá-las e devolver os resultados ao modelo para processamento posterior.

use_function_invocation(chat_client: type[TChatClient]) -> type[TChatClient]

Parâmetros

Nome Descrição
chat_client
Necessário
type[<xref:TChatClient>]

A classe do cliente de chat para decorar.

Regressos

Tipo Descrição
type[~<xref:TChatClient>]

A classe cliente de chat decorada com invocação de função ativada.

Exceções

Tipo Descrição

Se o cliente de chat não tiver os métodos necessários.

Exemplos


   from agent_framework import use_function_invocation, BaseChatClient


   @use_function_invocation
   class MyCustomClient(BaseChatClient):
       async def get_response(self, messages, **kwargs):
           # Implementation here
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Implementation here
           pass


   # The client now automatically handles function calls
   client = MyCustomClient()

validate_workflow_graph

Função de conveniência para validar um gráfico de fluxo de trabalho.

validate_workflow_graph(edge_groups: Sequence[EdgeGroup], executors: dict[str, Executor], start_executor: Executor) -> None

Parâmetros

Nome Descrição
edge_groups
Necessário
Sequence[<xref:agent_framework._workflows._edge.EdgeGroup>]

Lista de grupos de arestas no fluxo de trabalho

executors
Necessário

Mapeamento dos IDs do executor para as instâncias do executor

start_executor
Necessário

O executor inicial (pode ser instância ou ID)

Regressos

Tipo Descrição

Exceções

Tipo Descrição

Se alguma validação falhar