Partager via


Flux de travail Microsoft Agent Framework - Utilisation des agents

Cette page fournit une vue d’ensemble de l’utilisation des agents dans les flux de travail Microsoft Agent Framework.

Aperçu

Pour ajouter de l’intelligence à vos flux de travail, vous pouvez tirer parti des agents IA dans le cadre de l’exécution de votre flux de travail. Les agents IA peuvent être facilement intégrés aux flux de travail, ce qui vous permet de créer des solutions complexes et intelligentes qui étaient auparavant difficiles à réaliser.

Ajouter un agent directement à un flux de travail

Vous pouvez ajouter des agents à votre flux de travail via des périphéries :

using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// Create the agents first
AIAgent agentA = new ChatClientAgent(chatClient, instructions);
AIAgent agentB = new ChatClientAgent(chatClient, instructions);

// Build a workflow with the agents
WorkflowBuilder builder = new(agentA);
builder.AddEdge(agentA, agentB);
Workflow<ChatMessage> workflow = builder.Build<ChatMessage>();

Exécution du flux de travail

Dans le flux de travail créé ci-dessus, les agents sont encapsulés à l’intérieur d’un exécuteur qui gère la communication de l’agent avec d’autres parties du flux de travail. L’exécuteur peut gérer trois types de messages :

  • ChatMessage: message de conversation unique
  • List<ChatMessage>: Liste des messages de conversation
  • TurnToken: jeton de tour qui signale le début d’un nouveau tour

L’exécuteur ne déclenche pas la réponse de l’agent tant qu’il ne reçoit pas TurnToken. Tous les messages reçus avant le TurnToken sont mis en mémoire tampon et envoyés à l’agent lorsque le TurnToken est reçu.

StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, "Hello World!"));
// Must send the turn token to trigger the agents. The agents are wrapped as executors.
// When they receive messages, they will cache the messages and only start processing
// when they receive a TurnToken. The turn token will be passed from one agent to the next.
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    // The agents will run in streaming mode and an AgentRunUpdateEvent
    // will be emitted as new chunks are generated.
    if (evt is AgentRunUpdateEvent agentRunUpdate)
    {
        Console.WriteLine($"{agentRunUpdate.ExecutorId}: {agentRunUpdate.Data}");
    }
}

Utilisation de l’exécuteur d’agent intégré

Vous pouvez ajouter des agents à votre flux de travail via des périphéries :

from agent_framework import WorkflowBuilder
from agent_framework.azure import AzureChatClient
from azure.identity import AzureCliCredential

# Create the agents first
chat_client = AzureChatClient(credential=AzureCliCredential())
writer_agent: ChatAgent = chat_client.create_agent(
    instructions=(
        "You are an excellent content writer. You create new content and edit contents based on the feedback."
    ),
    name="writer_agent",
)
reviewer_agent = chat_client.create_agent(
    instructions=(
        "You are an excellent content reviewer."
        "Provide actionable feedback to the writer about the provided content."
        "Provide the feedback in the most concise manner possible."
    ),
    name="reviewer_agent",
)

# Build a workflow with the agents
builder = WorkflowBuilder()
builder.set_start_executor(writer_agent)
builder.add_edge(writer_agent, reviewer_agent)
workflow = builder.build()

Exécution du flux de travail

Dans le flux de travail créé ci-dessus, les agents sont encapsulés à l’intérieur d’un exécuteur qui gère la communication de l’agent avec d’autres parties du flux de travail. L’exécuteur peut gérer trois types de messages :

  • str: message de conversation unique au format chaîne
  • ChatMessage: message de conversation unique
  • List<ChatMessage>: Liste des messages de conversation

Chaque fois que l’exécuteur reçoit un message d’un de ces types, il déclenche la réponse de l’agent et le type de réponse est un AgentExecutorResponse objet. Cette classe contient des informations utiles sur la réponse de l’agent, notamment :

  • executor_id: ID de l’exécuteur qui a produit cette réponse
  • agent_run_response: réponse complète de l’agent
  • full_conversation: Historique complet des conversations jusqu’à ce stade

