Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En esta página se proporciona información general sobre cómo funciona el control de solicitudes y respuestas en el sistema de flujo de trabajo de Microsoft Agent Framework.
Información general
Los ejecutores de un flujo de trabajo pueden enviar solicitudes fuera del flujo de trabajo y esperar respuestas. Esto es útil para escenarios en los que un ejecutor necesita interactuar con sistemas externos, como interacciones humanas en el bucle o cualquier otra operación asincrónica.
Habilitar el control de solicitudes y respuestas en un flujo de trabajo
Las solicitudes y respuestas se gestionan a través de un tipo especial denominado InputPort.
// Create an input port that receives requests of type CustomRequestType and responses of type CustomResponseType.
var inputPort = InputPort.Create<CustomRequestType, CustomResponseType>("input-port");
Agregue el puerto de entrada a un flujo de trabajo.
var executorA = new SomeExecutor();
var workflow = new WorkflowBuilder(inputPort)
.AddEdge(inputPort, executorA)
.AddEdge(executorA, inputPort)
.Build<CustomRequestType>();
Ahora, dado que en el flujo de trabajo hemos conectado a executorA en ambas direcciones, inputPort debe poder enviar solicitudes y recibir respuestas a través de executorA.inputPort Esto es lo que necesitamos hacer en SomeExecutor para enviar una solicitud y recibir una respuesta.
internal sealed class SomeExecutor() : Executor<CustomResponseType>("SomeExecutor")
{
public async ValueTask HandleAsync(CustomResponseType message, IWorkflowContext context)
{
// Process the response...
...
// Send a request
await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
}
}
Como alternativa, SomeExecutor puede separar el envío de solicitudes y el control de respuesta en dos controladores.
internal sealed class SomeExecutor() : Executor("SomeExecutor")
{
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder)
{
return routeBuilder
.AddHandler<CustomResponseType>(this.HandleCustomResponseAsync)
.AddHandler<OtherDataType>(this.HandleOtherDataAsync);
}
public async ValueTask HandleCustomResponseAsync(CustomResponseType message, IWorkflowContext context)
{
// Process the response...
...
}
public async ValueTask HandleOtherDataAsync(OtherDataType message, IWorkflowContext context)
{
// Process the message...
...
// Send a request
await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
}
}
Los ejecutores pueden enviar solicitudes mediante ctx.request_info() y controlar las respuestas con @response_handler.
from agent_framework import response_handler, WorkflowBuilder
executor_a = SomeExecutor()
executor_b = SomeOtherExecutor()
workflow_builder = WorkflowBuilder()
workflow_builder.set_start_executor(executor_a)
workflow_builder.add_edge(executor_a, executor_b)
workflow = workflow_builder.build()
executor_a puede enviar solicitudes y recibir respuestas directamente mediante funcionalidades integradas.
from agent_framework import (
Executor,
WorkflowContext,
handler,
response_handler,
)
class SomeExecutor(Executor):
@handler
async def handle_data(
self,
data: OtherDataType,
context: WorkflowContext,
):
# Process the message...
...
# Send a request using the API
await context.request_info(
request_data=CustomRequestType(...),
response_type=CustomResponseType
)
@response_handler
async def handle_response(
self,
original_request: CustomRequestType,
response: CustomResponseType,
context: WorkflowContext,
):
# Process the response...
...
El decorador @response_handler registra automáticamente el método para gestionar las respuestas de los tipos de solicitud y respuesta especificados.
Control de solicitudes y respuestas
Un InputPort emite un RequestInfoEvent cuando recibe una solicitud. Puede suscribirse a estos eventos para controlar las solicitudes entrantes desde el flujo de trabajo. Cuando reciba una respuesta de un sistema externo, envíela de nuevo al flujo de trabajo mediante el mecanismo de respuesta. El marco enruta automáticamente la respuesta al ejecutor que envió la solicitud original.
StreamingRun handle = await InProcessExecution.StreamAsync(workflow, input).ConfigureAwait(false);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
ExternalResponse response = requestInputEvt.Request.CreateResponse<CustomResponseType>(...);
await handle.SendResponseAsync(response).ConfigureAwait(false);
break;
case WorkflowOutputEvent workflowOutputEvt:
// The workflow has completed successfully
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
return;
}
}
Los ejecutores pueden enviar solicitudes directamente sin necesidad de un componente independiente. Cuando un ejecutor llama a ctx.request_info(), el flujo de trabajo emite un RequestInfoEvent. Puede suscribirse a estos eventos para controlar las solicitudes entrantes desde el flujo de trabajo. Cuando reciba una respuesta de un sistema externo, envíela de nuevo al flujo de trabajo mediante el mecanismo de respuesta. El marco enruta automáticamente la respuesta al método del @response_handler ejecutor.
from agent_framework import RequestInfoEvent
while True:
request_info_events : list[RequestInfoEvent] = []
pending_responses : dict[str, CustomResponseType] = {}
stream = workflow.run_stream(input) if not pending_responses else workflow.send_responses_streaming(pending_responses)
async for event in stream:
if isinstance(event, RequestInfoEvent):
# Handle `RequestInfoEvent` from the workflow
request_info_events.append(event)
if not request_info_events:
break
for request_info_event in request_info_events:
# Handle `RequestInfoEvent` from the workflow
response = CustomResponseType(...)
pending_responses[request_info_event.request_id] = response
Puntos de control y solicitudes
Para más información sobre los puntos de control, consulte esta página.
Cuando se crea un punto de control, las solicitudes pendientes también se guardan como parte del estado del punto de control. Al restaurar desde un punto de control, las solicitudes pendientes se volverán a emitir como RequestInfoEvent objetos, lo que le permite capturarlas y responder a ellas. No puede proporcionar respuestas directamente durante la operación de reanudación; en su lugar, debe escuchar los eventos que se vuelven a emitir y responder mediante el mecanismo de respuesta estándar.