Orchestrations de flux de travail Microsoft Agent Framework - Magentic

L’orchestration Magentic est conçue sur la base du système Magentic-One inventé par AutoGen. Il s’agit d’un modèle multi-agent à usage général flexible conçu pour les tâches complexes et ouvertes qui nécessitent une collaboration dynamique. Dans ce modèle, un responsable Magentic dédié coordonne une équipe d’agents spécialisés, en sélectionnant l’agent qui doit agir ensuite en fonction du contexte en évolution, de la progression des tâches et des fonctionnalités de l’agent.

Le gestionnaire Magentic gère un contexte partagé, suit la progression et adapte le flux de travail en temps réel. Cela permet au système de décomposer les problèmes complexes, de déléguer des tâches subordonnées et d’affiner de manière itérative des solutions via la collaboration de l’agent. L’orchestration est particulièrement adaptée aux scénarios où le chemin de la solution n’est pas connu à l’avance et peut nécessiter plusieurs séries de raisonnement, de recherche et de calcul.

Orchestration magentique

Conseil / Astuce

L’orchestration Magentic a la même architecture que le modèle d'orchestration Group Chat, avec un gestionnaire très puissant qui utilise la planification pour coordonner la collaboration des agents. Si votre scénario nécessite une coordination plus simple sans planification complexe, envisagez plutôt d’utiliser le modèle de conversation de groupe.

Note

Dans le document Magentic-One , 4 agents hautement spécialisés sont conçus pour résoudre un ensemble très spécifique de tâches. Dans l’orchestration Magentic dans Agent Framework, vous pouvez définir vos propres agents spécialisés pour répondre à vos besoins d’application spécifiques. Toutefois, il n’a pas été testé à quel point l’orchestration Magentic fonctionnera en dehors de la conception d’origine de Magentic-One.

Ce que vous allez apprendre

  • Comment configurer un responsable Magentic pour coordonner plusieurs agents spécialisés
  • Comment gérer les événements de diffusion en continu avec WorkflowEvent
  • Comment implémenter la révision du plan human-in-the-loop
  • Comment suivre la collaboration et la progression de l’agent via des tâches complexes

Définir vos agents spécialisés

Dans l’orchestration Magentic, vous définissez des agents spécialisés que le gestionnaire peut sélectionner dynamiquement en fonction des exigences de tâche :

#pragma warning disable MAAIW001  // Magentic types are experimental
#pragma warning disable OPENAI001 // HostedCodeInterpreterTool is experimental

using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Specialized.Magentic;
using Microsoft.Extensions.AI;

string endpoint = Environment.GetEnvironmentVariable("AZURE_AI_PROJECT_ENDPOINT")
    ?? throw new InvalidOperationException("AZURE_AI_PROJECT_ENDPOINT is not set.");
string deploymentName = Environment.GetEnvironmentVariable("AZURE_AI_MODEL_DEPLOYMENT_NAME") ?? "gpt-5.4-mini";

AIProjectClient projectClient = new(new Uri(endpoint), new DefaultAzureCredential());

AIAgent researcherAgent = projectClient.AsAIAgent(
    deploymentName,
    name: "ResearcherAgent",
    description: "Specialist in research and information gathering.",
    instructions: "You are a researcher. Find relevant information without doing additional computation or quantitative analysis.");

AIAgent coderAgent = projectClient.AsAIAgent(
    deploymentName,
    name: "CoderAgent",
    description: "A helpful assistant that writes and executes code to analyze data.",
    instructions: "You solve quantitative questions by writing and running code. Show the analysis and the computation process clearly.",
    tools: [new HostedCodeInterpreterTool()]);

AIAgent managerAgent = projectClient.AsAIAgent(
    deploymentName,
    name: "MagenticManager",
    description: "Orchestrator that coordinates the research and coding workflow.",
    instructions: "You coordinate the team to complete complex tasks efficiently.");
import os

from agent_framework import Agent
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential

client = FoundryChatClient(
    project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
    model=os.environ["FOUNDRY_MODEL"],
    credential=AzureCliCredential(),
)

