Compartir a través de


agent_framework Paquete

Paquetes

a2a
ag_ui
anthropic
azure
chatkit
declarative
devui
lab
mem0
microsoft
ollama
openai
redis

Módulos

exceptions
observability

Clases

AIFunction

Una herramienta que encapsula una función de Python para que sea invocable por los modelos de IA.

Esta clase encapsula una función de Python para que los modelos de IA puedan llamarla con validación automática de parámetros y generación de esquemas JSON.

Inicialice AIFunction.

AgentExecutor

ejecutor integrado que encapsula un agente para controlar los mensajes.

AgentExecutor adapta su comportamiento en función del modo de ejecución del flujo de trabajo:

  • run_stream(): emite eventos AgentRunUpdateEvent incrementales a medida que el agente genera tokens.
  • run(): emite un único AgentRunEvent que contiene la respuesta completa.

El ejecutor detecta automáticamente el modo a través de WorkflowContext.is_streaming().

Inicialice el ejecutor con un identificador único.

AgentExecutorRequest

Solicitud a un ejecutor de agente.

AgentExecutorResponse

Respuesta de un ejecutor de agente.

AgentInputRequest

Solicite la entrada humana antes de que un agente se ejecute en flujos de trabajo de generador de alto nivel.

Se genera a través de RequestInfoEvent cuando un flujo de trabajo se detiene antes de que se ejecute un agente. La respuesta se inserta en la conversación como mensaje de usuario para dirigir el comportamiento del agente.

Este es el tipo de solicitud estándar usado por .with_request_info() en SequentialBuilder, ConcurrentBuilder, GroupChatBuilder y HandoffBuilder.

AgentMiddleware

Clase base abstracta para middleware del agente que puede interceptar invocaciones de agente.

El middleware del agente permite interceptar y modificar las invocaciones del agente antes y después de la ejecución. Puede inspeccionar los mensajes, modificar el contexto, invalidar los resultados o finalizar la ejecución anticipadamente.

Nota:

AgentMiddleware es una clase base abstracta. Debe subclase e implementarlo

el método process() para crear middleware de agente personalizado.

AgentProtocol

Protocolo para un agente que se puede invocar.

Este protocolo define la interfaz que todos los agentes deben implementar, incluidas las propiedades para la identificación y los métodos para su ejecución.

Nota:

Los protocolos usan subtipado estructural (escritura de pato). Las clases no necesitan

para heredar explícitamente de este protocolo para que se considere compatible.

Esto le permite crear agentes completamente personalizados sin usar

cualquier clase base de Agent Framework.

AgentRunContext

Objeto de contexto para invocaciones de middleware del agente.

Este contexto se pasa a través de la canalización de middleware del agente y contiene toda la información sobre la invocación del agente.

Inicialice AgentRunContext.

AgentRunEvent

Evento desencadenado cuando se completa una ejecución del agente.

Inicialice el evento de ejecución del agente.

AgentRunResponse

Representa la respuesta a una solicitud de ejecución del agente.

Proporciona uno o varios mensajes de respuesta y metadatos sobre la respuesta. Una respuesta típica contendrá un solo mensaje, pero puede contener varios mensajes en escenarios que implican llamadas de función, recuperaciones rag o lógica compleja.

Inicialice un AgentRunResponse.

AgentRunResponseUpdate

Representa un único fragmento de respuesta de streaming de un agente.

Inicialice un AgentRunResponseUpdate.

AgentRunUpdateEvent

Evento desencadenado cuando un agente transmite mensajes.

Inicialice el evento de streaming del agente.

AgentThread

La clase de subproceso Del agente, puede representar un subproceso administrado localmente o un subproceso administrado por el servicio.

Mantiene AgentThread el estado de la conversación y el historial de mensajes para una interacción del agente. Puede usar un subproceso administrado por el servicio (a través service_thread_idde ) o un almacén de mensajes local (a través message_storede ), pero no ambos.

Inicialice un AgentThread, no use este método manualmente, siempre use: agent.get_new_thread().

Nota:

Se pueden establecer service_thread_id o message_store, pero no ambos.

AggregateContextProvider

ContextProvider que contiene varios proveedores de contexto.

Delega eventos a varios proveedores de contexto y agrega respuestas de esos eventos antes de devolverlos. Esto le permite combinar varios proveedores de contexto en un único proveedor.

Nota:

AggregateContextProvider se crea automáticamente cuando se pasa un único contexto

proveedor o una secuencia de proveedores de contexto para el constructor del agente.

Inicialice AggregateContextProvider con proveedores de contexto.

BaseAgent

Clase base para todos los agentes de Agent Framework.

Esta clase proporciona funcionalidad básica para las implementaciones del agente, incluidos los proveedores de contexto, la compatibilidad con middleware y la administración de subprocesos.

Nota:

No se puede crear una instancia de BaseAgent directamente, ya que no implementa el

run(), run_stream() y otros métodos requeridos por AgentProtocol.

Use una implementación concreta como ChatAgent o cree una subclase.

Inicialice una instancia de BaseAgent.

BaseAnnotation

Clase base para todos los tipos de anotación de IA.

Inicialice BaseAnnotation.

BaseChatClient

Clase base para clientes de chat.

Esta clase base abstracta proporciona una funcionalidad básica para las implementaciones de cliente de chat, incluida la compatibilidad con middleware, la preparación de mensajes y la normalización de herramientas.

Nota:

BaseChatClient no se puede crear una instancia directamente, ya que es una clase base abstracta.

Las subclases deben implementar _inner_get_response() y _inner_get_streaming_response().

Inicialice una instancia de BaseChatClient.

BaseContent

Representa el contenido usado por los servicios de IA.

Inicialice BaseContent.

Case

Contenedor en tiempo de ejecución que combina un predicado switch-case con su destino.

Cada caso combina un predicado booleano con el ejecutor que debe controlar el mensaje cuando el predicado se evalúa como True. El tiempo de ejecución mantiene este contenedor ligero separado del switchCaseEdgeGroupCase serializable para que la ejecución pueda funcionar con los invocables activos sin contaminar el estado persistente.

ChatAgent

Un agente de cliente de chat.

Esta es la implementación del agente principal que usa un cliente de chat para interactuar con los modelos de lenguaje. Admite herramientas, proveedores de contexto, middleware y respuestas de streaming y no de streaming.

Inicialice una instancia de ChatAgent.

Nota:

El conjunto de parámetros de frequency_penalty a request_kwargs se usan para

llame al cliente de chat. También se pueden pasar a ambos métodos de ejecución.

Cuando se establecen ambos, los que se pasan a los métodos de ejecución tienen prioridad.

ChatClientProtocol

Protocolo para un cliente de chat que puede generar respuestas.

Este protocolo define la interfaz que todos los clientes de chat deben implementar, incluidos los métodos para generar respuestas de streaming y no streaming.

Nota:

Los protocolos usan subtipado estructural (escritura de pato). Las clases no necesitan

para heredar explícitamente de este protocolo para que se considere compatible.

ChatContext

Objeto de contexto para invocaciones de middleware de chat.

Este contexto se pasa a través de la canalización de middleware de chat y contiene toda la información sobre la solicitud de chat.

Inicialice ChatContext.

ChatMessage

Representa un mensaje de chat.

Inicialice ChatMessage.

ChatMessageStore

Implementación en memoria de ChatMessageStoreProtocol que almacena mensajes en una lista.

Esta implementación proporciona un almacenamiento simple basado en listas para mensajes de chat compatibles con la serialización y deserialización. Implementa todos los métodos necesarios del ChatMessageStoreProtocol protocolo.

El almacén mantiene los mensajes en la memoria y proporciona métodos para serializar y deserializar el estado con fines de persistencia.

Cree un ChatMessageStore para usarlo en un subproceso.

ChatMessageStoreProtocol

Define métodos para almacenar y recuperar mensajes de chat asociados a un subproceso específico.

Las implementaciones de este protocolo son responsables de administrar el almacenamiento de mensajes de chat, incluido el control de grandes volúmenes de datos truncando o resumiendo los mensajes según sea necesario.

ChatMiddleware

Clase base abstracta para middleware de chat que puede interceptar las solicitudes de cliente de chat.

El middleware de chat permite interceptar y modificar las solicitudes de cliente de chat antes y después de la ejecución. Puede modificar mensajes, agregar avisos del sistema, solicitudes de registro o invalidar respuestas de chat.

Nota:

ChatMiddleware es una clase base abstracta. Debe subclase e implementarlo

el método process() para crear middleware de chat personalizado.

ChatOptions

Configuración de solicitud común para los servicios de IA.

Inicialice ChatOptions.

ChatResponse

Representa la respuesta a una solicitud de chat.

Inicializa un ChatResponse con los parámetros proporcionados.

ChatResponseUpdate

Representa un único fragmento de respuesta de streaming de un chatClient.

Inicializa una clase ChatResponseUpdate con los parámetros proporcionados.

CheckpointStorage

Protocolo para back-end de almacenamiento de puntos de control.

CitationAnnotation

Representa una anotación de cita.

Inicialice CitationAnnotation.

ConcurrentBuilder

Generador de alto nivel para flujos de trabajo simultáneos del agente.

  • participantes([...]) acepta una lista de AgentProtocol (recomendado) o Executor.

  • register_participants([...]) acepta una lista de generadores para AgentProtocol (recomendado)

    o generadores de Ejecutor

  • build() wires: distribuidor -> fan-out - participantes ->> fan-in -> agregador.

  • with_aggregator(...) invalida el agregador predeterminado con un ejecutor o una devolución de llamada.

  • register_aggregator(...) acepta un generador de un ejecutor como agregador personalizado.

