Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Это руководство поможет обновить рабочие процессы Python до последних изменений API, представленных в версии 1.0.0b251104.
Обзор изменений
Этот выпуск включает два основных улучшения в системе рабочих процессов:
1. Объединенные API выполнения рабочих процессов
Методы выполнения рабочего процесса унифицированы для простоты:
-
Унифицированные
run_stream()методы иrun()методы: замена отдельных методов для определенных контрольных точек (run_stream_from_checkpoint(),run_from_checkpoint()) -
Единый интерфейс. Использование
checkpoint_idпараметра для возобновления из контрольных точек вместо отдельных методов - Гибкая контрольная точка: настройте хранилище контрольных точек во время сборки или измените его во время выполнения
-
Очистка семантики: взаимоисключающие
message(новые запуски) иcheckpoint_id(резюме) параметры
2. Упрощённая система Запрос-Ответ
Система ответа на запросы оптимизирована:
-
RequestInfoExecutorБольше нет: исполнителям теперь можно отправлять запросы напрямую. -
Новый
@response_handlerдекоратор: заменаRequestResponseобработчиков сообщений -
Упрощенные типы запросов: наследование от
RequestInfoMessageне требуется - Встроенные возможности: все исполнители автоматически поддерживают функции ответа на запросы
-
Более чистые графики рабочих процессов: удаление
RequestInfoExecutorузлов из рабочих процессов
Часть 1. Api единого выполнения рабочих процессов
Сначала рекомендуется перейти на объединенные API рабочих процессов, так как это формирует основу для всех схем выполнения рабочих процессов.
Возобновление с контрольных точек
До (старый API):
# OLD: Separate method for checkpoint resume
async for event in workflow.run_stream_from_checkpoint(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
):
print(f"Event: {event}")
После (новый API):
# NEW: Unified method with checkpoint_id parameter
async for event in workflow.run_stream(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage # Optional if configured at build time
):
print(f"Event: {event}")
Основные различия:
- Использование
checkpoint_idпараметра вместо отдельного метода - Не удается предоставить оба
messageиcheckpoint_id(взаимоисключающные) - Должен предоставить либо
message(новый запуск), либоcheckpoint_id(возобновить) -
checkpoint_storageявляется необязательным, если чекпоинтинг был сконфигурирован при сборке
API непотоковой передачи
Метод run() без потоковой передачи следует тому же шаблону:
Старый:
result = await workflow.run_from_checkpoint(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
)
Новые функции:
result = await workflow.run(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage # Optional if configured at build time
)
Возобновление процесса контрольной точки с ожидающими запросами
Важное критическое изменение: при возобновлении с контрольной точки, на которой есть ожидающие RequestInfoEvent объекты, новый API снова издает эти события автоматически. Вам необходимо записать и ответить на них.
До (старое поведение):
# OLD: Could provide responses directly during resume
responses = {
"request-id-1": "user response data",
"request-id-2": "another response"
}
async for event in workflow.run_stream_from_checkpoint(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage,
responses=responses # No longer supported
):
print(f"Event: {event}")
После (новое поведение):
# NEW: Capture re-emitted pending requests
requests: dict[str, Any] = {}
async for event in workflow.run_stream(checkpoint_id="checkpoint-id"):
if event.type == "request_info":
# Pending requests are automatically re-emitted
print(f"Pending request re-emitted: {event.request_id}")
requests[event.request_id] = event.data
# Collect user responses
responses: dict[str, Any] = {}
for request_id, request_data in requests.items():
response = handle_request(request_data) # Your logic here
responses[request_id] = response
# Send responses back to workflow
async for event in workflow.send_responses_streaming(responses):
if event.type == "output":
print(f"Workflow output: {event.data}")
Полный пример с участием человека в контуре
Ниже приведен полный пример возобновления контрольной точки с ожиданием утверждения человека:
from agent_framework import (
Executor,
FileCheckpointStorage,
WorkflowBuilder,
handler,
response_handler,
)
# ... (Executor definitions omitted for brevity)
async def run_interactive_session(
workflow: Workflow,
initial_message: str | None = None,
checkpoint_id: str | None = None,
) -> str:
"""Run workflow until completion, handling human input interactively."""
requests: dict[str, HumanApprovalRequest] = {}
responses: dict[str, str] | None = None
completed_output: str | None = None
while True:
# Determine which API to call
if responses:
# Send responses from previous iteration
event_stream = workflow.send_responses_streaming(responses)
requests.clear()
responses = None
else:
# Start new run or resume from checkpoint
if initial_message:
event_stream = workflow.run_stream(initial_message)
elif checkpoint_id:
event_stream = workflow.run_stream(checkpoint_id=checkpoint_id)
else:
raise ValueError("Either initial_message or checkpoint_id required")
# Process events
async for event in event_stream:
if event.type == "status":
print(event)
if event.type == "output":
completed_output = event.data
if event.type == "request_info":
if isinstance(event.data, HumanApprovalRequest):
requests[event.request_id] = event.data
# Check completion
if completed_output:
break
# Prompt for user input if we have pending requests
if requests:
responses = prompt_for_responses(requests)
continue
raise RuntimeError("Workflow stopped without completing or requesting input")
return completed_output
Часть 2. Упрощенная система Request-Response
После миграции в интерфейсы API унифицированных рабочих процессов обновите шаблоны ответа на запросы, чтобы использовать новую интегрированную систему.
1. Обновление импорта
Before:
from agent_framework import (
RequestInfoExecutor,
RequestInfoMessage,
RequestResponse,
# ... other imports
)
After:
from agent_framework import (
response_handler,
# ... other imports
# Remove: RequestInfoExecutor, RequestInfoMessage, RequestResponse
)
2. Обновление типов запросов
Before:
from dataclasses import dataclass
from agent_framework import RequestInfoMessage
@dataclass
class UserApprovalRequest(RequestInfoMessage):
"""Request for user approval."""
prompt: str = ""
context: str = ""
After:
from dataclasses import dataclass
@dataclass
class UserApprovalRequest:
"""Request for user approval."""
prompt: str = ""
context: str = ""
3. Обновление графа рабочего процесса
Before:
# Old pattern: Required RequestInfoExecutor in workflow
approval_executor = ApprovalRequiredExecutor(id="approval")
request_info_executor = RequestInfoExecutor(id="request_info")
workflow = (
WorkflowBuilder(start_executor=approval_executor)
.add_edge(approval_executor, request_info_executor)
.add_edge(request_info_executor, approval_executor)
.build()
)
After:
# New pattern: Direct request-response capabilities
approval_executor = ApprovalRequiredExecutor(id="approval")
workflow = (
WorkflowBuilder(start_executor=approval_executor)
.build()
)
4. Отправка запроса на обновление
Before:
class ApprovalRequiredExecutor(Executor):
@handler
async def process(self, message: str, ctx: WorkflowContext[UserApprovalRequest]) -> None:
request = UserApprovalRequest(
prompt=f"Please approve: {message}",
context="Important operation"
)
await ctx.send_message(request)
After:
class ApprovalRequiredExecutor(Executor):
@handler
async def process(self, message: str, ctx: WorkflowContext) -> None:
request = UserApprovalRequest(
prompt=f"Please approve: {message}",
context="Important operation"
)
await ctx.request_info(request_data=request, response_type=bool)
Обновление обработки ответов
Before:
class ApprovalRequiredExecutor(Executor):
@handler
async def handle_approval(
self,
response: RequestResponse[UserApprovalRequest, bool],
ctx: WorkflowContext[Never, str]
) -> None:
if response.data:
await ctx.yield_output("Approved!")
else:
await ctx.yield_output("Rejected!")
After:
class ApprovalRequiredExecutor(Executor):
@response_handler
async def handle_approval(
self,
original_request: UserApprovalRequest,
approved: bool,
ctx: WorkflowContext
) -> None:
if approved:
await ctx.yield_output("Approved!")
else:
await ctx.yield_output("Rejected!")
Сводка преимуществ
Унифицированные API рабочих процессов
- Упрощенный интерфейс: единый метод для начального запуска и возобновления контрольной точки
- Очистка семантики: взаимоисключающие параметры делают намерение явным
- Гибкий чекпоинтинг: настраивать на этапе сборки или переопределять во время выполнения
- Снижение когнитивной нагрузки: меньше методов для запоминания и обслуживания
система Запрос-Ответ
-
Упрощенная архитектура: нет необходимости в отдельных
RequestInfoExecutorкомпонентах -
Безопасность типов: непосредственная спецификация типа в
request_info()вызовах - Более чистый код: меньше импортов и более простых графиков рабочих процессов
- Повышение производительности: снижение затрат на маршрутизацию сообщений
- Улучшенная отладка: очистка потока выполнения и обработка ошибок
Тестирование миграции
Часть 1 Контрольный список: API рабочего процесса
-
Обновление вызовов API: замена
run_stream_from_checkpoint()наrun_stream(checkpoint_id=...) -
Обновление вызовов API: замена
run_from_checkpoint()наrun(checkpoint_id=...) -
Удаление
responsesпараметра: удаление любыхresponsesаргументов из вызовов возобновления контрольных точек -
Добавление перехвата событий: Реализация логики перехвата повторно испускаемых событий request_info (
event.type == "request_info") - Контрольная точка теста: убедитесь, что ожидающие запросы повторно отправляются и обрабатываются правильно.
Часть 2 Контрольный список: система Request-Response
-
Проверка импорта: убедитесь, что старые импорты не остаются (
RequestInfoExecutor,RequestInfoMessage,RequestResponse) -
Проверка типов запросов: подтверждение удаления
RequestInfoMessageнаследования -
Тестовая диаграмма
RequestInfoExecutorрабочего процесса: проверка удаления узлов -
Проверка обработчиков: Убедитесь
@response_handler, что декораторы применяются - Тест сквозных сценариев: выполнение полного рабочего процесса
Дальнейшие шаги
После завершения миграции:
- Ознакомьтесь с обновленным руководством по запросам и ответам
- Изучение расширенных шаблонов в руководстве пользователя
- Ознакомьтесь с обновленными примерами в репозитории
Дополнительные сведения см. в документации по Agent Framework или обратитесь к команде и сообществу.