Compartir a través de


Flujos de trabajo de Microsoft Agent Framework: trabajar con agentes

En esta página se proporciona información general sobre cómo usar agentes en los flujos de trabajo de Microsoft Agent Framework.

Información general

Para agregar inteligencia a los flujos de trabajo, puede aprovechar los agentes de inteligencia artificial como parte de la ejecución del flujo de trabajo. Los agentes de inteligencia artificial se pueden integrar fácilmente en flujos de trabajo, lo que le permite crear soluciones complejas e inteligentes que anteriormente eran difíciles de lograr.

Agregar un agente directamente a un flujo de trabajo

Puede agregar agentes al flujo de trabajo a través de bordes:

using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// Create the agents first
AIAgent agentA = new ChatClientAgent(chatClient, instructions);
AIAgent agentB = new ChatClientAgent(chatClient, instructions);

// Build a workflow with the agents
WorkflowBuilder builder = new(agentA);
builder.AddEdge(agentA, agentB);
Workflow<ChatMessage> workflow = builder.Build<ChatMessage>();

Ejecución del flujo de trabajo

Dentro del flujo de trabajo creado anteriormente, los agentes se encapsulan realmente dentro de un ejecutor que controla la comunicación del agente con otras partes del flujo de trabajo. El ejecutor puede controlar tres tipos de mensajes:

  • ChatMessage: un único mensaje de chat
  • List<ChatMessage>: una lista de mensajes de chat
  • TurnToken: un token de turno que indica el inicio de un nuevo turno.

El ejecutor no activa la respuesta del agente hasta que recibe un TurnToken. Los mensajes recibidos antes del TurnToken se almacenan en búfer y se envían al agente cuando se recibe el TurnToken.

StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, "Hello World!"));
// Must send the turn token to trigger the agents. The agents are wrapped as executors.
// When they receive messages, they will cache the messages and only start processing
// when they receive a TurnToken. The turn token will be passed from one agent to the next.
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    // The agents will run in streaming mode and an AgentRunUpdateEvent
    // will be emitted as new chunks are generated.
    if (evt is AgentRunUpdateEvent agentRunUpdate)
    {
        Console.WriteLine($"{agentRunUpdate.ExecutorId}: {agentRunUpdate.Data}");
    }
}

Uso del ejecutor del agente integrado

Puede agregar agentes al flujo de trabajo a través de bordes:

from agent_framework import WorkflowBuilder
from agent_framework.azure import AzureChatClient
from azure.identity import AzureCliCredential

# Create the agents first
chat_client = AzureChatClient(credential=AzureCliCredential())
writer_agent: ChatAgent = chat_client.create_agent(
    instructions=(
        "You are an excellent content writer. You create new content and edit contents based on the feedback."
    ),
    name="writer_agent",
)
reviewer_agent = chat_client.create_agent(
    instructions=(
        "You are an excellent content reviewer."
        "Provide actionable feedback to the writer about the provided content."
        "Provide the feedback in the most concise manner possible."
    ),
    name="reviewer_agent",
)

# Build a workflow with the agents
builder = WorkflowBuilder()
builder.set_start_executor(writer_agent)
builder.add_edge(writer_agent, reviewer_agent)
workflow = builder.build()

Ejecución del flujo de trabajo

Dentro del flujo de trabajo creado anteriormente, los agentes se encapsulan realmente dentro de un ejecutor que controla la comunicación del agente con otras partes del flujo de trabajo. El ejecutor puede controlar tres tipos de mensajes:

  • str: un único mensaje de chat en formato de cadena
  • ChatMessage: un único mensaje de chat
  • List<ChatMessage>: una lista de mensajes de chat

Cada vez que el ejecutor recibe un mensaje de uno de estos tipos, desencadenará el agente para responder y el tipo de respuesta será un AgentExecutorResponse objeto. Esta clase contiene información útil sobre la respuesta del agente, entre las que se incluyen:

  • executor_id: el identificador del ejecutor que generó esta respuesta.
  • agent_run_response: la respuesta completa del agente
  • full_conversation: el historial completo de conversación hasta este punto

