Partilhar via


Lidar com solicitações e respostas em fluxos de trabalho

Este tutorial demonstra como lidar com solicitações e respostas em fluxos de trabalho usando fluxos de trabalho do Agent Framework. Você aprenderá a criar fluxos de trabalho interativos que podem pausar a execução para solicitar entrada de fontes externas (como humanos ou outros sistemas) e, em seguida, retomar assim que uma resposta for fornecida.

Conceitos abordados

No .NET, os fluxos de trabalho human-in-the-loop usam RequestPort e o tratamento de solicitações externas para pausar a execução e coletar a entrada do usuário. Esse padrão permite fluxos de trabalho interativos onde o sistema pode solicitar informações de fontes externas durante a execução.

Pré-requisitos

Instalar pacotes NuGet

Primeiro, instale os pacotes necessários para seu projeto .NET:

dotnet add package Microsoft.Agents.AI.Workflows --prerelease

Componentes-chave

RequestPort e Solicitações Externas

A RequestPort atua como uma ponte entre o fluxo de trabalho e as fontes de entrada externas. Quando o fluxo de trabalho precisa de entrada, ele gera um RequestInfoEvent que seu aplicativo manipula:

// Create a RequestPort for handling human input requests
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

Tipos de sinal

Defina tipos de sinal para comunicar diferentes tipos de solicitação:

/// <summary>
/// Signals used for communication between guesses and the JudgeExecutor.
/// </summary>
internal enum NumberSignal
{
    Init,     // Initial guess request
    Above,    // Previous guess was too high
    Below,    // Previous guess was too low
}

Executor de fluxo de trabalho

Crie executores que processam a entrada do usuário e fornecem feedback:

/// <summary>
/// Executor that judges the guess and provides feedback.
/// </summary>
internal sealed class JudgeExecutor : Executor<int>("Judge")
{
    private readonly int _targetNumber;
    private int _tries;

    public JudgeExecutor(int targetNumber) : this()
    {
        _targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken)
    {
        _tries++;
        if (message == _targetNumber)
        {
            await context.YieldOutputAsync($"{_targetNumber} found in {_tries} tries!", cancellationToken)
                         .ConfigureAwait(false);
        }
        else if (message < _targetNumber)
        {
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken).ConfigureAwait(false);
        }
        else
        {
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken).ConfigureAwait(false);
        }
    }
}

Criando o fluxo de trabalho

Conecte o RequestPort e o executor em um loop de feedback:

internal static class WorkflowHelper
{
    internal static ValueTask<Workflow<NumberSignal>> GetWorkflowAsync()
    {
        // Create the executors
        RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
        JudgeExecutor judgeExecutor = new(42);

        // Build the workflow by connecting executors in a loop
        return new WorkflowBuilder(numberRequestPort)
            .AddEdge(numberRequestPort, judgeExecutor)
            .AddEdge(judgeExecutor, numberRequestPort)
            .WithOutputFrom(judgeExecutor)
            .BuildAsync<NumberSignal>();
    }
}

Executando o fluxo de trabalho interativo

Lidar com solicitações externas durante a execução do fluxo de trabalho:

private static async Task Main()
{
    // Create the workflow
    var workflow = await WorkflowHelper.GetWorkflowAsync().ConfigureAwait(false);

    // Execute the workflow
    await using StreamingRun handle = await InProcessExecution.StreamAsync(workflow, NumberSignal.Init).ConfigureAwait(false);
    await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
    {
        switch (evt)
        {
            case RequestInfoEvent requestInputEvt:
                // Handle human input request from the workflow
                ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
                await handle.SendResponseAsync(response).ConfigureAwait(false);
                break;

            case WorkflowOutputEvent outputEvt:
                // The workflow has yielded output
                Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
                return;
        }
    }
}

Tratamento de Pedidos

Processar diferentes tipos de solicitações de entrada:

private static ExternalResponse HandleExternalRequest(ExternalRequest request)
{
    switch (request.DataAs<NumberSignal?>())
    {
        case NumberSignal.Init:
            int initialGuess = ReadIntegerFromConsole("Please provide your initial guess: ");
            return request.CreateResponse(initialGuess);
        case NumberSignal.Above:
            int lowerGuess = ReadIntegerFromConsole("You previously guessed too large. Please provide a new guess: ");
            return request.CreateResponse(lowerGuess);
        case NumberSignal.Below:
            int higherGuess = ReadIntegerFromConsole("You previously guessed too small. Please provide a new guess: ");
            return request.CreateResponse(higherGuess);
        default:
            throw new ArgumentException("Unexpected request type.");
    }
}

