Delen via


Aanvragen en antwoorden verwerken in werkstromen

Deze zelfstudie laat zien hoe u aanvragen en antwoorden in werkstromen verwerkt met behulp van Agent Framework-werkstromen. U leert hoe u interactieve werkstromen maakt die de uitvoering kunnen onderbreken om invoer van externe bronnen (zoals mensen of andere systemen) aan te vragen en vervolgens te hervatten zodra er een antwoord is opgegeven.

Behandelde concepten

In .NET gebruiken human-in-the-loop-werkstromen RequestPort en het verwerken van externe verzoeken om de uitvoering te onderbreken en gebruikersinvoer te verzamelen. Dit patroon maakt interactieve werkstromen mogelijk waarbij het systeem tijdens de uitvoering informatie van externe bronnen kan aanvragen.

Vereiste voorwaarden

NuGet-pakketten installeren

Installeer eerst de vereiste pakketten voor uw .NET-project:

dotnet add package Microsoft.Agents.AI.Workflows --prerelease

Belangrijke onderdelen

RequestPort en externe aanvragen

Een RequestPort fungeert als een brug tussen de werkstroom en externe invoerbronnen. Wanneer de werkstroom invoer nodig heeft, wordt er een RequestInfoEvent gegenereerd die door uw toepassing wordt verwerkt:

// Create a RequestPort for handling human input requests
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

Signaaltypen

Signaaltypen definiëren om verschillende aanvraagtypen te communiceren:

/// <summary>
/// Signals used for communication between guesses and the JudgeExecutor.
/// </summary>
internal enum NumberSignal
{
    Init,     // Initial guess request
    Above,    // Previous guess was too high
    Below,    // Previous guess was too low
}

Werkstroomuitvoerder

Maak uitvoerders die gebruikersinvoer verwerken en feedback geven:

/// <summary>
/// Executor that judges the guess and provides feedback.
/// </summary>
internal sealed class JudgeExecutor : Executor<int>("Judge")
{
    private readonly int _targetNumber;
    private int _tries;

    public JudgeExecutor(int targetNumber) : this()
    {
        _targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken)
    {
        _tries++;
        if (message == _targetNumber)
        {
            await context.YieldOutputAsync($"{_targetNumber} found in {_tries} tries!", cancellationToken)
                         .ConfigureAwait(false);
        }
        else if (message < _targetNumber)
        {
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken).ConfigureAwait(false);
        }
        else
        {
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken).ConfigureAwait(false);
        }
    }
}

De werkstroom bouwen

Verbind de RequestPort en de uitvoerder in een feedbacklus:

internal static class WorkflowHelper
{
    internal static ValueTask<Workflow<NumberSignal>> GetWorkflowAsync()
    {
        // Create the executors
        RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
        JudgeExecutor judgeExecutor = new(42);

        // Build the workflow by connecting executors in a loop
        return new WorkflowBuilder(numberRequestPort)
            .AddEdge(numberRequestPort, judgeExecutor)
            .AddEdge(judgeExecutor, numberRequestPort)
            .WithOutputFrom(judgeExecutor)
            .BuildAsync<NumberSignal>();
    }
}

De interactieve werkstroom uitvoeren

Externe aanvragen verwerken tijdens de uitvoering van de werkstroom:

private static async Task Main()
{
    // Create the workflow
    var workflow = await WorkflowHelper.GetWorkflowAsync().ConfigureAwait(false);

    // Execute the workflow
    await using StreamingRun handle = await InProcessExecution.StreamAsync(workflow, NumberSignal.Init).ConfigureAwait(false);
    await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
    {
        switch (evt)
        {
            case RequestInfoEvent requestInputEvt:
                // Handle human input request from the workflow
                ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
                await handle.SendResponseAsync(response).ConfigureAwait(false);
                break;

            case WorkflowOutputEvent outputEvt:
                // The workflow has yielded output
                Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
                return;
        }
    }
}

Verwerking van aanvragen

Verschillende typen invoeraanvragen verwerken:

private static ExternalResponse HandleExternalRequest(ExternalRequest request)
{
    switch (request.DataAs<NumberSignal?>())
    {
        case NumberSignal.Init:
            int initialGuess = ReadIntegerFromConsole("Please provide your initial guess: ");
            return request.CreateResponse(initialGuess);
        case NumberSignal.Above:
            int lowerGuess = ReadIntegerFromConsole("You previously guessed too large. Please provide a new guess: ");
            return request.CreateResponse(lowerGuess);
        case NumberSignal.Below:
            int higherGuess = ReadIntegerFromConsole("You previously guessed too small. Please provide a new guess: ");
            return request.CreateResponse(higherGuess);
        default:
            throw new ArgumentException("Unexpected request type.");
    }
}