Deux types d’événements possibles liés aux réponses des agents peuvent être émis lors de l’exécution du flux de travail :

  • AgentRunUpdateEvent contenant des blocs de la réponse de l’agent à mesure qu’ils sont générés en mode streaming.
  • AgentRunEvent contenant la réponse complète de l’agent en mode non streaming.

Par défaut, les agents sont encapsulés dans des exécuteurs qui s’exécutent en mode streaming. Vous pouvez personnaliser ce comportement en créant un exécuteur personnalisé. Pour plus d’informations, consultez la section suivante.

last_executor_id = None
async for event in workflow.run_streaming("Write a short blog post about AI agents."):
    if isinstance(event, AgentRunUpdateEvent):
        if event.executor_id != last_executor_id:
            if last_executor_id is not None:
                print()
            print(f"{event.executor_id}:", end=" ", flush=True)
            last_executor_id = event.executor_id
        print(event.data, end="", flush=True)

Utilisation d’un exécuteur d’agent personnalisé

Parfois, vous souhaiterez peut-être personnaliser la façon dont les agents IA sont intégrés à un flux de travail. Pour ce faire, vous pouvez créer un exécuteur personnalisé. Cela vous permet de contrôler :

  • Appel de l’agent : diffusion en continu ou non-diffusion en continu
  • Les types de messages gérés par l’agent, y compris les types de messages personnalisés
  • Cycle de vie de l’agent, y compris l’initialisation et le nettoyage
  • Utilisation des threads d’agent et d’autres ressources
  • Événements supplémentaires émis pendant l’exécution de l’agent, y compris les événements personnalisés
  • Intégration à d’autres fonctionnalités de flux de travail, telles que les états partagés et les demandes/réponses
internal sealed class CustomAgentExecutor : Executor<CustomInput, CustomOutput>("CustomAgentExecutor")
{
    private readonly AIAgent _agent;

    /// <summary>
    /// Creates a new instance of the <see cref="CustomAgentExecutor"/> class.
    /// </summary>
    /// <param name="agent">The AI agent used for custom processing</param>
    public CustomAgentExecutor(AIAgent agent) : base("CustomAgentExecutor")
    {
        this._agent = agent;
    }

    public async ValueTask<CustomOutput> HandleAsync(CustomInput message, IWorkflowContext context)
    {
        // Retrieve any shared states if needed
        var sharedState = await context.ReadStateAsync<SharedStateType>("sharedStateId", scopeName: "SharedStateScope");

        // Render the input for the agent
        var agentInput = RenderInput(message, sharedState);

        // Invoke the agent
        // Assume the agent is configured with structured outputs with type `CustomOutput`
        var response = await this._agent.RunAsync(agentInput);
        var customOutput = JsonSerializer.Deserialize<CustomOutput>(response.Text);

        return customOutput;
    }
}
from agent_framework import (
    ChatAgent,
    ChatMessage,
    Executor,
    WorkflowContext,
    handler
)

class Writer(Executor):

    agent: ChatAgent

    def __init__(self, chat_client: AzureChatClient, id: str = "writer"):
        # Create a domain specific agent using your configured AzureChatClient.
        agent = chat_client.create_agent(
            instructions=(
                "You are an excellent content writer. You create new content and edit contents based on the feedback."
            ),
        )
        # Associate the agent with this executor node. The base Executor stores it on self.agent.
        super().__init__(agent=agent, id=id)

    @handler
    async def handle(self, message: ChatMessage, ctx: WorkflowContext[list[ChatMessage]]) -> None:
        """Handles a single chat message and forwards the accumulated messages to the next executor in the workflow."""
        # Invoke the agent with the incoming message and get the response
        messages: list[ChatMessage] = [message]
        response = await self.agent.run(messages)
        # Accumulate messages and send them to the next executor in the workflow.
        messages.extend(response.messages)
        await ctx.send_message(messages)

Étapes suivantes