Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Предупреждение
API функционального рабочего процесса является экспериментальным и подлежит изменению или удалению в будущих версиях без уведомления.
API функционального рабочего процесса позволяет создавать рабочие процессы как простые Python асинхронные функции. Вместо определения классов исполнителя, соединения рёбер и использования
Параллельное сравнение с API графа см. в разделе API рабочих процессов в обзоре рабочих процессов.
декоратор @workflow
@workflow Примените async к функции, чтобы преобразовать ее в FunctionalWorkflow объект:
from agent_framework import workflow
@workflow
async def text_pipeline(text: str) -> str:
upper = await to_upper_case(text)
return await reverse_text(upper)
Декоратор @workflow поддерживает параметризованную форму с необязательными аргументами:
from agent_framework import InMemoryCheckpointStorage, workflow
storage = InMemoryCheckpointStorage()
@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
...
параметры @workflow
| Parameter | Тип | Description |
|---|---|---|
name |
str | None |
Отображаемое имя рабочего процесса. По умолчанию значение функции равно __name__. |
description |
str | None |
Необязательное описание, доступное для чтения человеком. |
checkpoint_storage |
CheckpointStorage | None |
Хранилище по умолчанию для сохранения результатов этапов между запусками. Может быть переопределено для каждого вызова run(). |
Сигнатура функции рабочего процесса
Первый параметр функции рабочего процесса получает входные данные, переданные в .run().
ctx: WorkflowRunContext Добавьте параметр только в том случае, если требуется HITL, состояние ключа и значения или пользовательские события— это необязательно в противном случае:
# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
result = await process(data)
return result
# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
feedback = await ctx.request_info({"draft": data}, response_type=str)
return feedback
WorkflowRunContext сначала определяется по аннотации типа, затем по имени параметра ctx, поэтому работают и параметр ctx: WorkflowRunContext, и голый параметр ctx.
Запуск рабочего процесса
Вызовите .run() на объекте FunctionalWorkflow, возвращаемом @workflow:
# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world") # str — the raw return value
# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text) # first output as a string
print(result.get_outputs()) # list of all outputs
print(result.get_final_state()) # WorkflowRunState.IDLE
параметры run()
| Parameter | Тип | Description |
|---|---|---|
message |
Any | None |
Входные данные, переданные функции рабочего процесса в качестве первого аргумента. |
stream |
bool |
Если True, возвращает ResponseStream, который возвращает WorkflowEvent объекты. По умолчанию — False. |
responses |
dict[str, Any] | None |
Ответы HITL, для которых ключом является request_id. Используется для возобновления приостановленного рабочего процесса. |
checkpoint_id |
str | None |
Контрольная точка для восстановления. Необходимо задать checkpoint_storage. |
checkpoint_storage |
CheckpointStorage | None |
Переопределяет настройку хранилища по умолчанию, установленную на декораторе для этого запуска. |
include_status_events |
bool |
Включите события изменения состояния в результат, отличный от потоковой передачи. |
Точно один из параметров message, responses или checkpoint_id должен быть указан для каждого вызова.
WorkflowRunResult
run() (непотоковый) возвращает WorkflowRunResult. Ключевые методы:
| Метод / свойство | Returns | Description |
|---|---|---|
.text |
str |
Первый вывод в виде строки. Пустая строка, если строковые выходные данные отсутствуют. |
.get_outputs() |
list[Any] |
Все выходные данные, создаваемые рабочим процессом. |
.get_final_state() |
WorkflowRunState |
Конечное состояние выполнения (IDLE, , IDLE_WITH_PENDING_REQUESTSFAILED...). |
.get_request_info_events() |
list[WorkflowEvent] |
Ожидающие запросы HITL при состоянии IDLE_WITH_PENDING_REQUESTS. |
Стриминг
```
Передайте stream=True, чтобы получать события по мере их создания:
```
from agent_framework import workflow
@workflow
async def data_pipeline(url: str) -> str:
raw = await fetch_data(url)
return await transform_data(raw)
# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
if event.type == "output":
print(f"Output: {event.data}")
# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")
Полный пример см. в python/samples/03-workflows/functional/basic_streaming_pipeline.py.
декоратор @step
@step — это декоратор, который добавляет кэширование результатов, выбросы событий и контрольные точки на шаг для отдельных асинхронных функций:
from agent_framework import step, workflow
@step
async def fetch_data(url: str) -> dict:
# expensive — hits a real API
return await http_get(url)
@workflow
async def pipeline(url: str) -> str:
raw = await fetch_data(url)
return process(raw)
Что @step делает внутри рабочего процесса
-
Кэширует результаты — результат сохраняется
(step_name, call_index). При перезапуске или восстановлении контрольной точки HITL завершенный шаг мгновенно возвращает сохраненный результат, нежели повторно выполняется. -
Выдает события —
executor_invoked/executor_completed/executor_failedгенерируются для отслеживания. При попадании в кэш вместо этого выдаётсяexecutor_bypassed. -
Сохраняет контрольные точки — если рабочий процесс имеет
checkpoint_storage, контрольная точка сохраняется после завершения каждого шага. -
Внедряет
WorkflowRunContext— если функция шага объявляет параметрctx: WorkflowRunContext, активный контекст автоматически внедряется.
Вне запущенного рабочего процесса @step является прозрачной — функция ведет себя идентично своей версии без украшения, что делает ее полностью тестируемой в изоляции.
Когда следует использовать @step
Используйте @step для функций, которые дорогостоящи для повторного выполнения: вызовы агента, внешние API запросы или любая операция, где повторное выполнение при возобновлении может быть дорогостоящим или иметь побочные эффекты. Обычные функции (без @step) по-прежнему работают внутри @workflow; они просто повторно выполняются при возобновлении рабочего процесса.
from agent_framework import InMemoryCheckpointStorage, step, workflow
storage = InMemoryCheckpointStorage()
@step # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
return (await agent.run(prompt)).text
# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
return len(text) > 0
@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
draft = await call_llm(f"Write about: {topic}")
ok = await validate(draft)
return draft if ok else ""
@step также принимает name параметр:
@step(name="transform")
async def transform_data(raw: dict) -> str:
...
Полный пример см. в python/samples/03-workflows/functional/steps_and_checkpointing.py.
WorkflowRunContext
WorkflowRunContext (короткий псевдоним: RunContext) — это контекст выполнения, внедряемый в рабочие процессы и функции шагов. Это нужно только при использовании HITL, состояния типа "ключ/значение" или пользовательских событий.
Импортируйте его из agent_framework:
from agent_framework import WorkflowRunContext, workflow
ctx.request_info() — "Человек в контуре"
ctx.request_info() приостанавливает рабочий процесс, чтобы ожидать внешних входных данных:
@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
draft = await write_draft(topic)
feedback = await ctx.request_info(
{"draft": draft, "instructions": "Please review this draft"},
response_type=str,
request_id="review_request",
)
return await revise_draft(draft, feedback)
Параметры:
| Parameter | Тип | Description |
|---|---|---|
request_data |
Any |
Полезная нагрузка, описывающая необходимые входные данные (dict, Pydantic-модель, string, ...). |
response_type |
type |
Ожидаемый Python тип ответа. |
request_id |
str | None |
Стабильный идентификатор для этого запроса. Если опущено, создается случайный UUID. |
Семантика воспроизведения: При первом выполнении request_info() вызывает внутренний сигнал (никогда не видимый для кода), который приостанавливает рабочий процесс. Вызывающий получает WorkflowRunResult с get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS.
.run(responses={request_id: value}) Возобновить вызов— рабочий процесс повторно выполняется из верхней части и request_info() возвращает предоставленное значение немедленно.
@step-декорированные функции, которые выполнялись до приостановки, возвращают кэшированные результаты при возобновлении вместо повторного выполнения.
Обработка ответа:
# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
requests = result1.get_request_info_events()
print(requests[0].request_id) # "review_request"
# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)
Полный пример см. в python/samples/03-workflows/functional/hitl_review.py.
ctx.request_info() также поддерживается внутри @step функций.
ctx.add_event() — пользовательские события
Используется ctx.add_event() для выдачи событий, относящихся к приложению, а также событий жизненного цикла платформы. Полные сведения и примеры см. в разделе "Создание пользовательских событий".
ctx.get_state()
/
ctx.set_state() — состояние ключа и значения
Используйте ctx.get_state() и ctx.set_state() для хранения значений, которые сохраняются при прерываниях HITL и включаются в контрольные точки. Полные сведения см. в разделе "Состояние рабочего процесса".
Значения состояния должны быть сериализуемыми в формате JSON при настройке хранилища контрольных точек.
ctx.is_streaming()
Возвращается True когда текущий запуск был начат с stream=True. Полезные внутри функций шага, которые хотят настроить их поведение на основе режима потоковой передачи.
get_run_context()
Извлекает активный WorkflowRunContext из любого места в работающем рабочем процессе — полезно для вспомогательных функций, которые не объявляют ctx параметры:
from agent_framework import get_run_context
async def helper():
ctx = get_run_context()
if ctx is not None:
ctx.set_state("helper_ran", True)
Возвращается None при вызове вне запущенного рабочего процесса.
Параллелизм с asyncio.gather
Используйте стандартный параллелизм Python для распределения и сбора — без необходимости в примитивах платформы.
import asyncio
from agent_framework import workflow
@workflow
async def research_pipeline(topic: str) -> str:
web, papers, news = await asyncio.gather(
research_web(topic),
research_papers(topic),
research_news(topic),
)
return await synthesize([web, papers, news])
asyncio.gather также работает при украшении @stepфункций.
Полный пример см. в python/samples/03-workflows/functional/parallel_pipeline.py.
Вызов агентов внутри рабочих процессов
Вызовы агентов выполняются как обычные вызовы функций внутри @workflow.
from agent_framework import Agent, workflow
writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)
@workflow
async def poem_workflow(topic: str) -> str:
poem = (await writer.run(f"Write a poem about: {topic}")).text
review = (await reviewer.run(f"Review this poem: {poem}")).text
return f"Poem:\n{poem}\n\nReview: {review}"
Добавьте @step к функциям вызова агента, когда хотите, чтобы их результаты кэшировались при возобновлениях HITL или восстановлении контрольных точек.
from agent_framework import step
@step
async def write_poem(topic: str) -> str:
return (await writer.run(f"Write a poem about: {topic}")).text
Полный пример см. в python/samples/03-workflows/functional/agent_integration.py.
.as_agent() — использование рабочего процесса в качестве агента
Оберните FunctionalWorkflow как объект, совместимый с агентом, с .as_agent():
from agent_framework import workflow
@workflow
async def poem_workflow(topic: str) -> str:
...
# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")
# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)
# Or use in a larger workflow or orchestration
.as_agent()FunctionalWorkflowAgent возвращает FunctionalWorkflowAgent, который открывает тот же интерфейс, что и другие объекты агента, делая функциональные рабочие процессы составляемыми с любой системой, принимающей агентов.
| Parameter | Тип | Description |
|---|---|---|
name |
str | None |
Покажите имя агента. По умолчанию используется имя рабочего процесса. |
См. python/samples/03-workflows/functional/agent_integration.py в качестве примера.
Samples
Примеры запуска находятся в следующих примерах папок:
-
python/samples/01-get-started/— вводные@workflowпримеры -
python/samples/03-workflows/functional/— полнофункциональные примеры рабочих процессов
Дальнейшие действия
См. также:
- Исполнители — единицы обработки в API на основе графов
- Человек в контуре — HITL в рабочих процессах на основе графов
- Контрольные точки — хранилище контрольных точек и возобновление работы
- События — типы событий рабочего процесса
- Использование рабочих процессов в качестве агентов
В настоящее время API функционального рабочего процесса недоступен для C#.