Partekatu honen bidez:


Ejecutor del agente

Al agregar un agente de IA a un flujo de trabajo, debe encapsularse en un ejecutor para que el motor de flujo de trabajo pueda enrutar mensajes a él, administrar su estado de sesión y controlar su salida. Agent Executor es el ejecutor integrado que controla esta adaptación.

Visión general

El Ejecutor de Agente conecta la abstracción del agente con el modelo de ejecución del flujo de trabajo. Este:

  • Recibe mensajes tipados del gráfico de flujo de trabajo y los reenvía al agente subyacente.
  • Administra el estado de la sesión y la conversación del agente entre ejecuciones.
  • Adapta su comportamiento en función del modo de ejecución del flujo de trabajo (streaming o no streaming).
  • Emite eventos de salida (AgentResponse o AgentResponseUpdate) al llamador del flujo de trabajo para su observación.
  • Envía mensajes a los ejecutores de nivel inferior conectados para el procesamiento continuo dentro del gráfico.
  • Admite puntos de control para flujos de trabajo de larga duración.

Funcionamiento

En C#, el motor de flujo de trabajo crea internamente un AIAgentHostExecutor para cada AIAgent agregado a un flujo de trabajo. Este ejecutor especializado extiende ChatProtocolExecutor y usa un patrón de token de turno

  1. Almacenamiento en caché de mensajes: a medida que llegan mensajes de otros ejecutores, el ejecutor del agente los recopila. Si ForwardIncomingMessages está habilitado (valor predeterminado), los mensajes entrantes también se reenvieron a los ejecutores de nivel inferior.
  2. Activar el desencadenador de token : el agente procesa sus mensajes almacenados en caché solo después de recibir un TurnToken.
  3. Invocación del agente — el ejecutor llama a RunAsync (no streaming) o RunStreamingAsync (streaming) en el agente subyacente.
  4. Producción de salida: si los eventos de transmisión están habilitados, cada incremento AgentResponseUpdate se genera como salida del flujo de trabajo. Si EmitAgentResponseEvents está habilitado, el agregado AgentResponse también se genera como resultado del flujo de trabajo.
  5. Mensajería de bajada : los mensajes de respuesta del agente se envían a los ejecutores de nivel inferior conectados.
  6. Paso a través del token : después de completar su turno, el ejecutor envía un nuevo TurnToken nivel inferior para que el siguiente agente de la cadena pueda comenzar el procesamiento.

Sugerencia

Algunos escenarios pueden requerir un ejecutor de agentes más especializado; por ejemplo, las orquestaciones de traspaso usan un ejecutor dedicado HandoffAgentExecutor con lógica de enrutamiento personalizada.

Creación implícita frente a explícita

Cuando se pasa un AIAgent a WorkflowBuilder, el marco lo encapsula automáticamente en un AIAgentBinding, lo que crea el objeto subyacente AIAgentHostExecutor. No es necesario crear instancias del ejecutor del agente directamente.

AIAgent writerAgent = /* create your agent */;
AIAgent reviewerAgent = /* create your agent */;

// Agents are automatically wrapped — no manual executor creation required
var workflow = new WorkflowBuilder(writerAgent)
    .AddEdge(writerAgent, reviewerAgent)
    .Build();

También puede usar los métodos auxiliares en AgentWorkflowBuilder para patrones comunes:

// Build a sequential pipeline of agents
var workflow = AgentWorkflowBuilder.BuildSequential(writerAgent, reviewerAgent);

Configuración personalizada

Para personalizar el comportamiento del ejecutor del agente, use BindAsExecutor con AIAgentHostOptions:

var options = new AIAgentHostOptions
{
    EmitAgentUpdateEvents = true,
    EmitAgentResponseEvents = true,
    ReassignOtherAgentsAsUsers = true,
    ForwardIncomingMessages = true,
};

ExecutorBinding writerBinding = writerAgent.BindAsExecutor(options);
var workflow = new WorkflowBuilder(writerBinding)
    .AddEdge(writerBinding, reviewerAgent)
    .Build();

Tipos de entrada

