Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
На этой странице представлен обзор работы обработки запросов и ответов в системе рабочих процессов Microsoft Agent Framework.
Обзор
Исполнители рабочего процесса могут отправлять запросы за пределы рабочего процесса и ожидать ответов. Это полезно для сценариев, когда исполнитель должен взаимодействовать с внешними системами, такими как взаимодействие с человеком в цикле или любые другие асинхронные операции.
Включение обработки запросов и ответов в рабочем процессе
Запросы и ответы обрабатываются с помощью специального типа InputPort.
// Create an input port that receives requests of type CustomRequestType and responses of type CustomResponseType.
var inputPort = InputPort.Create<CustomRequestType, CustomResponseType>("input-port");
Добавьте входной порт в рабочий процесс.
var executorA = new SomeExecutor();
var workflow = new WorkflowBuilder(inputPort)
.AddEdge(inputPort, executorA)
.AddEdge(executorA, inputPort)
.Build<CustomRequestType>();
Теперь, поскольку в рабочем процессе у нас executorA соединён с inputPort в обоих направлениях, executorA необходимо иметь возможность отправлять запросы и получать ответы через inputPort. Вот что нам нужно сделать в SomeExecutor, чтобы отправить запрос и получить ответ.
internal sealed class SomeExecutor() : Executor<CustomResponseType>("SomeExecutor")
{
public async ValueTask HandleAsync(CustomResponseType message, IWorkflowContext context)
{
// Process the response...
...
// Send a request
await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
}
}
Кроме того, SomeExecutor можно разделить отправку запроса и обработку ответа на два обработчика.
internal sealed class SomeExecutor() : Executor("SomeExecutor")
{
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder)
{
return routeBuilder
.AddHandler<CustomResponseType>(this.HandleCustomResponseAsync)
.AddHandler<OtherDataType>(this.HandleOtherDataAsync);
}
public async ValueTask HandleCustomResponseAsync(CustomResponseType message, IWorkflowContext context)
{
// Process the response...
...
}
public async ValueTask HandleOtherDataAsync(OtherDataType message, IWorkflowContext context)
{
// Process the message...
...
// Send a request
await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
}
}
Исполнители могут отправлять запросы с помощью ctx.request_info() и обрабатывать ответы.@response_handler
from agent_framework import response_handler, WorkflowBuilder
executor_a = SomeExecutor()
executor_b = SomeOtherExecutor()
workflow_builder = WorkflowBuilder()
workflow_builder.set_start_executor(executor_a)
workflow_builder.add_edge(executor_a, executor_b)
workflow = workflow_builder.build()
executor_a может отправлять запросы и получать ответы непосредственно с помощью встроенных возможностей.
from agent_framework import (
Executor,
WorkflowContext,
handler,
response_handler,
)
class SomeExecutor(Executor):
@handler
async def handle_data(
self,
data: OtherDataType,
context: WorkflowContext,
):
# Process the message...
...
# Send a request using the API
await context.request_info(
request_data=CustomRequestType(...),
response_type=CustomResponseType
)
@response_handler
async def handle_response(
self,
original_request: CustomRequestType,
response: CustomResponseType,
context: WorkflowContext,
):
# Process the response...
...
Декоратор @response_handler автоматически регистрирует метод для обработки ответов для указанных типов запросов и ответов.
Обработка запросов и ответов
InputPort излучает RequestInfoEvent при получении запроса. Вы можете подписаться на эти события для обработки входящих запросов из рабочего процесса. При получении ответа от внешней системы отправьте его обратно в рабочий процесс с помощью механизма ответа. Платформа автоматически направляет ответ исполнителям, отправляющим исходный запрос.
StreamingRun handle = await InProcessExecution.StreamAsync(workflow, input).ConfigureAwait(false);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
ExternalResponse response = requestInputEvt.Request.CreateResponse<CustomResponseType>(...);
await handle.SendResponseAsync(response).ConfigureAwait(false);
break;
case WorkflowOutputEvent workflowOutputEvt:
// The workflow has completed successfully
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
return;
}
}
Исполнители могут отправлять запросы напрямую, не нуждаясь в отдельном компоненте. При вызове ctx.request_info()исполнителя рабочий процесс выдает RequestInfoEvent. Вы можете подписаться на эти события для обработки входящих запросов из рабочего процесса. При получении ответа от внешней системы отправьте его обратно в рабочий процесс с помощью механизма ответа. Платформа автоматически направляет ответ на метод исполнителя @response_handler .
from agent_framework import RequestInfoEvent
while True:
request_info_events : list[RequestInfoEvent] = []
pending_responses : dict[str, CustomResponseType] = {}
stream = workflow.run_stream(input) if not pending_responses else workflow.send_responses_streaming(pending_responses)
async for event in stream:
if isinstance(event, RequestInfoEvent):
# Handle `RequestInfoEvent` from the workflow
request_info_events.append(event)
if not request_info_events:
break
for request_info_event in request_info_events:
# Handle `RequestInfoEvent` from the workflow
response = CustomResponseType(...)
pending_responses[request_info_event.request_id] = response
Контрольные точки и запросы
Дополнительные сведения о контрольных точках см. на этой странице.
При создании контрольной точки ожидающие запросы также сохраняются в составе состояния контрольной точки. При восстановлении из контрольной точки все ожидающие запросы будут повторно выдаваться в виде RequestInfoEvent объектов, что позволяет записывать и реагировать на них. Вы не можете предоставить ответы непосредственно во время возобновления. Вместо этого вы должны слушать повторно сгенерированные события и реагировать с помощью стандартного механизма ответа.