private static int ReadIntegerFromConsole(string prompt)
{
    while (true)
    {
        Console.Write(prompt);
        string? input = Console.ReadLine();
        if (int.TryParse(input, out int value))
        {
            return value;
        }
        Console.WriteLine("Invalid input. Please enter a valid integer.");
    }
}

Conceitos de Implementação

Fluxo do Evento de Solicitação de Informação

  1. Execução do fluxo de trabalho: O fluxo de trabalho é processado até precisar de entrada externa
  2. Geração de pedidos: RequestPort gera um RequestInfoEvent com os detalhes do pedido
  3. Manipulação externa: seu aplicativo captura o evento e coleta a entrada do usuário
  4. Envio de respostas: envie um ExternalResponse de volta para continuar o fluxo de trabalho
  5. Reinício do fluxo de trabalho: O fluxo de trabalho continua processando com a entrada fornecida

Ciclo de vida do fluxo de trabalho

  • Execução de streaming: Use StreamAsync para monitorar eventos em tempo real
  • Manipulação de eventos: processo RequestInfoEvent para solicitações de entrada e WorkflowOutputEvent para conclusão
  • Coordenação de resposta: Harmonize as respostas com os pedidos usando o mecanismo de processamento de respostas do fluxo de trabalho

Fluxo de implementação

  1. Inicialização do fluxo de trabalho: O fluxo de trabalho começa enviando um NumberSignal.Init para o RequestPort.

  2. Geração de Solicitação: O RequestPort gera uma RequestInfoEvent solicitação de um palpite inicial do usuário.

  3. Pausa do fluxo de trabalho: o fluxo de trabalho pausa e aguarda a entrada externa enquanto o aplicativo lida com a solicitação.

  4. Resposta humana: O aplicativo externo coleta a entrada do usuário e envia um ExternalResponse retorno para o fluxo de trabalho.

  5. Processamento e feedback: processa JudgeExecutor o palpite e conclui o fluxo de trabalho ou envia um novo sinal (Acima/Abaixo) para solicitar outro palpite.

  6. Continuação do loop: O processo se repete até que o número correto seja adivinhado.

Benefícios da estrutura

  • Segurança do tipo: A digitação forte garante que os contratos de solicitação-resposta sejam mantidos
  • Orientado a eventos: um sistema de eventos avançado fornece visibilidade sobre a execução do fluxo de trabalho
  • Execução pausavel: os fluxos de trabalho podem pausar indefinidamente enquanto aguardam a entrada externa
  • Gerenciamento de Estado: o estado do fluxo de trabalho é preservado nos ciclos de pausa e retomada
  • Integração flexível: RequestPorts pode se integrar com qualquer fonte de entrada externa (UI, API, console, etc.)

Amostra completa

Consulte o exemplo Human-in-the-Loop Basic para obter a implementação funcional e completa.

Esse padrão permite a criação de aplicativos interativos sofisticados onde os usuários podem fornecer informações em pontos-chave de decisão dentro de fluxos de trabalho automatizados.

O que você vai construir

Você criará um fluxo de trabalho interativo de jogo de adivinhação de números que demonstra padrões de solicitação-resposta:

  • Um agente de IA que dá palpites inteligentes
  • Executores que podem enviar solicitações diretamente usando a request_info API
  • Um gerenciador de turnos que coordena entre o agente e as interações humanas usando @response_handler
  • Entrada/saída interativa do console para feedback em tempo real

Pré-requisitos

  • Python 3.10 ou posterior
  • Implantação do Azure OpenAI configurada
  • Autenticação da CLI do Azure configurada (az login)
  • Compreensão básica da programação assíncrona Python

Conceitos-chave

Capacidades de Solicitações e Respostas

Os executores têm capacidades internas de pedido e resposta que permitem interações humanas em tempo real.

  • Ligue ctx.request_info(request_data=request_data, response_type=response_type) para enviar solicitações
  • Use o @response_handler decorador para lidar com as respostas
  • Definir tipos de solicitação/resposta personalizados sem requisitos de herança

Fluxo Request-Response

