Dela via


Skapa ett enkelt parallellt arbetsflöde

Den här självstudien visar hur du skapar ett samtidigt arbetsflöde med Agent Framework. Du lär dig att implementera fan-out och fan-in-mönster som möjliggör parallell bearbetning, vilket gör att flera exekutorer eller agenter kan arbeta samtidigt och sedan sammanställa sina resultat.

Vad du kommer att bygga

Du skapar ett arbetsflöde som:

  • Tar en fråga som indata (till exempel "Vad är temperatur?")
  • Skickar samma fråga till två expert-AI-agenter samtidigt (fysiker och kemist)
  • Samlar in och kombinerar svar från båda agenterna till en enda utdata
  • Demonstrerar samtidig körning med AI-agenter med hjälp av fan-out och fan-in-mönster

Begrepp som omfattas

Förutsättningar

Steg 1: Installera NuGet-paket

Installera först de nödvändiga paketen för .NET-projektet:

dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease

Steg 2: Konfigurera beroenden och Azure OpenAI

Börja med att konfigurera projektet med de nödvändiga NuGet-paketen och Azure OpenAI-klienten:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
            .GetChatClient(deploymentName).AsIChatClient();

Steg 3: Skapa expert-AI-agenter

Skapa två specialiserade AI-agenter som ger expertperspektiv:

        // Create the AI agents with specialized expertise
        ChatClientAgent physicist = new(
            chatClient,
            name: "Physicist",
            instructions: "You are an expert in physics. You answer questions from a physics perspective."
        );

        ChatClientAgent chemist = new(
            chatClient,
            name: "Chemist",
            instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
        );

Steg 4: Skapa startexekutor

Skapa en köre som initierar samtidig bearbetning genom att skicka indata till flera agenter:

        var startExecutor = new ConcurrentStartExecutor();

Implementeringen ConcurrentStartExecutor :

/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor")
{
    /// <summary>
    /// Starts the concurrent processing by sending messages to the agents.
    /// </summary>
    /// <param name="message">The user message to process</param>
    /// <param name="context">Workflow context for accessing workflow services and adding events</param>
    /// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
    /// The default is <see cref="CancellationToken.None"/>.</param>
    /// <returns>A task representing the asynchronous operation</returns>
    public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Broadcast the message to all connected agents. Receiving agents will queue
        // the message but will not start processing until they receive a turn token.
        await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);

        // Broadcast the turn token to kick off the agents.
        await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
    }
}

Steg 5: Skapa Aggregation Executor

Skapa en exekutor som samlar in och kombinerar svar från flera agenter.

        var aggregationExecutor = new ConcurrentAggregationExecutor();

Implementeringen ConcurrentAggregationExecutor :

/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
    Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
    private readonly List<ChatMessage> _messages = [];

    /// <summary>
    /// Handles incoming messages from the agents and aggregates their responses.
    /// </summary>
    /// <param name="message">The message from the agent</param>
    /// <param name="context">Workflow context for accessing workflow services and adding events</param>
    /// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
    /// The default is <see cref="CancellationToken.None"/>.</param>
    /// <returns>A task representing the asynchronous operation</returns>
    public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._messages.AddRange(message);

        if (this._messages.Count == 2)
        {
            var formattedMessages = string.Join(Environment.NewLine,
                this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
            await context.YieldOutputAsync(formattedMessages, cancellationToken);
        }
    }
}

Steg 6: Skapa arbetsflödet

Anslut körarna och agenterna med hjälp av fläkt och fläkt-in kantsammanfogningar.

        // Build the workflow by adding executors and connecting them
        var workflow = new WorkflowBuilder(startExecutor)
            .AddFanOutEdge(startExecutor, targets: [physicist, chemist])
            .AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
            .WithOutputFrom(aggregationExecutor)
            .Build();

Steg 7: Kör arbetsflödet

Kör arbetsflödet och samla in strömmande utdata:

        // Execute the workflow in streaming mode
        await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, "What is temperature?");
        await foreach (WorkflowEvent evt in run.WatchStreamAsync())
        {
            if (evt is WorkflowOutputEvent output)
            {
                Console.WriteLine($"Workflow completed with results:\n{output.Data}");
            }
        }
    }
}

Så här fungerar det

  1. Fan-Out: ConcurrentStartExecutor tar emot frågan och fan-out gränsen skickar den till både fysikagenten och kemistagenten samtidigt.
  2. Parallell bearbetning: Båda AI-agenterna bearbetar samma fråga samtidigt, var och en ger sitt expertperspektiv.
  3. Fan-In: ConcurrentAggregationExecutor samlar in ChatMessage svar från båda agenterna.
  4. Sammansättning: När båda svaren har tagits emot kombinerar aggregatorn dem till en formaterad utdata.