Uso:


   from agent_framework import ConcurrentBuilder

   # Minimal: use default aggregator (returns list[ChatMessage])
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).build()

   # With agent factories
   workflow = ConcurrentBuilder().register_participants([create_agent1, create_agent2, create_agent3]).build()


   # Custom aggregator via callback (sync or async). The callback receives
   # list[AgentExecutorResponse] and its return value becomes the workflow's output.
   def summarize(results: list[AgentExecutorResponse]) -> str:
       return " | ".join(r.agent_run_response.messages[-1].text for r in results)


   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_aggregator(summarize).build()


   # Custom aggregator via a factory
   class MyAggregator(Executor):
       @handler
       async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None:
           await ctx.yield_output(" | ".join(r.agent_run_response.messages[-1].text for r in results))


   workflow = (
       ConcurrentBuilder()
       .register_participants([create_agent1, create_agent2, create_agent3])
       .register_aggregator(lambda: MyAggregator(id="my_aggregator"))
       .build()
   )


   # Enable checkpoint persistence so runs can resume
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_checkpointing(storage).build()

   # Enable request info before aggregation
   workflow = ConcurrentBuilder().participants([agent1, agent2]).with_request_info().build()
Context

Clase que contiene cualquier contexto que se debe proporcionar al modelo de IA, tal como lo proporciona contextProvider.

Cada ContextProvider tiene la capacidad de proporcionar su propio contexto para cada invocación. La clase Context contiene el contexto adicional proporcionado por ContextProvider. Este contexto se combinará con el contexto proporcionado por otros proveedores antes de pasarse al modelo de IA. Este contexto es por invocación y no se almacenará como parte del historial de chat.

Cree un nuevo objeto Context.

ContextProvider

Clase base para todos los proveedores de contexto.

Un proveedor de contexto es un componente que se puede usar para mejorar la administración de contextos de la inteligencia artificial. Puede escuchar los cambios en la conversación y proporcionar contexto adicional al modelo de IA justo antes de la invocación.

Nota:

ContextProvider es una clase base abstracta. Debe subclase e implementarlo

el método invocación() para crear un proveedor de contexto personalizado. Lo ideal es que debas

implemente también los métodos invoked() y thread_created() para realizar un seguimiento de la conversación.

state, pero son opcionales.

DataContent

Representa el contenido de datos binarios con un tipo de medio asociado (también conocido como tipo MIME).

Importante

Esto es para los datos binarios que se representan como un URI de datos, no para los recursos en línea.

Use UriContent para recursos en línea.

Inicializa una instancia de DataContent.

Importante

Esto es para los datos binarios que se representan como un URI de datos, no para los recursos en línea.

Use UriContent para recursos en línea.

Default

Representación en tiempo de ejecución de la rama predeterminada en un grupo de casos switch.

La rama predeterminada solo se invoca cuando no coinciden otros predicados de caso. En la práctica, se garantiza que existe para que el enrutamiento nunca genere un destino vacío.

Edge

Modele una entrega condicional dirigida, opcionalmente, entre dos ejecutores.

Cada edge captura los metadatos mínimos necesarios para mover un mensaje de un ejecutor a otro dentro del gráfico de flujo de trabajo. Opcionalmente, inserta un predicado booleano que decide si el borde se debe tomar en tiempo de ejecución. Al serializar el borde hasta primitivos, podemos reconstruir la topología de un flujo de trabajo independientemente del proceso de Python original.

Inicialice un borde totalmente especificado entre dos ejecutores de flujo de trabajo.

EdgeDuplicationError

Excepción generada cuando se detectan bordes duplicados en el flujo de trabajo.

ErrorContent

Representa un error.

Observaciones: normalmente se usa para errores no irrecuperables, donde algo salió mal como parte de la operación, pero la operación todavía era capaz de continuar.

Inicializa una instancia de ErrorContent.

Executor

Clase base para todos los ejecutores de flujo de trabajo que procesan mensajes y realizan cálculos.

Información general

Los ejecutores son los bloques de creación fundamentales de los flujos de trabajo, que representan unidades de procesamiento individuales que reciben mensajes, realizan operaciones y generan salidas. Cada ejecutor se identifica de forma única y puede controlar tipos de mensajes específicos a través de métodos de controlador decorados.

Sistema de tipos

Los ejecutores tienen un sistema de tipos enriquecido que define sus funcionalidades:

Tipos de entrada

Los tipos de mensajes que un ejecutor puede procesar, detectados desde firmas de método de controlador:


   class MyExecutor(Executor):
       @handler
       async def handle_string(self, message: str, ctx: WorkflowContext) -> None:
           # This executor can handle 'str' input types

Acceso a través de la propiedad input_types .

Tipos de salida

Los tipos de mensajes que un ejecutor puede enviar a otros ejecutores a través de ctx.send_message():


   class MyExecutor(Executor):
       @handler
       async def handle_data(self, message: str, ctx: WorkflowContext[int | bool]) -> None:
           # This executor can send 'int' or 'bool' messages

Acceso a través de la propiedad output_types .

Tipos de salida de flujo de trabajo

Los tipos de datos que un ejecutor puede emitir como salidas de nivel de flujo de trabajo a través de ctx.yield_output():


   class MyExecutor(Executor):
       @handler
       async def process(self, message: str, ctx: WorkflowContext[int, str]) -> None:
           # Can send 'int' messages AND yield 'str' workflow outputs

Acceso a través de la propiedad workflow_output_types .

Detección del controlador

Los ejecutores detectan sus capacidades mediante métodos decorados:

@handler Decorador

Marca los métodos que procesan los mensajes entrantes:


   class MyExecutor(Executor):
       @handler
       async def handle_text(self, message: str, ctx: WorkflowContext[str]) -> None:
           await ctx.send_message(message.upper())

Interceptación de solicitudes de subproceso de flujo de trabajo

Use @handler métodos para interceptar solicitudes de sub-flujo de trabajo:


   class ParentExecutor(Executor):
       @handler
       async def handle_subworkflow_request(
           self,
           request: SubWorkflowRequestMessage,
           ctx: WorkflowContext[SubWorkflowResponseMessage],
       ) -> None:
           if self.is_allowed(request.domain):
               response = request.create_response(data=True)
               await ctx.send_message(response, target_id=request.executor_id)
           else:
               await ctx.request_info(request.source_event, response_type=request.source_event.response_type)

Tipos de contexto

Los métodos de controlador reciben diferentes variantes workflowContext en función de sus anotaciones de tipo:

WorkflowContext (sin parámetros de tipo)

Para controladores que solo realizan efectos secundarios sin enviar mensajes ni producir salidas:


   class LoggingExecutor(Executor):
       @handler
       async def log_message(self, msg: str, ctx: WorkflowContext) -> None:
           print(f"Received: {msg}")  # Only logging, no outputs

WorkflowContext[T_Out]

Habilita el envío de mensajes de tipo T_Out a través de ctx.send_message():


   class ProcessorExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int]) -> None:
           await ctx.send_message(42)  # Can send int messages

WorkflowContext[T_Out, T_W_Out]

Habilita tanto el envío de mensajes (T_Out) como la producción de salidas de flujo de trabajo (T_W_Out):


   class DualOutputExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int, str]) -> None:
           await ctx.send_message(42)  # Send int message
           await ctx.yield_output("done")  # Yield str workflow output

Ejecutores de función

Las funciones simples se pueden convertir en ejecutores mediante el decorador de @executor :


   @executor
   async def process_text(text: str, ctx: WorkflowContext[str]) -> None:
       await ctx.send_message(text.upper())


   # Or with custom ID:
   @executor(id="text_processor")
   def sync_process(text: str, ctx: WorkflowContext[str]) -> None:
       ctx.send_message(text.lower())  # Sync functions run in thread pool

Composición de sub-flujo de trabajo

Los ejecutores pueden contener subprocesos mediante WorkflowExecutor. Los subprocesos pueden realizar solicitudes que los flujos de trabajo primarios pueden interceptar. Consulte la documentación de WorkflowExecutor para obtener más información sobre los patrones de composición de flujo de trabajo y el control de solicitudes y respuestas.

Administración de estados

Los ejecutores pueden contener estados que se conservan en ejecuciones de flujo de trabajo y puntos de control. Invalide los métodos on_checkpoint_save y on_checkpoint_restore para implementar la lógica de restauración y serialización de estado personalizada.

Notas de implementación

  • No llame directamente a execute(): se invoca mediante el motor de flujo de trabajo.
  • No invalidar execute(): defina controladores mediante decoradores en su lugar.
  • Cada ejecutor debe tener al menos un método @handler
  • Las firmas de método de controlador se validan en el momento de la inicialización

Inicialice el ejecutor con un identificador único.

ExecutorCompletedEvent

Evento desencadenado cuando se completa un controlador de ejecutor.

Inicialice el evento executor con un identificador de ejecutor y datos opcionales.

ExecutorEvent

Clase base para eventos executor.

Inicialice el evento executor con un identificador de ejecutor y datos opcionales.

ExecutorFailedEvent

Evento desencadenado cuando un controlador de ejecutor genera un error.

ExecutorInvokedEvent

Evento desencadenado cuando se invoca un controlador de ejecutor.

Inicialice el evento executor con un identificador de ejecutor y datos opcionales.

FanInEdgeGroup

Representa un conjunto convergente de bordes que alimentan un único ejecutor de bajada.

Los grupos de ventiladores se suelen usar cuando varias fases ascendentes generan de forma independiente mensajes que deben llegar al mismo procesador de bajada.

Cree una asignación de ventilador que combine varios orígenes en un destino.

FanOutEdgeGroup

Representa un grupo perimetral de estilo de difusión con lógica de selección opcional.

Un distribución ramificada reenvía un mensaje generado por un único ejecutor de origen a uno o varios ejecutores de bajada. En tiempo de ejecución, podemos restringir aún más los destinos ejecutando una selection_func que inspecciona la carga y devuelve el subconjunto de identificadores que deben recibir el mensaje.

Cree una asignación de distribución ramificada desde un único origen a muchos destinos.

FileCheckpointStorage

Almacenamiento de punto de comprobación basado en archivos para la persistencia.

Inicialice el almacenamiento de archivos.

FinishReason

Representa el motivo por el que se completó una respuesta de chat.

Inicialice FinishReason con un valor.

FunctionApprovalRequestContent

Representa una solicitud de aprobación del usuario de una llamada de función.

Inicializa una instancia functionApprovalRequestContent.

FunctionApprovalResponseContent

Representa una respuesta para la aprobación del usuario de una llamada de función.