private static int ReadIntegerFromConsole(string prompt)
{
    while (true)
    {
        Console.Write(prompt);
        string? input = Console.ReadLine();
        if (int.TryParse(input, out int value))
        {
            return value;
        }
        Console.WriteLine("Invalid input. Please enter a valid integer.");
    }
}

Implementatieconcepten

RequestInfoEvent Flow

  1. Werkstroomuitvoering: de werkstroom wordt verwerkt totdat externe invoer nodig is
  2. Aanvraag genereren: RequestPort genereert een RequestInfoEvent met de aanvraagdetails
  3. Externe verwerking: uw toepassing merkt de gebeurtenis op en verzamelt gebruikersinvoer
  4. Antwoord indienen: Stuur een ExternalResponse terug om door te gaan met de werkstroom
  5. Werkstroomhervatting: de werkstroom wordt verder verwerkt met de opgegeven invoer

Levenscyclus van werkstroom

  • Streaminguitvoering: gebruiken StreamAsync om gebeurtenissen in realtime te bewaken
  • Gebeurtenisafhandeling: proces RequestInfoEvent voor invoeraanvragen en WorkflowOutputEvent voor voltooiing
  • Antwoordcoördinatie: antwoorden koppelen aan aanvragen met behulp van het mechanisme voor het verwerken van reacties van de werkstroom

Stroom van Implementatie

  1. Werkstroom initialisatie: de werkstroom begint met het verzenden van een NumberSignal.Init naar de RequestPort.

  2. Aanvraag genereren: De RequestPort genereert een RequestInfoEvent en vraagt om een eerste schatting van de gebruiker.

  3. Werkstroom onderbreken: de werkstroom wordt onderbroken en wacht op externe invoer terwijl de toepassing de aanvraag verwerkt.

  4. Menselijke reactie: de externe toepassing verzamelt gebruikersinvoer en stuurt een ExternalResponse terug naar de werkstroom.

  5. Verwerking en feedback: de JudgeExecutor schatting wordt verwerkt en de werkstroom wordt voltooid of er wordt een nieuw signaal (boven/onder) verzonden om een andere schatting aan te vragen.

  6. Lusvoortzetting: Het proces herhaalt zich totdat het juiste getal wordt geraden.

Frameworkvoordelen

  • Typeveiligheid: Sterke typen zorgen ervoor dat aanvragen-responscontracten worden onderhouden
  • Gebeurtenisgestuurd: uitgebreid gebeurtenissysteem biedt inzicht in de uitvoering van de werkstroom
  • Onderbroken uitvoering: werkstromen kunnen voor onbepaalde tijd worden onderbroken terwijl wordt gewacht op externe invoer
  • Statusbeheer: de Workflow-status blijft behouden tijdens onderbreken-hervattencycli
  • Flexibele integratie: RequestPorts kan worden geïntegreerd met elke externe invoerbron (UI, API, console, enzovoort)

Volledig voorbeeld

Zie het Human-in-the-Loop Basic-voorbeeld voor de volledige werkende implementatie.

Met dit patroon kunt u geavanceerde interactieve toepassingen bouwen, waar gebruikers input kunnen leveren op belangrijke beslissingspunten binnen geautomatiseerde werkstromen.

Wat je gaat bouwen

Maak een interactieve workflow voor een getalraadspel dat vraag-antwoordpatronen demonstreert.

  • Een AI-agent die intelligente schattingen maakt
  • Uitvoerders die aanvragen rechtstreeks kunnen verzenden met behulp van de request_info API
  • Een turnmanager die coördineert tussen de agent en menselijke interacties met behulp van @response_handler
  • Interactieve consoleinvoer/uitvoer voor realtime feedback

Vereiste voorwaarden

  • Python 3.10 of hoger
  • Azure OpenAI-implementatie geconfigureerd
  • Geconfigureerde Azure CLI-verificatie (az login)
  • Basiskennis van asynchroon programmeren in Python

Sleutelbegrippen

Mogelijkheden voor aanvragen en antwoorden

Uitvoerders hebben ingebouwde aanvraag- en antwoordsystemen die mens-in-de-lus-interacties faciliteren.

  • Roep ctx.request_info(request_data=request_data, response_type=response_type) aan om aanvragen te versturen
  • Gebruik de @response_handler decorator om antwoorden te verwerken.
  • Aangepaste aanvraag-/antwoordtypen definiëren zonder overnamevereisten

