Руководство по обновлению: API рабочих процессов и система Request-Response

Это руководство поможет обновить рабочие процессы Python до последних изменений API, представленных в версии 1.0.0b251104.

Обзор изменений

Этот выпуск включает два основных улучшения в системе рабочих процессов:

1. Объединенные API выполнения рабочих процессов

Методы выполнения рабочего процесса унифицированы для простоты:

  • Унифицированные run_stream() методы и run() методы: замена отдельных методов для определенных контрольных точек (run_stream_from_checkpoint(), run_from_checkpoint())
  • Единый интерфейс. Использование checkpoint_id параметра для возобновления из контрольных точек вместо отдельных методов
  • Гибкая контрольная точка: настройте хранилище контрольных точек во время сборки или измените его во время выполнения
  • Очистка семантики: взаимоисключающие message (новые запуски) и checkpoint_id (резюме) параметры

2. Упрощённая система Запрос-Ответ

Система ответа на запросы оптимизирована:

  • RequestInfoExecutorБольше нет: исполнителям теперь можно отправлять запросы напрямую.
  • Новый @response_handler декоратор: замена RequestResponse обработчиков сообщений
  • Упрощенные типы запросов: наследование от RequestInfoMessage не требуется
  • Встроенные возможности: все исполнители автоматически поддерживают функции ответа на запросы
  • Более чистые графики рабочих процессов: удаление RequestInfoExecutor узлов из рабочих процессов

Часть 1. Api единого выполнения рабочих процессов

Сначала рекомендуется перейти на объединенные API рабочих процессов, так как это формирует основу для всех схем выполнения рабочих процессов.

Возобновление с контрольных точек

До (старый API):

# OLD: Separate method for checkpoint resume
async for event in workflow.run_stream_from_checkpoint(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage
):
    print(f"Event: {event}")

После (новый API):

# NEW: Unified method with checkpoint_id parameter
async for event in workflow.run_stream(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage  # Optional if configured at build time
):
    print(f"Event: {event}")

Основные различия:

  • Использование checkpoint_id параметра вместо отдельного метода
  • Не удается предоставить оба message и checkpoint_id (взаимоисключающные)
  • Должен предоставить либо message (новый запуск), либо checkpoint_id (возобновить)
  • checkpoint_storage является необязательным, если чекпоинтинг был сконфигурирован при сборке

API непотоковой передачи

Метод run() без потоковой передачи следует тому же шаблону:

Старый:

result = await workflow.run_from_checkpoint(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage
)

Новые функции:

result = await workflow.run(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage  # Optional if configured at build time
)

Возобновление процесса контрольной точки с ожидающими запросами

Важное критическое изменение: при возобновлении с контрольной точки, на которой есть ожидающие RequestInfoEvent объекты, новый API снова издает эти события автоматически. Вам необходимо записать и ответить на них.

До (старое поведение):

# OLD: Could provide responses directly during resume
responses = {
    "request-id-1": "user response data",
    "request-id-2": "another response"
}

async for event in workflow.run_stream_from_checkpoint(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage,
    responses=responses  # No longer supported
):
    print(f"Event: {event}")

После (новое поведение):

# NEW: Capture re-emitted pending requests
requests: dict[str, Any] = {}

async for event in workflow.run_stream(checkpoint_id="checkpoint-id"):
    if event.type == "request_info":
        # Pending requests are automatically re-emitted
        print(f"Pending request re-emitted: {event.request_id}")
        requests[event.request_id] = event.data

# Collect user responses
responses: dict[str, Any] = {}
for request_id, request_data in requests.items():
    response = handle_request(request_data)  # Your logic here
    responses[request_id] = response

# Send responses back to workflow
async for event in workflow.send_responses_streaming(responses):
    if event.type == "output":
        print(f"Workflow output: {event.data}")

Полный пример с участием человека в контуре

Ниже приведен полный пример возобновления контрольной точки с ожиданием утверждения человека:

from agent_framework import (
    Executor,
    FileCheckpointStorage,
    WorkflowBuilder,
    handler,
    response_handler,
)

# ... (Executor definitions omitted for brevity)

async def run_interactive_session(
    workflow: Workflow,
    initial_message: str | None = None,
    checkpoint_id: str | None = None,
) -> str:
    """Run workflow until completion, handling human input interactively."""

    requests: dict[str, HumanApprovalRequest] = {}
    responses: dict[str, str] | None = None
    completed_output: str | None = None

    while True:
        # Determine which API to call
        if responses:
            # Send responses from previous iteration
            event_stream = workflow.send_responses_streaming(responses)
            requests.clear()
            responses = None
        else:
            # Start new run or resume from checkpoint
            if initial_message:
                event_stream = workflow.run_stream(initial_message)
            elif checkpoint_id:
                event_stream = workflow.run_stream(checkpoint_id=checkpoint_id)
            else:
                raise ValueError("Either initial_message or checkpoint_id required")

        # Process events
        async for event in event_stream:
            if event.type == "status":
                print(event)
            if event.type == "output":
                completed_output = event.data
            if event.type == "request_info":
                if isinstance(event.data, HumanApprovalRequest):
                    requests[event.request_id] = event.data

        # Check completion
        if completed_output:
            break

        # Prompt for user input if we have pending requests
        if requests:
            responses = prompt_for_responses(requests)
            continue

        raise RuntimeError("Workflow stopped without completing or requesting input")

    return completed_output

