Udostępnij przez


Przepływy pracy programu Microsoft Agent Framework — używanie przepływów pracy jako agentów

Ten dokument zawiera omówienie sposobu używania przepływów pracy jako agentów w programie Microsoft Agent Framework.

Przegląd

Czasami utworzono zaawansowany przepływ pracy z wieloma agentami, niestandardowymi funkcjami wykonawczych i złożoną logiką — ale chcesz używać go tak samo jak każdy inny agent. Dokładnie to, co umożliwiają agenci przepływu pracy. Zawijając przepływ pracy jako Agentelement, możesz wchodzić z nim w interakcje za pomocą tego samego znanego interfejsu API, którego używasz dla prostego agenta czatu.

Najważniejsze korzyści

  • Ujednolicony interfejs: interakcja ze złożonymi przepływami pracy przy użyciu tego samego interfejsu API co prostych agentów
  • Zgodność interfejsu API: integrowanie przepływów pracy z istniejącymi systemami obsługującymi interfejs agenta
  • Komponowalność: używanie agentów przepływu pracy jako bloków konstrukcyjnych w większych systemach agentów lub innych przepływach pracy
  • Zarządzanie wątkami: wykorzystanie wątków agenta do obsługi stanu konwersacji, punktów kontrolnych i wznowienia
  • Obsługa przesyłania strumieniowego: Otrzymuj aktualizacje w czasie rzeczywistym podczas realizacji przepływu pracy

Jak to działa

Gdy konwertujesz przepływ pracy na agenta:

  1. Przepływ pracy jest weryfikowany, aby upewnić się, że jego komponent wykonawczy startu może akceptować komunikaty czatu.
  2. Wątek jest tworzony w celu zarządzania stanem konwersacji i punktami kontrolnymi
  3. Komunikaty wejściowe są kierowane do wykonawcy początkowego przepływu pracy
  4. Zdarzenia przepływu pracy są konwertowane na aktualizacje odpowiedzi agenta
  5. Żądania zewnętrznych danych wejściowych (z RequestInfoExecutor) są udostępniane jako wywołania funkcji

Requirements

Aby użyć przepływu pracy jako agenta, funkcja wykonawcza uruchamiania przepływu pracy musi być w stanie obsłużyć IEnumerable<ChatMessage> jako dane wejściowe. Jest to automatycznie spełnione w przypadku używania ChatClientAgent lub innych funkcji wykonawczych opartych na agencie.

Tworzenie agenta przepływu pracy

AsAgent() Użyj metody rozszerzenia, aby przekonwertować dowolny zgodny przepływ pracy na agenta:

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;

// First, build your workflow
var workflow = AgentWorkflowBuilder
    .CreateSequentialPipeline(researchAgent, writerAgent, reviewerAgent)
    .Build();

// Convert the workflow to an agent
AIAgent workflowAgent = workflow.AsAgent(
    id: "content-pipeline",
    name: "Content Pipeline Agent",
    description: "A multi-agent workflow that researches, writes, and reviews content"
);

Parametry AsAgent

Parameter Typ Description
id string? Opcjonalny unikatowy identyfikator agenta. Generowane automatycznie, jeśli nie podano.
name string? Opcjonalna nazwa wyświetlana dla agenta.
description string? Opcjonalny opis przeznaczenia agenta.
checkpointManager CheckpointManager? Opcjonalny menedżer punktów kontrolnych na potrzeby trwałości między sesjami.
executionEnvironment IWorkflowExecutionEnvironment? Opcjonalne środowisko wykonywania. Domyślnie ustawiane jako InProcessExecution.OffThread lub InProcessExecution.Concurrent, w zależności od konfiguracji przepływu pracy.

Korzystanie z agentów przepływu pracy

Tworzenie wątku

Każda konwersacja z agentem przepływu pracy wymaga wątku do zarządzania stanem:

// Create a new thread for the conversation
AgentThread thread = workflowAgent.GetNewThread();

Wykonywanie bez przesyłania strumieniowego

W przypadku prostych przypadków użycia, w których chcesz uzyskać pełną odpowiedź:

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

AgentRunResponse response = await workflowAgent.RunAsync(messages, thread);

foreach (ChatMessage message in response.Messages)
{
    Console.WriteLine($"{message.AuthorName}: {message.Text}");
}

Wykonywanie przesyłania strumieniowego

Aby uzyskać aktualizacje w czasie rzeczywistym podczas wykonywania przepływu pracy:

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

await foreach (AgentRunResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
    // Process streaming updates from each agent in the workflow
    if (!string.IsNullOrEmpty(update.Text))
    {
        Console.Write(update.Text);
    }
}

Obsługa żądań zewnętrznych danych wejściowych

