Oharra
Baimena behar duzu orria atzitzeko. Direktorioetan saioa has dezakezu edo haiek alda ditzakezu.
Baimena behar duzu orria atzitzeko. Direktorioak alda ditzakezu.
En esta página se proporciona información general sobre las interacciones de human-in-the-loop (HITL) en el sistema de flujo de trabajo de Microsoft Agent Framework. HITL se logra mediante el mecanismo de control de solicitudes y respuestas en flujos de trabajo, lo que permite a los ejecutores enviar solicitudes a sistemas externos (como operadores humanos) y esperar sus respuestas antes de continuar con la ejecución del flujo de trabajo.
Visión general
Los ejecutores de un flujo de trabajo pueden enviar solicitudes fuera del flujo de trabajo y esperar respuestas. Esto es útil para escenarios en los que un ejecutor necesita interactuar con sistemas externos, como interacciones humanas en el bucle o cualquier otra operación asincrónica.
Vamos a crear un flujo de trabajo que pida a un operador humano que adivina un número y use un ejecutor para juzgar si la estimación es correcta.
Habilitar el control de solicitudes y respuestas en un flujo de trabajo
Las solicitudes y respuestas se gestionan a través de un tipo especial denominado RequestPort.
Un RequestPort es un canal de comunicación que permite a los ejecutores enviar solicitudes y recibir respuestas. Cuando un ejecutor envía un mensaje a RequestPort, el puerto de solicitud emite un RequestInfoEvent que contiene los detalles de la solicitud. Los sistemas externos pueden escuchar estos eventos, procesar las solicitudes y devolver respuestas al flujo de trabajo. El marco enruta automáticamente las respuestas al ejecutor adecuado en función de la solicitud original.
// Create a request port that receives requests of type NumberSignal and responses of type int.
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
Agregue el puerto de entrada a un flujo de trabajo.
JudgeExecutor judgeExecutor = new(42);
var workflow = new WorkflowBuilder(numberRequestPort)
.AddEdge(numberRequestPort, judgeExecutor)
.AddEdge(judgeExecutor, numberRequestPort)
.WithOutputFrom(judgeExecutor)
.Build();
La definición de JudgeExecutor necesita un número de destino y poder juzgar si la estimación es correcta. Si no es correcto, enviará otra solicitud para solicitar una nueva estimación a través de RequestPort.
internal enum NumberSignal
{
Init,
Above,
Below,
}
internal sealed class JudgeExecutor() : Executor<int>("Judge")
{
private readonly int _targetNumber;
private int _tries;
public JudgeExecutor(int targetNumber) : this()
{
this._targetNumber = targetNumber;
}
public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._tries++;
if (message == this._targetNumber)
{
await context.YieldOutputAsync($"{this._targetNumber} found in {this._tries} tries!", cancellationToken);
}
else if (message < this._targetNumber)
{
await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
}
else
{
await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
}
}
}
En Python, los ejecutores envían solicitudes mediante ctx.request_info() y controlan las respuestas con el @response_handler decorador.
Vamos a crear un flujo de trabajo que pida a un operador humano que adivina un número y use un ejecutor para juzgar si la estimación es correcta.
Habilitar el control de solicitudes y respuestas en un flujo de trabajo
from dataclasses import dataclass
from agent_framework import (
Executor,
WorkflowBuilder,
WorkflowContext,
handler,
response_handler,
)
@dataclass
class NumberSignal:
hint: str # "init", "above", or "below"
class JudgeExecutor(Executor):
def __init__(self, target_number: int):
super().__init__(id="judge")
self._target_number = target_number
self._tries = 0
@handler
async def handle_guess(self, guess: int, ctx: WorkflowContext[int, str]) -> None:
self._tries += 1
if guess == self._target_number:
await ctx.yield_output(f"{self._target_number} found in {self._tries} tries!")
elif guess < self._target_number:
await ctx.request_info(request_data=NumberSignal(hint="below"), response_type=int)
else:
await ctx.request_info(request_data=NumberSignal(hint="above"), response_type=int)
@response_handler
async def on_human_response(
self,
original_request: NumberSignal,
response: int,
ctx: WorkflowContext[int, str],
) -> None:
await self.handle_guess(response, ctx)
judge = JudgeExecutor(target_number=42)
workflow = WorkflowBuilder(start_executor=judge).build()
El decorador @response_handler registra automáticamente el método para gestionar las respuestas de los tipos de solicitud y respuesta especificados. El marco asigna las respuestas entrantes al controlador correcto basado en las anotaciones de tipo de los parámetros original_request y response.
Control de solicitudes y respuestas
Un RequestPort emite un RequestInfoEvent cuando recibe una solicitud. Puede suscribirse a estos eventos para controlar las solicitudes entrantes desde el flujo de trabajo. Cuando reciba una respuesta de un sistema externo, envíela de nuevo al flujo de trabajo mediante el mecanismo de respuesta. El marco enruta automáticamente la respuesta al ejecutor que envió la solicitud original.
await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(workflow, NumberSignal.Init);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
int guess = ...; // Get the guess from the human operator or any external system
await handle.SendResponseAsync(requestInputEvt.Request.CreateResponse(guess));
break;
case WorkflowOutputEvent outputEvt:
// The workflow has yielded output
Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
return;
}
}
Sugerencia
Consulte el ejemplo completo del proyecto ejecutable completo.
Los ejecutores pueden enviar solicitudes directamente sin necesidad de un componente independiente. Cuando un ejecutor llama a ctx.request_info(), el flujo de trabajo emite un WorkflowEvent con type == "request_info". Puede suscribirse a estos eventos para controlar las solicitudes entrantes desde el flujo de trabajo. Cuando reciba una respuesta de un sistema externo, envíela de nuevo al flujo de trabajo mediante el mecanismo de respuesta. El marco enruta automáticamente la respuesta al método del @response_handler ejecutor.
from collections.abc import AsyncIterable
from agent_framework import WorkflowEvent
async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, int] | None:
"""Process events from the workflow stream to capture requests."""
requests: list[tuple[str, NumberSignal]] = []
async for event in stream:
if event.type == "request_info":
requests.append((event.request_id, event.data))
# Handle any pending human feedback requests.
if requests:
responses: dict[str, int] = {}
for request_id, request in requests:
guess = ... # Get the guess from the human operator or any external system.
responses[request_id] = guess
return responses
return None
# Initiate the first run of the workflow with an initial guess.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(25, stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
# Run the workflow until there is no more human feedback to provide,
# in which case this workflow completes.
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
Sugerencia
Consulte este ejemplo completo para obtener un archivo ejecutable completo.
Human-in-the-Loop con orquestaciones del agente
El RequestPort patrón descrito anteriormente funciona con ejecutores personalizados y WorkflowBuilder. Cuando se usan orquestaciones de agente (como flujos de trabajo de chat secuenciales, simultáneos o grupales), la aprobación de herramientas se logra mediante el mecanismo de solicitud y respuesta humana en bucle.
Los agentes pueden usar herramientas que requieren aprobación humana antes de la ejecución. Cuando el agente intenta llamar a una herramienta que requiere aprobación, el flujo de trabajo se detiene y emite un RequestInfoEvent igual que el patrón RequestPort, pero la carga útil del evento contiene un ToolApprovalRequestContent (C#) o un Content con type == "function_approval_request" (Python) en lugar de un tipo de solicitud personalizado.
Sugerencia
Para obtener ejemplos completos con código, consulte:
Puntos de control y solicitudes
Para más información sobre los puntos de control, consulte Puntos de control.
Cuando se crea un punto de control, las solicitudes pendientes también se guardan como parte del estado del punto de control. Al restaurar desde un punto de control, las solicitudes pendientes se volverán a emitir como RequestInfoEvent objetos, lo que le permite capturarlas y responder a ellas. No puede proporcionar respuestas directamente durante la operación de reanudación; en su lugar, debe escuchar los eventos que se vuelven a emitir y responder mediante el mecanismo de respuesta estándar.