Часть 2. Упрощенная система Request-Response

После миграции в интерфейсы API унифицированных рабочих процессов обновите шаблоны ответа на запросы, чтобы использовать новую интегрированную систему.

1. Обновление импорта

Before:

from agent_framework import (
    RequestInfoExecutor,
    RequestInfoMessage,
    RequestResponse,
    # ... other imports
)

After:

from agent_framework import (
    response_handler,
    # ... other imports
    # Remove: RequestInfoExecutor, RequestInfoMessage, RequestResponse
)

2. Обновление типов запросов

Before:

from dataclasses import dataclass
from agent_framework import RequestInfoMessage

@dataclass
class UserApprovalRequest(RequestInfoMessage):
    """Request for user approval."""
    prompt: str = ""
    context: str = ""

After:

from dataclasses import dataclass

@dataclass
class UserApprovalRequest:
    """Request for user approval."""
    prompt: str = ""
    context: str = ""

3. Обновление графа рабочего процесса

Before:

# Old pattern: Required RequestInfoExecutor in workflow
approval_executor = ApprovalRequiredExecutor(id="approval")
request_info_executor = RequestInfoExecutor(id="request_info")

workflow = (
    WorkflowBuilder(start_executor=approval_executor)
    .add_edge(approval_executor, request_info_executor)
    .add_edge(request_info_executor, approval_executor)
    .build()
)

After:

# New pattern: Direct request-response capabilities
approval_executor = ApprovalRequiredExecutor(id="approval")

workflow = (
    WorkflowBuilder(start_executor=approval_executor)
    .build()
)

4. Отправка запроса на обновление

Before:

class ApprovalRequiredExecutor(Executor):
    @handler
    async def process(self, message: str, ctx: WorkflowContext[UserApprovalRequest]) -> None:
        request = UserApprovalRequest(
            prompt=f"Please approve: {message}",
            context="Important operation"
        )
        await ctx.send_message(request)

After:

class ApprovalRequiredExecutor(Executor):
    @handler
    async def process(self, message: str, ctx: WorkflowContext) -> None:
        request = UserApprovalRequest(
            prompt=f"Please approve: {message}",
            context="Important operation"
        )
        await ctx.request_info(request_data=request, response_type=bool)

Обновление обработки ответов

Before:

class ApprovalRequiredExecutor(Executor):
    @handler
    async def handle_approval(
        self,
        response: RequestResponse[UserApprovalRequest, bool],
        ctx: WorkflowContext[Never, str]
    ) -> None:
        if response.data:
            await ctx.yield_output("Approved!")
        else:
            await ctx.yield_output("Rejected!")

After:

class ApprovalRequiredExecutor(Executor):
    @response_handler
    async def handle_approval(
        self,
        original_request: UserApprovalRequest,
        approved: bool,
        ctx: WorkflowContext
    ) -> None:
        if approved:
            await ctx.yield_output("Approved!")
        else:
            await ctx.yield_output("Rejected!")

Сводка преимуществ

Унифицированные API рабочих процессов

  1. Упрощенный интерфейс: единый метод для начального запуска и возобновления контрольной точки
  2. Очистка семантики: взаимоисключающие параметры делают намерение явным
  3. Гибкий чекпоинтинг: настраивать на этапе сборки или переопределять во время выполнения
  4. Снижение когнитивной нагрузки: меньше методов для запоминания и обслуживания

система Запрос-Ответ

  1. Упрощенная архитектура: нет необходимости в отдельных RequestInfoExecutor компонентах
  2. Безопасность типов: непосредственная спецификация типа в request_info() вызовах
  3. Более чистый код: меньше импортов и более простых графиков рабочих процессов
  4. Повышение производительности: снижение затрат на маршрутизацию сообщений
  5. Улучшенная отладка: очистка потока выполнения и обработка ошибок

Тестирование миграции

Часть 1 Контрольный список: API рабочего процесса

  1. Обновление вызовов API: замена run_stream_from_checkpoint() на run_stream(checkpoint_id=...)
  2. Обновление вызовов API: замена run_from_checkpoint() на run(checkpoint_id=...)
  3. Удаление responses параметра: удаление любых responses аргументов из вызовов возобновления контрольных точек
  4. Добавление перехвата событий: Реализация логики перехвата повторно испускаемых событий request_info (event.type == "request_info")
  5. Контрольная точка теста: убедитесь, что ожидающие запросы повторно отправляются и обрабатываются правильно.

Часть 2 Контрольный список: система Request-Response

  1. Проверка импорта: убедитесь, что старые импорты не остаются (RequestInfoExecutor, RequestInfoMessage, RequestResponse)
  2. Проверка типов запросов: подтверждение удаления RequestInfoMessage наследования
  3. Тестовая диаграммаRequestInfoExecutor рабочего процесса: проверка удаления узлов
  4. Проверка обработчиков: Убедитесь@response_handler, что декораторы применяются
  5. Тест сквозных сценариев: выполнение полного рабочего процесса

Дальнейшие шаги

После завершения миграции:

  1. Ознакомьтесь с обновленным руководством по запросам и ответам
  2. Изучение расширенных шаблонов в руководстве пользователя
  3. Ознакомьтесь с обновленными примерами в репозитории

Дополнительные сведения см. в документации по Agent Framework или обратитесь к команде и сообществу.