Warning
기능 워크플로 API는 실험 적이며 예고 없이 이후 버전에서 변경 또는 제거될 수 있습니다.
기능 워크플로 API를 사용하면 워크플로를 일반 Python 비동기 함수로 작성할 수 있습니다. 실행기 클래스를 정의하는 대신 에지를 배선합니다. WorkflowBuilder 사용하여 async 함수를 @workflow 데코레이팅하고 네이티브 Python 제어 흐름(if/else, for 루프, asyncio.gather)을 사용하여 논리를 표현합니다.
그래프 API와 나란히 비교하려면 워크플로 개요의 워크플로 API 를 참조하세요.
@workflow 데코레이터
@workflow을(를) async 함수에 적용하여 FunctionalWorkflow 개체로 변환합니다.
from agent_framework import workflow
@workflow
async def text_pipeline(text: str) -> str:
upper = await to_upper_case(text)
return await reverse_text(upper)
데코레이터는 @workflow 선택적 인수를 사용하여 매개 변수가 있는 양식을 지원합니다.
from agent_framework import InMemoryCheckpointStorage, workflow
storage = InMemoryCheckpointStorage()
@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
...
@workflow 매개 변수
| 매개 변수 | 유형 | 설명 |
|---|---|---|
name |
str | None |
워크플로의 표시 이름입니다. 기본값은 함수의 __name__. |
description |
str | None |
사람이 읽을 수 있는 선택적 설명입니다. |
checkpoint_storage |
CheckpointStorage | None |
실행 간에 단계 결과를 유지하기 위한 기본 스토리지입니다. 호출당 run()에서 재정의할 수 있습니다. |
워크플로 함수 서명
워크플로 함수의 첫 번째 매개 변수 는 전달된 입력을 받습니다 .run().
ctx: WorkflowRunContext HITL, 키/값 상태 또는 사용자 지정 이벤트가 필요한 경우에만 매개 변수를 추가합니다. 그렇지 않으면 선택 사항입니다.
# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
result = await process(data)
return result
# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
feedback = await ctx.request_info({"draft": data}, response_type=str)
return feedback
WorkflowRunContext는 먼저 형식 주석에 의해 감지되고, 그 다음에는 매개 변수 이름 ctx에 의해 감지되므로, ctx: WorkflowRunContext와 기본 ctx 매개 변수 둘 다 작동합니다.
워크플로 실행
.run()에서 반환된 FunctionalWorkflow 객체에 대해 @workflow를 호출하세요.
# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world") # str — the raw return value
# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text) # first output as a string
print(result.get_outputs()) # list of all outputs
print(result.get_final_state()) # WorkflowRunState.IDLE
run() 매개 변수
| 매개 변수 | 유형 | 설명 |
|---|---|---|
message |
Any | None |
워크플로 함수에 첫 번째 인수로 전달된 입력입니다. |
stream |
bool |
True일 경우, ResponseStream 객체를 생성하는 WorkflowEvent를 반환합니다. 기본값은 False입니다. |
responses |
dict[str, Any] | None |
request_id로 키가 지정된 HITL 응답 일시 중단된 워크플로를 다시 시작하는 데 사용됩니다. |
checkpoint_id |
str | None |
복원할 검사점. 설정하려면 checkpoint_storage이 필요합니다. |
checkpoint_storage |
CheckpointStorage | None |
이 실행에 대한 데코레이터에 설정된 기본 스토리지를 재정의합니다. |
include_status_events |
bool |
비 스트리밍 결과에 상태 변경 이벤트를 포함합니다. |
각 호출당 message, responses, 또는 checkpoint_id 중 정확히 하나가 제공되어야 합니다.
WorkflowRunResult
run() (비 스트리밍)은 WorkflowRunResult를 반환합니다. 주요 메서드:
| 메서드/속성 | Returns | 설명 |
|---|---|---|
.text |
str |
첫 번째 출력을 문자열로 사용합니다. 문자열 출력이 없으면 빈 문자열입니다. |
.get_outputs() |
list[Any] |
워크플로에서 내보낸 모든 출력입니다. |
.get_final_state() |
WorkflowRunState |
최종 실행 상태(IDLE, IDLE_WITH_PENDING_REQUESTS, FAILED, ...). |
.get_request_info_events() |
list[WorkflowEvent] |
상태가 IDLE_WITH_PENDING_REQUESTS일 때 보류 중인 HITL 요청입니다. |
스트리밍
이벤트가 생성될 때 이벤트를 수신하도록 전달 stream=True 합니다.
from agent_framework import workflow
@workflow
async def data_pipeline(url: str) -> str:
raw = await fetch_data(url)
return await transform_data(raw)
# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
if event.type == "output":
print(f"Output: {event.data}")
# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")
전체 예제를 참조하세요 python/samples/03-workflows/functional/basic_streaming_pipeline.py .
@step 데코레이터
@step 는 개별 비동기 함수에 결과 캐싱, 이벤트 배출 및 단계별 검사점을 추가하는 옵트인 데코레이터입니다.
from agent_framework import step, workflow
@step
async def fetch_data(url: str) -> dict:
# expensive — hits a real API
return await http_get(url)
@workflow
async def pipeline(url: str) -> str:
raw = await fetch_data(url)
return process(raw)
@step 워크플로 내에서 수행하는 작업
-
결과를 캐시합니다 . 결과는 .에 의해
(step_name, call_index)저장됩니다. HITL 다시 시작 또는 검사점 복원 시 완료된 단계는 다시 실행하는 대신 저장된 결과를 즉시 반환합니다. -
이벤트를
executor_invoked/executor_completed/executor_failed내보내는 것은 관찰을 위해 내보내집니다. 캐시 적중 시executor_bypassed대신 내보내집니다. -
검사점을 저장합니다. 워크플로에 있는
checkpoint_storage경우 각 단계가 완료된 후 검사점이 저장됩니다. -
주입
WorkflowRunContext- 단계 함수가ctx: WorkflowRunContext매개 변수를 선언하면, 활성 컨텍스트가 자동으로 주입됩니다.
실행 중인 워크플로 @step 외부에서는 투명합니다. 함수는 디코데이션되지 않은 버전과 동일하게 작동하므로 격리된 상태로 완전히 테스트할 수 있습니다.
@step를 사용하는 경우
@step 함수( 에이전트 호출, 외부 API 요청 또는 다시 시작 시 다시 실행이 비용이 많이 들거나 부작용이 있는 작업)에서 사용합니다. 일반 함수(없음 @step)는 여전히 내부에서 @workflow작동합니다. 워크플로가 다시 시작될 때 다시 실행됩니다.
from agent_framework import InMemoryCheckpointStorage, step, workflow
storage = InMemoryCheckpointStorage()
@step # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
return (await agent.run(prompt)).text
# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
return len(text) > 0
@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
draft = await call_llm(f"Write about: {topic}")
ok = await validate(draft)
return draft if ok else ""
@step또한 매개 변수를 허용합니다.name
@step(name="transform")
async def transform_data(raw: dict) -> str:
...
전체 예제를 참조하세요 python/samples/03-workflows/functional/steps_and_checkpointing.py .
WorkflowRunContext
WorkflowRunContext (짧은 별칭: RunContext) 는 워크플로 및 단계 함수에 삽입된 실행 컨텍스트입니다. HITL, 키/값 상태 또는 사용자 지정 이벤트를 사용하는 경우에만 필요합니다.
다음에서 가져오기: agent_framework
from agent_framework import WorkflowRunContext, workflow
ctx.request_info() — 휴먼 인 더 루프
ctx.request_info() 는 외부 입력을 기다리도록 워크플로를 일시 중단합니다.
@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
draft = await write_draft(topic)
feedback = await ctx.request_info(
{"draft": draft, "instructions": "Please review this draft"},
response_type=str,
request_id="review_request",
)
return await revise_draft(draft, feedback)
매개 변수:
| 매개 변수 | 유형 | 설명 |
|---|---|---|
request_data |
Any |
필요한 입력(dict, Pydantic 모델, 문자열, ...)을 설명하는 페이로드입니다. |
response_type |
type |
응답의 예상 Python 유형입니다. |
request_id |
str | None |
이 요청의 안정적인 식별자입니다. 생략하면 임의의 UUID가 생성됩니다. |
재생 의미 체계: 첫 번째 실행 시, 코드에 노출되지 않지만 워크플로를 일시 중단하는 내부 신호를 발생시킵니다. 호출자가 WorkflowRunResult와 함께 get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS를 받습니다. 호출 .run(responses={request_id: value}) 을 통해 다시 시작 - 워크플로가 맨 위에서 다시 실행되고 request_info() 제공된 값을 즉시 반환합니다.
@step일시 중단 전에 실행된 데코레이팅된 함수는 다시 실행하는 대신 다시 시작할 때 캐시된 결과를 반환합니다.
응답 처리:
# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
requests = result1.get_request_info_events()
print(requests[0].request_id) # "review_request"
# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)
전체 예제를 참조하세요 python/samples/03-workflows/functional/hitl_review.py .
ctx.request_info()는 @step 함수 내부에서도 지원됩니다.
ctx.add_event() — 사용자 지정 이벤트
프레임워크 수명 주기 이벤트와 함께 애플리케이션 관련 이벤트를 내보내는 데 사용합니다 ctx.add_event() . 자세한 내용 및 예제는 사용자 지정 이벤트 내보내기 참조하세요.
ctx.get_state()
/
ctx.set_state() — 키/값 상태
ctx.get_state() 및 ctx.set_state()을 사용하여 HITL 중단 중에도 유지되고 검사점에 포함되는 값을 저장합니다. 자세한 내용은 워크플로 상태를 참조하세요.
상태 값은 검사점 스토리지가 구성된 경우 JSON 직렬화 가능해야 합니다.
ctx.is_streaming()
True는 현재 실행이 stream=True로 시작되었을 때 반환됩니다. 스트리밍 모드에 따라 동작을 조정하려는 단계 내 함수에 유용합니다.
get_run_context()
실행 중인 워크플로 내 어디에서나 활성 WorkflowRunContext을 검색하여 매개변수를 선언하지 않는 도우미 함수에서 유용하게 사용할 수 있습니다.
from agent_framework import get_run_context
async def helper():
ctx = get_run_context()
if ctx is not None:
ctx.set_state("helper_ran", True)
실행 중인 워크플로 외부에서 호출되면 반환 None 합니다.
asyncio.gather와의 병렬 처리
팬아웃/팬인에 표준 Python 동시성을 사용합니다. 프레임워크 기본 형식은 필요하지 않습니다.
import asyncio
from agent_framework import workflow
@workflow
async def research_pipeline(topic: str) -> str:
web, papers, news = await asyncio.gather(
research_web(topic),
research_papers(topic),
research_news(topic),
)
return await synthesize([web, papers, news])
asyncio.gather 함수가 @step로 데코레이션될 때도 작동합니다.
전체 예제를 참조하세요 python/samples/03-workflows/functional/parallel_pipeline.py .
워크플로 내에서 에이전트 호출
에이전트 호출은 @workflow 내부에서 일반 함수 호출처럼 작동합니다.
from agent_framework import Agent, workflow
writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)
@workflow
async def poem_workflow(topic: str) -> str:
poem = (await writer.run(f"Write a poem about: {topic}")).text
review = (await reviewer.run(f"Review this poem: {poem}")).text
return f"Poem:\n{poem}\n\nReview: {review}"
HITL 다시 시작 또는 체크포인트 복원에서 결과를 캐시하고자 할 때, @step를 에이전트 호출 함수에 추가합니다.
from agent_framework import step
@step
async def write_poem(topic: str) -> str:
return (await writer.run(f"Write a poem about: {topic}")).text
전체 예제를 참조하세요 python/samples/03-workflows/functional/agent_integration.py .
.as_agent() — 워크플로를 에이전트로 사용
FunctionalWorkflow을(를) .as_agent()로 에이전트 호환 개체로 래핑합니다.
from agent_framework import workflow
@workflow
async def poem_workflow(topic: str) -> str:
...
# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")
# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)
# Or use in a larger workflow or orchestration
.as_agent() 는 다른 에이전트 개체와 동일한 FunctionalWorkflowAgent 인터페이스를 노출하여, 에이전트를 허용하는 모든 시스템에서 함수 워크플로를 구성할 수 있도록 하는 run()을 반환합니다.
| 매개 변수 | 유형 | 설명 |
|---|---|---|
name |
str | None |
에이전트의 표시 이름입니다. 기본값은 워크플로 이름입니다. |
python/samples/03-workflows/functional/agent_integration.py 예제를 참조하세요.
Samples
실행 가능한 예제는 다음 샘플 폴더에 있습니다.
-
python/samples/01-get-started/— 소개@workflow예제 -
python/samples/03-workflows/functional/— 기능 전체 워크플로우 샘플
다음 단계
관련 항목:
- 실행기 - 그래프 기반 API의 처리 단위
- 휴먼 인 더 루프 - 그래프 기반 워크플로의 HITL
- 검사점 - 검사점 스토리지 및 다시 시작
- 이벤트 — 워크플로 이벤트 유형
- 워크플로를 에이전트로 사용
현재 C#에서는 기능 워크플로 API를 사용할 수 없습니다.