Inicializa una instancia functionApprovalResponseContent.

FunctionCallContent

Representa una solicitud de llamada de función.

Inicializa una instancia de FunctionCallContent.

FunctionExecutor

Ejecutor que ajusta una función definida por el usuario.

Este ejecutor permite a los usuarios definir funciones sencillas (sincronización y asincrónica) y usarlas como ejecutores de flujo de trabajo sin necesidad de crear clases completas del ejecutor.

Las funciones sincrónicas se ejecutan en un grupo de subprocesos mediante asyncio.to_thread() para evitar el bloqueo del bucle de eventos.

Inicialice FunctionExecutor con una función definida por el usuario.

FunctionInvocationConfiguration

Configuración de la invocación de funciones en clientes de chat.

Esta clase se crea automáticamente en todos los clientes de chat que admiten la invocación de funciones. Esto significa que, en la mayoría de los casos, solo puede modificar los atributos de la instancia, en lugar de crear uno nuevo.

Inicializar FunctionInvocationConfiguration.

FunctionInvocationContext

Objeto de contexto para invocaciones de middleware de función.

Este contexto se pasa a través de la canalización de middleware de función y contiene toda la información sobre la invocación de función.

Inicialice FunctionInvocationContext.

FunctionMiddleware

Clase base abstracta para middleware de función que puede interceptar invocaciones de función.

El middleware de función permite interceptar y modificar invocaciones de función o herramienta antes y después de la ejecución. Puede validar argumentos, resultados de caché, invocaciones de registro o invalidar la ejecución de funciones.

Nota:

FunctionMiddleware es una clase base abstracta. Debe subclase e implementarlo

el método process() para crear middleware de función personalizada.

FunctionResultContent

Representa el resultado de una llamada de función.

Inicializa una instancia functionResultContent.

GraphConnectivityError

Excepción generada cuando se detectan problemas de conectividad de grafos.

GroupChatBuilder

Generador de alto nivel para flujos de trabajo de chat de grupo dirigidos por el administrador con orquestación dinámica.

GroupChat coordina las conversaciones multiagente mediante un administrador que selecciona qué participante habla a continuación. El administrador puede ser una función de Python simple (set_select_speakers_func) o un selector basado en agente a través set_managerde . Estos dos enfoques son mutuamente excluyentes.

Flujo de trabajo principal:

  1. Definir participantes: lista de agentes (usa su .name) o nombres de asignación dict a agentes

  2. Configurar la selección del hablante: set_select_speakers_func OR

    set_manager (no ambos)

  3. Opcional: establecer límites redondos, puntos de control, condiciones de terminación

  4. Compilación y ejecución del flujo de trabajo

Patrones de selección de hablantes:

Patrón 1: Selección simple basada en funciones (recomendado)


   from agent_framework import GroupChatBuilder, GroupChatStateSnapshot


   def select_next_speaker(state: GroupChatStateSnapshot) -> str | None:
       # state contains: task, participants, conversation, history, round_index
       if state["round_index"] >= 5:
           return None  # Finish
       last_speaker = state["history"][-1].speaker if state["history"] else None
       if last_speaker == "researcher":
           return "writer"
       return "researcher"


   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher_agent, writer_agent])  # Uses agent.name
       .build()
   )

Patrón 2: selección basada en LLM


   from agent_framework import ChatAgent
   from agent_framework.azure import AzureOpenAIChatClient

   manager_agent = AzureOpenAIChatClient().create_agent(
       instructions="Coordinate the conversation and pick the next speaker.",
       name="Coordinator",
       temperature=0.3,
       seed=42,
       max_tokens=500,
   )

   workflow = (
       GroupChatBuilder()
       .set_manager(manager_agent, display_name="Coordinator")
       .participants([researcher, writer])  # Or use dict: researcher=r, writer=w
       .with_max_rounds(10)
       .build()
   )

Patrón 3: Solicitar información para los comentarios de la conversación media


   from agent_framework import GroupChatBuilder

   # Pause before all participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer])
       .with_request_info()
       .build()
   )

   # Pause only before specific participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer, editor])
       .with_request_info(agents=[editor])  # Only pause before editor responds
       .build()
   )

Especificación del participante:

Dos maneras de especificar participantes:

  • Formulario de lista: [agent1, agent2] : usa agent.name atributo para los nombres de participantes
  • Formulario de dict: {name1: agent1, name2: agent2} - control de nombre explícito
  • Formulario de palabra clave: participants(name1=agent1, name2=agent2): control de nombre explícito

Estructura de instantáneas de estado:

GroupChatStateSnapshot pasado a set_select_speakers_func contiene:

  • task: ChatMessage - Tarea de usuario original
  • participantes: dict[str, str] - Asignación de nombres de participantes a descripciones
  • conversation: tupla[ChatMessage, ...] - Historial completo de conversaciones
  • historial: tupla[GroupChatTurn, ...] - Registro turn-by-turn con atribución del hablante
  • round_index: int - Número de rondas de selección de administrador hasta ahora
  • pending_agent: str | Ninguno: nombre del procesamiento del agente (si existe)

Restricciones importantes:

  • No se puede combinar set_select_speakers_func y set_manager
  • Los nombres de los participantes deben ser únicos
  • Al usar el formulario de lista, los agentes deben tener un atributo de nombre no vacío.

Inicialice GroupChatBuilder.

GroupChatDirective

Instrucción emitida por una implementación del administrador de chat en grupo.

HandoffBuilder

Fluent Builder para flujos de trabajo de entrega conversacionales con agentes de coordinación y especialistas.

El patrón de entrega permite a un agente de coordinación enrutar solicitudes a agentes especialistas. El modo de interacción controla si el flujo de trabajo solicita la entrada del usuario después de cada respuesta del agente o se completa de forma autónoma una vez que los agentes terminen de responder. Una condición de terminación determina cuándo el flujo de trabajo debe dejar de solicitar la entrada y completarse.

Patrones de enrutamiento:

Single-Tier (valor predeterminado): Solo el coordinador puede entregar a especialistas. De forma predeterminada, después de que cualquier especialista responda, el control vuelve al usuario para obtener más entradas. Esto crea un flujo cíclico: usuario -> coordinador -> [especialista opcional] -> usuario -> coordinador -> ... Use with_interaction_mode("autónomo") para omitir la solicitud de entrada de usuario adicional y producir la conversación final cuando un agente responde sin delegar.

Nivel múltiple (avanzado): Los especialistas pueden entregar a otros especialistas mediante .add_handoff(). Esto proporciona más flexibilidad para flujos de trabajo complejos, pero es menos controlable que el patrón de un solo nivel. Los usuarios pierden visibilidad en tiempo real de los pasos intermedios durante las entregas especializadas a especialistas (aunque se conserva el historial completo de conversaciones, incluidas todas las entregas y se pueden inspeccionar después).

Características principales:

  • Detección automática de entrega: el coordinador invoca una herramienta de entrega cuya

    argumentos (por ejemplo , {"handoff_to": "shipping_agent"}) identifican al especialista para recibir el control.

  • Herramientas generadas automáticamente: de forma predeterminada, el generador sintetiza las herramientas handoff_to_<agent> para el coordinador, por lo que no define manualmente las funciones de marcador de posición.

  • Historial de conversaciones completo: toda la conversación (incluida cualquier ChatMessage.additional_properties) se conserva y se pasa a cada agente.

  • Control de terminación: de forma predeterminada, finaliza después de 10 mensajes de usuario. Invalide con .with_termination_condition(lambda conv: ...) para la lógica personalizada (por ejemplo, detectar "adiós").

  • Modos de interacción: elija human_in_loop (valor predeterminado) para solicitar a los usuarios entre turnos del agente o autónomos que continúen enrutando a agentes sin solicitar la entrada del usuario hasta que se produzca una entrega o se alcance un límite de terminación/turno (límite de turno autónomo predeterminado: 50).

  • Punto de control: persistencia opcional para flujos de trabajo reanudables.

Uso (Single-Tier):


   from agent_framework import HandoffBuilder
   from agent_framework.openai import OpenAIChatClient

   chat_client = OpenAIChatClient()

   # Create coordinator and specialist agents
   coordinator = chat_client.create_agent(
       instructions=(
           "You are a frontline support agent. Assess the user's issue and decide "
           "whether to hand off to 'refund_agent' or 'shipping_agent'. When delegation is "
           "required, call the matching handoff tool (for example `handoff_to_refund_agent`)."
       ),
       name="coordinator_agent",
   )

   refund = chat_client.create_agent(
       instructions="You handle refund requests. Ask for order details and process refunds.",
       name="refund_agent",
   )

   shipping = chat_client.create_agent(
       instructions="You resolve shipping issues. Track packages and update delivery status.",
       name="shipping_agent",
   )

   # Build the handoff workflow - default single-tier routing
   workflow = (
       HandoffBuilder(
           name="customer_support",
           participants=[coordinator, refund, shipping],
       )
       .set_coordinator(coordinator)
       .build()
   )

   # Run the workflow
   events = await workflow.run_stream("My package hasn't arrived yet")
   async for event in events:
       if isinstance(event, RequestInfoEvent):
           # Request user input
           user_response = input("You: ")
           await workflow.send_response(event.data.request_id, user_response)

Enrutamiento de varios niveles con .add_handoff():


   # Enable specialist-to-specialist handoffs with fluent API
   workflow = (
       HandoffBuilder(participants=[coordinator, replacement, delivery, billing])
       .set_coordinator(coordinator)
       .add_handoff(coordinator, [replacement, delivery, billing])  # Coordinator routes to all
       .add_handoff(replacement, [delivery, billing])  # Replacement delegates to delivery/billing
       .add_handoff(delivery, billing)  # Delivery escalates to billing
       .build()
   )

   # Flow: User → Coordinator → Replacement → Delivery → Back to User
   # (Replacement hands off to Delivery without returning to user)

Usar factorías de participantes para el aislamiento de estado:

Condición de terminación personalizada:


   # Terminate when user says goodbye or after 5 exchanges
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_termination_condition(
           lambda conv: (
               sum(1 for msg in conv if msg.role.value == "user") >= 5
               or any("goodbye" in msg.text.lower() for msg in conv[-2:])
           )
       )
       .build()
   )

