Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Deze zelfstudie laat zien hoe u aanvragen en antwoorden in werkstromen verwerkt met behulp van Agent Framework-werkstromen. U leert hoe u interactieve werkstromen maakt die de uitvoering kunnen onderbreken om invoer van externe bronnen (zoals mensen of andere systemen) aan te vragen en vervolgens te hervatten zodra er een antwoord is opgegeven.
Behandelde concepten
In .NET gebruiken human-in-the-loop-werkstromen RequestPort en het verwerken van externe verzoeken om de uitvoering te onderbreken en gebruikersinvoer te verzamelen. Dit patroon maakt interactieve werkstromen mogelijk waarbij het systeem tijdens de uitvoering informatie van externe bronnen kan aanvragen.
Vereiste voorwaarden
- .NET 8.0 SDK of hoger.
- Azure OpenAI-service-eindpunt en -implementatie geconfigureerd.
- Azure CLI is geïnstalleerd en geverifieerd (voor Azure-referentieverificatie).
- Basiskennis van C# en asynchroon programmeren.
- Een nieuwe consoletoepassing.
NuGet-pakketten installeren
Installeer eerst de vereiste pakketten voor uw .NET-project:
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
Belangrijke onderdelen
RequestPort en externe aanvragen
Een RequestPort fungeert als een brug tussen de werkstroom en externe invoerbronnen. Wanneer de werkstroom invoer nodig heeft, wordt er een RequestInfoEvent gegenereerd die door uw toepassing wordt verwerkt:
// Create a RequestPort for handling human input requests
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
Signaaltypen
Signaaltypen definiëren om verschillende aanvraagtypen te communiceren:
/// <summary>
/// Signals used for communication between guesses and the JudgeExecutor.
/// </summary>
internal enum NumberSignal
{
Init, // Initial guess request
Above, // Previous guess was too high
Below, // Previous guess was too low
}
Werkstroomuitvoerder
Maak uitvoerders die gebruikersinvoer verwerken en feedback geven:
/// <summary>
/// Executor that judges the guess and provides feedback.
/// </summary>
internal sealed class JudgeExecutor : Executor<int>("Judge")
{
private readonly int _targetNumber;
private int _tries;
public JudgeExecutor(int targetNumber) : this()
{
_targetNumber = targetNumber;
}
public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken)
{
_tries++;
if (message == _targetNumber)
{
await context.YieldOutputAsync($"{_targetNumber} found in {_tries} tries!", cancellationToken)
.ConfigureAwait(false);
}
else if (message < _targetNumber)
{
await context.SendMessageAsync(NumberSignal.Below, cancellationToken).ConfigureAwait(false);
}
else
{
await context.SendMessageAsync(NumberSignal.Above, cancellationToken).ConfigureAwait(false);
}
}
}
De werkstroom bouwen
Verbind de RequestPort en de uitvoerder in een feedbacklus:
internal static class WorkflowHelper
{
internal static ValueTask<Workflow<NumberSignal>> GetWorkflowAsync()
{
// Create the executors
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
JudgeExecutor judgeExecutor = new(42);
// Build the workflow by connecting executors in a loop
return new WorkflowBuilder(numberRequestPort)
.AddEdge(numberRequestPort, judgeExecutor)
.AddEdge(judgeExecutor, numberRequestPort)
.WithOutputFrom(judgeExecutor)
.BuildAsync<NumberSignal>();
}
}
De interactieve werkstroom uitvoeren
Externe aanvragen verwerken tijdens de uitvoering van de werkstroom:
private static async Task Main()
{
// Create the workflow
var workflow = await WorkflowHelper.GetWorkflowAsync().ConfigureAwait(false);
// Execute the workflow
await using StreamingRun handle = await InProcessExecution.StreamAsync(workflow, NumberSignal.Init).ConfigureAwait(false);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle human input request from the workflow
ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
await handle.SendResponseAsync(response).ConfigureAwait(false);
break;
case WorkflowOutputEvent outputEvt:
// The workflow has yielded output
Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
return;
}
}
}
Verwerking van aanvragen
Verschillende typen invoeraanvragen verwerken:
private static ExternalResponse HandleExternalRequest(ExternalRequest request)
{
switch (request.DataAs<NumberSignal?>())
{
case NumberSignal.Init:
int initialGuess = ReadIntegerFromConsole("Please provide your initial guess: ");
return request.CreateResponse(initialGuess);
case NumberSignal.Above:
int lowerGuess = ReadIntegerFromConsole("You previously guessed too large. Please provide a new guess: ");
return request.CreateResponse(lowerGuess);
case NumberSignal.Below:
int higherGuess = ReadIntegerFromConsole("You previously guessed too small. Please provide a new guess: ");
return request.CreateResponse(higherGuess);
default:
throw new ArgumentException("Unexpected request type.");
}
}
private static int ReadIntegerFromConsole(string prompt)
{
while (true)
{
Console.Write(prompt);
string? input = Console.ReadLine();
if (int.TryParse(input, out int value))
{
return value;
}
Console.WriteLine("Invalid input. Please enter a valid integer.");
}
}
Implementatieconcepten
RequestInfoEvent Flow
- Werkstroomuitvoering: de werkstroom wordt verwerkt totdat externe invoer nodig is
-
Aanvraag genereren: RequestPort genereert een
RequestInfoEventmet de aanvraagdetails - Externe verwerking: uw toepassing merkt de gebeurtenis op en verzamelt gebruikersinvoer
-
Antwoord indienen: Stuur een
ExternalResponseterug om door te gaan met de werkstroom - Werkstroomhervatting: de werkstroom wordt verder verwerkt met de opgegeven invoer
Levenscyclus van werkstroom
-
Streaminguitvoering: gebruiken
StreamAsyncom gebeurtenissen in realtime te bewaken -
Gebeurtenisafhandeling: proces
RequestInfoEventvoor invoeraanvragen enWorkflowOutputEventvoor voltooiing - Antwoordcoördinatie: antwoorden koppelen aan aanvragen met behulp van het mechanisme voor het verwerken van reacties van de werkstroom
Stroom van Implementatie
Werkstroom initialisatie: de werkstroom begint met het verzenden van een
NumberSignal.Initnaar de RequestPort.Aanvraag genereren: De RequestPort genereert een
RequestInfoEventen vraagt om een eerste schatting van de gebruiker.Werkstroom onderbreken: de werkstroom wordt onderbroken en wacht op externe invoer terwijl de toepassing de aanvraag verwerkt.
Menselijke reactie: de externe toepassing verzamelt gebruikersinvoer en stuurt een
ExternalResponseterug naar de werkstroom.Verwerking en feedback: de
JudgeExecutorschatting wordt verwerkt en de werkstroom wordt voltooid of er wordt een nieuw signaal (boven/onder) verzonden om een andere schatting aan te vragen.Lusvoortzetting: Het proces herhaalt zich totdat het juiste getal wordt geraden.
Frameworkvoordelen
- Typeveiligheid: Sterke typen zorgen ervoor dat aanvragen-responscontracten worden onderhouden
- Gebeurtenisgestuurd: uitgebreid gebeurtenissysteem biedt inzicht in de uitvoering van de werkstroom
- Onderbroken uitvoering: werkstromen kunnen voor onbepaalde tijd worden onderbroken terwijl wordt gewacht op externe invoer
- Statusbeheer: de Workflow-status blijft behouden tijdens onderbreken-hervattencycli
- Flexibele integratie: RequestPorts kan worden geïntegreerd met elke externe invoerbron (UI, API, console, enzovoort)
Volledig voorbeeld
Zie het Human-in-the-Loop Basic-voorbeeld voor de volledige werkende implementatie.
Met dit patroon kunt u geavanceerde interactieve toepassingen bouwen, waar gebruikers input kunnen leveren op belangrijke beslissingspunten binnen geautomatiseerde werkstromen.
Wat je gaat bouwen
Maak een interactieve workflow voor een getalraadspel dat vraag-antwoordpatronen demonstreert.
- Een AI-agent die intelligente schattingen maakt
- Uitvoerders die aanvragen rechtstreeks kunnen verzenden met behulp van de
request_infoAPI - Een turnmanager die coördineert tussen de agent en menselijke interacties met behulp van
@response_handler - Interactieve consoleinvoer/uitvoer voor realtime feedback
Vereiste voorwaarden
- Python 3.10 of hoger
- Azure OpenAI-implementatie geconfigureerd
- Geconfigureerde Azure CLI-verificatie (
az login) - Basiskennis van asynchroon programmeren in Python
Sleutelbegrippen
Mogelijkheden voor aanvragen en antwoorden
Uitvoerders hebben ingebouwde aanvraag- en antwoordsystemen die mens-in-de-lus-interacties faciliteren.
- Roep
ctx.request_info(request_data=request_data, response_type=response_type)aan om aanvragen te versturen - Gebruik de
@response_handlerdecorator om antwoorden te verwerken. - Aangepaste aanvraag-/antwoordtypen definiëren zonder overnamevereisten
Request-Response-verloop
Uitvoerders kunnen aanvragen rechtstreeks verzenden met ctx.request_info() en antwoorden verwerken met de @response_handler decorator.
- Executor-aanroepen
ctx.request_info(request_data=request_data, response_type=response_type) - Werkstroom verzendt een
RequestInfoEventmet de aanvraaggegevens - Het externe systeem (human, API, enzovoort) verwerkt de aanvraag
- Antwoord wordt teruggestuurd via
send_responses_streaming() - De werkstroom wordt hervat en levert het antwoord aan de uitvoerder's
@response_handler-methode.
De omgeving instellen
Installeer eerst de vereiste pakketten:
pip install agent-framework-core --pre
pip install azure-identity
Aanvraag- en antwoordmodellen definiëren
Begin met het definiëren van de gegevensstructuren voor communicatie tussen aanvragen en antwoorden:
import asyncio
from dataclasses import dataclass
from pydantic import BaseModel
from agent_framework import (
AgentExecutor,
AgentExecutorRequest,
AgentExecutorResponse,
ChatMessage,
Executor,
RequestInfoEvent,
Role,
WorkflowBuilder,
WorkflowContext,
WorkflowOutputEvent,
WorkflowRunState,
WorkflowStatusEvent,
handler,
response_handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
@dataclass
class HumanFeedbackRequest:
"""Request message for human feedback in the guessing game."""
prompt: str = ""
guess: int | None = None
class GuessOutput(BaseModel):
"""Structured output from the AI agent with response_format enforcement."""
guess: int
Dit HumanFeedbackRequest is een eenvoudige gegevensklasse voor gestructureerde aanvraagpayloads.
- Sterke typage voor verzoekgegevens
- Voorwaarts-compatibele validatie
- Verwijder correlatiesemantiek met reacties
- Contextuele velden (zoals de vorige schatting) voor uitgebreide ui-prompts
Turn Manager maken
De turnmanager coördineert de stroom tussen de AI-agent en de mens:
class TurnManager(Executor):
"""Coordinates turns between the AI agent and human player.
Responsibilities:
- Start the game by requesting the agent's first guess
- Process agent responses and request human feedback
- Handle human feedback and continue the game or finish
"""
def __init__(self, id: str | None = None):
super().__init__(id=id or "turn_manager")
@handler
async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Start the game by asking the agent for an initial guess."""
user = ChatMessage(Role.USER, text="Start by making your first guess.")
await ctx.send_message(AgentExecutorRequest(messages=[user], should_respond=True))
@handler
async def on_agent_response(
self,
result: AgentExecutorResponse,
ctx: WorkflowContext,
) -> None:
"""Handle the agent's guess and request human guidance."""
# Parse structured model output (defensive default if agent didn't reply)
text = result.agent_run_response.text or ""
last_guess = GuessOutput.model_validate_json(text).guess if text else None
# Craft a clear human prompt that defines higher/lower relative to agent's guess
prompt = (
f"The agent guessed: {last_guess if last_guess is not None else text}. "
"Type one of: higher (your number is higher than this guess), "
"lower (your number is lower than this guess), correct, or exit."
)
# Send a request using the request_info API
await ctx.request_info(
request_data=HumanFeedbackRequest(prompt=prompt, guess=last_guess),
response_type=str
)
@response_handler
async def on_human_feedback(
self,
original_request: HumanFeedbackRequest,
feedback: str,
ctx: WorkflowContext[AgentExecutorRequest, str],
) -> None:
"""Continue the game or finish based on human feedback."""
reply = feedback.strip().lower()
# Use the correlated request's guess to avoid extra state reads
last_guess = original_request.guess
if reply == "correct":
await ctx.yield_output(f"Guessed correctly: {last_guess}")
return
# Provide feedback to the agent for the next guess
user_msg = ChatMessage(
Role.USER,
text=f'Feedback: {reply}. Return ONLY a JSON object matching the schema {{"guess": <int 1..10>}}.',
)
await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))
De werkstroom bouwen
Maak de hoofdwerkstroom waarmee alle onderdelen worden verbonden:
async def main() -> None:
# Create the chat agent with structured output enforcement
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
agent = chat_client.create_agent(
instructions=(
"You guess a number between 1 and 10. "
"If the user says 'higher' or 'lower', adjust your next guess. "
'You MUST return ONLY a JSON object exactly matching this schema: {"guess": <integer 1..10>}. '
"No explanations or additional text."
),
response_format=GuessOutput,
)
# Create workflow components
turn_manager = TurnManager(id="turn_manager")
agent_exec = AgentExecutor(agent=agent, id="agent")
# Build the workflow graph
workflow = (
WorkflowBuilder()
.set_start_executor(turn_manager)
.add_edge(turn_manager, agent_exec) # Ask agent to make/adjust a guess
.add_edge(agent_exec, turn_manager) # Agent's response goes back to coordinator
.build()
)
# Execute the interactive workflow
await run_interactive_workflow(workflow)
async def run_interactive_workflow(workflow):
"""Run the workflow with human-in-the-loop interaction."""
pending_responses: dict[str, str] | None = None
completed = False
workflow_output: str | None = None
print("🎯 Number Guessing Game")
print("Think of a number between 1 and 10, and I'll try to guess it!")
print("-" * 50)
while not completed:
# First iteration uses run_stream("start")
# Subsequent iterations use send_responses_streaming with pending responses
stream = (
workflow.send_responses_streaming(pending_responses)
if pending_responses
else workflow.run_stream("start")
)
# Collect events for this turn
events = [event async for event in stream]
pending_responses = None
# Process events to collect requests and detect completion
requests: list[tuple[str, str]] = [] # (request_id, prompt)
for event in events:
if isinstance(event, RequestInfoEvent) and isinstance(event.data, HumanFeedbackRequest):
# RequestInfoEvent for our HumanFeedbackRequest
requests.append((event.request_id, event.data.prompt))
elif isinstance(event, WorkflowOutputEvent):
# Capture workflow output when yielded
workflow_output = str(event.data)
completed = True
# Check workflow status
pending_status = any(
isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
for e in events
)
idle_with_requests = any(
isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
for e in events
)
if pending_status:
print("🔄 State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)")
if idle_with_requests:
print("⏸️ State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)")
# Handle human requests if any
if requests and not completed:
responses: dict[str, str] = {}
for req_id, prompt in requests:
print(f"\n🤖 {prompt}")
answer = input("👤 Enter higher/lower/correct/exit: ").lower()
if answer == "exit":
print("👋 Exiting...")
return
responses[req_id] = answer
pending_responses = responses
# Show final result
print(f"\n🎉 {workflow_output}")
Het voorbeeld uitvoeren
Zie het voorbeeld Human-in-the-Loop raadspel voor de volledig functionerende implementatie.
Hoe het werkt
Werkstroom initialisatie: de werkstroom begint met het
TurnManageraanvragen van een eerste schatting van de AI-agent.Agentantwoord: De AI-agent maakt een schatting en retourneert gestructureerde JSON, die terugloopt naar de
TurnManager.Menselijke aanvraag: het proces verwerkt de schatting van de agent en roept
TurnManageraan met eenctx.request_info().Werkstroom onderbreken: de werkstroom verzendt een
RequestInfoEventen gaat door totdat er geen verdere acties kunnen worden uitgevoerd en wacht vervolgens op menselijke invoer.Menselijke reactie: de externe toepassing verzamelt menselijke invoer en stuurt antwoorden terug met behulp van
send_responses_streaming().Hervatten en doorgaan: de werkstroom hervat, de
TurnManager@response_handlermethode verwerkt de menselijke feedback en beëindigt het spel of stuurt een andere aanvraag naar de agent.
Belangrijkste voordelen
- Gestructureerde communicatie: Typeveilige aanvraag- en responsmodellen voorkomen runtimefouten
- Correlatie: Aanvraag-id's zorgen ervoor dat antwoorden overeenkomen met de juiste aanvragen
- Onderbroken uitvoering: werkstromen kunnen voor onbepaalde tijd worden onderbroken terwijl wordt gewacht op externe invoer
- Statusbehoud: de werkstroomstatus wordt behouden tijdens pauze-hervattingscycli
- Gebeurtenisgestuurd: uitgebreid gebeurtenissysteem biedt inzicht in de werkstroomstatus en -overgangen
Met dit patroon kunt u geavanceerde interactieve toepassingen bouwen waarbij AI-agents en mensen naadloos samenwerken binnen gestructureerde werkstromen.