Поделиться через


Рабочие процессы Microsoft Agent Framework — использование рабочих процессов в качестве агентов

В этом документе представлен обзор использования рабочих процессов в качестве агентов в Microsoft Agent Framework.

Обзор

Иногда вы создали сложный рабочий процесс с несколькими агентами, пользовательскими исполнителями и сложной логикой, но вы хотите использовать его так же, как и любой другой агент. Это именно то, что позволяют вам делать агенты рабочих процессов. Упаковав рабочий процесс в качестве Agentобъекта, вы можете взаимодействовать с ним с помощью того же знакомого API, который вы будете использовать для простого агента чата.

Ключевые преимущества

  • Унифицированный интерфейс. Взаимодействие со сложными рабочими процессами с помощью того же API, что и простые агенты
  • Совместимость API: интеграция рабочих процессов с существующими системами, поддерживающими интерфейс агента
  • Компоновка: Использование агентов рабочих процессов в качестве строительных блоков в более крупных системах агентов или других рабочих процессах.
  • Управление потоками: взаимодействие с потоками агента для управления состоянием беседы, контрольными точками и их возобновлением.
  • Поддержка потоковой передачи: получение обновлений в режиме реального времени при выполнении рабочего процесса

Принцип работы

При преобразовании рабочего процесса в агент:

  1. Рабочий процесс проверяется, чтобы его исполнительный блок для запуска мог принимать сообщения чата.
  2. Тред создается для управления состоянием беседы и контрольными точками
  3. Входные сообщения направляются в начальный исполнитель рабочего процесса
  4. События рабочего процесса преобразуются в обновления ответа агента
  5. Внешние входные запросы (из RequestInfoExecutor) отображаются в виде вызовов функций

Требования

Чтобы использовать рабочий процесс в качестве агента, начальный исполнитель рабочего процесса должен иметь возможность обрабатывать IEnumerable<ChatMessage> как входные данные. Это автоматически выполняется при использовании ChatClientAgent или других исполнителей на основе агентов.

Создание агента рабочего процесса

AsAgent() Используйте метод расширения для преобразования любого совместимого рабочего процесса в агент:

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;

// First, build your workflow
var workflow = AgentWorkflowBuilder
    .CreateSequentialPipeline(researchAgent, writerAgent, reviewerAgent)
    .Build();

// Convert the workflow to an agent
AIAgent workflowAgent = workflow.AsAgent(
    id: "content-pipeline",
    name: "Content Pipeline Agent",
    description: "A multi-agent workflow that researches, writes, and reviews content"
);

Параметры AsAgent

Параметр Тип Description
id string? Необязательный уникальный идентификатор агента. Автоматически генерируется, если не указано.
name string? Необязательное отображаемое имя агента.
description string? Необязательное описание назначения агента.
checkpointManager CheckpointManager? Необязательный менеджер контрольных точек для сохранения между сеансами.
executionEnvironment IWorkflowExecutionEnvironment? Необязательная среда выполнения кода. По умолчанию используется InProcessExecution.OffThread или InProcessExecution.Concurrent в зависимости от конфигурации рабочего процесса.

Использование агентов рабочих процессов

Создание потока

Для каждой беседы с агентом рабочего процесса требуется поток для управления состоянием:

// Create a new thread for the conversation
AgentThread thread = workflowAgent.GetNewThread();

Операция без потокового режима

Для простых вариантов использования, когда требуется полный ответ:

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

AgentRunResponse response = await workflowAgent.RunAsync(messages, thread);

foreach (ChatMessage message in response.Messages)
{
    Console.WriteLine($"{message.AuthorName}: {message.Text}");
}

Выполнение потоковой передачи

Для обновлений в режиме реального времени при выполнении рабочего процесса:

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

await foreach (AgentRunResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
    // Process streaming updates from each agent in the workflow
    if (!string.IsNullOrEmpty(update.Text))
    {
        Console.Write(update.Text);
    }
}

Обработка внешних запросов ввода

Если рабочий процесс содержит исполнителей, запрашивающих внешние входные данные (используя RequestInfoExecutor), эти запросы отображаются как вызовы функций в ответе агента:

await foreach (AgentRunResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
    // Check for function call requests
    foreach (AIContent content in update.Contents)
    {
        if (content is FunctionCallContent functionCall)
        {
            // Handle the external input request
            Console.WriteLine($"Workflow requests input: {functionCall.Name}");
            Console.WriteLine($"Request data: {functionCall.Arguments}");

            // Provide the response in the next message
        }
    }
}

Серилизация потоков и возобновление выполнения

Потоки агента рабочего процесса можно сериализовать для сохраняемости и возобновить позже:

// Serialize the thread state
JsonElement serializedThread = thread.Serialize();

// Store serializedThread to your persistence layer...

// Later, resume the thread
AgentThread resumedThread = workflowAgent.DeserializeThread(serializedThread);

// Continue the conversation
await foreach (var update in workflowAgent.RunStreamingAsync(newMessages, resumedThread))
{
    Console.Write(update.Text);
}

Создание контрольных точек с использованием агентов рабочего процесса

Включите контрольную точку для сохранения состояния рабочего процесса во время перезапуска процесса:

// Create a checkpoint manager with your storage backend
var checkpointManager = new CheckpointManager(new FileCheckpointStorage("./checkpoints"));

// Create workflow agent with checkpointing enabled
AIAgent workflowAgent = workflow.AsAgent(
    id: "persistent-workflow",
    name: "Persistent Workflow Agent",
    checkpointManager: checkpointManager
);

Требования

Чтобы использовать рабочий процесс в качестве агента, начальный исполнитель рабочего процесса должен иметь возможность обрабатывать list[ChatMessage] как входные данные. Это автоматически удовлетворяется при использовании ChatAgent или AgentExecutor.

Создание агента рабочего процесса

Вызвать as_agent() на любом совместимом рабочем процессе для его преобразования в агента.

from agent_framework import WorkflowBuilder, ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

# Create your chat client and agents
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

researcher = ChatAgent(
    name="Researcher",
    instructions="Research and gather information on the given topic.",
    chat_client=chat_client,
)

writer = ChatAgent(
    name="Writer", 
    instructions="Write clear, engaging content based on research.",
    chat_client=chat_client,
)

# Build your workflow
workflow = (
    WorkflowBuilder()
    .set_start_executor(researcher)
    .add_edge(researcher, writer)
    .build()
)

# Convert the workflow to an agent
workflow_agent = workflow.as_agent(name="Content Pipeline Agent")

параметры as_agent

Параметр Тип Description
name str | None Необязательное отображаемое имя агента. Автоматическое создание, если не предусмотрено.

Использование агентов рабочих процессов

Создание потока

Для каждой беседы с агентом рабочего процесса требуется поток для управления состоянием:

# Create a new thread for the conversation
thread = workflow_agent.get_new_thread()

Выполнение без потоковой обработки

Для простых вариантов использования, когда требуется полный ответ:

from agent_framework import ChatMessage, Role

messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]

response = await workflow_agent.run(messages, thread=thread)

for message in response.messages:
    print(f"{message.author_name}: {message.text}")

Выполнение потоковой передачи

Для обновлений в режиме реального времени при выполнении рабочего процесса:

messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]

async for update in workflow_agent.run_stream(messages, thread=thread):
    # Process streaming updates from each agent in the workflow
    if update.text:
        print(update.text, end="", flush=True)

Обработка внешних запросов ввода

Если рабочий процесс содержит исполнителей, запрашивающих внешние входные данные (используя RequestInfoExecutor), эти запросы отображаются в виде вызовов функций. Агент рабочего процесса отслеживает ожидающие запросы и ожидает ответы перед продолжением:

from agent_framework import (
    FunctionCallContent,
    FunctionApprovalRequestContent,
    FunctionApprovalResponseContent,
)

async for update in workflow_agent.run_stream(messages, thread=thread):
    for content in update.contents:
        if isinstance(content, FunctionApprovalRequestContent):
            # The workflow is requesting external input
            request_id = content.id
            function_call = content.function_call

            print(f"Workflow requests input: {function_call.name}")
            print(f"Request data: {function_call.arguments}")

            # Store the request_id to provide a response later

# Check for pending requests
if workflow_agent.pending_requests:
    print(f"Pending requests: {list(workflow_agent.pending_requests.keys())}")

