Compartilhar via


Fluxos de trabalho do Microsoft Agent Framework – Usando fluxos de trabalho como agentes

Este documento fornece uma visão geral de como usar fluxos de trabalho como agentes no Microsoft Agent Framework.

Visão geral

Às vezes, você criou um fluxo de trabalho sofisticado com vários agentes, executores personalizados e lógica complexa , mas você deseja usá-lo como qualquer outro agente. É exatamente isso que os agentes de fluxo de trabalho permitem que você faça. Ao encapsular seu fluxo de trabalho como um Agent, você pode interagir com ele por meio da mesma API familiar que usaria para um agente de chat simples.

Principais benefícios

  • Interface Unificada: interagir com fluxos de trabalho complexos usando a mesma API que agentes simples
  • Compatibilidade da API: integrar fluxos de trabalho a sistemas existentes que dão suporte à interface do Agente
  • Composabilidade: usar agentes de fluxo de trabalho como blocos de construção em sistemas de agentes maiores ou outros fluxos de trabalho
  • Gerenciamento de Sessão: utilizar as sessões do agente para o estado e a continuação da conversa
  • Suporte de streaming: obter atualizações em tempo real à medida que o fluxo de trabalho é executado

Como funciona

Quando você converte um fluxo de trabalho em um agente:

  1. O fluxo de trabalho é validado para garantir que seu executor inicial possa aceitar os tipos de entrada necessários
  2. Uma sessão é criada para gerenciar o estado da conversa
  3. As mensagens de entrada são roteadas para o executor inicial do fluxo de trabalho
  4. Eventos de fluxo de trabalho são convertidos em atualizações de resposta do agente
  5. As solicitações de entrada externas (de RequestInfoExecutor) são exibidas como chamadas de função

Requirements

Para usar um fluxo de trabalho como um agente, o executor inicial do fluxo de trabalho deve ser capaz de lidar IEnumerable<ChatMessage> como entrada. Isso é atendido automaticamente ao usar executores baseados em agente criados com AsAIAgent.

Criar um agente de fluxo de trabalho

Use o AsAIAgent() método de extensão para converter qualquer fluxo de trabalho compatível em um agente:

using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;

// Create agents
AIAgent researchAgent = chatClient.AsAIAgent("You are a researcher. Research and gather information on the given topic.");
AIAgent writerAgent = chatClient.AsAIAgent("You are a writer. Write clear, engaging content based on research.");
AIAgent reviewerAgent = chatClient.AsAIAgent("You are a reviewer. Review the content and provide a final polished version.");

// Build a sequential workflow
var workflow = new WorkflowBuilder(researchAgent)
    .AddEdge(researchAgent, writerAgent)
    .AddEdge(writerAgent, reviewerAgent)
    .Build();

// Convert the workflow to an agent
AIAgent workflowAgent = workflow.AsAIAgent(
    id: "content-pipeline",
    name: "Content Pipeline Agent",
    description: "A multi-agent workflow that researches, writes, and reviews content"
);

Parâmetros AsAIAgent

Parâmetro Tipo DESCRIÇÃO
id string? Identificador exclusivo opcional para o agente. Gerado automaticamente se não for fornecido.
name string? Nome de exibição opcional para o agente.
description string? Descrição opcional da finalidade do agente.
executionEnvironment IWorkflowExecutionEnvironment? Ambiente de execução opcional. Define-se como padrão InProcessExecution.OffThread ou InProcessExecution.Concurrent com base na configuração do fluxo de trabalho.
includeExceptionDetails bool Se true, inclui mensagens de exceção no conteúdo de erro. Usa false como padrão.
includeWorkflowOutputsInResponse bool Se true, transforma as saídas do fluxo de trabalho em conteúdo nas respostas do agente. Usa false como padrão.

Usando agentes de fluxo de trabalho

Criando uma sessão

Cada conversa com um agente de fluxo de trabalho requer uma sessão para gerenciar o estado:

// Create a new session for the conversation
AgentSession session = await workflowAgent.CreateSessionAsync();

Execução sem streaming

Para casos de uso simples em que você deseja a resposta completa:

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

AgentResponse response = await workflowAgent.RunAsync(messages, session);

foreach (ChatMessage message in response.Messages)
{
    Console.WriteLine($"{message.AuthorName}: {message.Text}");
}

Execução de streaming

Para atualizações em tempo real à medida que o fluxo de trabalho é executado:

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

