Orquestaciones de flujos de trabajo de Microsoft Agent Framework: secuencial

En la orquestación secuencial, los agentes se organizan en una tubería. Cada agente procesa la tarea a su vez, pasando su salida al siguiente agente de la secuencia. Esto es ideal para los flujos de trabajo en los que cada paso se basa en el anterior, como la revisión de documentos, las canalizaciones de procesamiento de datos o el razonamiento de varias fases.

Orquestación secuencial

Importante

De forma predeterminada, cada agente de la secuencia consume la conversación completa del agente anterior, tanto los mensajes de entrada proporcionados al agente anterior como sus mensajes de respuesta. En su lugar, puede configurar agentes para que consuman solo los mensajes de respuesta del agente anterior. Consulte Control del contexto entre agentes para obtener más información.

Temas que se abordarán

  • Creación de una canalización secuencial de agentes
  • Cómo encadenar agentes en la que cada uno se basa en el resultado anterior
  • Cómo agregar la aprobación de human-in-the-loop para las llamadas a herramientas confidenciales
  • Cómo mezclar agentes con ejecutores personalizados para tareas especializadas
  • Cómo realizar el seguimiento del flujo de conversación a través de la canalización

Definir los agentes

En la orquestación secuencial, los agentes se organizan en una canalización en la que cada agente procesa la tarea a su vez, pasando la salida al siguiente agente de la secuencia.

Configuración del cliente de Azure OpenAI

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
    .GetProjectOpenAIClient()
    .GetProjectResponsesClient()
    .AsIChatClient(deploymentName);

Advertencia

DefaultAzureCredential es conveniente para el desarrollo, pero requiere una consideración cuidadosa en producción. En producción, considere usar una credencial específica (por ejemplo, ManagedIdentityCredential) para evitar problemas de latencia, sondeos de credenciales no deseados y posibles riesgos de seguridad de los mecanismos de respaldo.

Cree agentes especializados que funcionen en secuencia:

// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
    new(chatClient,
        $"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
        $"input by outputting the name of the input language and then translating the input to {targetLanguage}.");

// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
                         select GetTranslationAgent(lang, client));

Configurar la orquestación secuencial

Compile el flujo de trabajo mediante AgentWorkflowBuilder:

// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);

Ejecución del flujo de trabajo secuencial

Ejecute el flujo de trabajo y procese los eventos:

// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

string? lastExecutorId = null;
List<ChatMessage> result = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is AgentResponseUpdateEvent e)
    {
        if (e.ExecutorId != lastExecutorId)
        {
            lastExecutorId = e.ExecutorId;
            Console.WriteLine();
            Console.Write($"{e.ExecutorId}: ");
        }

        Console.Write(e.Update.Text);
    }
    else if (evt is WorkflowOutputEvent outputEvt)
    {
        result = outputEvt.As<List<ChatMessage>>()!;
        break;
    }
}

// Display final result
Console.WriteLine();
foreach (var message in result)
{
    Console.WriteLine($"{message.Role}: {message.Text}");
}

Resultado de muestra

French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!

Orquestación secuencial con human-in-the-loop

Las orquestaciones secuenciales admiten interacciones humanas en bucle a través de la aprobación de herramientas. Cuando los agentes usan herramientas ajustadas con ApprovalRequiredAIFunction, el flujo de trabajo se pausa y emite un RequestInfoEvent que contiene un ToolApprovalRequestContent. Los sistemas externos (como un operador humano) pueden inspeccionar la llamada a la herramienta, aprobarla o rechazarla, y el flujo de trabajo se reanuda según corresponda.

Orquestación secuencial con human-in-the-loop

Sugerencia

Para obtener más información sobre el modelo de solicitud y respuesta, consulte Human-in-the-Loop.

Definir agentes con herramientas que requieren aprobación

Cree agentes en los que las herramientas confidenciales se encapsulan con ApprovalRequiredAIFunction:

ChatClientAgent deployAgent = new(
    client,
    "You are a DevOps engineer. Check staging status first, then deploy to production.",
    "DeployAgent",
    "Handles deployments",
    [
        AIFunctionFactory.Create(CheckStagingStatus),
        new ApprovalRequiredAIFunction(AIFunctionFactory.Create(DeployToProduction))
    ]);

ChatClientAgent verifyAgent = new(
    client,
    "You are a QA engineer. Verify that the deployment was successful and summarize the results.",
    "VerifyAgent",
    "Verifies deployments");

Compilación y ejecución con control de aprobación

Compile normalmente el flujo de trabajo secuencial. El flujo de aprobación se controla a través del flujo de eventos:

var workflow = AgentWorkflowBuilder.BuildSequential([deployAgent, verifyAgent]);

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is RequestInfoEvent e &&
        e.Request.TryGetDataAs(out ToolApprovalRequestContent? approvalRequest))
    {
        await run.SendResponseAsync(
            e.Request.CreateResponse(approvalRequest.CreateResponse(approved: true)));
    }
}

Nota:

