Partager via


Gérer les demandes et les réponses dans les flux de travail

Ce tutoriel montre comment gérer les demandes et les réponses dans les flux de travail à l’aide de flux de travail Agent Framework. Vous allez apprendre à créer des flux de travail interactifs qui peuvent suspendre l’exécution pour demander une entrée à partir de sources externes (comme des humains ou d’autres systèmes), puis reprendre une fois qu’une réponse est fournie.

Concepts abordés

Dans .NET, les workflows human-in-the-loop utilisent RequestPort et gèrent les demandes externes pour suspendre l’exécution et collecter les entrées utilisateur. Ce modèle permet des flux de travail interactifs dans lesquels le système peut demander des informations à partir de sources externes pendant l’exécution.

Prerequisites

Installer les packages NuGet

Tout d’abord, installez les packages requis pour votre projet .NET :

dotnet add package Microsoft.Agents.AI.Workflows --prerelease

Composants clés

RequestPort et demandes externes

Un RequestPort agit comme un pont entre le flux de travail et les sources d’entrée externes. Lorsque le flux de travail a besoin d’une entrée, il génère une RequestInfoEvent opération gérée par votre application :

// Create a RequestPort for handling human input requests
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

Types de signaux

Définissez les types de signal pour communiquer différents types de requêtes :

/// <summary>
/// Signals used for communication between guesses and the JudgeExecutor.
/// </summary>
internal enum NumberSignal
{
    Init,     // Initial guess request
    Above,    // Previous guess was too high
    Below,    // Previous guess was too low
}

Exécuteur de flux de travail

Créez des exécuteurs qui traitent les entrées utilisateur et fournissent des commentaires :

/// <summary>
/// Executor that judges the guess and provides feedback.
/// </summary>
internal sealed class JudgeExecutor : Executor<int>("Judge")
{
    private readonly int _targetNumber;
    private int _tries;

    public JudgeExecutor(int targetNumber) : this()
    {
        _targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken)
    {
        _tries++;
        if (message == _targetNumber)
        {
            await context.YieldOutputAsync($"{_targetNumber} found in {_tries} tries!", cancellationToken)
                         .ConfigureAwait(false);
        }
        else if (message < _targetNumber)
        {
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken).ConfigureAwait(false);
        }
        else
        {
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken).ConfigureAwait(false);
        }
    }
}

Génération du flux de travail

Connectez le RequestPort et l’exécuteur dans une boucle de rétroaction :

internal static class WorkflowHelper
{
    internal static ValueTask<Workflow<NumberSignal>> GetWorkflowAsync()
    {
        // Create the executors
        RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
        JudgeExecutor judgeExecutor = new(42);

        // Build the workflow by connecting executors in a loop
        return new WorkflowBuilder(numberRequestPort)
            .AddEdge(numberRequestPort, judgeExecutor)
            .AddEdge(judgeExecutor, numberRequestPort)
            .WithOutputFrom(judgeExecutor)
            .BuildAsync<NumberSignal>();
    }
}

Exécution du flux de travail interactif

Gérer les demandes externes pendant l’exécution du flux de travail :

private static async Task Main()
{
    // Create the workflow
    var workflow = await WorkflowHelper.GetWorkflowAsync().ConfigureAwait(false);

    // Execute the workflow
    await using StreamingRun handle = await InProcessExecution.StreamAsync(workflow, NumberSignal.Init).ConfigureAwait(false);
    await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
    {
        switch (evt)
        {
            case RequestInfoEvent requestInputEvt:
                // Handle human input request from the workflow
                ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
                await handle.SendResponseAsync(response).ConfigureAwait(false);
                break;

            case WorkflowOutputEvent outputEvt:
                // The workflow has yielded output
                Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
                return;
        }
    }
}

Gestion des demandes

Traitez différents types de demandes d’entrée :

private static ExternalResponse HandleExternalRequest(ExternalRequest request)
{
    switch (request.DataAs<NumberSignal?>())
    {
        case NumberSignal.Init:
            int initialGuess = ReadIntegerFromConsole("Please provide your initial guess: ");
            return request.CreateResponse(initialGuess);
        case NumberSignal.Above:
            int lowerGuess = ReadIntegerFromConsole("You previously guessed too large. Please provide a new guess: ");
            return request.CreateResponse(lowerGuess);
        case NumberSignal.Below:
            int higherGuess = ReadIntegerFromConsole("You previously guessed too small. Please provide a new guess: ");
            return request.CreateResponse(higherGuess);
        default:
            throw new ArgumentException("Unexpected request type.");
    }
}