Viktiga begrepp

  • Fan-Out-kopplingar: Använd AddFanOutEdge() för att distribuera samma indata till flera exekutörer eller agenter.
  • Fan-In-kanter: Använd AddFanInEdge() för att samla in resultat från flera källexekutorer.
  • AI-agentintegrering: AI-agenter kan användas direkt som köre i arbetsflöden.
  • Körbasklass: Anpassade utförare ärver från Executor<TInput> och åsidosätter HandleAsync metoden.
  • Aktivera token: Använd TurnToken för att signalera agenter för att börja bearbeta köade meddelanden.
  • Körning av direktuppspelning: Använd StreamAsync() för att hämta realtidsuppdateringar när arbetsflödet fortskrider.

Fullständig implementering

Fullständig implementering av det här samtidiga arbetsflödet med AI-agenter finns i exemplet Concurrent/Program.cs i Agent Framework-lagringsplatsen.

I Python-implementeringen skapar du ett samtidigt arbetsflöde som bearbetar data via flera parallella utförare och aggregerar resultat av olika typer. Det här exemplet visar hur ramverket hanterar blandade resultattyper från samtidig bearbetning.

Vad du kommer att bygga

Du skapar ett arbetsflöde som:

  • Tar en lista med tal som indata
  • Distribuerar listan till två parallella utförare (en beräknande medelvärde, en beräkningssumma)
  • Aggregerar de olika resultattyperna (flyttal och int) till ett slutligt utdata
  • Visar hur ramverket hanterar olika resultattyper från samtidiga exekutorer

Begrepp som omfattas

Förutsättningar

  • Python 3.10 eller senare
  • Agent Framework Core har installerats: pip install agent-framework-core --pre

Steg 1: Importera nödvändiga beroenden

Börja med att importera nödvändiga komponenter från Agent Framework:

import asyncio
import random

from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never

Steg 2: Skapa Dispatcher Exekveraren

Den dispatcher ansvarar för att distribuera de initiala indata till flera parallella utförare.

class Dispatcher(Executor):
    """
    The sole purpose of this executor is to dispatch the input of the workflow to
    other executors.
    """

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
        if not numbers:
            raise RuntimeError("Input must be a valid list of integers.")

        await ctx.send_message(numbers)

Steg 3: Skapa exekutorer för parallell bearbetning

Skapa två köre som bearbetar data samtidigt:

class Average(Executor):
    """Calculate the average of a list of integers."""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
        average: float = sum(numbers) / len(numbers)
        await ctx.send_message(average)


class Sum(Executor):
    """Calculate the sum of a list of integers."""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
        total: int = sum(numbers)
        await ctx.send_message(total)

Steg 4: Skapa Aggregator-exekutorn

Aggregatorn samlar in resultat från parallella utförare och ger de slutliga utdata:

class Aggregator(Executor):
    """Aggregate the results from the different tasks and yield the final output."""

    @handler
    async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
        """Receive the results from the source executors.

        The framework will automatically collect messages from the source executors
        and deliver them as a list.

        Args:
            results (list[int | float]): execution results from upstream executors.
                The type annotation must be a list of union types that the upstream
                executors will produce.
            ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output.
        """
        await ctx.yield_output(results)

Steg 5: Skapa arbetsflödet

Anslut exekverare med hjälp av fan-out och fan-in kantmönster.

async def main() -> None:
    # 1) Create the executors
    dispatcher = Dispatcher(id="dispatcher")
    average = Average(id="average")
    summation = Sum(id="summation")
    aggregator = Aggregator(id="aggregator")

    # 2) Build a simple fan out and fan in workflow
    workflow = (
        WorkflowBuilder()
        .set_start_executor(dispatcher)
        .add_fan_out_edges(dispatcher, [average, summation])
        .add_fan_in_edges([average, summation], aggregator)
        .build()
    )

Steg 6: Kör arbetsflödet

Kör arbetsflödet med exempeldata och samla in utdata:

    # 3) Run the workflow
    output: list[int | float] | None = None
    async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
        if isinstance(event, WorkflowOutputEvent):
            output = event.data

    if output is not None:
        print(output)

if __name__ == "__main__":
    asyncio.run(main())

Så här fungerar det

  1. Utdatagren: Den Dispatcher tar emot indatalistan och skickar den samtidigt till både Average- och Sum-exekverare
  2. Parallell bearbetning: Båda körarna bearbetar samma indata samtidigt och producerar olika resultattyper:
    • Average executor ger ett float resultat
    • Sum executor genererar ett int resultat
  3. Fläkt-In: Aggregator tar emot resultat från båda verkställarna som en lista som innehåller båda typerna
  4. Typhantering: Ramverket hanterar automatiskt de olika resultattyperna med hjälp av unionstyper (int | float)

Viktiga begrepp

  • Fan-Out Ändpunkter: Använd add_fan_out_edges() för att skicka samma indata till flera exekutorer
  • Fan-In Anslutningar: Använd add_fan_in_edges() för att samla in resultat från flera källexekutorer
  • Union-typer: Hantera olika resultattyper med hjälp av typanteckningar som list[int | float]
  • Samtidig körning: Flera köre bearbetar data samtidigt, vilket förbättrar prestandan

Fullständig implementering

Den fullständiga fungerande implementeringen av det här samtidiga arbetsflödet finns i exempelprogrammet aggregate_results_of_different_types.py i Agent Framework-repositoriet.

Nästa steg