Condividi tramite


Gestire richieste e risposte nei flussi di lavoro

Questa esercitazione illustra come gestire le richieste e le risposte nei flussi di lavoro usando i flussi di lavoro di Agent Framework. Si apprenderà come creare flussi di lavoro interattivi in grado di sospendere l'esecuzione per richiedere l'input da origini esterne (ad esempio esseri umani o altri sistemi) e quindi riprendere una volta fornita una risposta.

Concetti trattati

In .NET i flussi di lavoro human-in-the-loop usano RequestPort e la gestione delle richieste esterne per sospendere l'esecuzione e raccogliere l'input dell'utente. Questo modello consente flussi di lavoro interattivi in cui il sistema può richiedere informazioni da origini esterne durante l'esecuzione.

Prerequisiti

Installare i pacchetti NuGet

Prima di tutto, installare i pacchetti necessari per il progetto .NET:

dotnet add package Microsoft.Agents.AI.Workflows --prerelease

Componenti chiave

RequestPort e richieste esterne

Un RequestPort funge da ponte tra il flusso di lavoro e le origini di input esterne. Quando il flusso di lavoro richiede l'input, viene generato un RequestInfoEvent che l'applicazione gestisce:

// Create a RequestPort for handling human input requests
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

Tipi di segnale

Definire i tipi di segnale per comunicare tipi di richiesta diversi:

/// <summary>
/// Signals used for communication between guesses and the JudgeExecutor.
/// </summary>
internal enum NumberSignal
{
    Init,     // Initial guess request
    Above,    // Previous guess was too high
    Below,    // Previous guess was too low
}

Esecutore del flusso di lavoro

Creare executor che elaborano l'input dell'utente e forniscono commenti e suggerimenti:

/// <summary>
/// Executor that judges the guess and provides feedback.
/// </summary>
internal sealed class JudgeExecutor : Executor<int>("Judge")
{
    private readonly int _targetNumber;
    private int _tries;

    public JudgeExecutor(int targetNumber) : this()
    {
        _targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken)
    {
        _tries++;
        if (message == _targetNumber)
        {
            await context.YieldOutputAsync($"{_targetNumber} found in {_tries} tries!", cancellationToken)
                         .ConfigureAwait(false);
        }
        else if (message < _targetNumber)
        {
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken).ConfigureAwait(false);
        }
        else
        {
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken).ConfigureAwait(false);
        }
    }
}

Compilazione del flusso di lavoro

Collegare il RequestPort e l'executor in un ciclo di feedback:

internal static class WorkflowHelper
{
    internal static ValueTask<Workflow<NumberSignal>> GetWorkflowAsync()
    {
        // Create the executors
        RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
        JudgeExecutor judgeExecutor = new(42);

        // Build the workflow by connecting executors in a loop
        return new WorkflowBuilder(numberRequestPort)
            .AddEdge(numberRequestPort, judgeExecutor)
            .AddEdge(judgeExecutor, numberRequestPort)
            .WithOutputFrom(judgeExecutor)
            .BuildAsync<NumberSignal>();
    }
}

Esecuzione del flusso di lavoro interattivo

Gestire le richieste esterne durante l'esecuzione del flusso di lavoro:

private static async Task Main()
{
    // Create the workflow
    var workflow = await WorkflowHelper.GetWorkflowAsync().ConfigureAwait(false);

    // Execute the workflow
    await using StreamingRun handle = await InProcessExecution.StreamAsync(workflow, NumberSignal.Init).ConfigureAwait(false);
    await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
    {
        switch (evt)
        {
            case RequestInfoEvent requestInputEvt:
                // Handle human input request from the workflow
                ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
                await handle.SendResponseAsync(response).ConfigureAwait(false);
                break;

            case WorkflowOutputEvent outputEvt:
                // The workflow has yielded output
                Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
                return;
        }
    }
}

Gestione delle richieste

Elaborare tipi diversi di richieste di input:

private static ExternalResponse HandleExternalRequest(ExternalRequest request)
{
    switch (request.DataAs<NumberSignal?>())
    {
        case NumberSignal.Init:
            int initialGuess = ReadIntegerFromConsole("Please provide your initial guess: ");
            return request.CreateResponse(initialGuess);
        case NumberSignal.Above:
            int lowerGuess = ReadIntegerFromConsole("You previously guessed too large. Please provide a new guess: ");
            return request.CreateResponse(lowerGuess);
        case NumberSignal.Below:
            int higherGuess = ReadIntegerFromConsole("You previously guessed too small. Please provide a new guess: ");
            return request.CreateResponse(higherGuess);
        default:
            throw new ArgumentException("Unexpected request type.");
    }
}

