Ескертпе
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Жүйеге кіруді немесе каталогтарды өзгертуді байқап көруге болады.
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Каталогтарды өзгертуді байқап көруге болады.
При добавлении агента ИИ в рабочий процесс необходимо упаковать его в исполнителя, чтобы обработчик рабочих процессов может направлять сообщения в него, управлять состоянием сеанса и обрабатывать выходные данные. Исполнитель агента — это встроенный исполнитель, который обрабатывает эту адаптацию.
Обзор
Исполнитель агента мостит разрыв между абстракцией агента и моделью выполнения рабочего процесса. Оно:
- Получает типизированные сообщения из графа рабочего процесса и пересылает их в базовый агент.
- Управляет состоянием сеанса и беседы агента между запусками.
- Адаптирует поведение на основе режима исполнения рабочего процесса (потоковый или непотоковый).
- Возвращает выходные события (
AgentResponseилиAgentResponseUpdate) вызывающей объект рабочего процесса для наблюдения. - Отправляет сообщения подключенным подчиненным исполнителям для непрерывной обработки в графе.
- Поддерживает создание контрольных точек для долго выполняющихся рабочих процессов.
Принцип работы
В C# подсистема рабочих процессов внутренне создает AIAgentHostExecutor для каждого AIAgent добавленного в рабочий процесс. Этот специализированный исполнитель расширяет ChatProtocolExecutor и использует шаблон маркера поворота :
-
Кэширование сообщений — сообщения, которые поступают от других исполнителей, собираются исполнителем агента. Если
ForwardIncomingMessagesвключена (по умолчанию), входящие сообщения также перенаправляются подчиненным исполнителям. -
Триггер маркера поворота — агент обрабатывает кэшированные сообщения только после получения маркера
TurnToken. -
Вызов агента — агент выполняет вызов
RunAsync(непотоково) илиRunStreamingAsync(потоковая передача) в базовом агенте. -
Получение выходных данных — если включены события потоковой передачи, каждый добавочный
AgentResponseUpdateвозвращается в виде выходных данных рабочего процесса. ЕслиEmitAgentResponseEventsэтот параметр включен, агрегированныеAgentResponseданные также предоставляются в виде выходных данных рабочего процесса. - Нижестоящий обмен сообщениями — сообщения ответа агента отправляются подключенным подчиненным исполнителям.
-
Переключение сквозной передачи маркера — после завершения его выполнения исполнитель отправляет новый
TurnTokenподчиненный поток, чтобы следующий агент в цепочке смог начать обработку.
Подсказка
Для некоторых сценариев может потребоваться более специализированный исполнитель агента; Например, оркестрации передачи используют выделенную HandoffAgentExecutor с пользовательской логикой маршрутизации.
Неявное и явное создание
При передаче AIAgent в WorkflowBuilder фреймворк автоматически оборачивает его в AIAgentBinding, который создает базовый AIAgentHostExecutor. Не нужно создавать экземпляр исполнителя агента напрямую.
AIAgent writerAgent = /* create your agent */;
AIAgent reviewerAgent = /* create your agent */;
// Agents are automatically wrapped — no manual executor creation required
var workflow = new WorkflowBuilder(writerAgent)
.AddEdge(writerAgent, reviewerAgent)
.Build();
Вы также можете использовать вспомогательные методы AgentWorkflowBuilder для распространенных шаблонов:
// Build a sequential pipeline of agents
var workflow = AgentWorkflowBuilder.BuildSequential(writerAgent, reviewerAgent);
Настраиваемая конфигурация
Чтобы настроить поведение исполнителя агента, используйте следующую командуBindAsExecutor:AIAgentHostOptions
var options = new AIAgentHostOptions
{
EmitAgentUpdateEvents = true,
EmitAgentResponseEvents = true,
ReassignOtherAgentsAsUsers = true,
ForwardIncomingMessages = true,
};
ExecutorBinding writerBinding = writerAgent.BindAsExecutor(options);
var workflow = new WorkflowBuilder(writerBinding)
.AddEdge(writerBinding, reviewerAgent)
.Build();
Типы входных данных
Исполнитель агента в C# принимает несколько типов входных данных: string, ChatMessageи IEnumerable<ChatMessage>. Строковые входные данные автоматически преобразуются в ChatMessage экземпляры с ролью User . Все входящие сообщения накапливаются до получения TurnToken, после чего исполнитель обрабатывает пакет. Если ReassignOtherAgentsAsUsers включена (по умолчанию), сообщения от других агентов переназначаются роли User , поэтому базовая модель обрабатывает их как входные данные пользователей, а сообщения от текущего агента сохраняют Assistant роль.
Выходные данные и цепочка
После завершения действия агента исполнитель:
- Отправляет ответные сообщения агента всем подключенным подчиненным исполнителям.
- Перенаправляет новый
TurnToken, чтобы следующий агент в цепочке мог начать обработку.
Это делает последовательную обработку агентов простой — просто соедините их между собой.
var workflow = new WorkflowBuilder(frenchTranslator)
.AddEdge(frenchTranslator, spanishTranslator)
.AddEdge(spanishTranslator, englishTranslator)
.Build();
Характеристика потокового воспроизведения
Поведение потоковой передачи управляется параметром EmitAgentUpdateEvents на AIAgentHostOptions, или динамически посредством TurnToken.
-
При включении — исполнитель вызывает
RunStreamingAsyncна агенте и возвращает каждоеAgentResponseUpdateкак событие выходных данных рабочего процесса. Это обеспечивает поэтапные обновления каждого токена в режиме реального времени. -
При отключении — исполнитель вызывает
RunAsyncи создает единый полный ответ.
// Enable streaming events at the configuration level
var options = new AIAgentHostOptions
{
EmitAgentUpdateEvents = true,
};
// Or enable streaming dynamically via TurnToken
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
Общие сеансы
Каждый агентный исполнитель по умолчанию поддерживает собственный сеанс. Чтобы поделиться сеансом между агентами, настройте агентов с общим поставщиком сеансов перед добавлением их в рабочий процесс.
Параметры конфигурации
AIAgentHostOptions управляет поведением агент-процессора:
| Опция | По умолчанию | Описание |
|---|---|---|
EmitAgentUpdateEvents |
null |
Испускает события потокового обновления в процессе выполнения.
TurnToken имеет приоритет, если задано. Если оба null, потоковое вещание отключено. |
EmitAgentResponseEvents |
false |
Выведите агрегированный ответ агента в виде события выходных данных рабочего процесса. |
InterceptUserInputRequests |
false |
Перехватывайте UserInputRequestContent и направляйте его как сообщение рабочего процесса на обработку. |
InterceptUnterminatedFunctionCalls |
false |
Перехватывать FunctionCallContent без соответствующего результата и направлять его в виде сообщения рабочего процесса. |
ReassignOtherAgentsAsUsers |
true |
Переназначьте сообщения от других агентов к User роли, чтобы модель рассматривала их как входные данные пользователей. |
ForwardIncomingMessages |
true |
Пересылать входящие сообщения подчиненным исполнителям до сгенерированных сообщений агентом. |
Создание контрольных точек
Исполнитель агента поддерживает контрольные точки для длительных рабочих процессов. При выполнении контрольной точки выполняется сериализация исполнителя:
- Состояние сеанса агента (через
SerializeSessionAsync). - Конфигурация генерации событий на текущем этапе (действует только в то время, пока запросы ожидают выполнения, и исполнитель еще не обработал свои входящие
TurnToken). - Все ожидающие запросы ввода от пользователей и запросы на вызов функций.
При восстановлении исполнитель десериализует сеанс и состояние ожидающего запроса, что позволяет рабочему процессу возобновиться с того места, где он был прерван.
Принцип работы
Класс AgentExecutor упаковывает агент, реализующий SupportsAgentRun протокол. Когда исполнитель получает сообщение:
-
Нормализация сообщений — входные данные нормализуются в список
Messageобъектов и добавляются во внутренний кэш исполнителя. Исполнитель принимает несколько типов входных данных —str,Message,list[str | Message]AgentExecutorRequestиAgentExecutorResponse— каждый из них направляется в выделенный обработчик, нормализующий входные данные перед кэшированием. -
Вызов агента — исполнитель вызывает
agent.run()с кэшированными сообщениями, автоматически выбирая потоковый или непотоковый режим на основе режима выполнения рабочего процесса. -
Выдача данных — в режиме потоковой передачи каждый
AgentResponseUpdateпредставляется как событие выхода рабочего процесса. В режиме, отличном от потоковой передачи, возвращается одинAgentResponse. - Нисходящая отправка — после завершения работы агента исполнитель отправляет сигнал всем подключенным нисходящим исполнителям. Этот ответ включает полную историю беседы, обеспечивая непрерывную цепочку.
- Сброс кэша — внутренний кэш сообщений исполнителя очищается после вызова агента, гарантируя, что каждый агент обрабатывает только новые сообщения, полученные с момента последнего вызова.
Подсказка
Для некоторых сценариев может потребоваться более специализированный исполнитель агента; Например, оркестрации ручного выполнения используют выделенный исполнитель с пользовательской логикой маршрутизации.
Неявное и явное создание
Агенты WorkflowBuilder автоматически упаковываются в AgentExecutor экземпляры при передаче агента напрямую. Для большинства рабочих процессов неявное создание подходит.
from agent_framework import WorkflowBuilder
writer_agent = client.as_agent(name="Writer", instructions="...")
reviewer_agent = client.as_agent(name="Reviewer", instructions="...")
# Agents are automatically wrapped — no manual AgentExecutor creation required
workflow = (
WorkflowBuilder(start_executor=writer_agent)
.add_edge(writer_agent, reviewer_agent)
.build()
)
Явное создание
Создайте AgentExecutor явным образом, когда это необходимо.
- Совместное использование сеанса между несколькими агентами.
- Укажите пользовательский идентификатор исполнителя для маршрутизации и целевых параметров среды выполнения.
- Использовать один и тот же экземпляр исполнителя на нескольких гранях.
from agent_framework import AgentExecutor
writer_executor = AgentExecutor(writer_agent, id="my-writer")
reviewer_executor = AgentExecutor(reviewer_agent, id="my-reviewer")
workflow = (
WorkflowBuilder(start_executor=writer_executor)
.add_edge(writer_executor, reviewer_executor)
.build()
)
Параметры конструктора:
| Параметр | Тип | Описание |
|---|---|---|
agent |
SupportsAgentRun |
Агент для упаковки. |
session |
AgentSession \| None |
Сеанс, который используется для работы агента. Если None, создается новый сеанс от агента. |
id |
str \| None |
Уникальный идентификатор исполнителя. По умолчанию используется имя агента, если оно доступно. |
Подсказка
Идентификатор исполнителя также является ключом, используемым при нацеливании workflow.run(function_invocation_kwargs=...) или client_kwargs= на отдельного агента. Если не указано id, рабочий процесс использует имя агента оболочки.
Типы входных данных
AgentExecutor определяет несколько методов обработчика, при этом каждый из них принимает определенный тип входных данных. Движок рабочих процессов автоматически распределяет правильный обработчик на основе типа сообщения. Все типы входных данных запускают агент немедленно, за исключением AgentExecutorRequest того, где should_respond флаг управляет запуском агента или просто кэширует сообщения:
| Тип входных данных | Обработчик | Агент триггеров | Описание |
|---|---|---|---|
AgentExecutorRequest |
run |
Conditional | Канонический тип входных данных. Содержит список сообщений и флаг should_respond, который определяет, выполняется ли агент. |
str |
from_str |
Всегда | Принимает необработанный строковый запрос. |
Message |
from_message |
Всегда | Принимает один Message объект. |
list[str \| Message] |
from_messages |
Всегда | Принимает список строк или Message объектов в качестве контекста беседы. |
AgentExecutorResponse |
from_response |
Всегда | Принимает ответ предыдущего агент-исполнителя, обеспечивая возможность прямой цепочки вызовов. |
Использование AgentExecutorRequest
AgentExecutorRequest является каноническим типом входных данных и предоставляет наибольший контроль.
from agent_framework import AgentExecutorRequest, Message
# Create a request with messages
request = AgentExecutorRequest(
messages=[Message(role="user", contents=["Hello, world!"])],
should_respond=True,
)
# Run the workflow
result = await workflow.run(request)
Флаг should_respond определяет, обрабатывает ли агент сообщения немедленно или просто кэширует их позже:
-
True(по умолчанию) — агент выполняет и генерирует ответ. -
False— сообщения добавляются в кэш, но агент не выполняется. Это полезно для предварительной загрузки контекста беседы перед активацией ответа.
Выходные данные и цепочка
После завершения работы агента исполнитель отправляет поток данных AgentExecutorResponse. Этот класс данных содержит следующее:
| Поле | Тип | Описание |
|---|---|---|
executor_id |
str |
Идентификатор исполнителя, создавшего ответ. |
agent_response |
AgentResponse |
Ответ исходного агента (без изменений от клиента). |
full_conversation |
list[Message] \| None |
Полный контекст беседы (предыдущие входные и выходные данные агента) для цепочки. |
При связывании исполнителей агента следующий исполнитель получает AgentExecutorResponse через from_response обработчик. В нем используется поле full_conversation для сохранения полной истории бесед, предотвращая потерю предыдущего контекста последующими агентами.
spam_detector = AgentExecutor(create_spam_detector_agent())
email_assistant = AgentExecutor(create_email_assistant_agent())
# The email_assistant receives the spam_detector's full conversation context
workflow = (
WorkflowBuilder(start_executor=spam_detector)
.add_edge(spam_detector, email_assistant)
.build()
)
Характеристика потокового воспроизведения
Автоматически AgentExecutor адаптируется к режиму выполнения рабочего процесса:
-
stream=True— вызываетagent.run(stream=True)и передаёт каждоеAgentResponseUpdateкак событие выходных данных рабочего процесса. После завершения потоковой передачи обновления объединяются в полнуюAgentResponse, готовую для дальнейшей отправки. -
stream=False(по умолчанию) — вызываетagent.run(stream=False)и генерирует одноAgentResponseв качестве события выходного результата рабочего процесса.
# Streaming mode — receive incremental updates
events = workflow.run("Write a story about a cat.", stream=True)
async for event in events:
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
print(event.data.text, end="", flush=True)
# Non-streaming mode — receive complete response
result = await workflow.run("Write a story about a cat.")
# Retrieve AgentResponse objects from the result
outputs = result.get_outputs()
for output in outputs:
if isinstance(output, AgentResponse):
print(output.text)
Общие сеансы
По умолчанию каждый AgentExecutor создает собственный сеанс. Чтобы разделить сеанс между несколькими агентами (например, для поддержания общего потока беседы), создайте сеанс явным образом и передайте его каждому исполнителю.
from agent_framework import AgentExecutor
# Create a shared session from one agent
shared_session = writer_agent.create_session()
# Both executors share the same session
writer_executor = AgentExecutor(writer_agent, session=shared_session)
reviewer_executor = AgentExecutor(reviewer_agent, session=shared_session)
Замечание
Не все агенты поддерживают общие сеансы. Как правило, только агенты одного типа поставщика могут совместно использовать сеанс.
Создание контрольных точек
AgentExecutor поддерживает создание контрольных точек для сохранения и восстановления состояния в длительных рабочих процессах. При выполнении контрольной точки выполняется сериализация исполнителя:
- Внутренний кэш сообщений.
- Полный журнал бесед.
- Состояние сеанса агента.
- Все ожидающие запросы и ответы на ввод данных от пользователей.
При восстановлении исполнитель десериализирует это состояние, позволяя рабочему процессу возобновить работу с места его отключения.
Предупреждение
Контрольные точки с агентами, использующими сеансы на стороне сервера (например FoundryAgent) имеют ограничения. Состояние сеанса на стороне сервера не фиксируется в контрольных точках и может быть изменено последующими запусками. Рассмотрите возможность реализации пользовательского исполнителя, если требуется надежная контрольная точка с сеансами на стороне сервера.