Os executores podem enviar solicitações diretamente usando ctx.request_info() e lidar com as respostas usando o @response_handler decorador:

  1. Chamadas de executor ctx.request_info(request_data=request_data, response_type=response_type)
  2. O fluxo de trabalho emite um RequestInfoEvent com os dados da solicitação
  3. O sistema externo (humano, API, etc.) processa o pedido
  4. A resposta é enviada de volta via send_responses_streaming()
  5. O processo de trabalho é retomado e entrega a resposta ao método do executor @response_handler

Configurando o ambiente

Primeiro, instale os pacotes necessários:

pip install agent-framework-core --pre
pip install azure-identity

Definir modelos de solicitação e resposta

Comece por definir as estruturas de dados para a comunicação pedido-resposta:

import asyncio
from dataclasses import dataclass
from pydantic import BaseModel

from agent_framework import (
    AgentExecutor,
    AgentExecutorRequest,
    AgentExecutorResponse,
    ChatMessage,
    Executor,
    RequestInfoEvent,
    Role,
    WorkflowBuilder,
    WorkflowContext,
    WorkflowOutputEvent,
    WorkflowRunState,
    WorkflowStatusEvent,
    handler,
    response_handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

@dataclass
class HumanFeedbackRequest:
    """Request message for human feedback in the guessing game."""
    prompt: str = ""
    guess: int | None = None

class GuessOutput(BaseModel):
    """Structured output from the AI agent with response_format enforcement."""
    guess: int

O HumanFeedbackRequest é uma classe de dados simples para cargas úteis de solicitação estruturada:

  • Digitação forte para cargas úteis de solicitação
  • Validação compatível com versões futuras
  • Semântica de correlação clara com as respostas
  • Campos contextuais (como o palpite anterior) para prompts avançados da interface do usuário

Criar o Gerenciador de Turnos

O gerenciador de turnos coordena o fluxo entre o agente de IA e o humano:

class TurnManager(Executor):
    """Coordinates turns between the AI agent and human player.

    Responsibilities:
    - Start the game by requesting the agent's first guess
    - Process agent responses and request human feedback
    - Handle human feedback and continue the game or finish
    """

    def __init__(self, id: str | None = None):
        super().__init__(id=id or "turn_manager")

    @handler
    async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
        """Start the game by asking the agent for an initial guess."""
        user = ChatMessage(Role.USER, text="Start by making your first guess.")
        await ctx.send_message(AgentExecutorRequest(messages=[user], should_respond=True))

    @handler
    async def on_agent_response(
        self,
        result: AgentExecutorResponse,
        ctx: WorkflowContext,
    ) -> None:
        """Handle the agent's guess and request human guidance."""
        # Parse structured model output (defensive default if agent didn't reply)
        text = result.agent_run_response.text or ""
        last_guess = GuessOutput.model_validate_json(text).guess if text else None

        # Craft a clear human prompt that defines higher/lower relative to agent's guess
        prompt = (
            f"The agent guessed: {last_guess if last_guess is not None else text}. "
            "Type one of: higher (your number is higher than this guess), "
            "lower (your number is lower than this guess), correct, or exit."
        )
        # Send a request using the request_info API
        await ctx.request_info(
            request_data=HumanFeedbackRequest(prompt=prompt, guess=last_guess),
            response_type=str
        )

    @response_handler
    async def on_human_feedback(
        self,
        original_request: HumanFeedbackRequest,
        feedback: str,
        ctx: WorkflowContext[AgentExecutorRequest, str],
    ) -> None:
        """Continue the game or finish based on human feedback."""
        reply = feedback.strip().lower()
        # Use the correlated request's guess to avoid extra state reads
        last_guess = original_request.guess

        if reply == "correct":
            await ctx.yield_output(f"Guessed correctly: {last_guess}")
            return

        # Provide feedback to the agent for the next guess
        user_msg = ChatMessage(
            Role.USER,
            text=f'Feedback: {reply}. Return ONLY a JSON object matching the schema {{"guess": <int 1..10>}}.',
        )
        await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))

Criar o fluxo de trabalho

Crie o fluxo de trabalho principal que conecta todos os componentes:

async def main() -> None:
    # Create the chat agent with structured output enforcement
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
    agent = chat_client.create_agent(
        instructions=(
            "You guess a number between 1 and 10. "
            "If the user says 'higher' or 'lower', adjust your next guess. "
            'You MUST return ONLY a JSON object exactly matching this schema: {"guess": <integer 1..10>}. '
            "No explanations or additional text."
        ),
        response_format=GuessOutput,
    )

    # Create workflow components
    turn_manager = TurnManager(id="turn_manager")
    agent_exec = AgentExecutor(agent=agent, id="agent")

    # Build the workflow graph
    workflow = (
        WorkflowBuilder()
        .set_start_executor(turn_manager)
        .add_edge(turn_manager, agent_exec)  # Ask agent to make/adjust a guess
        .add_edge(agent_exec, turn_manager)  # Agent's response goes back to coordinator
        .build()
    )

    # Execute the interactive workflow
    await run_interactive_workflow(workflow)

async def run_interactive_workflow(workflow):
    """Run the workflow with human-in-the-loop interaction."""
    pending_responses: dict[str, str] | None = None
    completed = False
    workflow_output: str | None = None

    print("🎯 Number Guessing Game")
    print("Think of a number between 1 and 10, and I'll try to guess it!")
    print("-" * 50)

    while not completed:
        # First iteration uses run_stream("start")
        # Subsequent iterations use send_responses_streaming with pending responses
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("start")
        )

        # Collect events for this turn
        events = [event async for event in stream]
        pending_responses = None

        # Process events to collect requests and detect completion
        requests: list[tuple[str, str]] = []  # (request_id, prompt)
        for event in events:
            if isinstance(event, RequestInfoEvent) and isinstance(event.data, HumanFeedbackRequest):
                # RequestInfoEvent for our HumanFeedbackRequest
                requests.append((event.request_id, event.data.prompt))
            elif isinstance(event, WorkflowOutputEvent):
                # Capture workflow output when yielded
                workflow_output = str(event.data)
                completed = True

        # Check workflow status
        pending_status = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
            for e in events
        )
        idle_with_requests = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
            for e in events
        )

        if pending_status:
            print("🔄 State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)")
        if idle_with_requests:
            print("⏸️  State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)")

        # Handle human requests if any
        if requests and not completed:
            responses: dict[str, str] = {}
            for req_id, prompt in requests:
                print(f"\n🤖 {prompt}")
                answer = input("👤 Enter higher/lower/correct/exit: ").lower()

                if answer == "exit":
                    print("👋 Exiting...")
                    return
                responses[req_id] = answer
            pending_responses = responses

    # Show final result
    print(f"\n🎉 {workflow_output}")

Executando o exemplo

Para consultar a implementação completa funcional, veja o exemplo de jogo de adivinhação Human-in-the-Loop.

Como funciona

  1. Inicialização do fluxo de trabalho: o fluxo de trabalho começa com a TurnManager solicitação de uma estimativa inicial do agente de IA.

  2. Resposta do agente: O agente de IA faz uma suposição e retorna JSON estruturado, que flui de volta para o TurnManager.

  3. Solicitação humana: O TurnManager processa o palpite do agente e chama ctx.request_info() com um HumanFeedbackRequest.

  4. Pausa do fluxo de trabalho: o fluxo de trabalho emite um RequestInfoEvent e continua até que nenhuma outra ação possa ser executada e, em seguida, aguarda a entrada humana.

  5. Resposta humana: O aplicativo externo coleta informações humanas e envia respostas de volta usando send_responses_streaming()o .

  6. Retomar e continuar: O fluxo de trabalho é retomado, o método do TurnManager processa @response_handlero feedback humano e termina o jogo ou envia outra solicitação ao agente.

Principais Benefícios

  • Comunicação estruturada: modelos de solicitação e resposta seguros para tipos evitam erros de tempo de execução
  • Correlação: os IDs de solicitação garantem que as respostas correspondam às solicitações corretas
  • Execução pausavel: os fluxos de trabalho podem pausar indefinidamente enquanto aguardam a entrada externa
  • Preservação de Estado: o estado do fluxo de trabalho é mantido durante os ciclos de pausa-retomada
  • Orientado a eventos: o avançado sistema de eventos fornece visibilidade sobre o status e as transições do fluxo de trabalho

Esse padrão permite a criação de aplicativos interativos sofisticados onde agentes de IA e humanos colaboram perfeitamente em fluxos de trabalho estruturados.

Próximas Etapas