Partager via


Flux de travail Microsoft Agent Framework - Utilisation de flux de travail en tant qu’agents

Ce document fournit une vue d’ensemble de l’utilisation des flux de travail en tant qu’agents dans Microsoft Agent Framework.

Aperçu

Parfois, vous avez créé un workflow sophistiqué avec plusieurs agents, exécuteurs personnalisés et logique complexe, mais vous souhaitez l’utiliser comme n’importe quel autre agent. C’est exactement ce que les agents de workflow vous permettent de faire. En emballant votre flux de travail en tant que Agent, vous pouvez interagir avec lui via la même API familière que vous utiliseriez pour un agent de conversation simple.

Principaux avantages

  • Interface unifiée : interagir avec des flux de travail complexes à l’aide de la même API que des agents simples
  • Compatibilité de l’API : intégrer des flux de travail à des systèmes existants qui prennent en charge l’interface de l’agent
  • Composabilité : Utilisez des agents de workflow comme éléments constitutifs dans des systèmes d'agents plus complexes ou dans d'autres workflows
  • Gestion des threads : exploiter les threads d’agent pour l’état de conversation, le point de contrôle et la reprise
  • Prise en charge de la diffusion en continu : obtenir des mises à jour en temps réel à mesure que le flux de travail s’exécute

Fonctionnement

Lorsque vous convertissez un flux de travail en agent :

  1. Le flux de travail est validé pour s’assurer que son exécuteur de démarrage peut accepter les messages de conversation
  2. Un thread est créé pour gérer l’état de conversation et les points de contrôle
  3. Les messages d’entrée sont routés vers l’exécuteur de démarrage du flux de travail
  4. Les événements de flux de travail sont convertis en mises à jour de réponse de l’agent
  5. Les demandes d’entrée externes (de RequestInfoExecutor) sont présentées en tant qu’appels de fonction.

Spécifications

Pour utiliser un flux de travail en tant qu’agent, l’exécuteur de démarrage du flux de travail doit être en mesure de gérer IEnumerable<ChatMessage> comme entrée. Cela est automatiquement satisfait lors de l'utilisation de ChatClientAgent ou d'autres exécuteurs basés sur un agent.

Création d’un agent de flux de travail

Utilisez la méthode d’extension AsAgent() pour convertir n’importe quel flux de travail compatible en agent :

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;

// First, build your workflow
var workflow = AgentWorkflowBuilder
    .CreateSequentialPipeline(researchAgent, writerAgent, reviewerAgent)
    .Build();

// Convert the workflow to an agent
AIAgent workflowAgent = workflow.AsAgent(
    id: "content-pipeline",
    name: "Content Pipeline Agent",
    description: "A multi-agent workflow that researches, writes, and reviews content"
);

Paramètres AsAgent

Paramètre Type Descriptif
id string? Identificateur unique facultatif pour l’agent. Généré automatiquement s’il n’est pas fourni.
name string? Nom d'affichage facultatif de l’agent.
description string? Description facultative de l’objectif de l’agent.
checkpointManager CheckpointManager? Gestionnaire de points de contrôle facultatif pour la persistance entre les sessions.
executionEnvironment IWorkflowExecutionEnvironment? Environnement d’exécution facultatif. Par défaut, InProcessExecution.OffThread ou InProcessExecution.Concurrent, en fonction de la configuration du flux de travail.

Utilisation d’agents de flux de travail

Création d’un thread

Chaque conversation avec un agent de workflow nécessite un thread pour gérer l’état :

// Create a new thread for the conversation
AgentThread thread = workflowAgent.GetNewThread();

Exécution sans diffusion en continu

Pour les cas d’usage simples où vous souhaitez obtenir la réponse complète :

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

AgentRunResponse response = await workflowAgent.RunAsync(messages, thread);

foreach (ChatMessage message in response.Messages)
{
    Console.WriteLine($"{message.AuthorName}: {message.Text}");
}

Exécution de streaming

Pour les mises à jour en temps réel à mesure que le flux de travail s’exécute :

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

await foreach (AgentRunResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
    // Process streaming updates from each agent in the workflow
    if (!string.IsNullOrEmpty(update.Text))
    {
        Console.Write(update.Text);
    }
}

Gestion des demandes d’entrée externes

Lorsqu’un flux de travail contient des exécuteurs qui demandent une entrée externe (à l’aide RequestInfoExecutor), ces requêtes sont exposées en tant qu’appels de fonction dans la réponse de l’agent :

