Compartir a través de


Gestión de solicitudes y respuestas en flujos de trabajo

En este tutorial se muestra cómo controlar solicitudes y respuestas en flujos de trabajo mediante flujos de trabajo del marco de agente. Aprenderá a crear flujos de trabajo interactivos que puedan pausar la ejecución para solicitar la entrada de orígenes externos (como humanos u otros sistemas) y, a continuación, reanudar una vez que se proporcione una respuesta.

Conceptos tratados

En .NET, los flujos de trabajo human-in-the-loop usan RequestPort y el control de solicitudes externas para pausar la ejecución y recopilar la entrada del usuario. Este patrón permite flujos de trabajo interactivos en los que el sistema puede solicitar información de orígenes externos durante la ejecución.

Prerrequisitos

Instalación de paquetes NuGet

En primer lugar, instale los paquetes necesarios para el proyecto de .NET:

dotnet add package Microsoft.Agents.AI.Workflows --prerelease

Componentes clave

RequestPort y solicitudes externas

Un RequestPort actúa como un puente entre el flujo de trabajo y los orígenes de entrada externos. Cuando el flujo de trabajo necesita entrada, genera un RequestInfoEvent que la aplicación controla:

// Create a RequestPort for handling human input requests
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

Tipos de señal

Defina tipos de señal para comunicar diferentes tipos de solicitud:

/// <summary>
/// Signals used for communication between guesses and the JudgeExecutor.
/// </summary>
internal enum NumberSignal
{
    Init,     // Initial guess request
    Above,    // Previous guess was too high
    Below,    // Previous guess was too low
}

Ejecutor de flujo de trabajo

Cree ejecutores que procesen la entrada del usuario y proporcionen comentarios:

/// <summary>
/// Executor that judges the guess and provides feedback.
/// </summary>
internal sealed class JudgeExecutor : Executor<int>("Judge")
{
    private readonly int _targetNumber;
    private int _tries;

    public JudgeExecutor(int targetNumber) : this()
    {
        _targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken)
    {
        _tries++;
        if (message == _targetNumber)
        {
            await context.YieldOutputAsync($"{_targetNumber} found in {_tries} tries!", cancellationToken)
                         .ConfigureAwait(false);
        }
        else if (message < _targetNumber)
        {
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken).ConfigureAwait(false);
        }
        else
        {
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken).ConfigureAwait(false);
        }
    }
}

Creación del flujo de trabajo

Conecte el Puerto de Solicitudes y el ejecutor en un bucle de retroalimentación.

internal static class WorkflowHelper
{
    internal static ValueTask<Workflow<NumberSignal>> GetWorkflowAsync()
    {
        // Create the executors
        RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
        JudgeExecutor judgeExecutor = new(42);

        // Build the workflow by connecting executors in a loop
        return new WorkflowBuilder(numberRequestPort)
            .AddEdge(numberRequestPort, judgeExecutor)
            .AddEdge(judgeExecutor, numberRequestPort)
            .WithOutputFrom(judgeExecutor)
            .BuildAsync<NumberSignal>();
    }
}

Ejecución del flujo de trabajo interactivo

Control de solicitudes externas durante la ejecución del flujo de trabajo:

private static async Task Main()
{
    // Create the workflow
    var workflow = await WorkflowHelper.GetWorkflowAsync().ConfigureAwait(false);

    // Execute the workflow
    await using StreamingRun handle = await InProcessExecution.StreamAsync(workflow, NumberSignal.Init).ConfigureAwait(false);
    await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
    {
        switch (evt)
        {
            case RequestInfoEvent requestInputEvt:
                // Handle human input request from the workflow
                ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
                await handle.SendResponseAsync(response).ConfigureAwait(false);
                break;

            case WorkflowOutputEvent outputEvt:
                // The workflow has yielded output
                Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
                return;
        }
    }
}

Manejo de las solicitudes

Procesar diferentes tipos de solicitudes de entrada:

private static ExternalResponse HandleExternalRequest(ExternalRequest request)
{
    switch (request.DataAs<NumberSignal?>())
    {
        case NumberSignal.Init:
            int initialGuess = ReadIntegerFromConsole("Please provide your initial guess: ");
            return request.CreateResponse(initialGuess);
        case NumberSignal.Above:
            int lowerGuess = ReadIntegerFromConsole("You previously guessed too large. Please provide a new guess: ");
            return request.CreateResponse(lowerGuess);
        case NumberSignal.Below:
            int higherGuess = ReadIntegerFromConsole("You previously guessed too small. Please provide a new guess: ");
            return request.CreateResponse(higherGuess);
        default:
            throw new ArgumentException("Unexpected request type.");
    }
}