private static int ReadIntegerFromConsole(string prompt)
{
    while (true)
    {
        Console.Write(prompt);
        string? input = Console.ReadLine();
        if (int.TryParse(input, out int value))
        {
            return value;
        }
        Console.WriteLine("Invalid input. Please enter a valid integer.");
    }
}

Concetti di implementazione

Flusso RequestInfoEvent

  1. Esecuzione del flusso di lavoro: Il flusso di lavoro viene elaborato fino a quando non necessita di input esterno.
  2. Generazione di richieste: RequestPort genera un oggetto RequestInfoEvent con i dettagli della richiesta
  3. Gestione esterna: l'applicazione intercetta l'evento e raccoglie l'input dell'utente
  4. Invio di risposta: invia un ExternalResponse indietro per continuare il flusso di lavoro
  5. Ripresa del flusso di lavoro: il flusso di lavoro continua l'elaborazione con l'input fornito

Ciclo di vita del flusso di lavoro

  • Esecuzione di streaming: usare StreamAsync per monitorare gli eventi in tempo reale
  • Gestione eventi: processo RequestInfoEvent per le richieste di input e WorkflowOutputEvent per il completamento
  • Coordinamento delle risposte: associare le risposte alle richieste usando il meccanismo di gestione delle risposte del flusso di lavoro

Flusso di implementazione

  1. Inizializzazione del flusso di lavoro: il flusso di lavoro inizia inviando un oggetto NumberSignal.Init a RequestPort.

  2. Generazione di richieste: RequestPort genera una RequestInfoEvent richiesta di un'ipotesi iniziale dall'utente.

  3. Pausa del flusso di lavoro: il flusso di lavoro si sospende e attende l'input esterno mentre l'applicazione gestisce la richiesta.

  4. Risposta umana: l'applicazione esterna raccoglie l'input dell'utente e invia un ExternalResponse messaggio al flusso di lavoro.

  5. Elaborazione e feedback: JudgeExecutor elabora l'ipotesi e completa il flusso di lavoro o invia un nuovo segnale (sopra/sotto) per richiedere un'altra ipotesi.

  6. Continuazione del ciclo: il processo si ripete fino a quando non viene indovinato il numero corretto.

Vantaggi del framework

  • Sicurezza dei tipi: la tipizzazione forte garantisce che i contratti di richiesta-risposta siano rispettati
  • Orientato agli eventi: il sistema di eventi avanzato offre visibilità sull'esecuzione del flusso di lavoro
  • Esecuzione pausabile: i flussi di lavoro possono essere sospesi per un periodo illimitato durante l'attesa dell'input esterno
  • Gestione dello stato: lo stato del flusso di lavoro viene mantenuto in cicli di sospensione-ripresa
  • Integrazione flessibile: RequestPorts può integrarsi con qualsiasi origine di input esterna (interfaccia utente, API, console e così via)

Esempio completo

Per l'implementazione di lavoro completa, vedere l'esempio Human-in-the-Loop Basic.

Questo modello consente di creare applicazioni interattive sofisticate in cui gli utenti possono fornire input in punti decisionali chiave all'interno di flussi di lavoro automatizzati.

Cosa costruirai

Si creerà un flusso di lavoro interattivo di individuazione dei numeri che illustra i modelli di richiesta-risposta:

  • Un agente di intelligenza artificiale che fa ipotesi intelligenti
  • Executor in grado di inviare direttamente richieste tramite l'API request_info
  • Un gestore dei turni che coordina tra l'agente e le interazioni umane usando @response_handler
  • Input/output interattivo della console per commenti e suggerimenti in tempo reale

Prerequisiti

  • Python 3.10 o versione successiva
  • Distribuzione di Azure OpenAI configurata
  • Autenticazione CLI di Azure configurata (az login)
  • Conoscenza di base della programmazione asincrona di Python

Concetti chiave

Funzionalità di richieste e risposte

Gli executor dispongono di funzionalità predefinite di richieste e risposte che consentono interazioni umane nel processo.

  • Chiamare ctx.request_info(request_data=request_data, response_type=response_type) per inviare richieste
  • Usare il decoratore @response_handler per gestire le risposte
  • Definire tipi di richiesta/risposta personalizzati senza requisiti di ereditarietà