await foreach (AgentResponseUpdate update in workflowAgent.RunStreamingAsync(messages, session))
{
    // Process streaming updates from each agent in the workflow
    if (!string.IsNullOrEmpty(update.Text))
    {
        Console.Write(update.Text);
    }
}

Lidando com solicitações de entrada externas

Quando um fluxo de trabalho contém executores que solicitam entrada externa (usando RequestInfoExecutor), essas solicitações são exibidas como chamadas de função na resposta do agente:

await foreach (AgentResponseUpdate update in workflowAgent.RunStreamingAsync(messages, session))
{
    // Check for function call requests
    foreach (AIContent content in update.Contents)
    {
        if (content is FunctionCallContent functionCall)
        {
            // Handle the external input request
            Console.WriteLine($"Workflow requests input: {functionCall.Name}");
            Console.WriteLine($"Request data: {functionCall.Arguments}");

            // Provide the response in the next message
        }
    }
}

Serialização e retomada de sessão

As sessões do agente de fluxo de trabalho podem ser serializadas para persistência e retomadas posteriormente:

// Serialize the session state
JsonElement serializedSession = await workflowAgent.SerializeSessionAsync(session);

// Store serializedSession to your persistence layer...

// Later, resume the session
AgentSession resumedSession = await workflowAgent.DeserializeSessionAsync(serializedSession);

// Continue the conversation
await foreach (var update in workflowAgent.RunStreamingAsync(newMessages, resumedSession))
{
    Console.Write(update.Text);
}

Requirements

Para usar um fluxo de trabalho como agente, o executor inicial do fluxo de trabalho deve ser capaz de lidar com a entrada da mensagem. Isso é atendido automaticamente ao usar Agent ou executores baseados em agente.

Criando um agente de fluxo de trabalho

Chame as_agent() em qualquer fluxo de trabalho compatível para convertê-lo em um agente:

from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.orchestrations import SequentialBuilder
from azure.identity import AzureCliCredential

# Create your chat client and agents
client = AzureOpenAIResponsesClient(
    project_endpoint="<your-endpoint>",
    deployment_name="<your-deployment>",
    credential=AzureCliCredential(),
)

researcher = client.as_agent(
    name="Researcher",
    instructions="Research and gather information on the given topic.",
)

writer = client.as_agent(
    name="Writer",
    instructions="Write clear, engaging content based on research.",
)

# Build a sequential workflow
workflow = SequentialBuilder(participants=[researcher, writer]).build()

# Convert the workflow to an agent
workflow_agent = workflow.as_agent(name="Content Pipeline Agent")

Parâmetros de as_agent

Parâmetro Tipo DESCRIÇÃO
name str | None Nome de exibição opcional para o agente. Gerado automaticamente se não for fornecido.

Usando agentes de fluxo de trabalho

Criando uma sessão

Opcionalmente, você pode criar uma sessão para gerenciar o estado da conversa em várias voltas:

# Create a new session for the conversation
session = await workflow_agent.create_session()

Observação

As sessões são opcionais. Se você não passar um session para run(), o agente manipulará o estado internamente. Se workflow.as_agent() for criado sem context_providers, a estrutura adicionará um InMemoryHistoryProvider() por padrão para que o histórico de vários turnos funcione pronto para uso. Se você passar context_providers explicitamente, essa lista será usada como está.

Execução sem streaming

Para casos de uso simples em que você deseja a resposta completa:

# You can pass a plain string as input
response = await workflow_agent.run("Write an article about AI trends")

for message in response.messages:
    print(f"{message.author_name}: {message.text}")

Execução de streaming

Para atualizações em tempo real à medida que o fluxo de trabalho é executado:

async for update in workflow_agent.run(
    "Write an article about AI trends",
    stream=True,
):
    if update.text:
        print(update.text, end="", flush=True)

Lidando com solicitações de entrada externas

Quando um fluxo de trabalho contém executores que solicitam entrada externa (usando request_info), essas solicitações são exibidas como chamadas de função na resposta do agente. A chamada de função usa o nome WorkflowAgent.REQUEST_INFO_FUNCTION_NAME:

from agent_framework import Content, Message, WorkflowAgent

response = await workflow_agent.run("Process my request")

# Look for function calls in the response
human_review_function_call = None
for message in response.messages:
    for content in message.contents:
        if content.name == WorkflowAgent.REQUEST_INFO_FUNCTION_NAME:
            human_review_function_call = content