researcher_agent = Agent(
    name="ResearcherAgent",
    description="Specialist in research and information gathering",
    instructions=(
        "You are a Researcher. You find information without additional computation or quantitative analysis."
    ),
    client=client,
)

coder_agent = Agent(
    name="CoderAgent",
    description="A helpful assistant that writes and executes code to process and analyze data.",
    instructions="You solve questions using code. Please provide detailed analysis and computation process.",
    client=client,
    tools=client.get_code_interpreter_tool(),
)

# Create a manager agent for orchestration
manager_agent = Agent(
    name="MagenticManager",
    description="Orchestrator that coordinates the research and coding workflow",
    instructions="You coordinate a team to complete complex tasks efficiently.",
    client=client,
)

Générer le flux de travail Magentic

Utilisez le générateur de flux de travail Magentic pour configurer le flux de travail avec un responsable et un ensemble de participants. Le constructeur expose également les limites de la boucle interne (nombre maximal de cycles de coordination, nombre maximal de décrochages consécutifs avant la replanification, nombre maximal de réinitialisations du plan) ainsi qu’un indicateur booléen pour l’examen du plan avec intervention humaine.

Workflow workflow = new MagenticWorkflowBuilder(managerAgent)
    .AddParticipants([researcherAgent, coderAgent])
    .WithName("Magentic Orchestration Workflow")
    .WithDescription("Coordinates a researcher and coder to solve a complex analytical task.")
    .RequirePlanSignoff(false)
    .WithMaxRounds(10)
    .WithMaxStalls(3)
    .WithMaxResets(2)
    .Build();
from agent_framework.orchestrations import MagenticBuilder

workflow = MagenticBuilder(
    participants=[researcher_agent, coder_agent],
    intermediate_output_from=[researcher_agent, coder_agent],
    manager_agent=manager_agent,
    max_round_count=10,
    max_stall_count=3,
    max_reset_count=2,
).build()

Conseil / Astuce

Un gestionnaire standard est implémenté basé sur la conception MagneticOne, avec des invites fixes extraites de l'article d'origine. Vous pouvez personnaliser le comportement du responsable en passant vos propres incitations via les paramètres du MagenticBuilder constructeur. Pour personnaliser davantage le gestionnaire, vous pouvez également implémenter votre propre responsable en subclassant la MagenticManagerBase classe.

Sorties intermédiaires

Note

Cette section s’applique actuellement uniquement au pivot Python.

Le passage de intermediate_output_from=[...] à MagenticBuilder désigne des participants spécifiques comme sources de sortie intermédiaire. Leurs appels à yield_output génèrent des événements "intermediate", tandis que la réponse synthétisée finale du gestionnaire reste un événement "output" (terminal). Sans ce paramètre (valeur par défaut), seules les surfaces du terminal du gestionnaire AgentResponse sont exposées.

Cela est particulièrement utile pour les flux de travail Magentic, car :

  • Les tâches sont souvent longues avec de nombreux cycles de collaboration entre agents.
  • Vous pouvez afficher la contribution de chaque agent en temps réel à mesure que le flux de travail progresse en mode streaming
  • Il fournit une visibilité sur les étapes de raisonnement intermédiaires du flux de travail

Exécuter le flux de travail avec le streaming d’événements

Exécutez une tâche complexe et gérez les événements pour les mises à jour de sortie et d’orchestration de streaming. La sortie du flux de travail terminal contient la réponse finale synthétisée du gestionnaire.

const string TaskPrompt =
    "I am preparing a report on the energy efficiency of different machine learning model architectures. " +
    "Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 " +
    "on standard datasets (for example, ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). " +
    "Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 " +
    "VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model " +
    "per task type (image classification, text classification, and text generation).";

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(
    workflow,
    new List<ChatMessage> { new(ChatRole.User, TaskPrompt) });

await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

string? lastResponseId = null;
WorkflowOutputEvent? finalOutput = null;

