Delen via


Microsoft Agent Framework-werkstromen - Werken met agents

Op deze pagina vindt u een overzicht van het gebruik van agents in Microsoft Agent Framework-werkstromen.

Overzicht

Als u intelligentie wilt toevoegen aan uw werkstromen, kunt u AI-agents gebruiken als onderdeel van de uitvoering van uw werkstroom. AI-agents kunnen eenvoudig worden geïntegreerd in werkstromen, zodat u complexe, intelligente oplossingen kunt maken die voorheen moeilijk te bereiken waren.

Een agent rechtstreeks toevoegen aan een werkstroom

U kunt agents toevoegen aan uw werkstroom via randen:

using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// Create the agents first
AIAgent agentA = new ChatClientAgent(chatClient, instructions);
AIAgent agentB = new ChatClientAgent(chatClient, instructions);

// Build a workflow with the agents
WorkflowBuilder builder = new(agentA);
builder.AddEdge(agentA, agentB);
Workflow<ChatMessage> workflow = builder.Build<ChatMessage>();

De werkstroom uitvoeren

Binnen de hierboven gemaakte werkstroom worden de agents in feite verpakt in een uitvoerder die de communicatie van de agents met andere onderdelen van de werkstroom afhandelt. De uitvoerder kan drie berichttypen verwerken:

  • ChatMessage: Één chatbericht.
  • List<ChatMessage>: Een lijst met chatberichten.
  • TurnToken: Een draaitoken dat het begin van een nieuwe draai aangeeft.

De uitvoerder triggert de agent niet om te reageren totdat deze een TurnToken ontvangt. Berichten die worden ontvangen voordat de TurnToken worden gebufferd en naar de agent verzonden zodra de TurnToken is ontvangen.

StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, "Hello World!"));
// Must send the turn token to trigger the agents. The agents are wrapped as executors.
// When they receive messages, they will cache the messages and only start processing
// when they receive a TurnToken. The turn token will be passed from one agent to the next.
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    // The agents will run in streaming mode and an AgentRunUpdateEvent
    // will be emitted as new chunks are generated.
    if (evt is AgentRunUpdateEvent agentRunUpdate)
    {
        Console.WriteLine($"{agentRunUpdate.ExecutorId}: {agentRunUpdate.Data}");
    }
}

De ingebouwde agentexecutor gebruiken

U kunt agents toevoegen aan uw werkstroom via randen:

from agent_framework import WorkflowBuilder
from agent_framework.azure import AzureChatClient
from azure.identity import AzureCliCredential

# Create the agents first
chat_client = AzureChatClient(credential=AzureCliCredential())
writer_agent: ChatAgent = chat_client.create_agent(
    instructions=(
        "You are an excellent content writer. You create new content and edit contents based on the feedback."
    ),
    name="writer_agent",
)
reviewer_agent = chat_client.create_agent(
    instructions=(
        "You are an excellent content reviewer."
        "Provide actionable feedback to the writer about the provided content."
        "Provide the feedback in the most concise manner possible."
    ),
    name="reviewer_agent",
)

# Build a workflow with the agents
builder = WorkflowBuilder()
builder.set_start_executor(writer_agent)
builder.add_edge(writer_agent, reviewer_agent)
workflow = builder.build()

De werkstroom uitvoeren

Binnen de hierboven gemaakte werkstroom worden de agents in feite verpakt in een uitvoerder die de communicatie van de agents met andere onderdelen van de werkstroom afhandelt. De uitvoerder kan drie berichttypen verwerken:

  • str: Eén chatbericht in tekenreeksindeling
  • ChatMessage: Één chatbericht
  • List<ChatMessage>: Een lijst met chatberichten

Wanneer de uitvoerder een bericht van een van deze typen ontvangt, wordt de agent geactiveerd om te reageren en is het antwoordtype een AgentExecutorResponse object. Deze klasse bevat nuttige informatie over het antwoord van de agent, waaronder:

  • executor_id: De id van de uitvoerder die dit antwoord heeft geproduceerd
  • agent_run_response: Het volledige antwoord van de agent
  • full_conversation: De volledige gespreksgeschiedenis tot nu toe

