Warning
功能性工作流程 API 仍 屬實驗性質 ,未來版本可能會變更或移除,且不會另行通知。
功能性工作流程 API 讓你可以將工作流程寫成純 Python 非同步函式。 你不需要定義執行器類別、接線邊緣或使用 WorkflowBuilder,而是用 async 裝飾一個 @workflow 函式,並使用原生的 Python 控制流程——if/else、for 迴圈、asyncio.gather——來表達你的邏輯。
欲與圖形 API 並列比較,請參閱工作流程總覽中的 工作流程 API 。
@workflow 裝飾者
套用 @workflow 到 async 一個函式,將其轉換成 FunctionalWorkflow 物件:
from agent_framework import workflow
@workflow
async def text_pipeline(text: str) -> str:
upper = await to_upper_case(text)
return await reverse_text(upper)
裝飾器支援帶有可選參數的參數化形式:
from agent_framework import InMemoryCheckpointStorage, workflow
storage = InMemoryCheckpointStorage()
@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
...
@workflow 參數
| 參數 | 類型 | 說明 |
|---|---|---|
name |
str | None |
工作流程的顯示名稱。 預設為函式的 __name__。 |
description |
str | None |
可選的人類可讀說明。 |
checkpoint_storage |
CheckpointStorage | None |
預設儲存空間,用於在執行間持續保存步驟結果。 在每次調用時可以覆寫 run()。 |
工作流程函式簽章
工作流程函數 的第一個參數 接收傳送給 .run()的輸入。 只有在需要 HITL、鍵值狀態或自訂事件時,才加入 ctx: WorkflowRunContext 參數——否則是可選的:
# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
result = await process(data)
return result
# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
feedback = await ctx.request_info({"draft": data}, response_type=str)
return feedback
WorkflowRunContext會先由型別註解偵測,然後由參數名稱ctx偵測,因此具有ctx: WorkflowRunContext或單獨的ctx參數都能正常運作。
執行工作流程
在.run()返回的FunctionalWorkflow物件上呼叫@workflow:
# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world") # str — the raw return value
# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text) # first output as a string
print(result.get_outputs()) # list of all outputs
print(result.get_final_state()) # WorkflowRunState.IDLE
run() 參數
| 參數 | 類型 | 說明 |
|---|---|---|
message |
Any | None |
輸入被作為第一個參數傳遞給工作流函數。 |
stream |
bool |
若 True,則返回 a ResponseStream ,產生 WorkflowEvent 物件。 預設為 False。 |
responses |
dict[str, Any] | None |
HITL 回應以 request_id 作為索引鍵組織。 用來恢復暫停的工作流程。 |
checkpoint_id |
str | None |
還有一個可以恢復的檢查點。 需要設定 checkpoint_storage。 |
checkpoint_storage |
CheckpointStorage | None |
覆寫此次運行中設定於裝飾器上的預設儲存。 |
include_status_events |
bool |
在非串流結果中包含狀態變更事件。 |
每次呼叫必須僅提供其中一個message、responses或checkpoint_id。
WorkflowRunResult
run() (非串流)回傳一個 WorkflowRunResult. 關鍵方法:
| 方法 / 性質 | Returns | 說明 |
|---|---|---|
.text |
str |
第一次輸出是字串。 如果沒有字串輸出,則為空字串。 |
.get_outputs() |
list[Any] |
所有由工作流程輸出的結果。 |
.get_final_state() |
WorkflowRunState |
最終運行狀態(IDLE, IDLE_WITH_PENDING_REQUESTS, FAILED, ...)。 |
.get_request_info_events() |
list[WorkflowEvent] |
當狀態為 IDLE_WITH_PENDING_REQUESTS時,待處理的 HITL 請求。 |
串流
傳遞 stream=True 以在事件產生時接收:
from agent_framework import workflow
@workflow
async def data_pipeline(url: str) -> str:
raw = await fetch_data(url)
return await transform_data(raw)
# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
if event.type == "output":
print(f"Output: {event.data}")
# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")
完整範例請參見 python/samples/03-workflows/functional/basic_streaming_pipeline.py 。
@step 裝飾者
@step 是一個可選擇加入的裝飾器,為個別非同步函式新增結果快取、事件排放及每步檢查點:
from agent_framework import step, workflow
@step
async def fetch_data(url: str) -> dict:
# expensive — hits a real API
return await http_get(url)
@workflow
async def pipeline(url: str) -> str:
raw = await fetch_data(url)
return process(raw)
在工作流程中,@step 的作用是什麼
-
快取結果 — 結果由
(step_name, call_index)儲存。 在 HITL 恢復或檢查點還原時,完成的步驟會立即回傳已儲存的結果,而非重新執行。 -
發射事件——
executor_invoked/executor_completed/executor_failed是為了可觀察而發射的。 在快取命中時,executor_bypassed會被發出。 -
儲存檢查點 — 若工作流程有
checkpoint_storage,則每步驟完成後會儲存檢查點。 -
注入
WorkflowRunContext— 如果階梯函數宣告參數ctx: WorkflowRunContext,主動上下文會自動注入。
在執行中的工作流程之外,@step 是透明的,這意味著其函式的行為與未裝飾版本相同,使得其可以在獨立的情況下完全測試。
@step 的使用時機
用於 @step 重 執行成本高的函式:代理呼叫、外部 API 請求,或任何在恢復時重新執行會造成成本高昂或有副作用的操作。 純函式(不含 @step)仍可在內部 @workflow運作;當工作流程恢復時,它們會重新執行。
from agent_framework import InMemoryCheckpointStorage, step, workflow
storage = InMemoryCheckpointStorage()
@step # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
return (await agent.run(prompt)).text
# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
return len(text) > 0
@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
draft = await call_llm(f"Write about: {topic}")
ok = await validate(draft)
return draft if ok else ""
@step 也接受一個 name 參數:
@step(name="transform")
async def transform_data(raw: dict) -> str:
...
完整範例請參見 python/samples/03-workflows/functional/steps_and_checkpointing.py 。
WorkflowRunContext
WorkflowRunContext (簡短別名: RunContext)是注入工作流程與步驟函式的執行上下文。 只有在使用 HITL、鍵值狀態或自訂事件時才需要。
匯入自 agent_framework:
from agent_framework import WorkflowRunContext, workflow
ctx.request_info() — 人機互動
ctx.request_info() 暫停工作流程以等待外部輸入:
@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
draft = await write_draft(topic)
feedback = await ctx.request_info(
{"draft": draft, "instructions": "Please review this draft"},
response_type=str,
request_id="review_request",
)
return await revise_draft(draft, feedback)
參數:
| 參數 | 類型 | 說明 |
|---|---|---|
request_data |
Any |
載荷描述所需的輸入(dict、Pydantic 模型、字串等)。 |
response_type |
type |
預期中的 Python 回應類型。 |
request_id |
str | None |
此請求的穩定識別碼。 若省略,則會產生隨機 UUID。 |
重播語意: 第一次執行時, request_info() 會發出一個內部訊號(你的程式碼永遠看不到)來暫停工作流程。 呼叫者收到 WorkflowRunResult a ,其 get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS為 。 呼叫恢復 .run(responses={request_id: value}) — 工作流程會從頂端重新執行,並 request_info() 立即回傳所提供的值。
@step-裝飾函式在暫停前執行,會在恢復時回傳快取結果,而非重新執行。
應對回應:
# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
requests = result1.get_request_info_events()
print(requests[0].request_id) # "review_request"
# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)
完整範例請參見 python/samples/03-workflows/functional/hitl_review.py 。
ctx.request_info() 也支援在 @step 函式中。
ctx.add_event() — 自訂活動
請使用 ctx.add_event() 來與框架生命週期事件同時發出應用程式專屬事件。 完整細節與範例請參見 「發出自訂事件」。
ctx.get_state()
/
ctx.set_state() — 鍵值狀態
使用 ctx.get_state() 和 ctx.set_state() 來儲存跨越 HITL 中斷且包含在檢查點中的值。 完整細節請參閱 工作流程狀態。
在設定檢查點儲存時,狀態值必須是可 JSON 序列化的。
ctx.is_streaming()
當前運行是以True開始時,會回傳stream=True。 有用的內部步驟功能,會根據串流模式調整行為。
get_run_context()
從執行中的工作流程中任意位置擷取主動 WorkflowRunContext 資料——在不宣告 ctx 參數的輔助函式中非常有用:
from agent_framework import get_run_context
async def helper():
ctx = get_run_context()
if ctx is not None:
ctx.set_state("helper_ran", True)
在非執行中的工作流程中被呼叫時,會回傳 None 。
平行性 asyncio.gather
使用標準 Python 並行進行扇出/扇入 — 不需要框架原語:
import asyncio
from agent_framework import workflow
@workflow
async def research_pipeline(topic: str) -> str:
web, papers, news = await asyncio.gather(
research_web(topic),
research_papers(topic),
research_news(topic),
)
return await synthesize([web, papers, news])
asyncio.gather 也適用於函數被@step裝飾時。
完整範例請參見 python/samples/03-workflows/functional/parallel_pipeline.py 。
在工作流程中呼叫代理
在@workflow中,代理呼叫就像是簡單的函式呼叫。
from agent_framework import Agent, workflow
writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)
@workflow
async def poem_workflow(topic: str) -> str:
poem = (await writer.run(f"Write a poem about: {topic}")).text
review = (await reviewer.run(f"Review this poem: {poem}")).text
return f"Poem:\n{poem}\n\nReview: {review}"
當您希望其結果在 HITL 任務恢復或檢查點還原中快取時,加入 @step 至代理呼叫函數:
from agent_framework import step
@step
async def write_poem(topic: str) -> str:
return (await writer.run(f"Write a poem about: {topic}")).text
完整範例請參見 python/samples/03-workflows/functional/agent_integration.py 。
.as_agent() — 將工作流程用作代理
將 a FunctionalWorkflow 包裝為代理相容物件,並以:.as_agent()
from agent_framework import workflow
@workflow
async def poem_workflow(topic: str) -> str:
...
# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")
# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)
# Or use in a larger workflow or orchestration
.as_agent() 回傳 A FunctionalWorkflowAgent ,暴露出與其他代理物件相同的 run() 介面,使功能性工作流程可與任何接受代理的系統組合。
| 參數 | 類型 | 說明 |
|---|---|---|
name |
str | None |
代理人的顯示名稱。 預設為工作流程名稱。 |
請參考 python/samples/03-workflows/functional/agent_integration.py 範例。
Samples
可跑的範例收錄於以下範例資料夾中:
-
python/samples/01-get-started/— 入門@workflow範例 -
python/samples/03-workflows/functional/— 功能齊全的工作流程範例
下一步
相關主題:
- 執行者 — 圖形化 API 中的處理單元
- 人工介入(HITL) — 基於圖形的工作流程中的 HITL
- 檢查點 — 檢查點儲存與恢復
- 事件 — 工作流程事件類型
- 以工作流作為代理使用
目前 C# 還沒有功能性工作流程 API。