await foreach (WorkflowEvent workflowEvent in run.WatchStreamAsync())
{
    switch (workflowEvent)
    {
        case AgentResponseUpdateEvent updateEvent:
            // Stream per-participant deltas. Group by ResponseId / MessageId / ExecutorId so
            // each new contiguous response prints its executor header once.
            string responseId = updateEvent.Update.ResponseId
                ?? updateEvent.Update.MessageId
                ?? updateEvent.ExecutorId;
            if (!string.Equals(responseId, lastResponseId, StringComparison.Ordinal))
            {
                if (lastResponseId is not null)
                {
                    Console.WriteLine();
                }
                Console.Write($"- {updateEvent.ExecutorId}: ");
                lastResponseId = responseId;
            }
            Console.Write(updateEvent.Update.Text);
            break;

        case MagenticPlanCreatedEvent planCreated:
            Console.WriteLine($"\n[Magentic Initial Plan]\n{planCreated.FullTaskLedger.Text}");
            break;

        case MagenticReplannedEvent replanned:
            Console.WriteLine($"\n[Magentic Replanned]\n{replanned.FullTaskLedger.Text}");
            break;

        case MagenticProgressLedgerUpdatedEvent progressUpdated:
            MagenticProgressLedger ledger = progressUpdated.ProgressLedger;
            Console.WriteLine(
                $"\n[Magentic Progress Ledger] satisfied={ledger.IsRequestSatisfied}, " +
                $"inLoop={ledger.IsInLoop}, progressing={ledger.IsProgressBeingMade}, " +
                $"nextSpeaker={ledger.NextSpeaker}, instruction={ledger.InstructionOrQuestion}");
            break;

        case WorkflowOutputEvent outputEvent when outputEvent.Is<List<ChatMessage>>():
            finalOutput = outputEvent;
            break;

        case WorkflowErrorEvent workflowError:
            Console.Error.WriteLine(workflowError.Exception?.ToString() ?? "Unknown workflow error.");
            break;

        case ExecutorFailedEvent executorFailed:
            Console.Error.WriteLine(
                $"Executor '{executorFailed.ExecutorId}' failed: " +
                (executorFailed.Data?.ToString() ?? "unknown error"));
            break;
    }
}

if (finalOutput?.As<List<ChatMessage>>() is { } transcript)
{
    Console.WriteLine("\n\n=== Final Conversation Transcript ===\n");
    foreach (ChatMessage message in transcript)
    {
        Console.WriteLine($"{message.AuthorName ?? message.Role.ToString()}: {message.Text}");
    }
}
import json
import asyncio
from typing import cast

from agent_framework import (
    AgentResponse,
    AgentResponseUpdate,
    Message,
    WorkflowEvent,
)
from agent_framework.orchestrations import MagenticProgressLedger

task = (
    "I am preparing a report on the energy efficiency of different machine learning model architectures. "
    "Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
    "on standard datasets (for example, ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
    "Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
    "VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
    "per task type (image classification, text classification, and text generation)."
)

# Keep track of the last executor to format output nicely in streaming mode
last_message_id: str | None = None
final_response: AgentResponse | None = None
async for event in workflow.run(task, stream=True):
    if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
        message_id = event.data.message_id
        if message_id != last_message_id:
            if last_message_id is not None:
                print("\n")
            print(f"- {event.executor_id}:", end=" ", flush=True)
            last_message_id = message_id
        print(event.data, end="", flush=True)

    elif event.type == "magentic_orchestrator":
        print(f"\n[Magentic Orchestrator Event] Type: {event.data.event_type.name}")
        if isinstance(event.data.content, Message):
            print(f"Please review the plan:\n{event.data.content.text}")
        elif isinstance(event.data.content, MagenticProgressLedger):
            print(f"Please review progress ledger:\n{json.dumps(event.data.content.to_dict(), indent=2)}")
        else:
            print(f"Unknown data type in MagenticOrchestratorEvent: {type(event.data.content)}")

        # Block to allow user to read the plan/progress before continuing
        # Note: this is for demonstration only and is not the recommended way to handle human interaction.
        # Please refer to `with_plan_review` for proper human interaction during planning phases.
        await asyncio.get_event_loop().run_in_executor(None, input, "Press Enter to continue...")

    elif event.type == "output" and isinstance(event.data, AgentResponse):
        final_response = event.data