Punto de control:


   from agent_framework import InMemoryCheckpointStorage

   storage = InMemoryCheckpointStorage()
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_checkpointing(storage)
       .build()
   )

Inicialice un HandoffBuilder para crear flujos de trabajo de entrega conversacionales.

El generador se inicia en un estado no configurado y requiere que llame a:

  1. .participants([...]) - Registrar agentes
  2. o .participant_factories({...}): registrar factorías de agente o ejecutor
  3. .set_coordinator(...) - Designar qué agente recibe la entrada inicial del usuario.
  4. .build(): construir el flujo de trabajo final

Los métodos de configuración opcionales permiten personalizar la administración de contextos, la lógica de terminación y la persistencia.

Nota:

Los participantes deben tener nombres o identificadores estables porque el flujo de trabajo asigna el

argumentos de la herramienta handoff para estos identificadores. Los nombres de agente deben coincidir

las cadenas emitidas por la herramienta de entrega del coordinador (por ejemplo, una herramienta que

outputs {"handoff_to": "billing"} requiere un agente denominado billing).

HandoffUserInputRequest

Mensaje de solicitud emitido cuando el flujo de trabajo necesita una entrada de usuario nueva.

Nota: El campo de conversación se excluye intencionadamente de la serialización de puntos de control para evitar la duplicación. La conversación se conserva en el estado del coordinador y se reconstruirá en la restauración. Consulte el problema n.º 2667.

HostedCodeInterpreterTool

Representa una herramienta hospedada que se puede especificar en un servicio de inteligencia artificial para habilitarla para ejecutar código generado.

Esta herramienta no implementa la propia interpretación del código. Sirve como marcador para informar a un servicio de que se permite ejecutar código generado si el servicio es capaz de hacerlo.

Inicialice HostedCodeInterpreterTool.

HostedFileContent

Representa un contenido de archivo hospedado.

Inicializa una instancia hostedFileContent.

HostedFileSearchTool

Representa una herramienta de búsqueda de archivos que se puede especificar en un servicio de inteligencia artificial para habilitarla para realizar búsquedas de archivos.

Inicialice un fileSearchTool.

HostedMCPSpecificApproval

Representa el modo específico de una herramienta hospedada.

Al usar este modo, el usuario debe especificar qué herramientas siempre o nunca requieren aprobación. Esto se representa como un diccionario con dos claves opcionales:

HostedMCPTool

Representa una herramienta MCP administrada y ejecutada por el servicio.

Cree una herramienta MCP hospedada.

HostedVectorStoreContent

Representa un contenido de almacén de vectores hospedado.

Inicializa una instancia hostedVectorStoreContent.

HostedWebSearchTool

Representa una herramienta de búsqueda web que se puede especificar en un servicio de inteligencia artificial para habilitarla para realizar búsquedas web.

Inicialice hostedWebSearchTool.

InMemoryCheckpointStorage

Almacenamiento de puntos de comprobación en memoria para pruebas y desarrollo.

Inicialice el almacenamiento de memoria.

InProcRunnerContext

Contexto de ejecución en proceso para la ejecución local y puntos de comprobación opcionales.

Inicialice el contexto de ejecución en proceso.

MCPStdioTool

Herramienta MCP para conectarse a servidores MCP basados en stdio.

Esta clase se conecta a los servidores MCP que se comunican a través de la entrada y salida estándar, que normalmente se usan para los procesos locales.

Inicialice la herramienta stdio de MCP.

Nota:

Los argumentos se usan para crear un objeto StdioServerParameters,

que se usa para crear un cliente stdio. Consulte mcp.client.stdio.stdio_client

y mcp.client.stdio.stdio_server_parameters para obtener más detalles.

MCPStreamableHTTPTool

Herramienta MCP para conectarse a servidores MCP basados en HTTP.

Esta clase se conecta a servidores MCP que se comunican a través de HTTP/SSE que se puede transmitir.

Inicialice la herramienta HTTP que se puede transmitir con MCP.

Nota:

Los argumentos se usan para crear un cliente HTTP que se pueda transmitir.

Consulte mcp.client.streamable_http.streamablehttp_client para obtener más información.

Todos los argumentos adicionales pasados al constructor se pasarán a .

constructor de cliente HTTP que se puede transmitir.

MCPWebsocketTool

Herramienta MCP para conectarse a servidores MCP basados en WebSocket.

Esta clase se conecta a servidores MCP que se comunican a través de WebSocket.

Inicialice la herramienta WebSocket de MCP.

Nota:

Los argumentos se usan para crear un cliente webSocket.

Consulte mcp.client.websocket.websocket_client para obtener más información.

Todos los argumentos adicionales pasados al constructor se pasarán a .

Constructor de cliente de WebSocket.

MagenticBuilder

Fluent Builder para crear flujos de trabajo de orquestación multiagente de Magentic One.

Los flujos de trabajo de Magentic One usan un administrador con tecnología LLM para coordinar varios agentes mediante el planeamiento dinámico de tareas, el seguimiento de progreso y el replanado adaptable. El administrador crea planes, selecciona agentes, supervisa el progreso y determina cuándo se va a replanar o completar.

El generador proporciona una API fluida para configurar participantes, el administrador, la revisión de planes opcionales, los puntos de control y las devoluciones de llamada de eventos.

Soporte humano en bucle: Magentic proporciona mecanismos HITL especializados a través de:

  • .with_plan_review(): revisar y aprobar o revisar planes antes de la ejecución

  • .with_human_input_on_stall(): intervención cuando se detiene el flujo de trabajo

  • Aprobación de herramientas a través de FunctionApprovalRequestContent : aprobación de llamadas a herramientas individuales

Estos emiten eventos MagenticHumanInterventionRequest que proporcionan opciones de decisión estructuradas (APPROVE, REVISE, CONTINUE, REPLAN, GUIDANCE) adecuadas para la orquestación basada en la planificación de Magentic.

Uso:


   from agent_framework import MagenticBuilder, StandardMagenticManager
   from azure.ai.projects.aio import AIProjectClient

   # Create manager with LLM client
   project_client = AIProjectClient.from_connection_string(...)
   chat_client = project_client.inference.get_chat_completions_client()

   # Build Magentic workflow with agents
   workflow = (
       MagenticBuilder()
       .participants(researcher=research_agent, writer=writing_agent, coder=coding_agent)
       .with_standard_manager(chat_client=chat_client, max_round_count=20, max_stall_count=3)
       .with_plan_review(enable=True)
       .with_checkpointing(checkpoint_storage)
       .build()
   )

   # Execute workflow
   async for message in workflow.run("Research and write article about AI agents"):
       print(message.text)

Con el administrador personalizado:


   # Create custom manager subclass
   class MyCustomManager(MagenticManagerBase):
       async def plan(self, context: MagenticContext) -> ChatMessage:
           # Custom planning logic
           ...


   manager = MyCustomManager()
   workflow = MagenticBuilder().participants(agent1=agent1, agent2=agent2).with_standard_manager(manager).build()
MagenticContext

Contexto del administrador magentic.

MagenticManagerBase

Clase base para el administrador de Magentic One.

ManagerDirectiveModel

Modelo pydantic para la salida de la directiva de administrador estructurado.

Cree un nuevo modelo mediante el análisis y la validación de datos de entrada a partir de argumentos de palabra clave.

Genera [ValidationError][pydantic_core. ValidationError] si los datos de entrada no se pueden validar para formar un modelo válido.

self es explícitamente solo posicional para permitirse como un nombre de campo.

ManagerSelectionRequest

Solicitud enviada al agente de administrador para la siguiente selección del hablante.

Esta clase de datos empaqueta el estado completo de la conversación y el contexto de la tarea para que el agente del administrador analice y tome una decisión de selección del hablante.

ManagerSelectionResponse

Respuesta del agente del administrador con decisión de selección del hablante.

El agente de administrador debe generar esta estructura (o dict/JSON compatible) para comunicar su decisión al orquestador.

Cree un nuevo modelo mediante el análisis y la validación de datos de entrada a partir de argumentos de palabra clave.

Genera [ValidationError][pydantic_core. ValidationError] si los datos de entrada no se pueden validar para formar un modelo válido.

self es explícitamente solo posicional para permitirse como un nombre de campo.

Message

Clase que representa un mensaje en el flujo de trabajo.

OrchestrationState

Contenedor de estado unificado para el control de puntos de control del orquestador.

Esta clase de datos normaliza la serialización de puntos de control en los tres patrones de chat de grupo, al tiempo que permite extensiones específicas de patrones a través de metadatos.

Los atributos comunes cubren los problemas de orquestación compartida (tarea, conversación, seguimiento de redondeo). El estado específico del patrón pasa en la diferencia de metadatos.

RequestInfoEvent

Evento desencadenado cuando un ejecutor de flujo de trabajo solicita información externa.

Inicialice el evento de información de solicitud.

RequestInfoInterceptor

Ejecutor interno que pausa el flujo de trabajo para la entrada humana antes de que se ejecute el agente.

Este ejecutor se inserta en el gráfico de flujo de trabajo mediante generadores cuando se llama a .with_request_info(). Intercepta los mensajes AgentExecutorRequest ANTES de que el agente se ejecute y pausa el flujo de trabajo a través de ctx.request_info() con agentInputRequest.

Cuando se recibe una respuesta, el controlador de respuesta inserta la entrada como mensaje de usuario en la conversación y reenvía la solicitud al agente.

El parámetro opcional agent_filter permite limitar qué agentes desencadenan la pausa. Si el identificador del agente de destino no está en el conjunto de filtros, la solicitud se reenvía sin pausar.

Inicialice el ejecutor del interceptor de información de solicitud.

Role

Describe el propósito previsto de un mensaje dentro de una interacción de chat.

Propiedades: SYSTEM: rol que indica o establece el comportamiento del sistema de IA. USUARIO: el rol que proporciona la entrada del usuario para las interacciones de chat. ASSISTANT: el rol que proporciona respuestas a la entrada instruida por el sistema y preguntada por el usuario. TOOL: rol que proporciona información adicional y referencias en respuesta a solicitudes de uso de herramientas.