Предоставление ответов на ожидающие запросы

Чтобы продолжить выполнение рабочего процесса после внешнего запроса ввода:

# Create a response for the pending request
response_content = FunctionApprovalResponseContent(
    id=request_id,
    function_call=function_call,
    approved=True,
)

response_message = ChatMessage(
    role=Role.USER,
    contents=[response_content],
)

# Continue the workflow with the response
async for update in workflow_agent.run_stream([response_message], thread=thread):
    if update.text:
        print(update.text, end="", flush=True)

Полный пример

Ниже приведен полный пример демонстрации рабочего агента с потоковым выводом:

import asyncio
from agent_framework import (
    ChatAgent,
    ChatMessage,
    Role,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework._workflows import SequentialBuilder
from azure.identity import AzureCliCredential


async def main():
    # Set up the chat client
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Create specialized agents
    researcher = ChatAgent(
        name="Researcher",
        instructions="Research the given topic and provide key facts.",
        chat_client=chat_client,
    )

    writer = ChatAgent(
        name="Writer",
        instructions="Write engaging content based on the research provided.",
        chat_client=chat_client,
    )

    reviewer = ChatAgent(
        name="Reviewer",
        instructions="Review the content and provide a final polished version.",
        chat_client=chat_client,
    )

    # Build a sequential workflow
    workflow = (
        SequentialBuilder()
        .add_agents([researcher, writer, reviewer])
        .build()
    )

    # Convert to a workflow agent
    workflow_agent = workflow.as_agent(name="Content Creation Pipeline")

    # Create a thread and run the workflow
    thread = workflow_agent.get_new_thread()
    messages = [ChatMessage(role=Role.USER, content="Write about quantum computing")]

    print("Starting workflow...")
    print("=" * 60)

    current_author = None
    async for update in workflow_agent.run_stream(messages, thread=thread):
        # Show when different agents are responding
        if update.author_name and update.author_name != current_author:
            if current_author:
                print("\n" + "-" * 40)
            print(f"\n[{update.author_name}]:")
            current_author = update.author_name

        if update.text:
            print(update.text, end="", flush=True)

    print("\n" + "=" * 60)
    print("Workflow completed!")


if __name__ == "__main__":
    asyncio.run(main())

Понимание преобразования событий

При запуске рабочего процесса в качестве агента события рабочего процесса преобразуются в ответы агента. Тип ответа зависит от используемого метода:

  • run(): возвращает полный AgentRunResponse результат после завершения рабочего процесса.
  • run_stream(): возвращает AgentRunResponseUpdate объекты по мере выполнения рабочего процесса, предоставляя обновления в режиме реального времени.

Во время выполнения внутренние события рабочего процесса сопоставляются с ответами агента следующим образом:

Событие рабочего процесса Ответ агента
AgentRunUpdateEvent Проводится как AgentRunResponseUpdate (стриминг) или агрегируется в AgentRunResponse (нестриминг)
RequestInfoEvent Преобразовано в FunctionCallContent и FunctionApprovalRequestContent
Другие события Включено в raw_representation для наблюдаемости

Это преобразование позволяет использовать стандартный интерфейс агента, но при необходимости иметь доступ к подробным сведениям о рабочем процессе.

Варианты использования

1. Сложные потоки агента

Объедините рабочий процесс с несколькими агентами в виде одного агента для применения в приложениях.

User Request --> [Workflow Agent] --> Final Response
                      |
                      +-- Researcher Agent
                      +-- Writer Agent  
                      +-- Reviewer Agent

2. Состав агента

Используйте агенты рабочего процесса в качестве компонентов в крупных системах:

  • Агент рабочего процесса можно использовать в качестве средства другим агентом.
  • Несколько агентов рабочих процессов можно синхронизировать вместе
  • Агенты рабочих процессов могут быть вложены в другие рабочие процессы

3. Интеграция API

Обеспечьте доступ к сложным рабочим процессам через API, которые ожидают стандартный интерфейс агента, предоставляя:

  • Интерфейсы чата, использующие сложные внутренние рабочие процессы
  • Интеграция с существующими системами на основе агента
  • Постепенная миграция из простых агентов в сложные рабочие процессы

Дальнейшие шаги