await foreach (AgentRunResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
    // Check for function call requests
    foreach (AIContent content in update.Contents)
    {
        if (content is FunctionCallContent functionCall)
        {
            // Handle the external input request
            Console.WriteLine($"Workflow requests input: {functionCall.Name}");
            Console.WriteLine($"Request data: {functionCall.Arguments}");

            // Provide the response in the next message
        }
    }
}

Sérialisation et reprise des threads

Les threads de l’agent de flux de travail peuvent être sérialisés pour la persistance et repris ultérieurement :

// Serialize the thread state
JsonElement serializedThread = thread.Serialize();

// Store serializedThread to your persistence layer...

// Later, resume the thread
AgentThread resumedThread = workflowAgent.DeserializeThread(serializedThread);

// Continue the conversation
await foreach (var update in workflowAgent.RunStreamingAsync(newMessages, resumedThread))
{
    Console.Write(update.Text);
}

Point de contrôle avec les agents de flux de travail

Activez le point de contrôle pour conserver l’état du flux de travail entre les redémarrages de processus :

// Create a checkpoint manager with your storage backend
var checkpointManager = new CheckpointManager(new FileCheckpointStorage("./checkpoints"));

// Create workflow agent with checkpointing enabled
AIAgent workflowAgent = workflow.AsAgent(
    id: "persistent-workflow",
    name: "Persistent Workflow Agent",
    checkpointManager: checkpointManager
);

Spécifications

Pour utiliser un flux de travail en tant qu’agent, l’exécuteur de démarrage du flux de travail doit être en mesure de gérer list[ChatMessage] comme entrée. Cela est automatiquement satisfait lors de l’utilisation de ChatAgent ou AgentExecutor.

Création d’un agent de flux de travail

Appelez as_agent() tout workflow compatible pour le convertir en agent :

from agent_framework import WorkflowBuilder, ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

# Create your chat client and agents
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

researcher = ChatAgent(
    name="Researcher",
    instructions="Research and gather information on the given topic.",
    chat_client=chat_client,
)

writer = ChatAgent(
    name="Writer", 
    instructions="Write clear, engaging content based on research.",
    chat_client=chat_client,
)

# Build your workflow
workflow = (
    WorkflowBuilder()
    .set_start_executor(researcher)
    .add_edge(researcher, writer)
    .build()
)

# Convert the workflow to an agent
workflow_agent = workflow.as_agent(name="Content Pipeline Agent")

Paramètres as_agent

Paramètre Type Descriptif
name str | None Nom d’affichage facultatif de l’agent. Généré automatiquement s’il n’est pas fourni.

Utilisation d’agents de flux de travail

Création d’un thread

Chaque conversation avec un agent de workflow nécessite un thread pour gérer l’état :

# Create a new thread for the conversation
thread = workflow_agent.get_new_thread()

Exécution sans diffusion en continu

Pour les cas d’usage simples où vous souhaitez obtenir la réponse complète :

from agent_framework import ChatMessage, Role

messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]

response = await workflow_agent.run(messages, thread=thread)

for message in response.messages:
    print(f"{message.author_name}: {message.text}")

Exécution de streaming

Pour les mises à jour en temps réel à mesure que le flux de travail s’exécute :

messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]

async for update in workflow_agent.run_stream(messages, thread=thread):
    # Process streaming updates from each agent in the workflow
    if update.text:
        print(update.text, end="", flush=True)

Gestion des demandes d’entrée externes

Lorsqu’un flux de travail contient des exécuteurs qui demandent une entrée externe (à l’aide RequestInfoExecutor), ces requêtes sont exposées en tant qu’appels de fonction. L’agent de flux de travail suit les demandes en attente et attend des réponses avant de continuer :

from agent_framework import (
    FunctionCallContent,
    FunctionApprovalRequestContent,
    FunctionApprovalResponseContent,
)

async for update in workflow_agent.run_stream(messages, thread=thread):
    for content in update.contents:
        if isinstance(content, FunctionApprovalRequestContent):
            # The workflow is requesting external input
            request_id = content.id
            function_call = content.function_call

            print(f"Workflow requests input: {function_call.name}")
            print(f"Request data: {function_call.arguments}")

            # Store the request_id to provide a response later

# Check for pending requests
if workflow_agent.pending_requests:
    print(f"Pending requests: {list(workflow_agent.pending_requests.keys())}")