Inicialice el rol con un valor.

Runner

Clase para ejecutar un flujo de trabajo en supersteps de Pregel.

Inicialice el ejecutor con bordes, estado compartido y contexto.

RunnerContext

Protocolo para el contexto de ejecución utilizado por el ejecutor.

Un único contexto que admite mensajería, eventos y puntos de control opcionales. Si el almacenamiento de puntos de control no está configurado, los métodos de punto de control pueden generarse.

SequentialBuilder

Generador de alto nivel para flujos de trabajo de agente o ejecutor secuenciales con contexto compartido.

  • participantes([...]) acepta una lista de instancias de AgentProtocol (recomendado) o Executor

  • register_participants([...]) acepta una lista de generadores para AgentProtocol (recomendado)

    o generadores de Ejecutor

  • Los ejecutores deben definir un controlador que consume list[ChatMessage] y enviar una lista[ChatMessage]

  • Los participantes del flujo de trabajo se conectan en orden, pasando una lista[ChatMessage] por la cadena.

  • Los agentes anexan sus mensajes de asistente a la conversación

  • Los ejecutores personalizados pueden transformar o resumir y devolver una lista[ChatMessage]

  • La salida final es la conversación producida por el último participante.

Uso:


   from agent_framework import SequentialBuilder

   # With agent instances
   workflow = SequentialBuilder().participants([agent1, agent2, summarizer_exec]).build()

   # With agent factories
   workflow = (
       SequentialBuilder().register_participants([create_agent1, create_agent2, create_summarizer_exec]).build()
   )

   # Enable checkpoint persistence
   workflow = SequentialBuilder().participants([agent1, agent2]).with_checkpointing(storage).build()

   # Enable request info for mid-workflow feedback (pauses before each agent)
   workflow = SequentialBuilder().participants([agent1, agent2]).with_request_info().build()

   # Enable request info only for specific agents
   workflow = (
       SequentialBuilder()
       .participants([agent1, agent2, agent3])
       .with_request_info(agents=[agent2])  # Only pause before agent2
       .build()
   )
SharedState

Clase para administrar el estado compartido en un flujo de trabajo.

SharedState proporciona acceso seguro para subprocesos a los datos de estado de flujo de trabajo que deben compartirse entre ejecutores durante la ejecución del flujo de trabajo.

Claves reservadas: las claves siguientes están reservadas para uso interno del marco de trabajo y no deben modificarse mediante código de usuario:

  • _executor_state: almacena el estado del ejecutor para el punto de comprobación (administrado por Runner)

Advertencia

No use claves a partir del carácter de subrayado (_) ya que se pueden reservar para

Operaciones internas del marco de trabajo.

Inicialice el estado compartido.

SingleEdgeGroup

Contenedor de conveniencia para un borde solitario, manteniendo el uniforme de api de grupo.

Cree un grupo perimetral uno a uno entre dos ejecutores.

StandardMagenticManager

Administrador de Magentic estándar que realiza llamadas LLM reales a través de un ChatAgent.

El administrador construye mensajes que reflejan la orquestación original de Magentic One:

  • Recopilación de hechos
  • Creación de planes
  • Libro de contabilidad de progreso en JSON
  • Actualización de hechos y actualización del plan sobre el restablecimiento
  • Síntesis de respuesta final

Inicialice el Administrador magentic estándar.

SubWorkflowRequestMessage

Mensaje enviado desde un sub-flujo de trabajo a un ejecutor en el flujo de trabajo primario para solicitar información.

Este mensaje encapsula un RequestInfoEvent emitido por el ejecutor en el sub-flujo de trabajo.

SubWorkflowResponseMessage

Mensaje enviado desde un flujo de trabajo primario a un sub-flujo de trabajo a través de WorkflowExecutor para proporcionar información solicitada.

Este mensaje encapsula los datos de respuesta junto con el requestInfoEvent original emitido por el ejecutor del sub-flujo de trabajo.

SuperStepCompletedEvent

Evento desencadenado cuando finaliza un superpaso.

Inicialice el evento superstep.

SuperStepStartedEvent

Evento desencadenado cuando se inicia un superpaso.

Inicialice el evento superstep.

SwitchCaseEdgeGroup

Variante de distribución ramificada que imita un flujo de control de mayúsculas y minúsculas tradicional.

Cada caso inspecciona la carga del mensaje y decide si debe controlar el mensaje. Exactamente un caso o la rama predeterminada devuelve un destino en tiempo de ejecución, conservando la semántica de envío único.

Configure una estructura de enrutamiento switch/case para un único ejecutor de origen.

SwitchCaseEdgeGroupCase

Descripción persistente de una sola rama condicional en un switch-case.

A diferencia del objeto Case en tiempo de ejecución, esta variante serializable almacena solo el identificador de destino y un nombre descriptivo para el predicado. Cuando el invocable subyacente no está disponible durante la deserialización, sustituyemos un marcador de posición de proxy que produce un error en voz alta, lo que garantiza que la dependencia que falta sea visible inmediatamente.

Registre los metadatos de enrutamiento de una rama de casos condicionales.

SwitchCaseEdgeGroupDefault

Descriptor persistente para la rama de reserva de un grupo switch-case.

Se garantiza que la rama predeterminada existe y se invoca cuando todos los demás predicados de caso no coinciden con la carga.

Apunte la rama predeterminada hacia el identificador del ejecutor especificado.

TextContent

Representa contenido de texto en un chat.

Inicializa una instancia de TextContent.

TextReasoningContent

Representa contenido de razonamiento de texto en un chat.

Observaciones: Esta clase y TextContent son superficialmente similares, pero distintas.

Inicializa una instancia de TextReasoningContent.

TextSpanRegion

Representa una región de texto anotada.

Inicialice TextSpanRegion.

ToolMode

Define si y cómo se usan las herramientas en una solicitud de chat.

Inicialice ToolMode.

ToolProtocol

Representa una herramienta genérica.

Este protocolo define la interfaz que todas las herramientas deben implementar para que sean compatibles con el marco del agente. Se implementa mediante diversas clases de herramientas, como HostedMCPTool, HostedWebSearchTool y AIFunction. El decorador de ai_function crea una instancia de AIFunction.

Dado que cada conector necesita analizar las herramientas de forma diferente, los usuarios pueden pasar un dict para especificar una herramienta específica del servicio cuando no haya ninguna abstracción disponible.

TypeCompatibilityError

Excepción generada cuando se detecta la incompatibilidad de tipos entre ejecutores conectados.

UriContent

Representa un contenido de URI.

Importante

Se usa para el contenido identificado por un URI, como una imagen o un archivo.

En el caso de los URI de datos (binarios), use DataContent en su lugar.

Inicializa una instancia de UriContent.

Comentarios: se usa para el contenido identificado por un URI, como una imagen o un archivo. En el caso de los URI de datos (binarios), use DataContent en su lugar.

UsageContent

Representa la información de uso asociada a una solicitud y respuesta de chat.

Inicializa una instancia usageContent.

UsageDetails

Proporciona detalles de uso sobre una solicitud o respuesta.

Inicializa la instancia UsageDetails.

Workflow

Un motor de ejecución basado en grafos que organiza ejecutores conectados.

Información general

Un flujo de trabajo ejecuta un grafo dirigido de ejecutores conectados a través de grupos perimetrales mediante un modelo similar a Pregel, que se ejecuta en supersteps hasta que el grafo se vuelve inactivo. Los flujos de trabajo se crean mediante la clase WorkflowBuilder: no crean instancias de esta clase directamente.

Modelo de ejecución

Los ejecutores se ejecutan en superpasos sincronizados donde cada ejecutor:

  • Se invoca cuando recibe mensajes de grupos perimetrales conectados.
  • Puede enviar mensajes a los ejecutores de nivel inferior a través de ctx.send_message()
  • Puede producir salidas de nivel de flujo de trabajo a través de ctx.yield_output()
  • Puede emitir eventos personalizados a través de ctx.add_event()

Los mensajes entre ejecutores se entregan al final de cada superstep y no están visibles en la secuencia de eventos. Solo los eventos de nivel de flujo de trabajo (salidas, eventos personalizados) y eventos de estado son observables para los autores de llamadas.

Tipos de entrada y salida

Los tipos de flujo de trabajo se detectan en tiempo de ejecución inspeccionando:

  • Tipos de entrada: desde los tipos de entrada del ejecutor inicial
  • Tipos de salida: unión de todos los tipos de salida de flujo de trabajo de los ejecutores Acceda a ellos a través de las propiedades input_types y output_types.

Métodos de ejecución

El flujo de trabajo proporciona dos API de ejecución principales, cada una de las cuales admite varios escenarios:

  • run(): Ejecutar hasta finalizar, devuelve WorkflowRunResult con todos los eventos.

  • run_stream(): devuelve un generador asincrónico que produce eventos a medida que se producen.

Ambos métodos admiten:

  • Ejecuciones de flujo de trabajo iniciales: proporcione el parámetro message .
  • Restauración del punto de comprobación: proporcione checkpoint_id (y, opcionalmente , checkpoint_storage)
  • Continuación de HIL: proporcionar respuestas para continuar después de solicitudes RequestInfoExecutor
  • Punto de comprobación en tiempo de ejecución: proporcione checkpoint_storage para habilitar o invalidar los puntos de control de esta ejecución

Administración de estados

Las instancias de flujo de trabajo contienen estados y estados se conservan entre las llamadas para ejecutar y run_stream. Para ejecutar varias ejecuciones independientes, cree instancias de flujo de trabajo independientes a través de WorkflowBuilder.

Solicitudes de entrada externas

Los ejecutores dentro de un flujo de trabajo pueden solicitar entradas externas mediante ctx.request_info():

  1. Executor llama a ctx.request_info() para solicitar la entrada
  2. Executor implementa response_handler() para procesar la respuesta
  3. Las solicitudes se emiten como instancias de RequestInfoEvent en el flujo de eventos.
  4. El flujo de trabajo entra en estado de IDLE_WITH_PENDING_REQUESTS
  5. El autor de la llamada controla las solicitudes y proporciona respuestas a través de los métodos send_responses o send_responses_streaming
  6. Las respuestas se enrutan a los ejecutores de solicitud y a los controladores de respuesta se invocan.