Gdy przepływ pracy zawiera funkcje wykonawcze, które żądają zewnętrznych danych wejściowych (przy użyciu RequestInfoExecutor), te żądania są udostępniane jako wywołania funkcji w odpowiedzi agenta:

await foreach (AgentRunResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
    // Check for function call requests
    foreach (AIContent content in update.Contents)
    {
        if (content is FunctionCallContent functionCall)
        {
            // Handle the external input request
            Console.WriteLine($"Workflow requests input: {functionCall.Name}");
            Console.WriteLine($"Request data: {functionCall.Arguments}");

            // Provide the response in the next message
        }
    }
}

Serializacja wątków i wznowienie

Wątki agenta przepływu pracy można serializować w celu przechowywania i wznowienia później.

// Serialize the thread state
JsonElement serializedThread = thread.Serialize();

// Store serializedThread to your persistence layer...

// Later, resume the thread
AgentThread resumedThread = workflowAgent.DeserializeThread(serializedThread);

// Continue the conversation
await foreach (var update in workflowAgent.RunStreamingAsync(newMessages, resumedThread))
{
    Console.Write(update.Text);
}

Tworzenie punktów kontrolnych z agentami przepływu pracy

Włącz tworzenie punktów kontrolnych, aby stan przepływu pracy był zachowany po ponownym uruchomieniu procesu.

// Create a checkpoint manager with your storage backend
var checkpointManager = new CheckpointManager(new FileCheckpointStorage("./checkpoints"));

// Create workflow agent with checkpointing enabled
AIAgent workflowAgent = workflow.AsAgent(
    id: "persistent-workflow",
    name: "Persistent Workflow Agent",
    checkpointManager: checkpointManager
);

Requirements

Aby użyć przepływu pracy jako agenta, funkcja wykonawcza uruchamiania przepływu pracy musi być w stanie obsłużyć list[ChatMessage] jako dane wejściowe. To jest automatycznie spełniane podczas korzystania z ChatAgent lub AgentExecutor.

Tworzenie agenta przepływu pracy

Wywołaj as_agent() dowolny zgodny przepływ pracy, aby przekonwertować go na agenta:

from agent_framework import WorkflowBuilder, ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

# Create your chat client and agents
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

researcher = ChatAgent(
    name="Researcher",
    instructions="Research and gather information on the given topic.",
    chat_client=chat_client,
)

writer = ChatAgent(
    name="Writer", 
    instructions="Write clear, engaging content based on research.",
    chat_client=chat_client,
)

# Build your workflow
workflow = (
    WorkflowBuilder()
    .set_start_executor(researcher)
    .add_edge(researcher, writer)
    .build()
)

# Convert the workflow to an agent
workflow_agent = workflow.as_agent(name="Content Pipeline Agent")

parametry as_agent

Parameter Typ Description
name str | None Opcjonalna wyświetlana nazwa agenta. Generowane automatycznie, jeśli nie podano.

Korzystanie z agentów przepływu pracy

Tworzenie wątku

Każda konwersacja z agentem przepływu pracy wymaga wątku do zarządzania stanem:

# Create a new thread for the conversation
thread = workflow_agent.get_new_thread()

Wykonywanie bez przesyłania strumieniowego

W przypadku prostych przypadków użycia, w których chcesz uzyskać pełną odpowiedź:

from agent_framework import ChatMessage, Role

messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]

response = await workflow_agent.run(messages, thread=thread)

for message in response.messages:
    print(f"{message.author_name}: {message.text}")

Wykonywanie przesyłania strumieniowego

Aby uzyskać aktualizacje w czasie rzeczywistym podczas wykonywania przepływu pracy:

messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]

async for update in workflow_agent.run_stream(messages, thread=thread):
    # Process streaming updates from each agent in the workflow
    if update.text:
        print(update.text, end="", flush=True)

Obsługa żądań zewnętrznych danych wejściowych

Gdy przepływ pracy zawiera funkcje wykonawcze, które żądają zewnętrznych danych wejściowych (przy użyciu RequestInfoExecutor), te żądania są udostępniane jako wywołania funkcji. Agent przepływu pracy monitoruje oczekujące żądania i wymaga odpowiedzi przed kontynuowaniem.

from agent_framework import (
    FunctionCallContent,
    FunctionApprovalRequestContent,
    FunctionApprovalResponseContent,
)

async for update in workflow_agent.run_stream(messages, thread=thread):
    for content in update.contents:
        if isinstance(content, FunctionApprovalRequestContent):
            # The workflow is requesting external input
            request_id = content.id
            function_call = content.function_call

            print(f"Workflow requests input: {function_call.name}")
            print(f"Request data: {function_call.arguments}")

            # Store the request_id to provide a response later