# The output of the Magentic workflow is an AgentResponse with the manager's final answer
if final_response:
    output = final_response.messages[-1].text if final_response.messages else ""
    print(output)

Magentic met en évidence trois événements de l’orchestrateur qui marquent les jalons de planification et de progression :

  • Plan initial créé : le responsable a produit le plan de tâche initial.
  • Replanifié — un nouveau plan a été produit, soit en raison d’une détection de blocage, soit parce qu’un humain a révisé le plan lors de son examen.
  • Registre de progression mis à jour : émis une fois par cycle de coordination ; porte le registre de progression actuel (si la demande est satisfaite, si l’équipe se trouve dans une boucle, si la progression est effectuée, le prochain orateur et l’instruction à envoyer).

Dans Python celles-ci sont portées à l’intérieur d’un seul MagenticOrchestratorEvent dont event_type enum distingue PLAN_CREATED, REPLANNED et PROGRESS_LEDGER_UPDATED. Dans .NET ils sont émis sous la forme de trois types distincts ( MagenticPlanCreatedEvent, MagenticReplannedEvent et MagenticProgressLedgerUpdatedEvent), tous dérivés de MagenticOrchestratorEvent.

Révision de plan avancée avec intervention humaine

Activez human-in-the-loop (HITL) pour permettre aux utilisateurs d’examiner et d’approuver le plan proposé par le responsable avant l’exécution. Cela est utile pour s’assurer que le plan s’aligne sur les attentes et les exigences des utilisateurs.

Il existe deux options pour la révision du plan :

  1. Révision : l’utilisateur fournit des commentaires pour réviser le plan, ce qui déclenche le replan du responsable en fonction des commentaires.
  2. Approuver : l’utilisateur approuve le plan as-is, ce qui permet au flux de travail de continuer.

Activez la révision du plan lors de la création du flux de travail Magentic. Les valeurs par défaut diffèrent entre les langues : dans Python, la révision du plan est off par défaut (enable_plan_review=False) et vous optez explicitement ; dans .NET, la révision du plan est on par défaut (RequirePlanSignoff par défaut est true), et l’exemple de base précédent dans cette page a été désactivé afin qu’il puisse s’exécuter de bout en bout sans interaction. Le code ci-dessous montre comment s’inscrire et gérer les demandes d’avis qui en résultent.

Les pauses lors de la révision du plan apparaissent via le mécanisme de requête/réponse du flux de travail avec les données MagenticPlanReviewRequest. Vous gérez ces éléments dans le flux d’événements et reprenez le flux de travail avec un MagenticPlanReviewResponse une fois qu’une personne a approuvé ou révisé le plan.

Conseil / Astuce

En savoir plus sur les demandes et les réponses dans le guide demandes et réponses .

Workflow workflow = new MagenticWorkflowBuilder(managerAgent)
    .AddParticipants([researcherAgent, coderAgent])
    .RequirePlanSignoff(true)
    .WithMaxRounds(10)
    .WithMaxStalls(1)
    .WithMaxResets(2)
    .Build();

CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
InProcessExecutionEnvironment environment = ExecutionEnvironment.InProcess_Lockstep
    .ToWorkflowExecutionEnvironment()
    .WithCheckpointing(checkpointManager);

await using StreamingRun run = await environment.OpenStreamingAsync(workflow);
await run.TrySendMessageAsync(new List<ChatMessage> { new(ChatRole.User, TaskPrompt) });
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

ExternalRequest? pendingRequest = null;
CheckpointInfo? lastCheckpoint = null;
WorkflowOutputEvent? finalOutput = null;