Puntos de control

Los puntos de comprobación se pueden configurar en tiempo de compilación o en tiempo de ejecución:

Tiempo de compilación (a través de WorkflowBuilder): workflow = WorkflowBuilder().with_checkpointing(storage).build()

Runtime (a través de parámetros run/run_stream): result = await workflow.run(message, checkpoint_storage=runtime_storage)

Cuando se habilita, los puntos de control se crean al final de cada superpaso, capturando:

  • Estados del ejecutor
  • Mensajes en tránsito
  • Los flujos de trabajo de estado compartido se pueden pausar y reanudar en los reinicios del proceso mediante el almacenamiento de puntos de control.

Composición

Los flujos de trabajo se pueden anidar mediante WorkflowExecutor, que encapsula un flujo de trabajo secundario como ejecutor. Los tipos de entrada y salida del flujo de trabajo anidados forman parte de los tipos de WorkflowExecutor. Cuando se invoca, WorkflowExecutor ejecuta el flujo de trabajo anidado para completar y procesa sus salidas.

Inicialice el flujo de trabajo con una lista de bordes.

WorkflowAgent

Una subclase Agent que encapsula un flujo de trabajo y la expone como agente.

Inicialice WorkflowAgent.

WorkflowBuilder

Clase de generador para construir flujos de trabajo.

Esta clase proporciona una API fluida para definir gráficos de flujo de trabajo mediante la conexión de ejecutores con bordes y la configuración de parámetros de ejecución. Llame build a para crear una instancia inmutable Workflow .

Inicialice WorkflowBuilder con una lista vacía de bordes y ningún ejecutor inicial.

WorkflowCheckpoint

Representa un punto de control completo del estado del flujo de trabajo.

Los puntos de control capturan el estado de ejecución completo de un flujo de trabajo en un punto específico, lo que permite pausar y reanudar los flujos de trabajo.

Nota:

El shared_state dict puede contener claves reservadas administradas por el marco.

Consulte la documentación de la clase SharedState para obtener más información sobre las claves reservadas.

WorkflowCheckpointSummary

Resumen legible de un punto de control de flujo de trabajo.

WorkflowContext

Contexto de ejecución que permite a los ejecutores interactuar con flujos de trabajo y otros ejecutores.

Información general

WorkflowContext proporciona una interfaz controlada para que los ejecutores envíen mensajes, generen salidas, administren el estado e interactúen con el ecosistema de flujo de trabajo más amplio. Aplica la seguridad de tipos a través de parámetros genéricos, a la vez que impide el acceso directo a los componentes internos del entorno de ejecución.

Parámetros de tipo

El contexto se parametriza para aplicar la seguridad de tipos para distintas operaciones:

WorkflowContext (sin parámetros)

Para ejecutores que solo realizan efectos secundarios sin enviar mensajes ni producir salidas:


   async def log_handler(message: str, ctx: WorkflowContext) -> None:
       print(f"Received: {message}")  # Only side effects

WorkflowContext[T_Out]

Habilita el envío de mensajes de tipo T_Out a otros ejecutores:


   async def processor(message: str, ctx: WorkflowContext[int]) -> None:
       result = len(message)
       await ctx.send_message(result)  # Send int to downstream executors

WorkflowContext[T_Out, T_W_Out]

Habilita tanto el envío de mensajes (T_Out) como la producción de salidas de flujo de trabajo (T_W_Out):


   async def dual_output(message: str, ctx: WorkflowContext[int, str]) -> None:
       await ctx.send_message(42)  # Send int message
       await ctx.yield_output("complete")  # Yield str workflow output

Tipos de unión

Se pueden especificar varios tipos mediante la notación de unión:


   async def flexible(message: str, ctx: WorkflowContext[int | str, bool | dict]) -> None:
       await ctx.send_message("text")  # or send 42
       await ctx.yield_output(True)  # or yield {"status": "done"}

Inicialice el contexto del ejecutor con el contexto de flujo de trabajo especificado.

WorkflowErrorDetails

Información de error estructurada que se va a mostrar en eventos o resultados de error.

WorkflowEvent

Clase base para eventos de flujo de trabajo.

Inicialice el evento de flujo de trabajo con datos opcionales.

WorkflowExecutor

Ejecutor que encapsula un flujo de trabajo para habilitar la composición jerárquica del flujo de trabajo.

Información general

WorkflowExecutor hace que un flujo de trabajo se comporte como un único ejecutor dentro de un flujo de trabajo primario, lo que permite arquitecturas de flujo de trabajo anidadas. Controla el ciclo de vida completo de la ejecución del sub-flujo de trabajo, incluido el procesamiento de eventos, el reenvío de salida y la coordinación de solicitudes y respuestas entre los flujos de trabajo primarios y secundarios.

Modelo de ejecución

Cuando se invoca, WorkflowExecutor:

  1. Inicia el flujo de trabajo ajustado con el mensaje de entrada.
  2. Ejecuta el sub-flujo de trabajo hasta que finalice o hasta que necesite una entrada externa.
  3. Procesa el flujo de eventos completo del sub-flujo de trabajo después de la ejecución
  4. Reenvía las salidas al flujo de trabajo primario como mensajes.
  5. Controla las solicitudes externas enrutandolas al flujo de trabajo primario.
  6. Acumula respuestas y reanuda la ejecución del subproceso de flujo de trabajo.

Procesamiento de flujos de eventos

WorkflowExecutor procesa eventos después de la finalización del subproceso del flujo de trabajo:

Reenvío de salida

Todas las salidas del sub-flujo de trabajo se reenvía automáticamente al elemento primario:

Cuando allow_direct_output es False (valor predeterminado):


   # An executor in the sub-workflow yields outputs
   await ctx.yield_output("sub-workflow result")

   # WorkflowExecutor forwards to parent via ctx.send_message()
   # Parent receives the output as a regular message

Cuando allow_direct_output es True:

Coordinación de solicitudes y respuestas

Cuando los subprocesos necesitan información externa:


   # An executor in the sub-workflow makes request
   request = MyDataRequest(query="user info")

   # WorkflowExecutor captures RequestInfoEvent and wraps it in a SubWorkflowRequestMessage
   # then send it to the receiving executor in parent workflow. The executor in parent workflow
   # can handle the request locally or forward it to an external source.
   # The WorkflowExecutor tracks the pending request, and implements a response handler.
   # When the response is received, it executes the response handler to accumulate responses
   # and resume the sub-workflow when all expected responses are received.
   # The response handler expects a SubWorkflowResponseMessage wrapping the response data.

Administración de estados

WorkflowExecutor mantiene el estado de ejecución en ciclos de solicitud y respuesta:

  • Realiza un seguimiento de las solicitudes pendientes por request_id
  • Acumula las respuestas hasta que se reciben todas las respuestas esperadas.
  • Reanuda la ejecución del subproceso de flujo de trabajo con un lote de respuesta completo
  • Controla las ejecuciones simultáneas y varias solicitudes pendientes

Integración del sistema de tipos

WorkflowExecutor hereda su firma de tipo del flujo de trabajo ajustado:

Tipos de entrada

Coincide con los tipos de entrada iniciales del ejecutor del flujo de trabajo ajustado:


   # If sub-workflow accepts str, WorkflowExecutor accepts str
   workflow_executor = WorkflowExecutor(my_workflow, id="wrapper")
   assert workflow_executor.input_types == my_workflow.input_types

Tipos de salida

Combina salidas de sub-flujo de trabajo con tipos de coordinación de solicitudes:


   # Includes all sub-workflow output types
   # Plus SubWorkflowRequestMessage if sub-workflow can make requests
   output_types = workflow.output_types + [SubWorkflowRequestMessage]  # if applicable

Tratamiento de errores

WorkflowExecutor propaga errores de sub-flujo de trabajo:

  • Captura WorkflowFailedEvent del sub-flujo de trabajo
  • Convierte a WorkflowErrorEvent en el contexto primario
  • Proporciona información detallada del error, incluido el identificador de subproceso de flujo de trabajo.

Compatibilidad con la ejecución simultánea

WorkflowExecutor admite totalmente varias ejecuciones simultáneas de sub-flujo de trabajo:

Aislamiento de estado de Per-Execution

Cada invocación de sub-flujo de trabajo crea un executionContext aislado:


   # Multiple concurrent invocations are supported
   workflow_executor = WorkflowExecutor(my_workflow, id="concurrent_executor")

   # Each invocation gets its own execution context
   # Execution 1: processes input_1 independently
   # Execution 2: processes input_2 independently
   # No state interference between executions

Coordinación de solicitudes y respuestas

Las respuestas se enrutan correctamente a la ejecución de origen:

  • Cada ejecución realiza un seguimiento de sus propias solicitudes pendientes y respuestas esperadas
  • La asignación de solicitud a ejecución garantiza que las respuestas lleguen al sub-flujo de trabajo correcto.
  • La acumulación de respuestas está aislada por ejecución
  • Limpieza automática cuando se completa la ejecución

Administración de memoria

  • Ejecuciones simultáneas ilimitadas admitidas
  • Cada ejecución tiene una identificación única basada en UUID
  • Limpieza de contextos de ejecución completados
  • Administración de estado seguro para subprocesos para el acceso simultáneo

Consideraciones importantes

Instancia de flujo de trabajo compartido: todas las ejecuciones simultáneas usan la misma instancia de flujo de trabajo subyacente. Para un aislamiento adecuado, asegúrese de que el flujo de trabajo ajustado y sus ejecutores no tienen estado.


   # Avoid: Stateful executor with instance variables
   class StatefulExecutor(Executor):
       def __init__(self):
           super().__init__(id="stateful")
           self.data = []  # This will be shared across concurrent executions!

Integración con flujos de trabajo primarios

Los flujos de trabajo primarios pueden interceptar solicitudes de subproceso de flujo de trabajo:

Notas de implementación

  • Los subprocesos se ejecutan hasta la finalización antes de procesar sus resultados
  • El procesamiento de eventos es atómico: todas las salidas se reenvieron antes de las solicitudes.
  • La acumulación de respuesta garantiza que los sub-flujos de trabajo reciban lotes de respuesta completos
  • El estado de ejecución se mantiene para la reanudación adecuada después de las solicitudes externas
  • Las ejecuciones simultáneas están totalmente aisladas y no interfieren entre sí

Inicialice WorkflowExecutor.

WorkflowFailedEvent

Evento de ciclo de vida integrado emitido cuando una ejecución de flujo de trabajo finaliza con un error.

WorkflowOutputEvent

Evento desencadenado cuando un ejecutor de flujo de trabajo produce la salida.

Inicialice el evento de salida del flujo de trabajo.

WorkflowRunResult

Contenedor para eventos generados durante la ejecución de flujos de trabajo que no son de streaming.

Información general

Representa los resultados de ejecución completos de una ejecución de flujo de trabajo, que contiene todos los eventos generados desde el inicio hasta el estado inactivo. Los flujos de trabajo generan salidas incrementalmente a través de llamadas ctx.yield_output() durante la ejecución.

Estructura de eventos

Mantiene la separación entre los eventos de plano de datos y de plano de control:

  • Eventos del plano de datos: invocaciones, finalizaciones, salidas y solicitudes del ejecutor (en la lista principal)
  • Eventos del plano de control: escala de tiempo de estado accesible a través del método status_timeline()

Métodos clave

  • get_outputs(): extracción de todas las salidas de flujo de trabajo de la ejecución
  • get_request_info_events(): recuperar solicitudes de entrada externas realizadas durante la ejecución
  • get_final_state(): obtener el estado final del flujo de trabajo (IDLE, IDLE_WITH_PENDING_REQUESTS, etc.)
  • status_timeline(): acceso al historial de eventos de estado completo
WorkflowStartedEvent

Evento de ciclo de vida integrado emitido cuando comienza una ejecución de flujo de trabajo.

Inicialice el evento de flujo de trabajo con datos opcionales.

WorkflowStatusEvent

Evento de ciclo de vida integrado emitido para las transiciones de estado de ejecución del flujo de trabajo.

Inicialice el evento de estado de flujo de trabajo con un nuevo estado y datos opcionales.

WorkflowValidationError

Excepción base para errores de validación de flujo de trabajo.

WorkflowViz

Clase para visualizar flujos de trabajo mediante graphviz y Mermaid.

Inicialice WorkflowViz con un flujo de trabajo.

Enumeraciones

MagenticHumanInterventionDecision

Opciones de decisión para las respuestas de intervención humana.

MagenticHumanInterventionKind

El tipo de intervención humana que se solicita.

ValidationTypeEnum

Enumeración de tipos de validación de flujo de trabajo.

WorkflowEventSource

Identifica si un evento de flujo de trabajo procede del marco o de un ejecutor.

Use FRAMEWORK para eventos emitidos por rutas de acceso de orquestación integradas( incluso cuando el código que los genera reside en módulos relacionados con el ejecutor) y EXECUTOR para eventos expuestos por las implementaciones del ejecutor proporcionadas por el desarrollador.

WorkflowRunState

Estado de nivel de ejecución de una ejecución de flujo de trabajo.

Semántica:

  • STARTED: se ha iniciado la ejecución y se ha creado el contexto de flujo de trabajo. Se trata de un estado inicial antes de realizar cualquier trabajo significativo. En este código base emitimos un workflowStartedEvent dedicado para la telemetría y normalmente avanzamos el estado directamente a IN_PROGRESS. Los consumidores pueden seguir confiando en STARTED para las máquinas de estado que necesitan una fase previa explícita al trabajo.

  • IN_PROGRESS: el flujo de trabajo se está ejecutando activamente (por ejemplo, el mensaje inicial se ha entregado al ejecutor inicial o se está ejecutando un superpaso). Este estado se emite al principio de una ejecución y puede ir seguido de otros estados a medida que avanza la ejecución.

  • IN_PROGRESS_PENDING_REQUESTS: ejecución activa mientras una o varias operaciones de solicitud de información están pendientes. Es posible que se programe un nuevo trabajo mientras las solicitudes están en curso.

  • IDLE: el flujo de trabajo está inactivo sin solicitudes pendientes y no hay más trabajo que hacer. Este es el estado terminal normal para los flujos de trabajo que han terminado de ejecutarse, lo que podría haber generado salidas a lo largo del camino.

  • IDLE_WITH_PENDING_REQUESTS: el flujo de trabajo se pausa en espera de entrada externa (por ejemplo, se emite un RequestInfoEvent). Se trata de un estado no terminal; el flujo de trabajo puede reanudarse cuando se proporcionan respuestas.

  • FAILED: estado del terminal que indica un error expuesto. Acompañado de workflowFailedEvent con detalles de error estructurados.

  • CANCELED: estado del terminal que indica que una llamada o orquestador canceló la ejecución. Actualmente no se emiten por rutas de acceso de ejecutor predeterminadas, pero se incluyen para integradores o orquestadores que admiten la cancelación.

Funciones

agent_middleware

Decorador para marcar una función como middleware de agente.

Este decorador identifica explícitamente una función como middleware del agente, que procesa objetos AgentRunContext.

agent_middleware(func: Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]

Parámetros

Nombre Description
func
Requerido

Función de middleware que se va a marcar como middleware del agente.

Devoluciones

Tipo Description

La misma función con el marcador de middleware del agente.

Ejemplos


   from agent_framework import agent_middleware, AgentRunContext, ChatAgent


   @agent_middleware
   async def logging_middleware(context: AgentRunContext, next):
       print(f"Before: {context.agent.name}")
       await next(context)
       print(f"After: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

ai_function

Decora una función para convertirlo en una instancia de AIFunction que se puede pasar a los modelos y ejecutarse automáticamente.

Este decorador crea un modelo Pydantic a partir de la firma de la función, que se usará para validar los argumentos pasados a la función y para generar el esquema JSON para los parámetros de la función.

Para agregar descripciones a parámetros, use el Annotated tipo de con una descripción de typing cadena como segundo argumento. También puede usar la clase de Field Pydantic para una configuración más avanzada.

Nota:

Cuando approval_mode se establece en "always_require", la función no se ejecutará.

hasta que se proporcione aprobación explícita, esto solo se aplica al flujo de invocación automática.

También es importante tener en cuenta que si el modelo devuelve varias llamadas de función, algunas que requieren aprobación

y otros que no lo hacen, solicitará aprobación para todos ellos.

ai_function(func: Callable[[...], ReturnT | Awaitable[ReturnT]] | None = None, *, name: str | None = None, description: str | None = None, approval_mode: Literal['always_require', 'never_require'] | None = None, max_invocations: int | None = None, max_invocation_exceptions: int | None = None, additional_properties: dict[str, Any] | None = None) -> AIFunction[Any, ReturnT] | Callable[[Callable[[...], ReturnT | Awaitable[ReturnT]]], AIFunction[Any, ReturnT]]

Parámetros

Nombre Description
func
Callable[[...], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]] | None

Función que se va a decorar.

Valor predeterminado: None
name
Requerido
str | None
description
Requerido
str | None
approval_mode
Requerido
Literal['always_require', 'never_require'] | None
max_invocations
Requerido
int | None
max_invocation_exceptions
Requerido
int | None
additional_properties
Requerido

Parámetros de palabra clave únicamente

Nombre Description
name

El nombre de la función. Si no se proporciona, se usará el atributo de __name__ la función.

Valor predeterminado: None
description

Descripción de la función. Si no se proporciona, se usará docstring de la función.

Valor predeterminado: None
approval_mode

Indica si se requiere o no aprobación para ejecutar esta herramienta. El valor predeterminado es que no se necesita la aprobación.

Valor predeterminado: None
max_invocations

Número máximo de veces que se puede invocar esta función. Si Ninguno, no hay límite, debe ser al menos 1.

Valor predeterminado: None
max_invocation_exceptions

Número máximo de excepciones permitidas durante las invocaciones. Si Ninguno, no hay límite, debe ser al menos 1.

Valor predeterminado: None
additional_properties

Propiedades adicionales que se van a establecer en la función.

Valor predeterminado: None

Devoluciones

Tipo Description
AIFunction[Any, <xref:agent_framework._tools.ReturnT>] | Callable[[Callable[[…], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]]], AIFunction[Any, <xref:agent_framework._tools.ReturnT>]]

Ejemplos


   from agent_framework import ai_function
   from typing import Annotated


   @ai_function
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # the same function but with approval required to run
   @ai_function(approval_mode="always_require")
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # With custom name and description
   @ai_function(name="custom_weather", description="Custom weather function")
   def another_weather_func(location: str) -> str:
       return f"Weather in {location}"


   # Async functions are also supported
   @ai_function
   async def async_get_weather(location: str) -> str:
       '''Get weather asynchronously.'''
       # Simulate async operation
       return f"Weather in {location}"

chat_middleware

Decorador para marcar una función como middleware de chat.

Este decorador identifica explícitamente una función como middleware de chat, que procesa objetos ChatContext.

chat_middleware(func: Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]

Parámetros

Nombre Description
func
Requerido

Función de middleware que se va a marcar como middleware de chat.

Devoluciones

Tipo Description

La misma función con el marcador de middleware de chat.

