Ескертпе
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Жүйеге кіруді немесе каталогтарды өзгертуді байқап көруге болады.
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Каталогтарды өзгертуді байқап көруге болады.
На этой странице представлен обзор взаимодействия "Человек в цикле" (HITL) в системе рабочих процессов Microsoft Agent Framework. HITL достигается с помощью механизма обработки запросов и ответа в рабочих процессах, что позволяет исполнителям отправлять запросы во внешние системы (например, операторы человека) и ждать их ответов, прежде чем продолжить выполнение рабочего процесса.
Обзор
Исполнители рабочего процесса могут отправлять запросы за пределы рабочего процесса и ожидать ответов. Это полезно для сценариев, когда исполнитель должен взаимодействовать с внешними системами, такими как взаимодействие с человеком в цикле или любые другие асинхронные операции.
Давайте создадим рабочий процесс, который просит человеческого оператора угадать число и использует исполнителя, чтобы судить, правильно ли угадывание.
Включение обработки запросов и ответов в рабочем процессе
Запросы и ответы обрабатываются с помощью специального типа RequestPort.
Это RequestPort канал связи, позволяющий исполнителям отправлять запросы и получать ответы. Когда исполнитель отправляет сообщение RequestPort, порт запроса излучает RequestInfoEvent, содержащее сведения о запросе. Внешние системы могут прослушивать эти события, обрабатывать запросы и отправлять ответы обратно в рабочий процесс. Фреймворк автоматически перенаправляет ответы обратно соответствующему исполнителю на основе исходного запроса.
// Create a request port that receives requests of type NumberSignal and responses of type int.
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
Добавьте входной порт в рабочий процесс.
JudgeExecutor judgeExecutor = new(42);
var workflow = new WorkflowBuilder(numberRequestPort)
.AddEdge(numberRequestPort, judgeExecutor)
.AddEdge(judgeExecutor, numberRequestPort)
.WithOutputFrom(judgeExecutor)
.Build();
Для определения цели JudgeExecutor требуется заданное число и возможность оценивать правильность предположения. Если это неправильно, он отправит еще один запрос, чтобы запросить новое предположение через RequestPort.
internal enum NumberSignal
{
Init,
Above,
Below,
}
internal sealed class JudgeExecutor() : Executor<int>("Judge")
{
private readonly int _targetNumber;
private int _tries;
public JudgeExecutor(int targetNumber) : this()
{
this._targetNumber = targetNumber;
}
public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._tries++;
if (message == this._targetNumber)
{
await context.YieldOutputAsync($"{this._targetNumber} found in {this._tries} tries!", cancellationToken);
}
else if (message < this._targetNumber)
{
await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
}
else
{
await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
}
}
}
В Python исполнители отправляют запросы с помощью ctx.request_info(), а обрабатывают ответы с помощью @response_handler декоратора.
Давайте создадим рабочий процесс, который просит человеческого оператора угадать число и использует исполнителя, чтобы судить, правильно ли угадывание.
Включение обработки запросов и ответов в рабочем процессе
from dataclasses import dataclass
from agent_framework import (
Executor,
WorkflowBuilder,
WorkflowContext,
handler,
response_handler,
)
@dataclass
class NumberSignal:
hint: str # "init", "above", or "below"
class JudgeExecutor(Executor):
def __init__(self, target_number: int):
super().__init__(id="judge")
self._target_number = target_number
self._tries = 0
@handler
async def handle_guess(self, guess: int, ctx: WorkflowContext[int, str]) -> None:
self._tries += 1
if guess == self._target_number:
await ctx.yield_output(f"{self._target_number} found in {self._tries} tries!")
elif guess < self._target_number:
await ctx.request_info(request_data=NumberSignal(hint="below"), response_type=int)
else:
await ctx.request_info(request_data=NumberSignal(hint="above"), response_type=int)
@response_handler
async def on_human_response(
self,
original_request: NumberSignal,
response: int,
ctx: WorkflowContext[int, str],
) -> None:
await self.handle_guess(response, ctx)
judge = JudgeExecutor(target_number=42)
workflow = WorkflowBuilder(start_executor=judge).build()
Декоратор @response_handler автоматически регистрирует метод для обработки ответов для указанных типов запросов и ответов. Фреймворк сопоставляет входящие ответы с правильным обработчиком на основе аннотаций типов параметров original_request и response.
Обработка запросов и ответов
RequestPort излучает RequestInfoEvent при получении запроса. Вы можете подписаться на эти события для обработки входящих запросов из рабочего процесса. При получении ответа от внешней системы отправьте его обратно в рабочий процесс с помощью механизма ответа. Платформа автоматически направляет ответ исполнителям, отправляющим исходный запрос.
await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(workflow, NumberSignal.Init);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
int guess = ...; // Get the guess from the human operator or any external system
await handle.SendResponseAsync(requestInputEvt.Request.CreateResponse(guess));
break;
case WorkflowOutputEvent outputEvt:
// The workflow has yielded output
Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
return;
}
}
Подсказка
См. полный образец для завершённого запускаемого проекта.
Исполнители могут отправлять запросы напрямую, не нуждаясь в отдельном компоненте. При вызове ctx.request_info() исполняющим механизмом рабочий процесс выдает WorkflowEvent с type == "request_info". Вы можете подписаться на эти события для обработки входящих запросов из рабочего процесса. При получении ответа от внешней системы отправьте его обратно в рабочий процесс с помощью механизма ответа. Платформа автоматически направляет ответ на метод исполнителя @response_handler .
from collections.abc import AsyncIterable
from agent_framework import WorkflowEvent
async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, int] | None:
"""Process events from the workflow stream to capture requests."""
requests: list[tuple[str, NumberSignal]] = []
async for event in stream:
if event.type == "request_info":
requests.append((event.request_id, event.data))
# Handle any pending human feedback requests.
if requests:
responses: dict[str, int] = {}
for request_id, request in requests:
guess = ... # Get the guess from the human operator or any external system.
responses[request_id] = guess
return responses
return None
# Initiate the first run of the workflow with an initial guess.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(25, stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
# Run the workflow until there is no more human feedback to provide,
# in which case this workflow completes.
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
Подсказка
См. полный пример полного запускаемого файла.
Человек в цикле управления с координацией агентов
Описанный RequestPort выше шаблон работает с пользовательскими исполнителями и WorkflowBuilder. При использовании оркестрации агентов (таких как последовательные, одновременные или групповые рабочие процессы чата), утверждение средства достигается с помощью механизма запроса и ответа в цикле.
Агенты могут использовать инструменты, которые требуют одобрения человеком перед выполнением. Когда агент пытается вызвать инструмент, требующий утверждения, рабочий процесс приостанавливается и выдает RequestInfoEvent, как в шаблоне RequestPort, но полезная нагрузка события содержит ToolApprovalRequestContent для C# или Content вместе с type == "function_approval_request" для Python вместо пользовательского типа запроса.
Подсказка
Полные примеры кода см. в следующих примерах:
Контрольные точки и запросы
Дополнительные сведения о контрольных точках см. в разделе "Контрольные точки".
При создании контрольной точки ожидающие запросы также сохраняются в составе состояния контрольной точки. При восстановлении из контрольной точки все ожидающие запросы будут повторно выдаваться в виде RequestInfoEvent объектов, что позволяет записывать и реагировать на них. Вы не можете предоставить ответы непосредственно во время возобновления. Вместо этого вы должны слушать повторно сгенерированные события и реагировать с помощью стандартного механизма ответа.