Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
Warning
A funkcionális munkafolyamat API kísérleti jellegű, és előzetes értesítés nélkül módosítható vagy eltávolítható a jövőbeli verziókban.
A funkcionális munkafolyamat API-val egyszerű Python aszinkron függvényként írhat munkafolyamatokat. A végrehajtó osztályok és kábelezési élek meghatározása, illetve a WorkflowBuilder használata helyett díszítsen fel egy async függvényt @workflow-val, és használjon natív Python vezérlési folyamatokat – például if/else, for ciklusok, asyncio.gather – a logika kifejezésére.
A graph API-val való párhuzamos összehasonlításért tekintse meg a Munkafolyamatok API-k áttekintését.
@workflow dekoratőr
Alkalmazza a @workflow elemet egy async függvényre, hogy azt objektummá alakítsa FunctionalWorkflow.
from agent_framework import workflow
@workflow
async def text_pipeline(text: str) -> str:
upper = await to_upper_case(text)
return await reverse_text(upper)
A @workflow dekoratőr támogatja a paraméteres űrlapot opcionális argumentumokkal:
from agent_framework import InMemoryCheckpointStorage, workflow
storage = InMemoryCheckpointStorage()
@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
...
@workflow Paraméterek
| Paraméter | Típus | Description |
|---|---|---|
name |
str | None |
A munkafolyamat kijelzőneve. A függvény alapértelmezett értéke __name__. |
description |
str | None |
Nem kötelező, emberi olvasásra alkalmas leírás. |
checkpoint_storage |
CheckpointStorage | None |
Alapértelmezett tároló a futtatások közötti lépéseredmények megőrzéséhez. Hívásonként felül lehet bírálni.run() |
A munkafolyamat függvény szignatúrája
A munkafolyamat-függvény első paramétere megkapja a megadott bemenetet .run(). Csak akkor adjon hozzá paramétert ctx: WorkflowRunContext , ha HITL- vagy kulcs-/értékállapotra vagy egyéni eseményekre van szüksége – máskülönben nem kötelező:
# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
result = await process(data)
return result
# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
feedback = await ctx.request_info({"draft": data}, response_type=str)
return feedback
WorkflowRunContext először típusannotációval észlelve, majd a paraméternév ctx alapján, így mind a ctx: WorkflowRunContext, mind a csupasz ctx paraméter működik.
Munkafolyamat futtatása
Hívás .run() az FunctionalWorkflow objektumon, amelyet a @workflow visszaadott:
# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world") # str — the raw return value
# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text) # first output as a string
print(result.get_outputs()) # list of all outputs
print(result.get_final_state()) # WorkflowRunState.IDLE
run() Paraméterek
| Paraméter | Típus | Description |
|---|---|---|
message |
Any | None |
A munkafolyamat-függvényhez első argumentumként átadott bemenet. |
stream |
bool |
Ha True, visszaad egy ResponseStream-et, ami WorkflowEvent objektumokat eredményez. Alapértelmezett érték: False. |
responses |
dict[str, Any] | None |
HITL-válaszok request_id-hoz társítva. Felfüggesztett munkafolyamat folytatására szolgál. |
checkpoint_id |
str | None |
Ellenőrzőpont, ahonnan vissza szeretne állítani.
checkpoint_storage beállítása szükséges. |
checkpoint_storage |
CheckpointStorage | None |
Felülbírálja a dekorátor alapértelmezett tárolási beállítását ennél a futtatásnál. |
include_status_events |
bool |
Az állapotváltozási események belefoglalása a nem streamelt eredménybe. |
Pontosan az egyik message, responses, vagy checkpoint_id kell megadni hívásonként.
WorkflowRunResult
run() (nem streamelés) egy WorkflowRunResult. Főbb módszerek:
| Metódus/tulajdonság | Returns | Description |
|---|---|---|
.text |
str |
Első kimenet karakterláncként. Üres sztring, ha nincs sztringkimenet. |
.get_outputs() |
list[Any] |
A munkafolyamat által kibocsátott összes kimenet. |
.get_final_state() |
WorkflowRunState |
Utolsó futtatási állapot (IDLE, IDLE_WITH_PENDING_REQUESTS, FAILED, ...). |
.get_request_info_events() |
list[WorkflowEvent] |
Függőben lévő HITL-kérelmek, amikor az állapot IDLE_WITH_PENDING_REQUESTS. |
Streaming
Adjon meg stream=True az események megkapásához, amint előállnak:
from agent_framework import workflow
@workflow
async def data_pipeline(url: str) -> str:
raw = await fetch_data(url)
return await transform_data(raw)
# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
if event.type == "output":
print(f"Output: {event.data}")
# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")
Tekintse meg python/samples/03-workflows/functional/basic_streaming_pipeline.py a teljes példát.
@step dekoratőr
@step egy választható dekorátor, amely eredmény gyorsítótárazást, eseménykibocsátást és lépésenkénti ellenőrzőpontozást ad hozzá az egyes aszinkron függvényekhez.
from agent_framework import step, workflow
@step
async def fetch_data(url: str) -> dict:
# expensive — hits a real API
return await http_get(url)
@workflow
async def pipeline(url: str) -> str:
raw = await fetch_data(url)
return process(raw)
Mit @step végez a munkafolyamaton belül?
-
Gyorsítótárazza az eredményeket – az eredményt a rendszer tárolja
(step_name, call_index). A HITL-folytatás vagy az ellenőrzőpont visszaállításakor a befejezett lépés az ismételt végrehajtás helyett azonnal visszaadja a mentett eredményt. -
Eseményeket bocsát ki –
executor_invoked/executor_completed/executor_faileda megfigyelhetőség érdekében kerül kibocsátásra. Gyorsítótár-találat esetén ehelyettexecutor_bypassedkerül kibocsátásra. -
Ellenőrzőpontok mentése – ha a munkafolyamat rendelkezik
checkpoint_storage– az ellenőrzőpont mentése az egyes lépések befejezése után történik. -
Injektál
WorkflowRunContext– ha a lépésfüggvény deklarál egyctx: WorkflowRunContextparamétert, az aktív környezet automatikusan injektálásra kerül.
A futó munkafolyamaton @step kívül transzparens – a függvény ugyanúgy viselkedik, mint a leválasztatlan verziója, így teljesen tesztelhető elszigetelten.
Mikor érdemes használni a @step
Függvényeken, amelyeknek az @step, például az ügynökhívásokon, külső API-kéréseken, vagy bármely olyan műveleten használható, ahol az újrafuttatás költséges lenne vagy mellékhatásokat okozhatna. Az egyszerű függvények (anélkül @step) továbbra is működnek belül @workflow; egyszerűen újrafuttatják őket, amikor a munkafolyamat folytatódik.
from agent_framework import InMemoryCheckpointStorage, step, workflow
storage = InMemoryCheckpointStorage()
@step # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
return (await agent.run(prompt)).text
# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
return len(text) > 0
@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
draft = await call_llm(f"Write about: {topic}")
ok = await validate(draft)
return draft if ok else ""
@step egy paramétert name is elfogad:
@step(name="transform")
async def transform_data(raw: dict) -> str:
...
Tekintse meg python/samples/03-workflows/functional/steps_and_checkpointing.py a teljes példát.
WorkflowRunContext
WorkflowRunContext (rövid alias: RunContext) a munkafolyamatba és a lépésfüggvényekbe injektált végrehajtási környezet. Csak HITL, kulcs/érték állapot vagy egyéni események használatakor van rá szüksége.
Importálja a következőből agent_framework:
from agent_framework import WorkflowRunContext, workflow
ctx.request_info() — Ember a rendszerben
ctx.request_info() felfüggeszti a munkafolyamatot, hogy megvárja a külső bemenetet:
@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
draft = await write_draft(topic)
feedback = await ctx.request_info(
{"draft": draft, "instructions": "Please review this draft"},
response_type=str,
request_id="review_request",
)
return await revise_draft(draft, feedback)
Paraméterek:
| Paraméter | Típus | Description |
|---|---|---|
request_data |
Any |
Adatcsomag, amely leírja, hogy milyen bemenetre van szükség (szótár, Pydantic-modell, karakterlánc, ...). |
response_type |
type |
A válasz várható Python típusa. |
request_id |
str | None |
A kérés stabil azonosítója. Ha nincs megadva, véletlenszerű UUID jön létre. |
Visszajátszás szemantikája: Az első végrehajtáskor request_info() egy belső jelet ad (a kód nem látható), amely felfüggeszti a munkafolyamatot. A hívó megkap egy WorkflowRunResult-t get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS-vel. Folytatás hívással .run(responses={request_id: value}) – a munkafolyamat felülről újrafut, és request_info() azonnal visszaadja a megadott értéket.
@step-dekorált függvények, amelyek a felfüggesztés előtt futottak, újraindításkor gyorsítótárazott eredményeiket adják vissza újrafuttatás helyett.
A válasz kezelése:
# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
requests = result1.get_request_info_events()
print(requests[0].request_id) # "review_request"
# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)
Tekintse meg python/samples/03-workflows/functional/hitl_review.py a teljes példát.
ctx.request_info() függvényen belül @step is támogatott.
ctx.add_event() — Egyéni események
Alkalmazásspecifikus ctx.add_event() eseményeket bocsáthat ki a keretrendszer életciklus-eseményei mellett. További részletekért és példákért lásd: Egyéni események kibocsátása.
ctx.get_state()
/
ctx.set_state() — Kulcs/érték állapota
Használja a ctx.get_state() és ctx.set_state() elemeket az értékek tárolására, amelyek megmaradnak a HITL-megszakítások során, és szerepelnek az ellenőrzőpontokban. További részletekért lásd: Munkafolyamat állapota.
Az állapotértékek JSON-szerializálhatóaknak kell lenniük az Ellenőrzőpont-tároló konfigurálásakor.
ctx.is_streaming()
Amikor az aktuális futtatást True-vel indították, stream=True-t ad vissza. Hasznos belső lépésfüggvények, amelyek a streamelési mód alapján szeretnék módosítani a működésüket.
get_run_context()
Egy futó munkafolyamat bármely pontjáról lekéri az aktívt WorkflowRunContext – olyan segédfüggvényekben hasznos, amelyek nem deklarálnak paramétert ctx :
from agent_framework import get_run_context
async def helper():
ctx = get_run_context()
if ctx is not None:
ctx.set_state("helper_ran", True)
Akkor ad None vissza, ha egy futó munkafolyamaton kívül hívják meg.
Párhuzamosság asyncio.gather -tel
Használja a standard Python párhuzamosságot a fan-out/fan-in feladatokhoz – nincs szükség keretrendszerbeli primitívekre.
import asyncio
from agent_framework import workflow
@workflow
async def research_pipeline(topic: str) -> str:
web, papers, news = await asyncio.gather(
research_web(topic),
research_papers(topic),
research_news(topic),
)
return await synthesize([web, papers, news])
asyncio.gather akkor is működik, ha a függvényeket dekorátorral látják el @step.
Tekintse meg python/samples/03-workflows/functional/parallel_pipeline.py a teljes példát.
Ügynökök meghívása munkafolyamatokon belül
Az ügynökhívások egyszerű függvényhívásként működnek belül @workflow:
from agent_framework import Agent, workflow
writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)
@workflow
async def poem_workflow(topic: str) -> str:
poem = (await writer.run(f"Write a poem about: {topic}")).text
review = (await reviewer.run(f"Review this poem: {poem}")).text
return f"Poem:\n{poem}\n\nReview: {review}"
Adja hozzá @step az ügynököt hívó függvényekhez, ha azt szeretné, hogy az eredmények gyorsítótárazva legyenek a HITL-folytatások vagy az ellenőrzőpont-visszaállítások során.
from agent_framework import step
@step
async def write_poem(topic: str) -> str:
return (await writer.run(f"Write a poem about: {topic}")).text
Tekintse meg python/samples/03-workflows/functional/agent_integration.py a teljes példát.
.as_agent() – Munkafolyamat használata ügynökként
Csomagolja be a(z) FunctionalWorkflow ügynökkompatibilis objektummá a következő segítségével: .as_agent().
from agent_framework import workflow
@workflow
async def poem_workflow(topic: str) -> str:
...
# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")
# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)
# Or use in a larger workflow or orchestration
.as_agent() Olyan FunctionalWorkflowAgent felületet ad vissza, amely ugyanazt run() az interfészt teszi elérhetővé, mint más ügynökobjektumok, így a funkcionális munkafolyamatok bármely olyan rendszerrel összeállíthatók, amelyek ügynököket fogadnak el.
| Paraméter | Típus | Description |
|---|---|---|
name |
str | None |
Az ügynök megjelenítendő neve. A munkafolyamat nevének alapértelmezett értéke. |
Lásd python/samples/03-workflows/functional/agent_integration.py példaként.
Samples
Futtatható példák a következő mintamappákban találhatók:
-
python/samples/01-get-started/– bevezető@workflowpéldák -
python/samples/03-workflows/functional/– teljes funkcionalitású funkcionális munkafolyamat-minták
Következő lépések
Kapcsolódó témakörök:
- Végrehajtók – feldolgozási egységek a gráfalapú API-ban
- Human-in-the-loop – HITL gráfalapú munkafolyamatokban
- Ellenőrzőpontok – ellenőrzőpontok tárolása és folytatása
- Események – munkafolyamat-eseménytípusok
- Munkafolyamatok használata ügynökökként
A funkcionális munkafolyamat API jelenleg nem érhető el a C# számára.