Ejemplos


   from agent_framework import chat_middleware, ChatContext, ChatAgent


   @chat_middleware
   async def logging_middleware(context: ChatContext, next):
       print(f"Messages: {len(context.messages)}")
       await next(context)
       print(f"Response: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

create_edge_runner

Función factory para crear el ejecutor perimetral adecuado para un grupo perimetral.

create_edge_runner(edge_group: EdgeGroup, executors: dict[str, Executor]) -> EdgeRunner

Parámetros

Nombre Description
edge_group
Requerido
<xref:agent_framework._workflows._edge.EdgeGroup>

Grupo perimetral para el que se va a crear un ejecutor.

executors
Requerido

Asignación de identificadores de ejecutor a instancias del ejecutor.

Devoluciones

Tipo Description
<xref:agent_framework._workflows._edge_runner.EdgeRunner>

Instancia de EdgeRunner adecuada.

executor

Decorador que convierte una función independiente en una instancia de FunctionExecutor.

El @executor decorador está diseñado solo para funciones independientes de nivel de módulo. En el caso de los ejecutores basados en clases, use la clase base Executor con @handler en métodos de instancia.

Admite funciones sincrónicas y asincrónicas. Las funciones sincrónicas se ejecutan en un grupo de subprocesos para evitar el bloqueo del bucle de eventos.

Importante

Uso @executor para funciones independientes (funciones locales o de nivel de módulo)

No usar @executor con staticmethod o classmethod

Para ejecutores basados en clases, subclase Executor y uso @handler en métodos de instancia

Uso:


   # Standalone async function (RECOMMENDED):
   @executor(id="upper_case")
   async def to_upper(text: str, ctx: WorkflowContext[str]):
       await ctx.send_message(text.upper())


   # Standalone sync function (runs in thread pool):
   @executor
   def process_data(data: str):
       return data.upper()


   # For class-based executors, use @handler instead:
   class MyExecutor(Executor):
       def __init__(self):
           super().__init__(id="my_executor")

       @handler
       async def process(self, data: str, ctx: WorkflowContext[str]):
           await ctx.send_message(data.upper())
executor(func: Callable[[...], Any] | None = None, *, id: str | None = None) -> Callable[[Callable[[...], Any]], FunctionExecutor] | FunctionExecutor

Parámetros

Nombre Description
func
Callable[[...], Any] | None

Función que se va a decorar (cuando se usa sin paréntesis)

Valor predeterminado: None
id
Requerido
str | None

Identificador personalizado opcional para el ejecutor. Si No, usa el nombre de la función.

Parámetros de palabra clave únicamente

Nombre Description
id
Valor predeterminado: None

Devoluciones

Tipo Description

Instancia de FunctionExecutor que se puede cablear a un flujo de trabajo.

Excepciones

Tipo Description

Si se usa con staticmethod o classmethod (patrón no admitido)

function_middleware

Decorador para marcar una función como middleware de función.

Este decorador identifica explícitamente una función como middleware de función, que procesa objetos FunctionInvocationContext.

function_middleware(func: Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]

Parámetros

Nombre Description
func
Requerido

Función de middleware que se va a marcar como middleware de función.

Devoluciones

Tipo Description

La misma función con el marcador de middleware de función.

Ejemplos


   from agent_framework import function_middleware, FunctionInvocationContext, ChatAgent


   @function_middleware
   async def logging_middleware(context: FunctionInvocationContext, next):
       print(f"Calling: {context.function.name}")
       await next(context)
       print(f"Result: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

get_checkpoint_summary

get_checkpoint_summary(checkpoint: WorkflowCheckpoint) -> WorkflowCheckpointSummary

Parámetros

Nombre Description
checkpoint
Requerido

Devoluciones

Tipo Description

get_logger

Obtenga un registrador con el nombre especificado y, de forma predeterminada, "agent_framework".

get_logger(name: str = 'agent_framework') -> Logger

Parámetros

Nombre Description
name
str

Nombre del registrador. El valor predeterminado es "agent_framework".

Valor predeterminado: "agent_framework"

Devoluciones

Tipo Description

Instancia de registrador configurada.

handler

Decorador para registrar un controlador para un ejecutor.

handler(func: Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]) -> Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]

Parámetros

Nombre Description
func
Requerido
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

Función que se va a decorar. Puede ser None cuando se usa sin parámetros.

Devoluciones

Tipo Description
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

Función decorada con metadatos del controlador.

Ejemplos

@handler async def handle_string(self, message: str, ctx: WorkflowContext[str]) -> None:

...

@handler async def handle_data(self, message: dict, ctx: WorkflowContext[str | int]) -> None:

...

prepare_function_call_results

Prepare los valores de los resultados de la llamada de función.

prepare_function_call_results(content: TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any | list[TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any]) -> str

Parámetros

Nombre Description
content
Requerido

Devoluciones

Tipo Description
str

prepend_agent_framework_to_user_agent

Anteponga "agent-framework" al User-Agent en los encabezados.

Cuando la telemetría del agente de usuario está deshabilitada a través de la AGENT_FRAMEWORK_USER_AGENT_DISABLED variable de entorno, el encabezado User-Agent no incluirá la información del marco de trabajo del agente. Se devolverá tal como está o como un dict vacío cuando se pase Ninguno.

prepend_agent_framework_to_user_agent(headers: dict[str, Any] | None = None) -> dict[str, Any]

Parámetros

Nombre Description
headers

Diccionario de encabezados existentes.

Valor predeterminado: None

Devoluciones

Tipo Description

Un nuevo dict con "User-Agent" establecido en "agent-framework-python/{version}" si los encabezados son Ninguno. El diccionario de encabezados modificados con "agent-framework-python/{version}" antepuesto al Agente de usuario.

Ejemplos


   from agent_framework import prepend_agent_framework_to_user_agent

   # Add agent-framework to new headers
   headers = prepend_agent_framework_to_user_agent()
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0"

   # Prepend to existing headers
   existing = {"User-Agent": "my-app/1.0"}
   headers = prepend_agent_framework_to_user_agent(existing)
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0 my-app/1.0"

response_handler

Decorador para registrar un controlador para controlar las respuestas de una solicitud.

response_handler(func: Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]) -> Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]

Parámetros

Nombre Description
func
Requerido
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

Función que se va a decorar.

Devoluciones

Tipo Description
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

Función decorada con metadatos del controlador.

Ejemplos


   @handler
   async def run(self, message: int, context: WorkflowContext[str]) -> None:
       # Example of a handler that sends a request
       ...
       # Send a request with a `CustomRequest` payload and expect a `str` response.
       await context.request_info(CustomRequest(...), str)


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: str,
       context: WorkflowContext[str],
   ) -> None:
       # Example of a response handler for the above request
       ...


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: dict,
       context: WorkflowContext[int],
   ) -> None:
       # Example of a response handler for a request expecting a dict response
       ...

setup_logging

Configure la configuración de registro para el marco del agente.

setup_logging() -> None

Devoluciones

Tipo Description

use_agent_middleware

Decorador de clases que agrega compatibilidad con middleware a una clase de agente.

Este decorador agrega funcionalidad de middleware a cualquier clase de agente. Encapsula los run() métodos y run_stream() para proporcionar ejecución de middleware.

La ejecución del middleware se puede finalizar en cualquier momento estableciendo la context.terminate propiedad en True. Una vez establecida, la canalización dejará de ejecutarse más middleware en cuanto el control vuelva a la canalización.

Nota:

Este decorador ya se aplica a clases de agente integradas. Solo tiene que usar

si va a crear implementaciones de agente personalizadas.

use_agent_middleware(agent_class: type[TAgent]) -> type[TAgent]

Parámetros

Nombre Description
agent_class
Requerido
type[<xref:TAgent>]

Clase de agente a la que se va a agregar compatibilidad con middleware.

Devoluciones

Tipo Description
type[~<xref:TAgent>]

Clase de agente modificada con compatibilidad con middleware.

Ejemplos


   from agent_framework import use_agent_middleware


   @use_agent_middleware
   class CustomAgent:
       async def run(self, messages, **kwargs):
           # Agent implementation
           pass

       async def run_stream(self, messages, **kwargs):
           # Streaming implementation
           pass

use_chat_middleware

Decorador de clases que agrega compatibilidad con middleware a una clase de cliente de chat.

Este decorador agrega funcionalidad de middleware a cualquier clase de cliente de chat. Encapsula los get_response() métodos y get_streaming_response() para proporcionar ejecución de middleware.

Nota:

Este decorador ya se aplica a las clases de cliente de chat integradas. Solo tiene que usar

si va a crear implementaciones de cliente de chat personalizadas.

use_chat_middleware(chat_client_class: type[TChatClient]) -> type[TChatClient]

Parámetros

Nombre Description
chat_client_class
Requerido
type[<xref:TChatClient>]

Clase cliente de chat a la que se va a agregar compatibilidad con middleware.

Devoluciones

Tipo Description
type[~<xref:TChatClient>]

Clase cliente de chat modificada con compatibilidad con middleware.

Ejemplos


   from agent_framework import use_chat_middleware


   @use_chat_middleware
   class CustomChatClient:
       async def get_response(self, messages, **kwargs):
           # Chat client implementation
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Streaming implementation
           pass

use_function_invocation

Decorador de clases que habilita la llamada de herramientas para un cliente de chat.

Este decorador ajusta los get_response métodos y get_streaming_response para controlar automáticamente las llamadas de función del modelo, ejecutarlas y devolver los resultados al modelo para su posterior procesamiento.

use_function_invocation(chat_client: type[TChatClient]) -> type[TChatClient]

Parámetros

Nombre Description
chat_client
Requerido
type[<xref:TChatClient>]

Clase cliente de chat que se va a decorar.

Devoluciones

Tipo Description
type[~<xref:TChatClient>]

Clase cliente de chat decorada con la invocación de función habilitada.

Excepciones

Tipo Description

Si el cliente de chat no tiene los métodos necesarios.

Ejemplos


   from agent_framework import use_function_invocation, BaseChatClient


   @use_function_invocation
   class MyCustomClient(BaseChatClient):
       async def get_response(self, messages, **kwargs):
           # Implementation here
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Implementation here
           pass


   # The client now automatically handles function calls
   client = MyCustomClient()

validate_workflow_graph

Función de conveniencia para validar un gráfico de flujo de trabajo.

validate_workflow_graph(edge_groups: Sequence[EdgeGroup], executors: dict[str, Executor], start_executor: Executor) -> None

Parámetros

Nombre Description
edge_groups
Requerido
Sequence[<xref:agent_framework._workflows._edge.EdgeGroup>]

lista de grupos perimetrales en el flujo de trabajo

executors
Requerido

Asignación de identificadores de ejecutor a instancias del ejecutor

start_executor
Requerido

Ejecutor inicial (puede ser instancia o identificador)

Devoluciones

Tipo Description

Excepciones

Tipo Description

Si se produce un error de validación