Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Ce didacticiel montre comment créer un flux de travail simultané à l’aide d’Agent Framework. Vous apprendrez à implémenter des modèles de fan-out et fan-in qui permettent le traitement parallèle, permettant à plusieurs exécuteurs ou agents de fonctionner simultanément puis d’agréger leurs résultats.
Ce que vous allez construire
Vous allez créer un flux de travail qui :
- Prend une question comme entrée (par exemple, « Qu’est-ce que la température ? »)
- Envoie la même question à deux agents d’IA experts simultanément (physicien et chimiste)
- Collecte et combine les réponses des deux agents dans une seule sortie
- Montre l’exécution simultanée avec des agents IA à l’aide de modèles fan-out/fan-in
Concepts abordés
Prerequisites
- Sdk .NET 8.0 ou version ultérieure
- Point de terminaison et déploiement du service Azure OpenAI configurés
- Azure CLI installé et authentifié (pour l’authentification des informations d’identification Azure)
- Une nouvelle application console
Étape 1 : Installer des packages NuGet
Tout d’abord, installez les packages requis pour votre projet .NET :
dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease
Étape 2 : Configurer les dépendances et Azure OpenAI
Commencez par configurer votre projet avec les packages NuGet requis et le client Azure OpenAI :
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName).AsIChatClient();
Étape 3 : Créer des agents IA experts
Créez deux agents IA spécialisés qui fourniront des perspectives d’experts :
// Create the AI agents with specialized expertise
ChatClientAgent physicist = new(
chatClient,
name: "Physicist",
instructions: "You are an expert in physics. You answer questions from a physics perspective."
);
ChatClientAgent chemist = new(
chatClient,
name: "Chemist",
instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
);
Étape 4 : Créer l’exécuteur de démarrage
Créez un exécuteur qui lance le traitement simultané en envoyant une entrée à plusieurs agents :
var startExecutor = new ConcurrentStartExecutor();
Implémentation ConcurrentStartExecutor :
/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor")
{
/// <summary>
/// Starts the concurrent processing by sending messages to the agents.
/// </summary>
/// <param name="message">The user message to process</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Broadcast the message to all connected agents. Receiving agents will queue
// the message but will not start processing until they receive a turn token.
await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);
// Broadcast the turn token to kick off the agents.
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
}
}
Étape 5 : Créer l’exécuteur d’agrégation
Créez un exécuteur qui collecte et combine les réponses de plusieurs agents :
var aggregationExecutor = new ConcurrentAggregationExecutor();
Implémentation ConcurrentAggregationExecutor :
/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
private readonly List<ChatMessage> _messages = [];
/// <summary>
/// Handles incoming messages from the agents and aggregates their responses.
/// </summary>
/// <param name="message">The message from the agent</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._messages.AddRange(message);
if (this._messages.Count == 2)
{
var formattedMessages = string.Join(Environment.NewLine,
this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
await context.YieldOutputAsync(formattedMessages, cancellationToken);
}
}
}
Étape 6 : Générer le flux de travail
Connectez les exécuteurs et les agents à l’aide de modèles de périphérie fan-out et fan-in :
// Build the workflow by adding executors and connecting them
var workflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, targets: [physicist, chemist])
.AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
.WithOutputFrom(aggregationExecutor)
.Build();
Étape 7 : Exécuter le flux de travail
Exécutez le flux de travail et capturez la sortie de streaming :
// Execute the workflow in streaming mode
await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, "What is temperature?");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent output)
{
Console.WriteLine($"Workflow completed with results:\n{output.Data}");
}
}
}
}
Fonctionnement
-
Fan-Out : Le
ConcurrentStartExecutorreçoit la question d’entrée et le fan-out l’envoie simultanément à l'agent-physicien et à l'agent-chimiste. - Traitement parallèle : les deux agents IA traitent simultanément la même question, chacun fournissant leur perspective d’expert.
-
Fan-In : Le système
ConcurrentAggregationExecutorcollecte lesChatMessageréponses des deux agents. - Agrégation : une fois les deux réponses reçues, l’agrégateur les combine dans une sortie mise en forme.
Concepts clés
-
Fan-Out Edges : utilisez
AddFanOutEdge()pour distribuer la même entrée à plusieurs exécuteurs ou agents. -
Fan-In Edges : Utilisez
AddFanInEdge()pour collecter les résultats de plusieurs exécuteurs sources. - Intégration de l’agent IA : les agents IA peuvent être utilisés directement en tant qu’exécuteurs dans les flux de travail.
-
Classe de base d’exécuteur : les exécuteurs personnalisés héritent de
Executor<TInput>et remplacent la méthodeHandleAsync. -
Activer les jetons : Utilisez
TurnTokenpour signaler aux agents de commencer à traiter les messages mis en file d’attente. -
Exécution de streaming : permet
StreamAsync()d’obtenir des mises à jour en temps réel à mesure que le flux de travail progresse.
Implémentation complète
Pour obtenir l’implémentation complète de ce flux de travail simultané avec des agents IA, consultez l’exemple concurrent/Program.cs dans le référentiel Agent Framework.
Dans l’implémentation de Python, vous allez créer un flux de travail simultané qui traite les données via plusieurs exécuteurs parallèles et agrège les résultats de différents types. Cet exemple montre comment l’infrastructure gère les types de résultats mixtes à partir du traitement simultané.
Ce que vous allez construire
Vous allez créer un flux de travail qui :
- Prend une liste de nombres comme entrée
- Distribue la liste à deux exécuteurs parallèles (un calcul moyen, une somme de calcul)
- Agrège les différents types de résultats (float et int) dans une sortie finale
- Montre comment l’infrastructure gère différents types de résultats des exécuteurs simultanés
Concepts abordés
Prerequisites
- Python 3.10 ou version ultérieure
- Agent Framework Core installé :
pip install agent-framework-core --pre
Étape 1 : Importer les dépendances requises
Commencez par importer les composants nécessaires à partir d’Agent Framework :
import asyncio
import random
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never
Étape 2 : Créer le Dispatcher Executor
Le répartiteur est chargé de distribuer l’entrée initiale à plusieurs exécuteurs parallèles :
class Dispatcher(Executor):
"""
The sole purpose of this executor is to dispatch the input of the workflow to
other executors.
"""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
if not numbers:
raise RuntimeError("Input must be a valid list of integers.")
await ctx.send_message(numbers)
Étape 3 : Créer des exécuteurs de traitement parallèle
Créez deux exécuteurs qui traiteront les données simultanément :
class Average(Executor):
"""Calculate the average of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
average: float = sum(numbers) / len(numbers)
await ctx.send_message(average)
class Sum(Executor):
"""Calculate the sum of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
total: int = sum(numbers)
await ctx.send_message(total)
Étape 4 : Créer l’exécuteur Aggregator
L’agrégateur collecte les résultats des exécuteurs parallèles et génère la sortie finale :
class Aggregator(Executor):
"""Aggregate the results from the different tasks and yield the final output."""
@handler
async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
"""Receive the results from the source executors.
The framework will automatically collect messages from the source executors
and deliver them as a list.
Args:
results (list[int | float]): execution results from upstream executors.
The type annotation must be a list of union types that the upstream
executors will produce.
ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output.
"""
await ctx.yield_output(results)
Étape 5 : Générer le flux de travail
Connectez les exécuteurs à l’aide de modèles de périphérie fan-out et fan-in :
async def main() -> None:
# 1) Create the executors
dispatcher = Dispatcher(id="dispatcher")
average = Average(id="average")
summation = Sum(id="summation")
aggregator = Aggregator(id="aggregator")
# 2) Build a simple fan out and fan in workflow
workflow = (
WorkflowBuilder()
.set_start_executor(dispatcher)
.add_fan_out_edges(dispatcher, [average, summation])
.add_fan_in_edges([average, summation], aggregator)
.build()
)
Étape 6 : Exécuter le flux de travail
Exécutez le workflow avec des exemples de données et capturez la sortie :
# 3) Run the workflow
output: list[int | float] | None = None
async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
if isinstance(event, WorkflowOutputEvent):
output = event.data
if output is not None:
print(output)
if __name__ == "__main__":
asyncio.run(main())
Fonctionnement
-
Fan-Out :
Dispatcherreçoit la liste d’entrée et l’envoie simultanément aux exécuteursAverageetSum -
Traitement parallèle : les deux exécuteurs traitent simultanément la même entrée, produisant différents types de résultats :
-
Averageexécuteur produit unfloatrésultat -
Sumexécuteur produit unintrésultat
-
-
Fan-In : Les
Aggregatorrésultats des deux exécuteurs sont reçus sous la forme d’une liste contenant les deux types -
Gestion des types : l’infrastructure gère automatiquement les différents types de résultats à l’aide de types union (
int | float)
Concepts clés
-
Fan-Out Edges : utilisez
add_fan_out_edges()pour envoyer la même entrée à plusieurs exécuteurs -
Fan-In Edges : Utilisez
add_fan_in_edges()pour collecter des résultats à partir de plusieurs exécuteurs sources -
Types d’union : gérer différents types de résultats à l’aide d’annotations de type telles que
list[int | float] - Exécution simultanée : plusieurs exécuteurs traitent simultanément les données, ce qui améliore les performances
Implémentation complète
Pour obtenir l’implémentation de travail complète de ce flux de travail simultané, consultez l’exemple aggregate_results_of_different_types.py dans le référentiel Agent Framework.