private static int ReadIntegerFromConsole(string prompt)
{
    while (true)
    {
        Console.Write(prompt);
        string? input = Console.ReadLine();
        if (int.TryParse(input, out int value))
        {
            return value;
        }
        Console.WriteLine("Invalid input. Please enter a valid integer.");
    }
}

Conceptos de implementación

Flujo de Evento de Solicitud de Información

  1. Ejecución del flujo de trabajo: el flujo de trabajo procesa hasta que necesita una entrada externa.
  2. Generación de solicitudes: RequestPort genera un RequestInfoEvent objeto con los detalles de la solicitud
  3. Control externo: la aplicación detecta el evento y recopila la entrada del usuario.
  4. Envío de respuesta: Enviar un ExternalResponse de vuelta para continuar el flujo de trabajo.
  5. Reanudación del flujo de trabajo: el flujo de trabajo continúa procesando con la entrada proporcionada.

Ciclo de vida del flujo de trabajo

  • Ejecución de streaming: uso StreamAsync para supervisar eventos en tiempo real
  • Control de eventos: proceso RequestInfoEvent de solicitudes de entrada y WorkflowOutputEvent finalización
  • Coordinación de respuestas: hacer coincidir las respuestas a las solicitudes mediante el mecanismo de control de respuestas del flujo de trabajo

Flujo de implementación

  1. Inicialización del flujo de trabajo: el flujo de trabajo comienza enviando un NumberSignal.Init elemento a RequestPort.

  2. Generación de solicitudes: RequestPort genera una RequestInfoEvent solicitud en la que se pide al usuario una estimación inicial.

  3. Pausa de flujo de trabajo: el flujo de trabajo se pausa y espera la entrada externa mientras la aplicación controla la solicitud.

  4. Respuesta humana: la aplicación externa recopila la entrada del usuario y envía una ExternalResponse devolución al flujo de trabajo.

  5. Procesamiento y comentarios: JudgeExecutor procesa la estimación y completa el flujo de trabajo o envía una nueva señal (arriba/abajo) para solicitar otra suposición.

  6. Continuación del bucle: el proceso se repite hasta que se adivina el número correcto.

Ventajas del marco de trabajo

  • Seguridad de tipos: la escritura fuerte garantiza que se mantienen los contratos de solicitud-respuesta.
  • Controlado por eventos: el sistema de eventos enriquecido proporciona visibilidad sobre la ejecución del flujo de trabajo.
  • Ejecución pausable: los flujos de trabajo se pueden pausar indefinidamente mientras se espera la entrada externa
  • Administración de estado: el estado del flujo de trabajo se conserva en ciclos de pausa y reanudación
  • Integración flexible: RequestPorts se puede integrar con cualquier origen de entrada externo (INTERFAZ de usuario, API, consola, etc.)

Ejemplo completo

Para obtener la ejecución completa, consulte el ejemplo Human-in-the-Loop Basic.

Este patrón permite crear aplicaciones interactivas sofisticadas en las que los usuarios pueden proporcionar entradas en puntos de decisión clave dentro de flujos de trabajo automatizados.

Lo que vas a construir

Creará un flujo de trabajo interactivo de juego de adivinación de números que muestra patrones de solicitud-respuesta:

  • Un agente de IA que realiza estimaciones inteligentes
  • Ejecutores que pueden enviar directamente solicitudes mediante la request_info API
  • Administrador de turnos que coordina las interacciones entre el agente y los humanos utilizando @response_handler
  • Entrada y salida de la consola interactiva para comentarios en tiempo real

Prerrequisitos

  • Python 3.10 o posterior
  • Implementación de Azure OpenAI configurada
  • Configurada la autenticación de la CLI de Azure (az login)
  • Conocimientos básicos de la programación asincrónica de Python

Conceptos clave

Capacidades de solicitudes y respuestas

Los ejecutores tienen funcionalidades integradas de solicitudes y respuestas que permiten interacciones humanas en el bucle:

  • Llamar ctx.request_info(request_data=request_data, response_type=response_type) para enviar solicitudes
  • Utilice el decorador @response_handler para gestionar las respuestas
  • Definición de tipos de solicitud y respuesta personalizados sin requisitos de herencia