private static int ReadIntegerFromConsole(string prompt)
{
    while (true)
    {
        Console.Write(prompt);
        string? input = Console.ReadLine();
        if (int.TryParse(input, out int value))
        {
            return value;
        }
        Console.WriteLine("Invalid input. Please enter a valid integer.");
    }
}

Concepts d’implémentation

Flux de l'événement d'information de demande

  1. Exécution du flux de travail : Le flux de travail se déroule jusqu'à ce qu'il ait besoin d'une entrée externe
  2. Génération de demande : RequestPort génère une RequestInfoEvent requête avec les détails de la demande
  3. Gestion externe : votre application intercepte l’événement et collecte les entrées utilisateur
  4. Soumission de réponse : renvoyer un ExternalResponse pour poursuivre le workflow
  5. Reprise du flux de travail : le flux de travail continue de traiter avec l’entrée fournie

Cycle de vie du flux de travail

  • Exécution de streaming : permet StreamAsync de surveiller les événements en temps réel
  • Gestion des événements : processus RequestInfoEvent pour les demandes d’entrée et WorkflowOutputEvent pour l’achèvement
  • Coordination des réponses : mettre en correspondance les réponses aux demandes à l’aide du mécanisme de gestion des réponses du flux de travail

Flux d’implémentation

  1. Initialisation du flux de travail : le flux de travail commence par envoyer un NumberSignal.Init message à RequestPort.

  2. Génération de demande : RequestPort génère une RequestInfoEvent sollicitant une estimation initiale de l'utilisateur.

  3. Pause du flux de travail : le flux de travail suspend et attend une entrée externe pendant que l’application gère la requête.

  4. Réponse humaine : l’application externe collecte les entrées utilisateur et renvoie un ExternalResponse retour au flux de travail.

  5. Traitement et retour d'information : le JudgeExecutor traite l'estimation et soit complète le workflow, soit envoie un nouveau signal (au-dessus/en dessous) pour demander une autre estimation.

  6. Continuation de la boucle : le processus se répète jusqu’à ce que le nombre correct soit deviné.

Avantages du framework

  • Sécurité du type : la saisie forte garantit que les contrats de demande-réponse sont maintenus
  • Piloté par les événements : le système d’événements enrichi offre une visibilité sur l’exécution du flux de travail
  • Exécution suspendue : les flux de travail peuvent s’interrompre indéfiniment en attendant l’entrée externe
  • Gestion de l’état : l’état du flux de travail est conservé entre les cycles de pause-reprise
  • Intégration flexible : RequestPorts peut s’intégrer à n’importe quelle source d’entrée externe (interface utilisateur, API, console, etc.)

Exemple complet

Pour obtenir l’implémentation de travail complète, consultez l’exemple Human-in-the-Loop Basic.

Ce modèle permet de créer des applications interactives sophistiquées où les utilisateurs peuvent fournir des entrées aux points de décision clés au sein de flux de travail automatisés.

Ce que vous allez construire

Vous allez créer un flux de travail interactif pour un jeu de devinettes de nombres qui illustre les schémas de demande-réponse :

  • Agent IA qui fait des estimations intelligentes
  • Exécuteurs qui peuvent envoyer directement des requêtes à l’aide de l’API request_info
  • Un gestionnaire de tours qui coordonne les interactions entre l'agent et les interactions humaines à l’aide de @response_handler
  • Entrée/sortie de console interactive pour les commentaires en temps réel

Prerequisites

  • Python 3.10 ou version ultérieure
  • Le déploiement d'Azure OpenAI est configuré
  • Authentification Azure CLI configurée (az login)
  • Compréhension de base de la programmation asynchrone Python

Concepts clés

Les fonctionnalités de demandes et de réponses

Les exécuteurs ont des fonctionnalités de requêtes et réponses intégrées qui permettent des interactions human-in-the-loop :

  • Appelez ctx.request_info(request_data=request_data, response_type=response_type) pour envoyer des demandes
  • Pour gérer les réponses, utilisez le décorateur @response_handler
  • Définir des types de demande/réponse personnalisés sans exigences d’héritage