async Task<WorkflowOutputEvent?> DrainAsync(StreamingRun activeRun)
{
    WorkflowOutputEvent? output = null;
    await foreach (WorkflowEvent evt in activeRun.WatchStreamAsync(blockOnPendingRequest: false))
    {
        switch (evt)
        {
            case AgentResponseUpdateEvent updateEvent:
                Console.Write(updateEvent.Update.Text);
                break;
            case RequestInfoEvent requestInfo
                when requestInfo.Request.Data.As<MagenticPlanReviewRequest>() is not null:
                pendingRequest = requestInfo.Request;
                break;
            case SuperStepCompletedEvent stepCompleted:
                lastCheckpoint = stepCompleted.CompletionInfo?.Checkpoint ?? lastCheckpoint;
                break;
            case WorkflowOutputEvent outputEvent when outputEvent.Is<List<ChatMessage>>():
                output = outputEvent;
                break;
        }
    }
    return output;
}

finalOutput = await DrainAsync(run);

// Loop until the workflow finishes or the user accepts a plan that runs to completion.
while (finalOutput is null && pendingRequest is not null)
{
    MagenticPlanReviewRequest reviewRequest = pendingRequest.Data.As<MagenticPlanReviewRequest>()!;

    Console.WriteLine("\n\n[Magentic Plan Review Request]");
    if (reviewRequest.CurrentProgress is { } progress)
    {
        Console.WriteLine(
            $"Current progress: satisfied={progress.IsRequestSatisfied}, " +
            $"inLoop={progress.IsInLoop}, progressing={progress.IsProgressBeingMade}");
    }
    if (reviewRequest.IsStalled)
    {
        Console.WriteLine("(Replan triggered by stall detection.)");
    }
    Console.WriteLine($"Proposed plan:\n{reviewRequest.Plan.Text}\n");
    Console.Write("Press Enter to approve, or type feedback to request a revision: ");

    string reply = Console.ReadLine() ?? string.Empty;
    MagenticPlanReviewResponse reviewResponse = string.IsNullOrWhiteSpace(reply)
        ? reviewRequest.Approve()
        : reviewRequest.Revise(reply);

    ExternalResponse response = pendingRequest.CreateResponse(reviewResponse);
    pendingRequest = null;

    await using StreamingRun resumed = await environment.ResumeStreamingAsync(workflow, lastCheckpoint!);
    await resumed.SendResponseAsync(response);
    finalOutput = await DrainAsync(resumed);
}

if (finalOutput?.As<List<ChatMessage>>() is { } transcript)
{
    Console.WriteLine("\n\n=== Final Conversation Transcript ===\n");
    foreach (ChatMessage message in transcript)
    {
        Console.WriteLine($"{message.AuthorName ?? message.Role.ToString()}: {message.Text}");
    }
}
import json
import asyncio
from typing import cast

from agent_framework import (
    AgentResponseUpdate,
    Agent,
    Message,
    WorkflowEvent,
)
from agent_framework.orchestrations import (
    MagenticBuilder,
    MagenticPlanReviewRequest,
    MagenticPlanReviewResponse,
)

workflow = MagenticBuilder(
    participants=[researcher_agent, coder_agent],
    intermediate_output_from=[researcher_agent, coder_agent],
    enable_plan_review=True,
    manager_agent=manager_agent,
    max_round_count=10,
    max_stall_count=1,
    max_reset_count=2,
).build()

pending_request: WorkflowEvent | None = None
pending_responses: dict[str, MagenticPlanReviewResponse] | None = None
final_response: AgentResponse | None = None