El ejecutor del agente de C# acepta varios tipos de entrada: string, ChatMessagey IEnumerable<ChatMessage>. Las entradas de texto se convierten automáticamente en instancias ChatMessage con el rol User. Todos los mensajes entrantes se acumulan hasta que se recibe un TurnToken , en cuyo momento el ejecutor procesa el lote. Cuando ReassignOtherAgentsAsUsers está habilitado (valor predeterminado), los mensajes de otros agentes se reasignan al User rol para que el modelo subyacente los trate como entradas de usuario, mientras que los mensajes del agente actual conservan el Assistant rol.

Salida y encadenación

Una vez que el agente ha completado su turno, el ejecutor:

  1. Envía los mensajes de respuesta del agente a todos los ejecutores de nivel inferior conectados.
  2. Reenvía un nuevo TurnToken para que el siguiente agente de la cadena pueda empezar a procesar.

Esto hace que encadenar agentes sea sencillo; simplemente conéctelos con aristas.

var workflow = new WorkflowBuilder(frenchTranslator)
    .AddEdge(frenchTranslator, spanishTranslator)
    .AddEdge(spanishTranslator, englishTranslator)
    .Build();

Comportamiento de streaming

El comportamiento de streaming se controla mediante la EmitAgentUpdateEvents opción en AIAgentHostOptionso dinámicamente a través de TurnToken:

  • Cuando está habilitado, el ejecutor llama a RunStreamingAsync sobre el agente y genera cada AgentResponseUpdate como un evento de salida del flujo de trabajo. Esto proporciona actualizaciones de token por token en tiempo real.
  • Cuando está deshabilitado , el ejecutor llama RunAsync a y genera una única respuesta completa.
// Enable streaming events at the configuration level
var options = new AIAgentHostOptions
{
    EmitAgentUpdateEvents = true,
};

// Or enable streaming dynamically via TurnToken
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

Sesiones compartidas

Cada ejecutor de agente mantiene su propia sesión de forma predeterminada. Para compartir una sesión entre agentes, configure los agentes con un proveedor de sesión común antes de agregarlos al flujo de trabajo.

Opciones de configuración

AIAgentHostOptions controla el comportamiento del ejecutor del agente:

Opción Predeterminado Descripción
EmitAgentUpdateEvents null Emita eventos de actualización de streaming durante la ejecución. TurnToken tiene prioridad si está configurado. Si ambos son null, el streaming está deshabilitado.
EmitAgentResponseEvents false Emitir la respuesta agregada del agente como evento de salida del flujo de trabajo.
InterceptUserInputRequests false Intercepte UserInputRequestContent y enrutelo como un mensaje de flujo de trabajo para controlarlo.
InterceptUnterminatedFunctionCalls false Intercepte FunctionCallContent sin un resultado correspondiente, y enrútelo como mensaje de flujo de trabajo.
ReassignOtherAgentsAsUsers true Reasignar mensajes de otros agentes al User rol para que el modelo los trate como entradas de usuario.
ForwardIncomingMessages true Reenvíe los mensajes entrantes a los ejecutores de nivel inferior antes de los mensajes generados por el agente.

Creación de punto de comprobación

El ejecutor del agente admite el uso de puntos de control para workflows de larga duración. Cuando se toma un punto de control, el ejecutor serializa:

  • El estado de sesión del agente (a través de SerializeSessionAsync).
  • La configuración de emisión de eventos del turno actual (solo está presente mientras las solicitudes están pendientes y el ejecutor aún no ha producido su entrada TurnToken).
  • Las solicitudes de entrada de usuario pendientes y las solicitudes de llamada de función.

Al restaurar, el ejecutor deserializa la sesión y el estado de solicitud pendiente, lo que permite que el flujo de trabajo se reanude desde donde se dejó.

Funcionamiento