flujo de solicitud-respuesta

Los ejecutores pueden enviar solicitudes directamente mediante ctx.request_info() y controlar las respuestas mediante el @response_handler decorador:

  1. Llamadas del ejecutor ctx.request_info(request_data=request_data, response_type=response_type)
  2. El flujo de trabajo emite un RequestInfoEvent con los datos de solicitud
  3. El sistema externo (humano, API, etc.) procesa la solicitud.
  4. La respuesta se envía a través de send_responses_streaming()
  5. El flujo de trabajo se reanuda y entrega la respuesta al método del @response_handler ejecutor

Configuración del entorno

En primer lugar, instale los paquetes necesarios:

pip install agent-framework-core --pre
pip install azure-identity

Definir modelos de solicitud y respuesta

Comience definiendo las estructuras de datos para la comunicación de solicitud-respuesta:

import asyncio
from dataclasses import dataclass
from pydantic import BaseModel

from agent_framework import (
    AgentExecutor,
    AgentExecutorRequest,
    AgentExecutorResponse,
    ChatMessage,
    Executor,
    RequestInfoEvent,
    Role,
    WorkflowBuilder,
    WorkflowContext,
    WorkflowOutputEvent,
    WorkflowRunState,
    WorkflowStatusEvent,
    handler,
    response_handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

@dataclass
class HumanFeedbackRequest:
    """Request message for human feedback in the guessing game."""
    prompt: str = ""
    guess: int | None = None

class GuessOutput(BaseModel):
    """Structured output from the AI agent with response_format enforcement."""
    guess: int

HumanFeedbackRequest es una clase de datos sencilla para cargas de solicitud estructuradas:

  • Tipado fuerte para cargas de solicitud
  • Validación compatible hacia el futuro
  • Aclara la semántica de correlación con respuestas
  • Campos contextuales (como la estimación anterior) para solicitudes de interfaz de usuario enriquecidas

Crear el Administrador de turnos

El administrador de turnos coordina el flujo entre el agente de IA y el humano:

class TurnManager(Executor):
    """Coordinates turns between the AI agent and human player.

    Responsibilities:
    - Start the game by requesting the agent's first guess
    - Process agent responses and request human feedback
    - Handle human feedback and continue the game or finish
    """

    def __init__(self, id: str | None = None):
        super().__init__(id=id or "turn_manager")

    @handler
    async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
        """Start the game by asking the agent for an initial guess."""
        user = ChatMessage(Role.USER, text="Start by making your first guess.")
        await ctx.send_message(AgentExecutorRequest(messages=[user], should_respond=True))

    @handler
    async def on_agent_response(
        self,
        result: AgentExecutorResponse,
        ctx: WorkflowContext,
    ) -> None:
        """Handle the agent's guess and request human guidance."""
        # Parse structured model output (defensive default if agent didn't reply)
        text = result.agent_run_response.text or ""
        last_guess = GuessOutput.model_validate_json(text).guess if text else None

        # Craft a clear human prompt that defines higher/lower relative to agent's guess
        prompt = (
            f"The agent guessed: {last_guess if last_guess is not None else text}. "
            "Type one of: higher (your number is higher than this guess), "
            "lower (your number is lower than this guess), correct, or exit."
        )
        # Send a request using the request_info API
        await ctx.request_info(
            request_data=HumanFeedbackRequest(prompt=prompt, guess=last_guess),
            response_type=str
        )

    @response_handler
    async def on_human_feedback(
        self,
        original_request: HumanFeedbackRequest,
        feedback: str,
        ctx: WorkflowContext[AgentExecutorRequest, str],
    ) -> None:
        """Continue the game or finish based on human feedback."""
        reply = feedback.strip().lower()
        # Use the correlated request's guess to avoid extra state reads
        last_guess = original_request.guess

        if reply == "correct":
            await ctx.yield_output(f"Guessed correctly: {last_guess}")
            return

        # Provide feedback to the agent for the next guess
        user_msg = ChatMessage(
            Role.USER,
            text=f'Feedback: {reply}. Return ONLY a JSON object matching the schema {{"guess": <int 1..10>}}.',
        )
        await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))

Compilación del flujo de trabajo

Cree el flujo de trabajo principal que conecta todos los componentes:

async def main() -> None:
    # Create the chat agent with structured output enforcement
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
    agent = chat_client.create_agent(
        instructions=(
            "You guess a number between 1 and 10. "
            "If the user says 'higher' or 'lower', adjust your next guess. "
            'You MUST return ONLY a JSON object exactly matching this schema: {"guess": <integer 1..10>}. '
            "No explanations or additional text."
        ),
        response_format=GuessOutput,
    )

    # Create workflow components
    turn_manager = TurnManager(id="turn_manager")
    agent_exec = AgentExecutor(agent=agent, id="agent")

    # Build the workflow graph
    workflow = (
        WorkflowBuilder()
        .set_start_executor(turn_manager)
        .add_edge(turn_manager, agent_exec)  # Ask agent to make/adjust a guess
        .add_edge(agent_exec, turn_manager)  # Agent's response goes back to coordinator
        .build()
    )

    # Execute the interactive workflow
    await run_interactive_workflow(workflow)

async def run_interactive_workflow(workflow):
    """Run the workflow with human-in-the-loop interaction."""
    pending_responses: dict[str, str] | None = None
    completed = False
    workflow_output: str | None = None

    print("🎯 Number Guessing Game")
    print("Think of a number between 1 and 10, and I'll try to guess it!")
    print("-" * 50)

    while not completed:
        # First iteration uses run_stream("start")
        # Subsequent iterations use send_responses_streaming with pending responses
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("start")
        )

        # Collect events for this turn
        events = [event async for event in stream]
        pending_responses = None

        # Process events to collect requests and detect completion
        requests: list[tuple[str, str]] = []  # (request_id, prompt)
        for event in events:
            if isinstance(event, RequestInfoEvent) and isinstance(event.data, HumanFeedbackRequest):
                # RequestInfoEvent for our HumanFeedbackRequest
                requests.append((event.request_id, event.data.prompt))
            elif isinstance(event, WorkflowOutputEvent):
                # Capture workflow output when yielded
                workflow_output = str(event.data)
                completed = True

        # Check workflow status
        pending_status = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
            for e in events
        )
        idle_with_requests = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
            for e in events
        )

        if pending_status:
            print("🔄 State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)")
        if idle_with_requests:
            print("⏸️  State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)")

        # Handle human requests if any
        if requests and not completed:
            responses: dict[str, str] = {}
            for req_id, prompt in requests:
                print(f"\n🤖 {prompt}")
                answer = input("👤 Enter higher/lower/correct/exit: ").lower()

                if answer == "exit":
                    print("👋 Exiting...")
                    return
                responses[req_id] = answer
            pending_responses = responses

    # Show final result
    print(f"\n🎉 {workflow_output}")

Ejecución del ejemplo

Para obtener la implementación completa funcional, ve el ejemplo Human-in-the-Loop Guessing Game.

Funcionamiento

  1. Inicialización del flujo de trabajo: el flujo de trabajo comienza con la TurnManager solicitud de una estimación inicial del agente de IA.

  2. Respuesta del agente: el agente de IA realiza una estimación y devuelve JSON estructurado, que fluye de nuevo a .TurnManager

  3. Solicitud humana: TurnManager procesa la conjetura del agente y llama a ctx.request_info() con HumanFeedbackRequest.

  4. Pausa del flujo de trabajo: el flujo de trabajo emite un RequestInfoEvent, continúa hasta que no se puedan tomar más acciones, y luego espera la intervención humana.

  5. Respuesta humana: la aplicación externa recopila la entrada humana y devuelve respuestas mediante send_responses_streaming().

  6. Reanudar y continuar: el flujo de trabajo se reanuda, el método de TurnManager@response_handler procesa la retroalimentación humana y finaliza el juego o envía otra solicitud al agente.

Ventajas clave

  • Comunicación estructurada: los modelos de solicitud y respuesta seguros para tipos impiden errores en tiempo de ejecución
  • Correlación: los identificadores de solicitud garantizan que las respuestas coincidan con las solicitudes correctas.
  • Ejecución pausable: los flujos de trabajo se pueden pausar indefinidamente mientras se espera la entrada externa
  • Conservación del estado: el estado del flujo de trabajo se mantiene en ciclos de pausa y reanudación
  • Controlado por eventos: el sistema de eventos enriquecido proporciona visibilidad sobre el estado y las transiciones del flujo de trabajo.

Este patrón permite crear aplicaciones interactivas sofisticadas en las que los agentes de inteligencia artificial y los seres humanos colaboran sin problemas en flujos de trabajo estructurados.

Pasos siguientes