flux de requête-réponse

Les exécuteurs peuvent envoyer des requêtes directement à l’aide de ctx.request_info() et gérer les réponses à l’aide du décorateur @response_handler.

  1. Appels d’exécuteur ctx.request_info(request_data=request_data, response_type=response_type)
  2. Le workflow émet une RequestInfoEvent avec les données de la requête
  3. Le système externe (humain, API, etc.) traite la demande
  4. La réponse est renvoyée via send_responses_streaming()
  5. Le flux de travail reprend et remet la réponse à la méthode de l’exécuteur @response_handler

Configuration de l’environnement

Tout d’abord, installez les packages requis :

pip install agent-framework-core --pre
pip install azure-identity

Définir des modèles de demande et de réponse

Commencez par définir les structures de données pour la communication de demande-réponse :

import asyncio
from dataclasses import dataclass
from pydantic import BaseModel

from agent_framework import (
    AgentExecutor,
    AgentExecutorRequest,
    AgentExecutorResponse,
    ChatMessage,
    Executor,
    RequestInfoEvent,
    Role,
    WorkflowBuilder,
    WorkflowContext,
    WorkflowOutputEvent,
    WorkflowRunState,
    WorkflowStatusEvent,
    handler,
    response_handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

@dataclass
class HumanFeedbackRequest:
    """Request message for human feedback in the guessing game."""
    prompt: str = ""
    guess: int | None = None

class GuessOutput(BaseModel):
    """Structured output from the AI agent with response_format enforcement."""
    guess: int

La HumanFeedbackRequest est une classe de données simple pour les charges utiles de requêtes structurées :

  • Typage fort pour le contenu de la requête
  • Validation compatible avec les versions futures
  • Clarifier la sémantique de corrélation avec les réponses
  • Champs contextuels (comme la prédiction précédente) pour les prompts d'interface utilisateur riches

Créer le Gestionnaire de tour

Le gestionnaire de tour coordonne le flux entre l’agent IA et l’humain :

class TurnManager(Executor):
    """Coordinates turns between the AI agent and human player.

    Responsibilities:
    - Start the game by requesting the agent's first guess
    - Process agent responses and request human feedback
    - Handle human feedback and continue the game or finish
    """

    def __init__(self, id: str | None = None):
        super().__init__(id=id or "turn_manager")

    @handler
    async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
        """Start the game by asking the agent for an initial guess."""
        user = ChatMessage(Role.USER, text="Start by making your first guess.")
        await ctx.send_message(AgentExecutorRequest(messages=[user], should_respond=True))

    @handler
    async def on_agent_response(
        self,
        result: AgentExecutorResponse,
        ctx: WorkflowContext,
    ) -> None:
        """Handle the agent's guess and request human guidance."""
        # Parse structured model output (defensive default if agent didn't reply)
        text = result.agent_run_response.text or ""
        last_guess = GuessOutput.model_validate_json(text).guess if text else None

        # Craft a clear human prompt that defines higher/lower relative to agent's guess
        prompt = (
            f"The agent guessed: {last_guess if last_guess is not None else text}. "
            "Type one of: higher (your number is higher than this guess), "
            "lower (your number is lower than this guess), correct, or exit."
        )
        # Send a request using the request_info API
        await ctx.request_info(
            request_data=HumanFeedbackRequest(prompt=prompt, guess=last_guess),
            response_type=str
        )

    @response_handler
    async def on_human_feedback(
        self,
        original_request: HumanFeedbackRequest,
        feedback: str,
        ctx: WorkflowContext[AgentExecutorRequest, str],
    ) -> None:
        """Continue the game or finish based on human feedback."""
        reply = feedback.strip().lower()
        # Use the correlated request's guess to avoid extra state reads
        last_guess = original_request.guess

        if reply == "correct":
            await ctx.yield_output(f"Guessed correctly: {last_guess}")
            return

        # Provide feedback to the agent for the next guess
        user_msg = ChatMessage(
            Role.USER,
            text=f'Feedback: {reply}. Return ONLY a JSON object matching the schema {{"guess": <int 1..10>}}.',
        )
        await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))

Générer le workflow

Créez le flux de travail principal qui connecte tous les composants :