Se pueden emitir dos tipos de evento posibles relacionados con las respuestas de los agentes al ejecutar el flujo de trabajo:

  • AgentRunUpdateEvent que contiene fragmentos de la respuesta del agente a medida que se generan en modo de streaming.
  • AgentRunEvent que contiene la respuesta completa del agente en modo no streaming.

De forma predeterminada, los agentes se encapsulan en ejecutores que se ejecutan en modo de streaming. Puede personalizar este comportamiento mediante la creación de un ejecutor personalizado. Consulte la sección siguiente para obtener más detalles.

last_executor_id = None
async for event in workflow.run_streaming("Write a short blog post about AI agents."):
    if isinstance(event, AgentRunUpdateEvent):
        if event.executor_id != last_executor_id:
            if last_executor_id is not None:
                print()
            print(f"{event.executor_id}:", end=" ", flush=True)
            last_executor_id = event.executor_id
        print(event.data, end="", flush=True)

Uso de un ejecutor de agente personalizado

A veces, es posible que quiera personalizar cómo se integran los agentes de inteligencia artificial en un flujo de trabajo. Para lograrlo, puede crear un ejecutor personalizado. Esto le permite controlar:

  • Invocación del agente: streaming o no streaming
  • Los tipos de mensaje que controlará el agente, incluidos los tipos de mensaje personalizados.
  • El ciclo de vida del agente, incluida la inicialización y la limpieza
  • Uso de subprocesos de agente y otros recursos
  • Eventos adicionales emitidos durante la ejecución del agente, incluidos los eventos personalizados
  • Integración con otras características de flujo de trabajo, como estados compartidos y solicitudes o respuestas
internal sealed class CustomAgentExecutor : Executor<CustomInput, CustomOutput>("CustomAgentExecutor")
{
    private readonly AIAgent _agent;

    /// <summary>
    /// Creates a new instance of the <see cref="CustomAgentExecutor"/> class.
    /// </summary>
    /// <param name="agent">The AI agent used for custom processing</param>
    public CustomAgentExecutor(AIAgent agent) : base("CustomAgentExecutor")
    {
        this._agent = agent;
    }

    public async ValueTask<CustomOutput> HandleAsync(CustomInput message, IWorkflowContext context)
    {
        // Retrieve any shared states if needed
        var sharedState = await context.ReadStateAsync<SharedStateType>("sharedStateId", scopeName: "SharedStateScope");

        // Render the input for the agent
        var agentInput = RenderInput(message, sharedState);

        // Invoke the agent
        // Assume the agent is configured with structured outputs with type `CustomOutput`
        var response = await this._agent.RunAsync(agentInput);
        var customOutput = JsonSerializer.Deserialize<CustomOutput>(response.Text);

        return customOutput;
    }
}
from agent_framework import (
    ChatAgent,
    ChatMessage,
    Executor,
    WorkflowContext,
    handler
)

class Writer(Executor):

    agent: ChatAgent

    def __init__(self, chat_client: AzureChatClient, id: str = "writer"):
        # Create a domain specific agent using your configured AzureChatClient.
        agent = chat_client.create_agent(
            instructions=(
                "You are an excellent content writer. You create new content and edit contents based on the feedback."
            ),
        )
        # Associate the agent with this executor node. The base Executor stores it on self.agent.
        super().__init__(agent=agent, id=id)

    @handler
    async def handle(self, message: ChatMessage, ctx: WorkflowContext[list[ChatMessage]]) -> None:
        """Handles a single chat message and forwards the accumulated messages to the next executor in the workflow."""
        # Invoke the agent with the incoming message and get the response
        messages: list[ChatMessage] = [message]
        response = await self.agent.run(messages)
        # Accumulate messages and send them to the next executor in the workflow.
        messages.extend(response.messages)
        await ctx.send_message(messages)

Pasos siguientes