# Check for pending requests
if workflow_agent.pending_requests:
    print(f"Pending requests: {list(workflow_agent.pending_requests.keys())}")

Dostarczanie odpowiedzi na oczekujące żądania

Aby kontynuować wykonywanie przepływu pracy po zewnętrznym żądaniu danych wejściowych:

# Create a response for the pending request
response_content = FunctionApprovalResponseContent(
    id=request_id,
    function_call=function_call,
    approved=True,
)

response_message = ChatMessage(
    role=Role.USER,
    contents=[response_content],
)

# Continue the workflow with the response
async for update in workflow_agent.run_stream([response_message], thread=thread):
    if update.text:
        print(update.text, end="", flush=True)

Kompletny przykład

Oto kompletny przykład demonstracyjny pokazujący agenta procesu roboczego z wyjściem strumieniowym.

import asyncio
from agent_framework import (
    ChatAgent,
    ChatMessage,
    Role,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework._workflows import SequentialBuilder
from azure.identity import AzureCliCredential


async def main():
    # Set up the chat client
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Create specialized agents
    researcher = ChatAgent(
        name="Researcher",
        instructions="Research the given topic and provide key facts.",
        chat_client=chat_client,
    )

    writer = ChatAgent(
        name="Writer",
        instructions="Write engaging content based on the research provided.",
        chat_client=chat_client,
    )

    reviewer = ChatAgent(
        name="Reviewer",
        instructions="Review the content and provide a final polished version.",
        chat_client=chat_client,
    )

    # Build a sequential workflow
    workflow = (
        SequentialBuilder()
        .add_agents([researcher, writer, reviewer])
        .build()
    )

    # Convert to a workflow agent
    workflow_agent = workflow.as_agent(name="Content Creation Pipeline")

    # Create a thread and run the workflow
    thread = workflow_agent.get_new_thread()
    messages = [ChatMessage(role=Role.USER, content="Write about quantum computing")]

    print("Starting workflow...")
    print("=" * 60)

    current_author = None
    async for update in workflow_agent.run_stream(messages, thread=thread):
        # Show when different agents are responding
        if update.author_name and update.author_name != current_author:
            if current_author:
                print("\n" + "-" * 40)
            print(f"\n[{update.author_name}]:")
            current_author = update.author_name

        if update.text:
            print(update.text, end="", flush=True)

    print("\n" + "=" * 60)
    print("Workflow completed!")


if __name__ == "__main__":
    asyncio.run(main())

Opis konwersji zdarzeń

Gdy przepływ pracy działa jako agent, zdarzenia przepływu pracy są konwertowane na odpowiedzi agenta. Typ odpowiedzi zależy od używanej metody:

  • run(): zwraca AgentRunResponse pełny wynik po zakończeniu przepływu pracy
  • run_stream(): generuje AgentRunResponseUpdate obiekty w trakcie realizacji przepływu pracy, dostarczając aktualizacje w czasie rzeczywistym

Podczas wykonywania zdarzenia wewnętrznego przepływu pracy są mapowane na odpowiedzi agenta w następujący sposób:

Zdarzenie przepływu pracy Odpowiedź agenta
AgentRunUpdateEvent Przekazywane jako AgentRunResponseUpdate (przesyłanie strumieniowe) lub agregowane do AgentRunResponse (bez przesyłania strumieniowego)
RequestInfoEvent Przekonwertowane na FunctionCallContent i FunctionApprovalRequestContent
Inne zdarzenia Dołączone do raw_representation dla widoczności i monitorowania

Ta konwersja umożliwia korzystanie ze standardowego interfejsu agenta, jednocześnie mając dostęp do szczegółowych informacji o przepływie pracy w razie potrzeby.

Przypadki użycia

1. Złożone potoki agentów

Zawijanie przepływu pracy z wieloma agentami jako jednego agenta do użycia w aplikacjach:

User Request --> [Workflow Agent] --> Final Response
                      |
                      +-- Researcher Agent
                      +-- Writer Agent  
                      +-- Reviewer Agent

2. Skład agenta

Użyj agentów przepływu pracy jako składników w większych systemach:

  • Agent przepływu pracy może służyć jako narzędzie przez innego agenta
  • Wiele agentów przepływu pracy można organizować razem
  • Agenci przepływu pracy mogą być zagnieżdżani w innych przepływach pracy

3. Integracja interfejsu API

Ujawnianie złożonych przepływów pracy za pośrednictwem interfejsów API oczekujących standardowego interfejsu agenta, umożliwiając:

  • Interfejsy rozmów korzystające z zaawansowanych przepływów pracy zaplecza
  • Integracja z istniejącymi systemami opartymi na agentach
  • Stopniowa migracja od prostych agentów do złożonych przepływów pracy

Dalsze kroki