AgentWorkflowBuilder.BuildSequential() admite la aprobación de herramientas lista para usar; no se necesita ninguna configuración adicional. Cuando un agente llama a una herramienta ajustada con ApprovalRequiredAIFunction, el flujo de trabajo pausa y emite automáticamente un RequestInfoEvent.

Sugerencia

Para obtener un ejemplo completo de ejecución de este flujo de aprobación, consulte el GroupChatToolApproval ejemplo. El mismo RequestInfoEvent patrón de control se aplica a otras orquestaciones.

Conceptos clave

  • Procesamiento secuencial: cada agente procesa la salida del agente anterior en orden
  • AgentWorkflowBuilder.BuildSequential(): crea un flujo de trabajo de canalización a partir de una colección de agentes.
  • ChatClientAgent: representa un agente respaldado por un cliente de chat con instrucciones específicas.
  • InProcessExecution.RunStreamingAsync(): ejecuta el flujo de trabajo y devuelve un StreamingRun para el streaming de eventos en tiempo real.
  • Control de eventos: supervisar el progreso del agente a través de AgentResponseUpdateEvent y su finalización a través de WorkflowOutputEvent
  • Aprobación de herramientas confidenciales: envuelva las herramientas confidenciales con ApprovalRequiredAIFunction para requerir la aprobación humana antes de la ejecución
  • RequestInfoEvent: emitido cuando una herramienta requiere aprobación; contiene ToolApprovalRequestContent con los detalles de la llamada a la herramienta

En la orquestación secuencial, cada agente procesa la tarea a su vez, con la salida que fluye de una a la siguiente. Comience definiendo agentes para un proceso de dos fases:

import os
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential

# 1) Create agents using FoundryChatClient
chat_client = FoundryChatClient(
    project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
    model=os.environ["FOUNDRY_MODEL"],
    credential=AzureCliCredential(),
)

writer = chat_client.as_agent(
    instructions=(
        "You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
    ),
    name="writer",
)

reviewer = chat_client.as_agent(
    instructions=(
        "You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
    ),
    name="reviewer",
)

Configurar la orquestación secuencial

La SequentialBuilder clase crea una canalización donde los agentes procesan las tareas en orden. Cada agente ve el historial de conversaciones completo y agrega su respuesta:

from agent_framework.orchestrations import SequentialBuilder

# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()

Ejecución del flujo de trabajo secuencial

Ejecute el flujo de trabajo y recopile la salida final. La salida del terminal es una AgentResponse que contiene los mensajes de respuesta del último agente:

from agent_framework import AgentResponse

# 3) Run and print the last agent's response
events = await workflow.run("Write a tagline for a budget-friendly eBike.")
outputs = events.get_outputs()

if outputs:
    print("===== Final Response =====")
    final: AgentResponse = outputs[0]
    for msg in final.messages:
        name = msg.author_name or "assistant"
        print(f"[{name}]\n{msg.text}")

Resultado de muestra