Er kunnen twee mogelijke gebeurtenistypen met betrekking tot de antwoorden van de agents worden uitgezonden wanneer de workflow wordt uitgevoerd.

  • AgentRunUpdateEvent bevat segmenten van het antwoord van de agent wanneer deze worden gegenereerd in de streamingmodus.
  • AgentRunEvent met het volledige antwoord van de agent in de niet-streamingmodus.

Agents worden standaard ingebed in uitvoerders die in streamingmodus worden uitgevoerd. U kunt dit gedrag aanpassen door een aangepaste executor te maken. Zie de volgende sectie voor meer informatie.

last_executor_id = None
async for event in workflow.run_streaming("Write a short blog post about AI agents."):
    if isinstance(event, AgentRunUpdateEvent):
        if event.executor_id != last_executor_id:
            if last_executor_id is not None:
                print()
            print(f"{event.executor_id}:", end=" ", flush=True)
            last_executor_id = event.executor_id
        print(event.data, end="", flush=True)

Een aangepaste agentexecutor gebruiken

Soms wilt u misschien aanpassen hoe AI-agents worden geïntegreerd in een werkstroom. U kunt dit bereiken door een aangepaste uitvoerder te maken. Hiermee kunt u het volgende beheren:

  • De aanroep van de agent: streaming of niet-streamend
  • De berichttypen die door de agent worden verwerkt, inclusief aangepaste berichttypen
  • De levenscyclus van de agent, inclusief initialisatie en opschoning
  • Het gebruik van agentthreads en andere resources
  • Aanvullende gebeurtenissen die worden verzonden tijdens de uitvoering van de agent, inclusief aangepaste gebeurtenissen
  • Integratie met andere werkstroomfuncties, zoals gedeelde statussen en aanvragen/antwoorden
internal sealed class CustomAgentExecutor : Executor<CustomInput, CustomOutput>("CustomAgentExecutor")
{
    private readonly AIAgent _agent;

    /// <summary>
    /// Creates a new instance of the <see cref="CustomAgentExecutor"/> class.
    /// </summary>
    /// <param name="agent">The AI agent used for custom processing</param>
    public CustomAgentExecutor(AIAgent agent) : base("CustomAgentExecutor")
    {
        this._agent = agent;
    }

    public async ValueTask<CustomOutput> HandleAsync(CustomInput message, IWorkflowContext context)
    {
        // Retrieve any shared states if needed
        var sharedState = await context.ReadStateAsync<SharedStateType>("sharedStateId", scopeName: "SharedStateScope");

        // Render the input for the agent
        var agentInput = RenderInput(message, sharedState);

        // Invoke the agent
        // Assume the agent is configured with structured outputs with type `CustomOutput`
        var response = await this._agent.RunAsync(agentInput);
        var customOutput = JsonSerializer.Deserialize<CustomOutput>(response.Text);

        return customOutput;
    }
}
from agent_framework import (
    ChatAgent,
    ChatMessage,
    Executor,
    WorkflowContext,
    handler
)

class Writer(Executor):

    agent: ChatAgent

    def __init__(self, chat_client: AzureChatClient, id: str = "writer"):
        # Create a domain specific agent using your configured AzureChatClient.
        agent = chat_client.create_agent(
            instructions=(
                "You are an excellent content writer. You create new content and edit contents based on the feedback."
            ),
        )
        # Associate the agent with this executor node. The base Executor stores it on self.agent.
        super().__init__(agent=agent, id=id)

    @handler
    async def handle(self, message: ChatMessage, ctx: WorkflowContext[list[ChatMessage]]) -> None:
        """Handles a single chat message and forwards the accumulated messages to the next executor in the workflow."""
        # Invoke the agent with the incoming message and get the response
        messages: list[ChatMessage] = [message]
        response = await self.agent.run(messages)
        # Accumulate messages and send them to the next executor in the workflow.
        messages.extend(response.messages)
        await ctx.send_message(messages)

Volgende stappen