while not final_response:
    if pending_responses is not None:
        stream = workflow.run(stream=True, responses=pending_responses)
    else:
        stream = workflow.run(task, stream=True)

    last_message_id: str | None = None
    async for event in stream:
        if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
            message_id = event.data.message_id
            if message_id != last_message_id:
                if last_message_id is not None:
                    print("\n")
                print(f"- {event.executor_id}:", end=" ", flush=True)
                last_message_id = message_id
            print(event.data, end="", flush=True)

        elif event.type == "request_info" and event.request_type is MagenticPlanReviewRequest:
            pending_request = event

        elif event.type == "output" and isinstance(event.data, AgentResponse):
            final_response = event.data

    pending_responses = None

    # Handle plan review request if any
    if pending_request is not None:
        event_data = cast(MagenticPlanReviewRequest, pending_request.data)

        print("\n\n[Magentic Plan Review Request]")
        if event_data.current_progress is not None:
            print("Current Progress Ledger:")
            print(json.dumps(event_data.current_progress.to_dict(), indent=2))
            print()
        print(f"Proposed Plan:\n{event_data.plan.text}\n")
        print("Please provide your feedback (press Enter to approve):")

        reply = await asyncio.get_event_loop().run_in_executor(None, input, "> ")
        if reply.strip() == "":
            print("Plan approved.\n")
            pending_responses = {pending_request.request_id: event_data.approve()}
        else:
            print("Plan revised by human.\n")
            pending_responses = {pending_request.request_id: event_data.revise(reply)}
        pending_request = None

Un MagenticPlanReviewRequest contient le plan proposé, le journal de progression actuel (null / None lors de l’examen initial et renseigné lors des replanifications déclenchées par un blocage), ainsi qu’un indicateur précisant si la replanification a été déclenchée par la détection d’un blocage. Générez la réponse en appelant approve() soit pour accepter le plan as-is, soit revise(...) avec des commentaires pour demander au responsable de planifier à nouveau.

Concepts clés

  • Coordination dynamique : le gestionnaire Magentic sélectionne dynamiquement l’agent qui doit agir ensuite en fonction du contexte en constante évolution.
  • Terminal Output : la sortie du flux de travail terminal porte la réponse finale synthétisée du responsable (un AgentResponse dans Python ; un WorkflowOutputEvent avec une charge utile List<ChatMessage> dans .NET).
  • Orchestrator Events : Les jalons créés par le plan, replanifiés et mis à jour dans le registre de progression sont exposés via MagenticOrchestratorEvent (un événement avec un enum event_type en Python ; trois types dérivés en .NET). Les deltas de streaming par participant sont transmis via les événements standard de mise à jour de la réponse de l’agent du framework.
  • Affinement itératif : le système peut décomposer des problèmes complexes et affiner de manière itérative des solutions par le biais de plusieurs arrondis.
  • Suivi de la progression et détection des blocages : le journal de progression indique si la demande est satisfaite, si l’équipe tourne en rond et si des progrès sont réalisés. Des itérations consécutives sans progression incrémentent un compteur de blocage, et le dépassement du maximum configuré déclenche une réinitialisation automatique et une replanification.
  • Collaboration flexible : les agents peuvent être appelés plusieurs fois dans n’importe quel ordre déterminé par le responsable.
  • Supervision humaine : révision facultative du plan avec intervention humaine via MagenticPlanReviewRequest / MagenticPlanReviewResponse.
  • Intermediate Outputs (Python uniquement, pour l'instant) : désigner les participants dont les appels yield_output doivent apparaître comme des événements "intermediate" en même temps que la sortie terminale du responsable.

Flux d'exécution du processus de travail

L’orchestration Magentic suit ce modèle d’exécution :

  1. Phase de planification : le responsable analyse la tâche et crée un plan initial
  2. Révision facultative du plan : si elle est activée, les humains peuvent examiner et approuver/modifier le plan
  3. Sélection de l’agent : le responsable sélectionne l’agent le plus approprié pour chaque sous-tâche
  4. Exécution : l’agent sélectionné exécute sa partie de la tâche
  5. Évaluation de la progression : le responsable évalue la progression et met à jour le plan
  6. Détection du blocage : si la progression se bloque, replanification automatique avec option de révision humaine
  7. Itération : les étapes 3 à 6 se répètent jusqu’à ce que la tâche soit terminée ou que les limites soient atteintes
  8. Synthèse finale : le gestionnaire synthétise toutes les sorties de l’agent dans un résultat final

Exemple complet

Consultez des exemples complets dans le référentiel d’exemples Agent Framework.

Consultez des exemples complets dans le référentiel d’exemples Agent Framework.

Prochaines étapes