Fourniture de réponses aux demandes en attente

Pour poursuivre l’exécution du flux de travail après une demande d’entrée externe :

# Create a response for the pending request
response_content = FunctionApprovalResponseContent(
    id=request_id,
    function_call=function_call,
    approved=True,
)

response_message = ChatMessage(
    role=Role.USER,
    contents=[response_content],
)

# Continue the workflow with the response
async for update in workflow_agent.run_stream([response_message], thread=thread):
    if update.text:
        print(update.text, end="", flush=True)

Exemple complet

Voici un exemple complet illustrant un agent de flux de travail avec une sortie de streaming :

import asyncio
from agent_framework import (
    ChatAgent,
    ChatMessage,
    Role,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework._workflows import SequentialBuilder
from azure.identity import AzureCliCredential


async def main():
    # Set up the chat client
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Create specialized agents
    researcher = ChatAgent(
        name="Researcher",
        instructions="Research the given topic and provide key facts.",
        chat_client=chat_client,
    )

    writer = ChatAgent(
        name="Writer",
        instructions="Write engaging content based on the research provided.",
        chat_client=chat_client,
    )

    reviewer = ChatAgent(
        name="Reviewer",
        instructions="Review the content and provide a final polished version.",
        chat_client=chat_client,
    )

    # Build a sequential workflow
    workflow = (
        SequentialBuilder()
        .add_agents([researcher, writer, reviewer])
        .build()
    )

    # Convert to a workflow agent
    workflow_agent = workflow.as_agent(name="Content Creation Pipeline")

    # Create a thread and run the workflow
    thread = workflow_agent.get_new_thread()
    messages = [ChatMessage(role=Role.USER, content="Write about quantum computing")]

    print("Starting workflow...")
    print("=" * 60)

    current_author = None
    async for update in workflow_agent.run_stream(messages, thread=thread):
        # Show when different agents are responding
        if update.author_name and update.author_name != current_author:
            if current_author:
                print("\n" + "-" * 40)
            print(f"\n[{update.author_name}]:")
            current_author = update.author_name

        if update.text:
            print(update.text, end="", flush=True)

    print("\n" + "=" * 60)
    print("Workflow completed!")


if __name__ == "__main__":
    asyncio.run(main())

Présentation de la conversion d’événements

Lorsqu’un flux de travail s’exécute en tant qu’agent, les événements de flux de travail sont convertis en réponses de l’agent. Le type de réponse dépend de la méthode que vous utilisez :

  • run(): retourne un AgentRunResponse résultat contenant le résultat complet une fois le flux de travail terminé
  • run_stream(): génère des AgentRunResponseUpdate objets à mesure que le flux de travail s’exécute, fournissant des mises à jour en temps réel

Pendant l’exécution, les événements de flux de travail internes sont mappés aux réponses de l’agent comme suit :

Événement de flux de travail Réponse de l’agent
AgentRunUpdateEvent Passé à travers comme AgentRunResponseUpdate (streaming) ou agrégé dans AgentRunResponse (non-streaming)
RequestInfoEvent Converti en FunctionCallContent et FunctionApprovalRequestContent
Autres événements Inclus dans raw_representation pour l'observabilité

Cette conversion vous permet d’utiliser l’interface de l’agent standard tout en ayant accès aux informations détaillées du flux de travail si nécessaire.

Cas d’usage

1. Pipelines d’agents complexes

Encapsulez un flux de travail multi-agent en tant qu’agent unique pour une utilisation dans les applications :

User Request --> [Workflow Agent] --> Final Response
                      |
                      +-- Researcher Agent
                      +-- Writer Agent  
                      +-- Reviewer Agent

2. Composition de l’agent

Utilisez des agents de flux de travail en tant que composants dans des systèmes plus volumineux :

  • Un agent de flux de travail peut être utilisé comme outil par un autre agent
  • Plusieurs agents de flux de travail peuvent être orchestrés ensemble
  • Les agents de flux de travail peuvent être imbriqués dans d’autres flux de travail

3. Intégration de l’API

Exposez des flux de travail complexes via des API qui attendent l’interface de l’agent standard, en activant :

  • Interfaces de conversation qui utilisent des flux de travail back-end sophistiqués
  • Intégration à des systèmes basés sur des agents existants
  • Migration progressive d’agents simples vers des flux de travail complexes

Étapes suivantes