async def main() -> None:
    # Create the chat agent with structured output enforcement
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
    agent = chat_client.create_agent(
        instructions=(
            "You guess a number between 1 and 10. "
            "If the user says 'higher' or 'lower', adjust your next guess. "
            'You MUST return ONLY a JSON object exactly matching this schema: {"guess": <integer 1..10>}. '
            "No explanations or additional text."
        ),
        response_format=GuessOutput,
    )

    # Create workflow components
    turn_manager = TurnManager(id="turn_manager")
    agent_exec = AgentExecutor(agent=agent, id="agent")

    # Build the workflow graph
    workflow = (
        WorkflowBuilder()
        .set_start_executor(turn_manager)
        .add_edge(turn_manager, agent_exec)  # Ask agent to make/adjust a guess
        .add_edge(agent_exec, turn_manager)  # Agent's response goes back to coordinator
        .build()
    )

    # Execute the interactive workflow
    await run_interactive_workflow(workflow)

async def run_interactive_workflow(workflow):
    """Run the workflow with human-in-the-loop interaction."""
    pending_responses: dict[str, str] | None = None
    completed = False
    workflow_output: str | None = None

    print("🎯 Number Guessing Game")
    print("Think of a number between 1 and 10, and I'll try to guess it!")
    print("-" * 50)

    while not completed:
        # First iteration uses run_stream("start")
        # Subsequent iterations use send_responses_streaming with pending responses
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("start")
        )

        # Collect events for this turn
        events = [event async for event in stream]
        pending_responses = None

        # Process events to collect requests and detect completion
        requests: list[tuple[str, str]] = []  # (request_id, prompt)
        for event in events:
            if isinstance(event, RequestInfoEvent) and isinstance(event.data, HumanFeedbackRequest):
                # RequestInfoEvent for our HumanFeedbackRequest
                requests.append((event.request_id, event.data.prompt))
            elif isinstance(event, WorkflowOutputEvent):
                # Capture workflow output when yielded
                workflow_output = str(event.data)
                completed = True

        # Check workflow status
        pending_status = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
            for e in events
        )
        idle_with_requests = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
            for e in events
        )

        if pending_status:
            print("🔄 State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)")
        if idle_with_requests:
            print("⏸️  State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)")

        # Handle human requests if any
        if requests and not completed:
            responses: dict[str, str] = {}
            for req_id, prompt in requests:
                print(f"\n🤖 {prompt}")
                answer = input("👤 Enter higher/lower/correct/exit: ").lower()

                if answer == "exit":
                    print("👋 Exiting...")
                    return
                responses[req_id] = answer
            pending_responses = responses

    # Show final result
    print(f"\n🎉 {workflow_output}")

Exécution de l’exemple

Pour voir l'implémentation complète en fonctionnement, consultez l'exemple Human-in-the-Loop Guessing Game.

Fonctionnement

  1. Initialisation du flux de travail : le flux de travail commence par la TurnManager demande d’une estimation initiale de l’agent IA.

  2. Réponse de l’agent : l’agent IA fait une estimation et retourne un JSON structuré, qui revient au TurnManager.

  3. Demande humaine : le TurnManager traite l'estimation de l'agent et appelle ctx.request_info() avec un HumanFeedbackRequest.

  4. Pause du flux de travail : le flux de travail émet un et continue jusqu’à ce qu’aucune action supplémentaire ne puisse être effectuée, puis attend l’entrée humaine.

  5. Réponse humaine : l’application externe collecte les entrées humaines et renvoie des réponses à l’aide send_responses_streaming()de .

  6. Reprendre et continuer : le flux de travail reprend, la méthode TurnManager@response_handler traite le retour d'informations humaines et met fin au jeu ou envoie une autre requête à l’agent.

Principaux avantages

  • Communication structurée : Les modèles de demande et de réponse de type sécurisé empêchent les erreurs d’exécution
  • Corrélation : les ID de requête garantissent que les réponses sont mises en correspondance avec les demandes correctes
  • Exécution suspendue : les flux de travail peuvent s’interrompre indéfiniment en attendant l’entrée externe
  • Conservation de l’état : l’état du flux de travail est conservé pendant les cycles de pause-reprise
  • Piloté par les événements : le système d’événements enrichi offre une visibilité sur l’état et les transitions du flux de travail

Ce modèle permet de créer des applications interactives sophistiquées où les agents IA et les humains collaborent en toute transparence dans des workflows structurés.

Étapes suivantes