flusso di richesta-risposta

Gli executor possono inviare richieste direttamente usando ctx.request_info() e gestire le risposte usando il decoratore @response_handler.

  1. Richieste dell'esecutore ctx.request_info(request_data=request_data, response_type=response_type)
  2. Il flusso di lavoro emette un RequestInfoEvent con i dati della richiesta
  3. Il sistema esterno (umano, API e così via) elabora la richiesta
  4. La risposta viene inviata tramite send_responses_streaming()
  5. Il flusso di lavoro riprende e recapita la risposta al metodo dell'esecutore @response_handler

Configurazione dell'ambiente

Prima di tutto, installare i pacchetti necessari:

pip install agent-framework-core --pre
pip install azure-identity

Definire modelli di richiesta e risposta

Per iniziare, definire le strutture di dati per la comunicazione request-response:

import asyncio
from dataclasses import dataclass
from pydantic import BaseModel

from agent_framework import (
    AgentExecutor,
    AgentExecutorRequest,
    AgentExecutorResponse,
    ChatMessage,
    Executor,
    RequestInfoEvent,
    Role,
    WorkflowBuilder,
    WorkflowContext,
    WorkflowOutputEvent,
    WorkflowRunState,
    WorkflowStatusEvent,
    handler,
    response_handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

@dataclass
class HumanFeedbackRequest:
    """Request message for human feedback in the guessing game."""
    prompt: str = ""
    guess: int | None = None

class GuessOutput(BaseModel):
    """Structured output from the AI agent with response_format enforcement."""
    guess: int

HumanFeedbackRequest è una semplice classe di dati per i payload delle richieste strutturate:

  • Tipizzazione avanzata per i payload delle richieste
  • Convalida compatibile con versioni future
  • Chiara semantica di correlazione con le risposte
  • Campi contestuali (come l'ipotesi precedente) per i prompt avanzati dell'interfaccia utente

Creare il manager dei turni

Il gestore dei turni coordina il flusso tra l'agente di intelligenza artificiale e l'uomo:

class TurnManager(Executor):
    """Coordinates turns between the AI agent and human player.

    Responsibilities:
    - Start the game by requesting the agent's first guess
    - Process agent responses and request human feedback
    - Handle human feedback and continue the game or finish
    """

    def __init__(self, id: str | None = None):
        super().__init__(id=id or "turn_manager")

    @handler
    async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
        """Start the game by asking the agent for an initial guess."""
        user = ChatMessage(Role.USER, text="Start by making your first guess.")
        await ctx.send_message(AgentExecutorRequest(messages=[user], should_respond=True))

    @handler
    async def on_agent_response(
        self,
        result: AgentExecutorResponse,
        ctx: WorkflowContext,
    ) -> None:
        """Handle the agent's guess and request human guidance."""
        # Parse structured model output (defensive default if agent didn't reply)
        text = result.agent_run_response.text or ""
        last_guess = GuessOutput.model_validate_json(text).guess if text else None

        # Craft a clear human prompt that defines higher/lower relative to agent's guess
        prompt = (
            f"The agent guessed: {last_guess if last_guess is not None else text}. "
            "Type one of: higher (your number is higher than this guess), "
            "lower (your number is lower than this guess), correct, or exit."
        )
        # Send a request using the request_info API
        await ctx.request_info(
            request_data=HumanFeedbackRequest(prompt=prompt, guess=last_guess),
            response_type=str
        )

    @response_handler
    async def on_human_feedback(
        self,
        original_request: HumanFeedbackRequest,
        feedback: str,
        ctx: WorkflowContext[AgentExecutorRequest, str],
    ) -> None:
        """Continue the game or finish based on human feedback."""
        reply = feedback.strip().lower()
        # Use the correlated request's guess to avoid extra state reads
        last_guess = original_request.guess

        if reply == "correct":
            await ctx.yield_output(f"Guessed correctly: {last_guess}")
            return

        # Provide feedback to the agent for the next guess
        user_msg = ChatMessage(
            Role.USER,
            text=f'Feedback: {reply}. Return ONLY a JSON object matching the schema {{"guess": <int 1..10>}}.',
        )
        await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))

Compilare il flusso di lavoro

Creare il flusso di lavoro principale che connette tutti i componenti:

async def main() -> None:
    # Create the chat agent with structured output enforcement
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
    agent = chat_client.create_agent(
        instructions=(
            "You guess a number between 1 and 10. "
            "If the user says 'higher' or 'lower', adjust your next guess. "
            'You MUST return ONLY a JSON object exactly matching this schema: {"guess": <integer 1..10>}. '
            "No explanations or additional text."
        ),
        response_format=GuessOutput,
    )

    # Create workflow components
    turn_manager = TurnManager(id="turn_manager")
    agent_exec = AgentExecutor(agent=agent, id="agent")

    # Build the workflow graph
    workflow = (
        WorkflowBuilder()
        .set_start_executor(turn_manager)
        .add_edge(turn_manager, agent_exec)  # Ask agent to make/adjust a guess
        .add_edge(agent_exec, turn_manager)  # Agent's response goes back to coordinator
        .build()
    )

    # Execute the interactive workflow
    await run_interactive_workflow(workflow)

async def run_interactive_workflow(workflow):
    """Run the workflow with human-in-the-loop interaction."""
    pending_responses: dict[str, str] | None = None
    completed = False
    workflow_output: str | None = None

    print("🎯 Number Guessing Game")
    print("Think of a number between 1 and 10, and I'll try to guess it!")
    print("-" * 50)

    while not completed:
        # First iteration uses run_stream("start")
        # Subsequent iterations use send_responses_streaming with pending responses
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("start")
        )

        # Collect events for this turn
        events = [event async for event in stream]
        pending_responses = None

        # Process events to collect requests and detect completion
        requests: list[tuple[str, str]] = []  # (request_id, prompt)
        for event in events:
            if isinstance(event, RequestInfoEvent) and isinstance(event.data, HumanFeedbackRequest):
                # RequestInfoEvent for our HumanFeedbackRequest
                requests.append((event.request_id, event.data.prompt))
            elif isinstance(event, WorkflowOutputEvent):
                # Capture workflow output when yielded
                workflow_output = str(event.data)
                completed = True

        # Check workflow status
        pending_status = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
            for e in events
        )
        idle_with_requests = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
            for e in events
        )

        if pending_status:
            print("🔄 State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)")
        if idle_with_requests:
            print("⏸️  State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)")

        # Handle human requests if any
        if requests and not completed:
            responses: dict[str, str] = {}
            for req_id, prompt in requests:
                print(f"\n🤖 {prompt}")
                answer = input("👤 Enter higher/lower/correct/exit: ").lower()

                if answer == "exit":
                    print("👋 Exiting...")
                    return
                responses[req_id] = answer
            pending_responses = responses

    # Show final result
    print(f"\n🎉 {workflow_output}")

Esecuzione dell'esempio

Per l'implementazione completa e funzionante, consulta l'esempio di gioco Human-in-the-Loop Guessing.

Funzionamento

  1. Inizializzazione del flusso di lavoro: il flusso di lavoro inizia con la richiesta di un'ipotesi TurnManager iniziale dall'agente di intelligenza artificiale.

  2. Risposta dell'agente: l'agente di intelligenza artificiale esegue un'ipotesi e restituisce json strutturato, che torna a TurnManager.

  3. Richiesta umana: TurnManager elabora l'ipotesi dell'agente e chiama ctx.request_info() con un oggetto HumanFeedbackRequest.

  4. Pausa del flusso di lavoro: il flusso di lavoro genera un RequestInfoEvent e continua fino a quando non si possono eseguire altre azioni, quindi attende l'input umano.

  5. Risposta umana: l'applicazione esterna raccoglie l'input umano e invia le risposte usando send_responses_streaming().

  6. Riprendi e continua: il flusso di lavoro riprende, il metodo TurnManager@response_handler elabora il feedback umano e termina il gioco o invia un'altra richiesta all'agente.

Vantaggi principali

  • Comunicazione strutturata: i modelli di richiesta e risposta indipendenti dai tipi impediscono gli errori di runtime
  • Correlazione: gli ID richiesta assicurano che le risposte vengano confrontate con le richieste corrette
  • Esecuzione pausabile: i flussi di lavoro possono essere sospesi per un periodo illimitato durante l'attesa dell'input esterno
  • Conservazione dello stato: lo stato del flusso di lavoro viene mantenuto tra i cicli di ripresa della pausa
  • Basato sugli eventi: il sistema di eventi complesso offre visibilità sullo stato e sulle transizioni del flusso di lavoro

Questo modello consente di creare applicazioni interattive sofisticate in cui gli agenti di intelligenza artificiale e gli esseri umani collaborano perfettamente all'interno di flussi di lavoro strutturati.

Passaggi successivi