===== Final Response =====
[reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!

Avanzado: Mezcla de agentes con ejecutores personalizados

La orquestación secuencial admite la combinación de agentes con ejecutores personalizados para el procesamiento especializado. Esto resulta útil cuando se necesita lógica personalizada que no requiera un LLM:

Definir un ejecutor personalizado

Nota:

Cuando un ejecutor personalizado sigue a un agente en la secuencia, su controlador recibe un AgentExecutorResponse (porque los agentes se encapsulan internamente mediante AgentExecutor). Use agent_response.full_conversation para acceder al historial de conversaciones completo. Un ejecutor personalizado que se usa como último participante (terminador) debe llamar a ctx.yield_output(AgentResponse(...)) para que su salida se convierta en la salida del terminal del flujo de trabajo.

from agent_framework import AgentExecutorResponse, AgentResponse, Executor, WorkflowContext, handler
from agent_framework import Message
from typing_extensions import Never

class Summarizer(Executor):
    """Terminator custom executor: consumes full conversation and yields a summary as the workflow's final answer."""

    @handler
    async def summarize(
        self,
        agent_response: AgentExecutorResponse,
        ctx: WorkflowContext[Never, AgentResponse]
    ) -> None:
        if not agent_response.full_conversation:
            await ctx.yield_output(AgentResponse(messages=[Message("assistant", ["No conversation to summarize."])]))
            return

        users = sum(1 for m in agent_response.full_conversation if m.role == "user")
        assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
        summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
        await ctx.yield_output(AgentResponse(messages=[summary]))

Creación de un flujo de trabajo secuencial mixto

# Create a content agent
content = chat_client.as_agent(
    instructions="Produce a concise paragraph answering the user's request.",
    name="content",
)

# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()

Salida de ejemplo con Ejecutor personalizado

===== Final Summary =====
Summary -> users:1 assistants:1

Control del contexto entre agentes

De forma predeterminada, cada agente de un SequentialBuilder flujo de trabajo consume la conversación completa del agente anterior (entrada y mensajes de respuesta). Al establecer chain_only_agent_responses=True se configura a todos los agentes de la secuencia para que consuman únicamente los mensajes de respuesta del agente anterior:

workflow = SequentialBuilder(
    participants=[writer, translator, reviewer],
    chain_only_agent_responses=True,
).build()

Esto resulta útil para las canalizaciones de traducción, el refinamiento progresivo y otros escenarios donde cada agente debe centrarse únicamente en transformar el resultado del agente anterior sin estar influenciado por turnos de conversación anteriores.

Para obtener un ejemplo completo, consulte sequential_chain_only_agent_responses.py en el repositorio de Agent Framework.

Sugerencia

Para obtener un control más específico sobre el flujo de contexto , incluidas las funciones de filtro personalizadas, consulte Modos de contexto en la referencia del ejecutor del agente.

Salidas intermedias

De forma predeterminada, solo la salida del último participante se muestra como un evento de flujo de trabajo output . Configure intermediate_outputs=True para mostrar la salida de cada participante junto con la salida final.

workflow = SequentialBuilder(
    participants=[writer, reviewer, editor],
    intermediate_outputs=True,
).build()

Puede controlar estos eventos en tiempo real en modo de streaming:

from agent_framework import AgentResponseUpdate

# Track the last author to format streaming output.
last_author: str | None = None

async for event in workflow.run("Write a tagline for a budget-friendly eBike.", stream=True):
    if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
        update = event.data
        author = update.author_name
        if author != last_author:
            if last_author is not None:
                print()  # Newline between different authors
            print(f"{author}: {update.text}", end="", flush=True)
            last_author = author
        else:
            print(update.text, end="", flush=True)

Orquestación secuencial con human-in-the-loop

Las orquestaciones secuenciales admiten interacciones humanas en el bucle de dos maneras: la aprobación de herramientas para controlar las llamadas a herramientas confidenciales y solicitar información para pausar después de cada respuesta del agente para recopilar comentarios.

Orquestación secuencial con human-in-the-loop

Sugerencia

Para obtener más información sobre el modelo de solicitud y respuesta, consulte Human-in-the-Loop.

Aprobación de herramientas en flujos de trabajo secuenciales

Use @tool(approval_mode="always_require") para marcar las herramientas que necesitan aprobación humana antes de la ejecución. El flujo de trabajo pausa y emite un request_info evento cuando el agente intenta llamar a la herramienta.

@tool(approval_mode="always_require")
def execute_database_query(query: str) -> str:
    return f"Query executed successfully: {query}"


database_agent = Agent(
    client=chat_client,
    name="DatabaseAgent",
    instructions="You are a database assistant.",
    tools=[execute_database_query],
)

workflow = SequentialBuilder(participants=[database_agent]).build()

Procese el flujo de eventos y controle las solicitudes de aprobación:

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info" and event.data.type == "function_approval_request":
            responses[event.request_id] = event.data.to_function_approval_response(approved=True)
    return responses if responses else None

stream = workflow.run("Check the schema and update all pending orders", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Sugerencia

Para obtener un ejemplo completo de ejecución, vea sequential_builder_tool_approval.py. La aprobación de herramientas funciona con SequentialBuilder sin ninguna configuración adicional del constructor.

Solicitar información sobre la retroalimentación del agente

Use .with_request_info() para pausar después de que los agentes específicos respondan, lo que permite la entrada externa (como la revisión humana) antes de que comience el siguiente agente:

drafter = Agent(
    client=chat_client,
    name="drafter",
    instructions="You are a document drafter. Create a brief draft on the given topic.",
)

editor = Agent(
    client=chat_client,
    name="editor",
    instructions="You are an editor. Review and improve the draft. Incorporate any human feedback.",
)

finalizer = Agent(
    client=chat_client,
    name="finalizer",
    instructions="You are a finalizer. Create a polished final version.",
)

# Enable request info for the editor agent only
workflow = (
    SequentialBuilder(participants=[drafter, editor, finalizer])
    .with_request_info(agents=["editor"])
    .build()
)

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info":
            responses[event.request_id] = AgentRequestInfoResponse.approve()
    return responses if responses else None

stream = workflow.run("Write a brief introduction to artificial intelligence.", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Conceptos clave

  • Contexto compartido: de forma predeterminada, cada agente consume la conversación completa del agente anterior, incluidos los mensajes de entrada y respuesta.
  • Control de contexto: use chain_only_agent_responses=True para configurar agentes para consumir solo los mensajes de respuesta del agente anterior.
  • Salida de AgentResponse: la salida del terminal del flujo de trabajo es una AgentResponse que contiene la respuesta del último agente (no la conversación completa).
  • Order Matters: los agentes se ejecutan estrictamente en el orden especificado en la participants lista
  • Participantes flexibles: puede mezclar agentes y ejecutores personalizados en cualquier orden
  • Contrato de terminador personalizado: un ejecutor personalizado que se usa como último participante debe llamar ctx.yield_output(AgentResponse(...)) para generar la salida del terminal.
  • Salidas intermedias: configurar intermediate_outputs=True para mostrar la salida de cada participante como un evento de flujo de trabajo output, y no solo el del último participante.
  • Aprobación de herramientas: uso @tool(approval_mode="always_require") para operaciones confidenciales que necesitan revisión humana
  • Solicitar información: Use .with_request_info(agents=[...]) para pausar después de ciertos agentes para recibir comentarios externos

Pasos siguientes