Not
Åtkomst till denna sida kräver auktorisation. Du kan prova att logga in eller byta katalog.
Åtkomst till denna sida kräver auktorisation. Du kan prova att byta katalog.
Den här självstudien visar hur du hanterar begäranden och svar i arbetsflöden med hjälp av Agent Framework-arbetsflöden. Du får lära dig hur du skapar interaktiva arbetsflöden som kan pausa körningen för att begära indata från externa källor (till exempel människor eller andra system) och sedan återuppta när ett svar har angetts.
Begrepp som omfattas
I .NET använder RequestPort human-in-the-loop-arbetsflöden och hantering av externa begäranden för att pausa körningen och samla in användarindata. Det här mönstret möjliggör interaktiva arbetsflöden där systemet kan begära information från externa källor under körningen.
Förutsättningar
- .NET 8.0 SDK eller senare.
- Azure OpenAI-tjänstens slutpunkt och distribution har konfigurerats.
- Azure CLI installerat och autentiserat (för Azure-autentisering av autentiseringsuppgifter).
- Grundläggande förståelse för C# och asynkron programmering.
- Ett nytt konsolprogram.
Installera NuGet-paket
Installera först de nödvändiga paketen för .NET-projektet:
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
Nyckelkomponenter
RequestPort och externa begäranden
En RequestPort fungerar som en brygga mellan arbetsflödet och externa indatakällor. När arbetsflödet behöver indata genereras en RequestInfoEvent som ditt program hanterar:
// Create a RequestPort for handling human input requests
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
Signaltyper
Definiera signaltyper för att kommunicera olika typer av begäranden:
/// <summary>
/// Signals used for communication between guesses and the JudgeExecutor.
/// </summary>
internal enum NumberSignal
{
Init, // Initial guess request
Above, // Previous guess was too high
Below, // Previous guess was too low
}
Arbetsflödesexekutor
Skapa köre som bearbetar användarindata och ger feedback:
/// <summary>
/// Executor that judges the guess and provides feedback.
/// </summary>
internal sealed class JudgeExecutor : Executor<int>("Judge")
{
private readonly int _targetNumber;
private int _tries;
public JudgeExecutor(int targetNumber) : this()
{
_targetNumber = targetNumber;
}
public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken)
{
_tries++;
if (message == _targetNumber)
{
await context.YieldOutputAsync($"{_targetNumber} found in {_tries} tries!", cancellationToken)
.ConfigureAwait(false);
}
else if (message < _targetNumber)
{
await context.SendMessageAsync(NumberSignal.Below, cancellationToken).ConfigureAwait(false);
}
else
{
await context.SendMessageAsync(NumberSignal.Above, cancellationToken).ConfigureAwait(false);
}
}
}
Skapa arbetsflödet
Anslut RequestPort och exekveraren i en feedbackloop:
internal static class WorkflowHelper
{
internal static ValueTask<Workflow<NumberSignal>> GetWorkflowAsync()
{
// Create the executors
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
JudgeExecutor judgeExecutor = new(42);
// Build the workflow by connecting executors in a loop
return new WorkflowBuilder(numberRequestPort)
.AddEdge(numberRequestPort, judgeExecutor)
.AddEdge(judgeExecutor, numberRequestPort)
.WithOutputFrom(judgeExecutor)
.BuildAsync<NumberSignal>();
}
}
Köra det interaktiva arbetsflödet
Hantera externa begäranden under körning av arbetsflödet.
private static async Task Main()
{
// Create the workflow
var workflow = await WorkflowHelper.GetWorkflowAsync().ConfigureAwait(false);
// Execute the workflow
await using StreamingRun handle = await InProcessExecution.StreamAsync(workflow, NumberSignal.Init).ConfigureAwait(false);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle human input request from the workflow
ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
await handle.SendResponseAsync(response).ConfigureAwait(false);
break;
case WorkflowOutputEvent outputEvt:
// The workflow has yielded output
Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
return;
}
}
}
Hantering av begäranden
Bearbeta olika typer av indatabegäranden:
private static ExternalResponse HandleExternalRequest(ExternalRequest request)
{
switch (request.DataAs<NumberSignal?>())
{
case NumberSignal.Init:
int initialGuess = ReadIntegerFromConsole("Please provide your initial guess: ");
return request.CreateResponse(initialGuess);
case NumberSignal.Above:
int lowerGuess = ReadIntegerFromConsole("You previously guessed too large. Please provide a new guess: ");
return request.CreateResponse(lowerGuess);
case NumberSignal.Below:
int higherGuess = ReadIntegerFromConsole("You previously guessed too small. Please provide a new guess: ");
return request.CreateResponse(higherGuess);
default:
throw new ArgumentException("Unexpected request type.");
}
}
private static int ReadIntegerFromConsole(string prompt)
{
while (true)
{
Console.Write(prompt);
string? input = Console.ReadLine();
if (int.TryParse(input, out int value))
{
return value;
}
Console.WriteLine("Invalid input. Please enter a valid integer.");
}
}
Implementeringsbegrepp
Flöde för RequestInfoEvent
- Arbetsflödeskörning: Arbetsflödet fortsätter tills det behöver extern input
-
Generering av begäran: RequestPort genererar en
RequestInfoEventmed begärandeinformationen - Extern hantering: Programmet fångar händelsen och samlar in användarindata
-
Svarsöverföring: Skicka en
ExternalResponsetillbaka för att fortsätta arbetsflödet - Arbetsflödesåtertagning: Arbetsflödet fortsätter bearbetningen med angivna indata
Arbetsflödeslivscykel
-
Körning av direktuppspelning: Används
StreamAsyncför att övervaka händelser i realtid -
Händelsehantering: Process
RequestInfoEventför indatabegäranden ochWorkflowOutputEventför slutförande - Svarssamordning: Matcha svar på begäranden med hjälp av arbetsflödets mekanism för svarshantering
Implementeringsflöde
Initiering av arbetsflöde: Arbetsflödet börjar med att skicka ett
NumberSignal.Inittill RequestPort.Begärandegenerering: RequestPort genererar en
RequestInfoEventbegäran om en första gissning från användaren.Pausa arbetsflöde: Arbetsflödet pausar och väntar på externa indata medan programmet hanterar begäran.
Mänskligt svar: Det externa programmet samlar in användarindata och skickar en
ExternalResponsetillbaka till arbetsflödet.Bearbetning och feedback: Bearbetar
JudgeExecutorgissningen och slutför antingen arbetsflödet eller skickar en ny signal (ovan/nedan) för att begära en ny gissning.Loopfortsättning: Processen upprepas tills rätt tal gissas.
Ramverksfördelar
- Typsäkerhet: Stark typning säkerställer att kontrakt för begärandesvar upprätthålls
- Händelsedriven: Omfattande händelsesystem ger insyn i arbetsflödeskörning
- Pausbar utförande: Arbetsflöden kan pausas på obestämd tid i väntan på extern information
- Tillståndshantering: Arbetsflödestillståndet bevaras mellan paus-återuppta-cykler
- Flexibel integrering: RequestPorts kan integreras med valfri extern indatakälla (användargränssnitt, API, konsol osv.)
Fullständigt exempel
Den fullständiga arbetsimplementeringen finns i Basic-exemplet Human-in-the-Loop.
Det här mönstret gör det möjligt att skapa avancerade interaktiva program där användare kan ge indata vid viktiga beslutsplatser i automatiserade arbetsflöden.
Vad du kommer att bygga
Du skapar ett interaktivt arbetsflöde för gissningsspel som visar mönster för begärandesvar:
- En AI-agent som gör intelligenta gissningar
- Utförare som kan skicka begäranden direkt via API:et
request_info - En turn manager som samordnar mellan agenten och mänskliga interaktioner med hjälp av
@response_handler - Interaktiv konsolinmatning/-utdata för feedback i realtid
Förutsättningar
- Python 3.10 eller senare
- Azure OpenAI-distribution konfigurerad
- Konfigurerad Azure CLI-autentisering (
az login) - Grundläggande förståelse för python-asynkron programmering
Viktiga begrepp
Funktioner för begäranden och svar
Exekutorer har inbyggda funktioner för förfrågningar och svar som möjliggör interaktioner där människor är en del av processen.
- Anropa
ctx.request_info(request_data=request_data, response_type=response_type)för att skicka begäranden - Använd dekoratören
@response_handlerför att hantera svar - Definiera anpassade typer av begäran/svar utan arvskrav
Begäran-Svar Flöde
Utförare kan skicka begäranden direkt med hjälp av ctx.request_info() och hantera svar med hjälp av dekoratören @response_handler :
- Köranrop
ctx.request_info(request_data=request_data, response_type=response_type) - Arbetsflödet genererar en
RequestInfoEventmed begärandedata - Externt system (människa, API osv.) bearbetar begäran
- Svaret skickas tillbaka via
send_responses_streaming() - Arbetsflödet återupptas och levererar svaret till exekutorens
@response_handler-metod
Att konfigurera miljön
Installera först de paket som krävs:
pip install agent-framework-core --pre
pip install azure-identity
Definiera modeller för begäran och svar
Börja med att definiera datastrukturerna för kommunikation med begärandesvar:
import asyncio
from dataclasses import dataclass
from pydantic import BaseModel
from agent_framework import (
AgentExecutor,
AgentExecutorRequest,
AgentExecutorResponse,
ChatMessage,
Executor,
RequestInfoEvent,
Role,
WorkflowBuilder,
WorkflowContext,
WorkflowOutputEvent,
WorkflowRunState,
WorkflowStatusEvent,
handler,
response_handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
@dataclass
class HumanFeedbackRequest:
"""Request message for human feedback in the guessing game."""
prompt: str = ""
guess: int | None = None
class GuessOutput(BaseModel):
"""Structured output from the AI agent with response_format enforcement."""
guess: int
HumanFeedbackRequest är en enkel dataklass för strukturerade nyttolaster för förfrågningar:
- Stark typning för nyttolaster för begäranden
- Framåtkompatibel validering
- Rensa korrelationssemantik med svar
- Kontextuella fält (som den tidigare gissningen) för omfattande användargränssnittsprompter
Skapa Turn Manager
Turhanteraren samordnar flödet mellan AI-agenten och människan:
class TurnManager(Executor):
"""Coordinates turns between the AI agent and human player.
Responsibilities:
- Start the game by requesting the agent's first guess
- Process agent responses and request human feedback
- Handle human feedback and continue the game or finish
"""
def __init__(self, id: str | None = None):
super().__init__(id=id or "turn_manager")
@handler
async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Start the game by asking the agent for an initial guess."""
user = ChatMessage(Role.USER, text="Start by making your first guess.")
await ctx.send_message(AgentExecutorRequest(messages=[user], should_respond=True))
@handler
async def on_agent_response(
self,
result: AgentExecutorResponse,
ctx: WorkflowContext,
) -> None:
"""Handle the agent's guess and request human guidance."""
# Parse structured model output (defensive default if agent didn't reply)
text = result.agent_run_response.text or ""
last_guess = GuessOutput.model_validate_json(text).guess if text else None
# Craft a clear human prompt that defines higher/lower relative to agent's guess
prompt = (
f"The agent guessed: {last_guess if last_guess is not None else text}. "
"Type one of: higher (your number is higher than this guess), "
"lower (your number is lower than this guess), correct, or exit."
)
# Send a request using the request_info API
await ctx.request_info(
request_data=HumanFeedbackRequest(prompt=prompt, guess=last_guess),
response_type=str
)
@response_handler
async def on_human_feedback(
self,
original_request: HumanFeedbackRequest,
feedback: str,
ctx: WorkflowContext[AgentExecutorRequest, str],
) -> None:
"""Continue the game or finish based on human feedback."""
reply = feedback.strip().lower()
# Use the correlated request's guess to avoid extra state reads
last_guess = original_request.guess
if reply == "correct":
await ctx.yield_output(f"Guessed correctly: {last_guess}")
return
# Provide feedback to the agent for the next guess
user_msg = ChatMessage(
Role.USER,
text=f'Feedback: {reply}. Return ONLY a JSON object matching the schema {{"guess": <int 1..10>}}.',
)
await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))
Skapa arbetsflödet
Skapa huvudarbetsflödet som ansluter alla komponenter:
async def main() -> None:
# Create the chat agent with structured output enforcement
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
agent = chat_client.create_agent(
instructions=(
"You guess a number between 1 and 10. "
"If the user says 'higher' or 'lower', adjust your next guess. "
'You MUST return ONLY a JSON object exactly matching this schema: {"guess": <integer 1..10>}. '
"No explanations or additional text."
),
response_format=GuessOutput,
)
# Create workflow components
turn_manager = TurnManager(id="turn_manager")
agent_exec = AgentExecutor(agent=agent, id="agent")
# Build the workflow graph
workflow = (
WorkflowBuilder()
.set_start_executor(turn_manager)
.add_edge(turn_manager, agent_exec) # Ask agent to make/adjust a guess
.add_edge(agent_exec, turn_manager) # Agent's response goes back to coordinator
.build()
)
# Execute the interactive workflow
await run_interactive_workflow(workflow)
async def run_interactive_workflow(workflow):
"""Run the workflow with human-in-the-loop interaction."""
pending_responses: dict[str, str] | None = None
completed = False
workflow_output: str | None = None
print("🎯 Number Guessing Game")
print("Think of a number between 1 and 10, and I'll try to guess it!")
print("-" * 50)
while not completed:
# First iteration uses run_stream("start")
# Subsequent iterations use send_responses_streaming with pending responses
stream = (
workflow.send_responses_streaming(pending_responses)
if pending_responses
else workflow.run_stream("start")
)
# Collect events for this turn
events = [event async for event in stream]
pending_responses = None
# Process events to collect requests and detect completion
requests: list[tuple[str, str]] = [] # (request_id, prompt)
for event in events:
if isinstance(event, RequestInfoEvent) and isinstance(event.data, HumanFeedbackRequest):
# RequestInfoEvent for our HumanFeedbackRequest
requests.append((event.request_id, event.data.prompt))
elif isinstance(event, WorkflowOutputEvent):
# Capture workflow output when yielded
workflow_output = str(event.data)
completed = True
# Check workflow status
pending_status = any(
isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
for e in events
)
idle_with_requests = any(
isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
for e in events
)
if pending_status:
print("🔄 State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)")
if idle_with_requests:
print("⏸️ State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)")
# Handle human requests if any
if requests and not completed:
responses: dict[str, str] = {}
for req_id, prompt in requests:
print(f"\n🤖 {prompt}")
answer = input("👤 Enter higher/lower/correct/exit: ").lower()
if answer == "exit":
print("👋 Exiting...")
return
responses[req_id] = answer
pending_responses = responses
# Show final result
print(f"\n🎉 {workflow_output}")
Köra exemplet
Den fullständiga arbetsimplementeringen finns i exemplet Human-in-the-Loop Guessing Game.
Så här fungerar det
Initiering av arbetsflöde: Arbetsflödet börjar med att
TurnManagerbegära en första gissning från AI-agenten.Agentsvar: AI-agenten gör en gissning och returnerar strukturerad JSON, som flödar tillbaka till
TurnManager.Mänsklig begäran:
TurnManagerbearbetar agentens gissning och anroparctx.request_info()med enHumanFeedbackRequest.Pausa arbetsflöde: Arbetsflödet genererar en
RequestInfoEventoch fortsätter tills inga ytterligare åtgärder kan vidtas och väntar sedan på mänsklig indata.Mänskligt svar: Det externa programmet samlar in mänskliga indata och skickar tillbaka svar med hjälp av
send_responses_streaming().Återuppta och fortsätt: Arbetsflödet återupptas,
TurnManagermetoden "s@response_handlerbearbetar den mänskliga feedbacken och avslutar antingen spelet eller skickar en annan begäran till agenten.
Viktiga fördelar
- Strukturerad kommunikation: Typsäkra begäran- och svarmodeller förhindrar körningsfel
- Korrelation: Begärande-ID:t ser till att svaren matchas mot rätt begäranden
- Pausbar utförande: Arbetsflöden kan pausas på obestämd tid i väntan på extern information
- Tillståndsbevarande: Arbetsflödestillståndet bibehålls över paus-återuppta-cykler
- Händelsedriven: Omfattande händelsesystem ger insyn i arbetsflödesstatus och övergångar
Det här mönstret gör det möjligt att skapa avancerade interaktiva program där AI-agenter och människor samarbetar sömlöst i strukturerade arbetsflöden.