Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Esta página fornece uma visão geral de como o tratamento de solicitações e respostas funciona no sistema de fluxo de trabalho do Microsoft Agent Framework.
Visão geral
Os executores em um fluxo de trabalho podem enviar solicitações para fora do fluxo de trabalho e aguardar respostas. Isso é útil para cenários em que um executor precisa interagir com sistemas externos, como interações human-in-the-loop ou quaisquer outras operações assíncronas.
Habilitar o tratamento de solicitações e respostas em um fluxo de trabalho
As solicitações e respostas são tratadas por meio de um tipo especial chamado InputPort.
// Create an input port that receives requests of type CustomRequestType and responses of type CustomResponseType.
var inputPort = InputPort.Create<CustomRequestType, CustomResponseType>("input-port");
Adicione a porta de entrada a um fluxo de trabalho.
var executorA = new SomeExecutor();
var workflow = new WorkflowBuilder(inputPort)
.AddEdge(inputPort, executorA)
.AddEdge(executorA, inputPort)
.Build<CustomRequestType>();
Agora, porque no fluxo de trabalho que temos executorA conectado ao inputPort em ambas as direções, executorA precisa ser capaz de enviar solicitações e receber respostas através do inputPort. Aqui está o que precisamos fazer em SomeExecutor para enviar uma solicitação e receber uma resposta.
internal sealed class SomeExecutor() : Executor<CustomResponseType>("SomeExecutor")
{
public async ValueTask HandleAsync(CustomResponseType message, IWorkflowContext context)
{
// Process the response...
...
// Send a request
await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
}
}
Como alternativa, SomeExecutor pode separar o envio de solicitações e o tratamento de respostas em dois manipuladores.
internal sealed class SomeExecutor() : Executor("SomeExecutor")
{
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder)
{
return routeBuilder
.AddHandler<CustomResponseType>(this.HandleCustomResponseAsync)
.AddHandler<OtherDataType>(this.HandleOtherDataAsync);
}
public async ValueTask HandleCustomResponseAsync(CustomResponseType message, IWorkflowContext context)
{
// Process the response...
...
}
public async ValueTask HandleOtherDataAsync(OtherDataType message, IWorkflowContext context)
{
// Process the message...
...
// Send a request
await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
}
}
Os executores podem enviar solicitações usando ctx.request_info() e lidar com respostas com @response_handler.
from agent_framework import response_handler, WorkflowBuilder
executor_a = SomeExecutor()
executor_b = SomeOtherExecutor()
workflow_builder = WorkflowBuilder()
workflow_builder.set_start_executor(executor_a)
workflow_builder.add_edge(executor_a, executor_b)
workflow = workflow_builder.build()
executor_a Pode enviar solicitações e receber respostas diretamente usando recursos integrados.
from agent_framework import (
Executor,
WorkflowContext,
handler,
response_handler,
)
class SomeExecutor(Executor):
@handler
async def handle_data(
self,
data: OtherDataType,
context: WorkflowContext,
):
# Process the message...
...
# Send a request using the API
await context.request_info(
request_data=CustomRequestType(...),
response_type=CustomResponseType
)
@response_handler
async def handle_response(
self,
original_request: CustomRequestType,
response: CustomResponseType,
context: WorkflowContext,
):
# Process the response...
...
O @response_handler decorador registra automaticamente o método para lidar com as respostas para os tipos de solicitação e resposta especificados.
Tratamento de solicitações e respostas
Um InputPort emite um RequestInfoEvent quando recebe um pedido. Você pode se inscrever nesses eventos para lidar com solicitações de entrada do fluxo de trabalho. Quando você receber uma resposta de um sistema externo, envie-a de volta para o fluxo de trabalho usando o mecanismo de resposta. A estrutura roteia automaticamente a resposta para o executor que enviou a solicitação original.
StreamingRun handle = await InProcessExecution.StreamAsync(workflow, input).ConfigureAwait(false);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
ExternalResponse response = requestInputEvt.Request.CreateResponse<CustomResponseType>(...);
await handle.SendResponseAsync(response).ConfigureAwait(false);
break;
case WorkflowOutputEvent workflowOutputEvt:
// The workflow has completed successfully
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
return;
}
}
Os executores podem enviar solicitações diretamente sem a necessidade de um componente separado. Quando um executor chama ctx.request_info(), o workflow emite um RequestInfoEvent. Você pode se inscrever nesses eventos para lidar com solicitações de entrada do fluxo de trabalho. Quando você receber uma resposta de um sistema externo, envie-a de volta para o fluxo de trabalho usando o mecanismo de resposta. A estrutura roteia automaticamente a resposta para o método do @response_handler executor.
from agent_framework import RequestInfoEvent
while True:
request_info_events : list[RequestInfoEvent] = []
pending_responses : dict[str, CustomResponseType] = {}
stream = workflow.run_stream(input) if not pending_responses else workflow.send_responses_streaming(pending_responses)
async for event in stream:
if isinstance(event, RequestInfoEvent):
# Handle `RequestInfoEvent` from the workflow
request_info_events.append(event)
if not request_info_events:
break
for request_info_event in request_info_events:
# Handle `RequestInfoEvent` from the workflow
response = CustomResponseType(...)
pending_responses[request_info_event.request_id] = response
Pontos de verificação e solicitações
Para saber mais sobre os pontos de verificação, consulte esta página.
Quando um ponto de verificação é criado, as solicitações pendentes também são salvas como parte do estado do ponto de verificação. Ao restaurar a partir de um ponto de verificação, todas as solicitações pendentes serão reemitidas como objetos RequestInfoEvent, permitindo capturá-las e respondê-las. Não é possível fornecer respostas diretamente durante a operação de retomada - em vez disso, você deve ouvir os eventos reemitidos e responder usando o mecanismo de resposta padrão.