La AgentExecutor clase encapsula un agente que implementa el SupportsAgentRun protocolo. Cuando el ejecutor recibe un mensaje:

  1. Normalización de mensajes: la entrada se normaliza en una lista de Message objetos y se agrega a la memoria caché interna del ejecutor. El ejecutor acepta varios tipos de entrada ( str, Messagelist[str | Message], AgentExecutorRequest, y AgentExecutorResponse ) cada uno enrutado a un controlador dedicado que normaliza la entrada antes del almacenamiento en caché.
  2. Invocación del agente : el ejecutor llama agent.run() a con los mensajes almacenados en caché y selecciona automáticamente el modo de streaming o no streaming en función del modo de ejecución del flujo de trabajo.
  3. Emisión de salida — en modo streaming, cada AgentResponseUpdate se genera como un evento de flujo de trabajo de salida. En el modo no de transmisión, se produce un solo AgentResponse.
  4. Despacho descendente: una vez completado el agente, el ejecutor envía un AgentExecutorResponse a todos los ejecutores descendentes conectados. Esta respuesta incluye el historial de conversaciones completo, lo que permite un encadenamiento fluido.
  5. Restablecimiento de caché : la caché de mensajes interna del ejecutor se borra después de invocar al agente, lo que garantiza que cada invocación del agente procesa solo los mensajes nuevos recibidos desde la última invocación.

Sugerencia

Algunos escenarios pueden requerir un ejecutor de agente más especializado; Por ejemplo, las orquestaciones de entrega usan un ejecutor dedicado con lógica de enrutamiento personalizada.

Creación implícita frente a explícita

El WorkflowBuilder encapsula automáticamente los agentes en una AgentExecutor instancia cuando pasa un agente directamente. Para la mayoría de los flujos de trabajo, la creación implícita es suficiente:

from agent_framework import WorkflowBuilder

writer_agent = client.as_agent(name="Writer", instructions="...")
reviewer_agent = client.as_agent(name="Reviewer", instructions="...")

# Agents are automatically wrapped — no manual AgentExecutor creation required
workflow = (
    WorkflowBuilder(start_executor=writer_agent)
    .add_edge(writer_agent, reviewer_agent)
    .build()
)

Creación explícita

Cree un objeto AgentExecutor explícitamente cuando necesite:

  • Comparta una sesión entre varios agentes.
  • Proporcione un identificador de ejecutor personalizado para el enrutamiento y los kwargs en tiempo de ejecución de destino.
  • Haga referencia a la misma instancia del ejecutor en varios bordes.
from agent_framework import AgentExecutor

writer_executor = AgentExecutor(writer_agent, id="my-writer")
reviewer_executor = AgentExecutor(reviewer_agent, id="my-reviewer")

workflow = (
    WorkflowBuilder(start_executor=writer_executor)
    .add_edge(writer_executor, reviewer_executor)
    .build()
)

Parámetros del constructor:

Parámetro Tipo Descripción
agent SupportsAgentRun Agente que se va a encapsular.
session AgentSession \| None Sesión que se va a usar para las ejecuciones del agente. Si None, se crea una nueva sesión desde el agente.
id str \| None Identificador único del ejecutor. El valor predeterminado es el nombre del agente si está disponible.

Sugerencia

El identificador del ejecutor también es la clave que se usa al dirigirse workflow.run(function_invocation_kwargs=...) o client_kwargs= a agentes individuales. Si omite id, el flujo de trabajo usa el nombre del agente integrado.

Tipos de entrada

AgentExecutor define varios métodos de controlador, cada uno acepta un tipo de entrada diferente. El motor de flujo de trabajo envía automáticamente el controlador correcto en función del tipo de mensaje. Todos los tipos de entrada hacen que el agente se ejecute inmediatamente, excepto en el caso de AgentExecutorRequest, donde la marca should_respond controla si el agente se ejecuta o simplemente almacena en caché los mensajes.

Tipo de entrada Controlador Agente de desencadenadores Descripción
AgentExecutorRequest run Condicional Tipo de entrada canónico. Contiene una lista de mensajes y una should_respond marca que controla si se ejecuta el agente.
str from_str Siempre Acepta una cadena de texto sin formatear.
Message from_message Siempre Acepta un único Message objeto.
list[str \| Message] from_messages Siempre Acepta una lista de cadenas u Message objetos como contexto de conversación.
AgentExecutorResponse from_response Siempre Acepta la respuesta de un ejecutor de agente anterior, lo que permite el encadenamiento directo.

Uso de AgentExecutorRequest

AgentExecutorRequest es el tipo de entrada canónico y proporciona el mayor control:

from agent_framework import AgentExecutorRequest, Message

# Create a request with messages
request = AgentExecutorRequest(
    messages=[Message(role="user", contents=["Hello, world!"])],
    should_respond=True,
)

# Run the workflow
result = await workflow.run(request)

La should_respond marca controla si el agente procesa los mensajes inmediatamente o simplemente los almacena en caché para más adelante:

  • True (valor predeterminado): el agente se ejecuta y genera una respuesta.
  • False : los mensajes se agregan a la memoria caché, pero el agente no se ejecuta. Esto es útil para cargar previamente el contexto de conversación antes de desencadenar una respuesta.

Salida y encadenación

Una vez que el agente completa, el ejecutor envía un AgentExecutorResponse descendente. Esta clase de datos contiene:

Campo Tipo Descripción
executor_id str Identificador del ejecutor que generó la respuesta.
agent_response AgentResponse Respuesta subyacente del agente (sin modificar por parte del cliente).
full_conversation list[Message] \| None Contexto completo de la conversación (entradas anteriores + salidas del agente) para su encadenamiento.

Al encadenar ejecutores del agente, el ejecutor descendente recibe el AgentExecutorResponse a través del controlador from_response. Usa el full_conversation campo para conservar el historial de conversaciones completo, lo que impide que los agentes posteriores pierdan el contexto anterior.

spam_detector = AgentExecutor(create_spam_detector_agent())
email_assistant = AgentExecutor(create_email_assistant_agent())

# The email_assistant receives the spam_detector's full conversation context
workflow = (
    WorkflowBuilder(start_executor=spam_detector)
    .add_edge(spam_detector, email_assistant)
    .build()
)

Comportamiento de streaming

El AgentExecutor se adapta automáticamente al modo de ejecución del flujo de trabajo:

  • stream=True — llama a agent.run(stream=True) y devuelve cada AgentResponseUpdate como un evento de salida del flujo de trabajo. Una vez completada la transmisión, las actualizaciones se agregan a un completo AgentResponse para su distribución a continuación.
  • stream=False (valor predeterminado) — llama a agent.run(stream=False) y genera un único AgentResponse como evento de salida del flujo de trabajo.
# Streaming mode — receive incremental updates
events = workflow.run("Write a story about a cat.", stream=True)
async for event in events:
    if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
        print(event.data.text, end="", flush=True)

# Non-streaming mode — receive complete response
result = await workflow.run("Write a story about a cat.")

# Retrieve AgentResponse objects from the result
outputs = result.get_outputs()
for output in outputs:
    if isinstance(output, AgentResponse):
        print(output.text)

Sesiones compartidas

De forma predeterminada, cada uno AgentExecutor crea su propia sesión. Para compartir una sesión entre varios agentes (por ejemplo, para mantener un subproceso de conversación común), cree una sesión explícitamente y pásela a cada ejecutor:

from agent_framework import AgentExecutor

# Create a shared session from one agent
shared_session = writer_agent.create_session()

# Both executors share the same session
writer_executor = AgentExecutor(writer_agent, session=shared_session)
reviewer_executor = AgentExecutor(reviewer_agent, session=shared_session)

Nota:

No todos los agentes admiten sesiones compartidas. Normalmente, solo los agentes del mismo tipo de proveedor pueden compartir una sesión.

Creación de punto de comprobación

AgentExecutor admite puntos de control para guardar y restaurar el estado en flujos de trabajo de ejecución prolongada. Cuando se toma un punto de control, el ejecutor serializa:

  • Caché interna de mensajes.
  • El historial completo de la conversación.
  • Estado de la sesión del agente.
  • Todas las solicitudes y respuestas de entrada de usuario pendientes.

Al restaurar, el ejecutor deserializa este estado, lo que permite que el flujo de trabajo se reanude desde donde se dejó.

Advertencia

Los puntos de control con agentes que usan sesiones del lado servidor (como FoundryAgent) tienen limitaciones. El estado de sesión del lado servidor no se captura en los puntos de control y se puede modificar mediante ejecuciones posteriores. Considere la posibilidad de implementar un ejecutor personalizado si necesita puntos de comprobación confiables con sesiones del lado servidor.

Pasos siguientes