Request-Response-verloop

Uitvoerders kunnen aanvragen rechtstreeks verzenden met ctx.request_info() en antwoorden verwerken met de @response_handler decorator.

  1. Executor-aanroepen ctx.request_info(request_data=request_data, response_type=response_type)
  2. Werkstroom verzendt een RequestInfoEvent met de aanvraaggegevens
  3. Het externe systeem (human, API, enzovoort) verwerkt de aanvraag
  4. Antwoord wordt teruggestuurd via send_responses_streaming()
  5. De werkstroom wordt hervat en levert het antwoord aan de uitvoerder's @response_handler-methode.

De omgeving instellen

Installeer eerst de vereiste pakketten:

pip install agent-framework-core --pre
pip install azure-identity

Aanvraag- en antwoordmodellen definiëren

Begin met het definiëren van de gegevensstructuren voor communicatie tussen aanvragen en antwoorden:

import asyncio
from dataclasses import dataclass
from pydantic import BaseModel

from agent_framework import (
    AgentExecutor,
    AgentExecutorRequest,
    AgentExecutorResponse,
    ChatMessage,
    Executor,
    RequestInfoEvent,
    Role,
    WorkflowBuilder,
    WorkflowContext,
    WorkflowOutputEvent,
    WorkflowRunState,
    WorkflowStatusEvent,
    handler,
    response_handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

@dataclass
class HumanFeedbackRequest:
    """Request message for human feedback in the guessing game."""
    prompt: str = ""
    guess: int | None = None

class GuessOutput(BaseModel):
    """Structured output from the AI agent with response_format enforcement."""
    guess: int

Dit HumanFeedbackRequest is een eenvoudige gegevensklasse voor gestructureerde aanvraagpayloads.

  • Sterke typage voor verzoekgegevens
  • Voorwaarts-compatibele validatie
  • Verwijder correlatiesemantiek met reacties
  • Contextuele velden (zoals de vorige schatting) voor uitgebreide ui-prompts

Turn Manager maken

De turnmanager coördineert de stroom tussen de AI-agent en de mens:

class TurnManager(Executor):
    """Coordinates turns between the AI agent and human player.

    Responsibilities:
    - Start the game by requesting the agent's first guess
    - Process agent responses and request human feedback
    - Handle human feedback and continue the game or finish
    """

    def __init__(self, id: str | None = None):
        super().__init__(id=id or "turn_manager")

    @handler
    async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
        """Start the game by asking the agent for an initial guess."""
        user = ChatMessage(Role.USER, text="Start by making your first guess.")
        await ctx.send_message(AgentExecutorRequest(messages=[user], should_respond=True))

    @handler
    async def on_agent_response(
        self,
        result: AgentExecutorResponse,
        ctx: WorkflowContext,
    ) -> None:
        """Handle the agent's guess and request human guidance."""
        # Parse structured model output (defensive default if agent didn't reply)
        text = result.agent_run_response.text or ""
        last_guess = GuessOutput.model_validate_json(text).guess if text else None

        # Craft a clear human prompt that defines higher/lower relative to agent's guess
        prompt = (
            f"The agent guessed: {last_guess if last_guess is not None else text}. "
            "Type one of: higher (your number is higher than this guess), "
            "lower (your number is lower than this guess), correct, or exit."
        )
        # Send a request using the request_info API
        await ctx.request_info(
            request_data=HumanFeedbackRequest(prompt=prompt, guess=last_guess),
            response_type=str
        )

    @response_handler
    async def on_human_feedback(
        self,
        original_request: HumanFeedbackRequest,
        feedback: str,
        ctx: WorkflowContext[AgentExecutorRequest, str],
    ) -> None:
        """Continue the game or finish based on human feedback."""
        reply = feedback.strip().lower()
        # Use the correlated request's guess to avoid extra state reads
        last_guess = original_request.guess

        if reply == "correct":
            await ctx.yield_output(f"Guessed correctly: {last_guess}")
            return

        # Provide feedback to the agent for the next guess
        user_msg = ChatMessage(
            Role.USER,
            text=f'Feedback: {reply}. Return ONLY a JSON object matching the schema {{"guess": <int 1..10>}}.',
        )
        await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))

De werkstroom bouwen

Maak de hoofdwerkstroom waarmee alle onderdelen worden verbonden:

async def main() -> None:
    # Create the chat agent with structured output enforcement
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
    agent = chat_client.create_agent(
        instructions=(
            "You guess a number between 1 and 10. "
            "If the user says 'higher' or 'lower', adjust your next guess. "
            'You MUST return ONLY a JSON object exactly matching this schema: {"guess": <integer 1..10>}. '
            "No explanations or additional text."
        ),
        response_format=GuessOutput,
    )

    # Create workflow components
    turn_manager = TurnManager(id="turn_manager")
    agent_exec = AgentExecutor(agent=agent, id="agent")

    # Build the workflow graph
    workflow = (
        WorkflowBuilder()
        .set_start_executor(turn_manager)
        .add_edge(turn_manager, agent_exec)  # Ask agent to make/adjust a guess
        .add_edge(agent_exec, turn_manager)  # Agent's response goes back to coordinator
        .build()
    )

    # Execute the interactive workflow
    await run_interactive_workflow(workflow)

async def run_interactive_workflow(workflow):
    """Run the workflow with human-in-the-loop interaction."""
    pending_responses: dict[str, str] | None = None
    completed = False
    workflow_output: str | None = None

    print("🎯 Number Guessing Game")
    print("Think of a number between 1 and 10, and I'll try to guess it!")
    print("-" * 50)

    while not completed:
        # First iteration uses run_stream("start")
        # Subsequent iterations use send_responses_streaming with pending responses
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("start")
        )

        # Collect events for this turn
        events = [event async for event in stream]
        pending_responses = None

        # Process events to collect requests and detect completion
        requests: list[tuple[str, str]] = []  # (request_id, prompt)
        for event in events:
            if isinstance(event, RequestInfoEvent) and isinstance(event.data, HumanFeedbackRequest):
                # RequestInfoEvent for our HumanFeedbackRequest
                requests.append((event.request_id, event.data.prompt))
            elif isinstance(event, WorkflowOutputEvent):
                # Capture workflow output when yielded
                workflow_output = str(event.data)
                completed = True

        # Check workflow status
        pending_status = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
            for e in events
        )
        idle_with_requests = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
            for e in events
        )

        if pending_status:
            print("🔄 State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)")
        if idle_with_requests:
            print("⏸️  State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)")

        # Handle human requests if any
        if requests and not completed:
            responses: dict[str, str] = {}
            for req_id, prompt in requests:
                print(f"\n🤖 {prompt}")
                answer = input("👤 Enter higher/lower/correct/exit: ").lower()

                if answer == "exit":
                    print("👋 Exiting...")
                    return
                responses[req_id] = answer
            pending_responses = responses

    # Show final result
    print(f"\n🎉 {workflow_output}")

Het voorbeeld uitvoeren

Zie het voorbeeld Human-in-the-Loop raadspel voor de volledig functionerende implementatie.

Hoe het werkt

  1. Werkstroom initialisatie: de werkstroom begint met het TurnManager aanvragen van een eerste schatting van de AI-agent.

  2. Agentantwoord: De AI-agent maakt een schatting en retourneert gestructureerde JSON, die terugloopt naar de TurnManager.

  3. Menselijke aanvraag: het proces verwerkt de schatting van de agent en roept TurnManager aan met een ctx.request_info().

  4. Werkstroom onderbreken: de werkstroom verzendt een RequestInfoEvent en gaat door totdat er geen verdere acties kunnen worden uitgevoerd en wacht vervolgens op menselijke invoer.

  5. Menselijke reactie: de externe toepassing verzamelt menselijke invoer en stuurt antwoorden terug met behulp van send_responses_streaming().

  6. Hervatten en doorgaan: de werkstroom hervat, de TurnManager@response_handler methode verwerkt de menselijke feedback en beëindigt het spel of stuurt een andere aanvraag naar de agent.

Belangrijkste voordelen

  • Gestructureerde communicatie: Typeveilige aanvraag- en responsmodellen voorkomen runtimefouten
  • Correlatie: Aanvraag-id's zorgen ervoor dat antwoorden overeenkomen met de juiste aanvragen
  • Onderbroken uitvoering: werkstromen kunnen voor onbepaalde tijd worden onderbroken terwijl wordt gewacht op externe invoer
  • Statusbehoud: de werkstroomstatus wordt behouden tijdens pauze-hervattingscycli
  • Gebeurtenisgestuurd: uitgebreid gebeurtenissysteem biedt inzicht in de werkstroomstatus en -overgangen

Met dit patroon kunt u geavanceerde interactieve toepassingen bouwen waarbij AI-agents en mensen naadloos samenwerken binnen gestructureerde werkstromen.

Volgende stappen