Fornecendo respostas a solicitações pendentes

Para continuar a execução do fluxo de trabalho após uma solicitação de entrada externa, crie um resultado de função e envie-o de volta:

if human_review_function_call:
    # Parse the request arguments
    request = WorkflowAgent.RequestInfoFunctionArgs.from_json(
        human_review_function_call.arguments
    )

    # Create a response (your custom response type)
    result_data = MyResponseType(approved=True, feedback="Looks good")

    # Create the function call result
    function_result = Content.from_function_result(
        call_id=human_review_function_call.call_id,
        result=result_data,
    )

    # Send the response back to continue the workflow
    response = await workflow_agent.run(Message("tool", [function_result]))

Exemplo completo

Aqui está um exemplo completo demonstrando um agente de fluxo de trabalho com saída de streaming:

import asyncio
import os

from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.orchestrations import SequentialBuilder
from azure.identity import AzureCliCredential


async def main():
    # Set up the chat client
    client = AzureOpenAIResponsesClient(
        project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
        deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
        credential=AzureCliCredential(),
    )

    # Create specialized agents
    researcher = client.as_agent(
        name="Researcher",
        instructions="Research the given topic and provide key facts.",
    )

    writer = client.as_agent(
        name="Writer",
        instructions="Write engaging content based on the research provided.",
    )

    reviewer = client.as_agent(
        name="Reviewer",
        instructions="Review the content and provide a final polished version.",
    )

    # Build a sequential workflow
    workflow = SequentialBuilder(participants=[researcher, writer, reviewer]).build()

    # Convert to a workflow agent
    workflow_agent = workflow.as_agent(name="Content Creation Pipeline")

    # Run the workflow
    print("Starting workflow...")
    print("=" * 60)

    current_author = None
    async for update in workflow_agent.run(
        "Write about quantum computing",
        stream=True,
    ):
        # Show when different agents are responding
        if update.author_name and update.author_name != current_author:
            if current_author:
                print("\n" + "-" * 40)
            print(f"\n[{update.author_name}]:")
            current_author = update.author_name

        if update.text:
            print(update.text, end="", flush=True)

    print("\n" + "=" * 60)
    print("Workflow completed!")


if __name__ == "__main__":
    asyncio.run(main())

Noções básicas sobre a conversão de eventos

Quando um fluxo de trabalho é executado como um agente, os eventos de fluxo de trabalho são convertidos em respostas do agente. O tipo de resposta depende de como você chama run():

  • run(): retorna um AgentResponse que contém o resultado completo após a conclusão do fluxo de trabalho
  • run(..., stream=True): retorna uma iterável assíncrona de AgentResponseUpdate objetos à medida que o fluxo de trabalho é executado, fornecendo atualizações em tempo real

Durante a execução, os eventos de fluxo de trabalho internos são mapeados para respostas do agente da seguinte maneira:

Evento de fluxo de trabalho Resposta do agente
event.type == "output" Passado como AgentResponseUpdate (streaming) ou agregado em AgentResponse (não streaming)
event.type == "request_info" Convertido em conteúdo de chamada de função usando WorkflowAgent.REQUEST_INFO_FUNCTION_NAME
Outros eventos Ignorado (somente no fluxo de trabalho)

Essa conversão permite que você use a interface do agente padrão enquanto ainda tem acesso a informações detalhadas de fluxo de trabalho quando necessário.

Casos de uso

Pipelines de Agente Complexos

Encapsular um fluxo de trabalho de vários agentes como um único agente para uso em aplicativos:

User Request --> [Workflow Agent] --> Final Response
                      |
                      +-- Researcher Agent
                      +-- Writer Agent  
                      +-- Reviewer Agent

2. Composição do agente

Use agentes de fluxo de trabalho como componentes em sistemas maiores:

  • Um agente de fluxo de trabalho pode ser usado como uma ferramenta por outro agente
  • Vários agentes de fluxo de trabalho podem ser orquestrados juntos
  • Os agentes de fluxo de trabalho podem ser aninhados em outros fluxos de trabalho

3. Integração de API

Exponha fluxos de trabalho complexos por meio de APIs que esperam a interface padrão do Agente, permitindo:

  • Interfaces de chat que usam fluxos de trabalho de back-end sofisticados
  • Integração com sistemas existentes baseados em agente
  • Migração gradual de agentes simples para fluxos de trabalho complexos

Próximas etapas