Funkcionális munkafolyamat API

Warning

A funkcionális munkafolyamat API kísérleti jellegű, és előzetes értesítés nélkül módosítható vagy eltávolítható a jövőbeli verziókban.

A funkcionális munkafolyamat API-val egyszerű Python aszinkron függvényként írhat munkafolyamatokat. A végrehajtó osztályok és kábelezési élek meghatározása, illetve a WorkflowBuilder használata helyett díszítsen fel egy async függvényt @workflow-val, és használjon natív Python vezérlési folyamatokat – például if/else, for ciklusok, asyncio.gather – a logika kifejezésére.

A graph API-val való párhuzamos összehasonlításért tekintse meg a Munkafolyamatok API-k áttekintését.

@workflow dekoratőr

Alkalmazza a @workflow elemet egy async függvényre, hogy azt objektummá alakítsa FunctionalWorkflow.

from agent_framework import workflow

@workflow
async def text_pipeline(text: str) -> str:
    upper = await to_upper_case(text)
    return await reverse_text(upper)

A @workflow dekoratőr támogatja a paraméteres űrlapot opcionális argumentumokkal:

from agent_framework import InMemoryCheckpointStorage, workflow

storage = InMemoryCheckpointStorage()

@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
    ...

@workflow Paraméterek

Paraméter Típus Description
name str | None A munkafolyamat kijelzőneve. A függvény alapértelmezett értéke __name__.
description str | None Nem kötelező, emberi olvasásra alkalmas leírás.
checkpoint_storage CheckpointStorage | None Alapértelmezett tároló a futtatások közötti lépéseredmények megőrzéséhez. Hívásonként felül lehet bírálni.run()

A munkafolyamat függvény szignatúrája

A munkafolyamat-függvény első paramétere megkapja a megadott bemenetet .run(). Csak akkor adjon hozzá paramétert ctx: WorkflowRunContext , ha HITL- vagy kulcs-/értékállapotra vagy egyéni eseményekre van szüksége – máskülönben nem kötelező:

# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
    result = await process(data)
    return result

# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
    feedback = await ctx.request_info({"draft": data}, response_type=str)
    return feedback

WorkflowRunContext először típusannotációval észlelve, majd a paraméternév ctx alapján, így mind a ctx: WorkflowRunContext, mind a csupasz ctx paraméter működik.

Munkafolyamat futtatása

Hívás .run() az FunctionalWorkflow objektumon, amelyet a @workflow visszaadott:

# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world")   # str — the raw return value

# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text)                # first output as a string
print(result.get_outputs())       # list of all outputs
print(result.get_final_state())   # WorkflowRunState.IDLE

run() Paraméterek

Paraméter Típus Description
message Any | None A munkafolyamat-függvényhez első argumentumként átadott bemenet.
stream bool Ha True, visszaad egy ResponseStream-et, ami WorkflowEvent objektumokat eredményez. Alapértelmezett érték: False.
responses dict[str, Any] | None HITL-válaszok request_id-hoz társítva. Felfüggesztett munkafolyamat folytatására szolgál.
checkpoint_id str | None Ellenőrzőpont, ahonnan vissza szeretne állítani. checkpoint_storage beállítása szükséges.
checkpoint_storage CheckpointStorage | None Felülbírálja a dekorátor alapértelmezett tárolási beállítását ennél a futtatásnál.
include_status_events bool Az állapotváltozási események belefoglalása a nem streamelt eredménybe.

Pontosan az egyik message, responses, vagy checkpoint_id kell megadni hívásonként.

WorkflowRunResult

run() (nem streamelés) egy WorkflowRunResult. Főbb módszerek:

Metódus/tulajdonság Returns Description
.text str Első kimenet karakterláncként. Üres sztring, ha nincs sztringkimenet.
.get_outputs() list[Any] A munkafolyamat által kibocsátott összes kimenet.
.get_final_state() WorkflowRunState Utolsó futtatási állapot (IDLE, IDLE_WITH_PENDING_REQUESTS, FAILED, ...).
.get_request_info_events() list[WorkflowEvent] Függőben lévő HITL-kérelmek, amikor az állapot IDLE_WITH_PENDING_REQUESTS.

Streaming

Adjon meg stream=True az események megkapásához, amint előállnak:

from agent_framework import workflow

@workflow
async def data_pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return await transform_data(raw)

# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
    if event.type == "output":
        print(f"Output: {event.data}")

# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")

Tekintse meg python/samples/03-workflows/functional/basic_streaming_pipeline.py a teljes példát.

@step dekoratőr

@step egy választható dekorátor, amely eredmény gyorsítótárazást, eseménykibocsátást és lépésenkénti ellenőrzőpontozást ad hozzá az egyes aszinkron függvényekhez.

from agent_framework import step, workflow

@step
async def fetch_data(url: str) -> dict:
    # expensive — hits a real API
    return await http_get(url)

@workflow
async def pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return process(raw)

Mit @step végez a munkafolyamaton belül?

  • Gyorsítótárazza az eredményeket – az eredményt a rendszer tárolja (step_name, call_index). A HITL-folytatás vagy az ellenőrzőpont visszaállításakor a befejezett lépés az ismételt végrehajtás helyett azonnal visszaadja a mentett eredményt.
  • Eseményeket bocsát kiexecutor_invoked / executor_completed / executor_failed a megfigyelhetőség érdekében kerül kibocsátásra. Gyorsítótár-találat esetén ehelyett executor_bypassed kerül kibocsátásra.
  • Ellenőrzőpontok mentése – ha a munkafolyamat rendelkezik checkpoint_storage– az ellenőrzőpont mentése az egyes lépések befejezése után történik.
  • Injektál WorkflowRunContext – ha a lépésfüggvény deklarál egy ctx: WorkflowRunContext paramétert, az aktív környezet automatikusan injektálásra kerül.

A futó munkafolyamaton @step kívül transzparens – a függvény ugyanúgy viselkedik, mint a leválasztatlan verziója, így teljesen tesztelhető elszigetelten.

Mikor érdemes használni a @step

Függvényeken, amelyeknek az @step, például az ügynökhívásokon, külső API-kéréseken, vagy bármely olyan műveleten használható, ahol az újrafuttatás költséges lenne vagy mellékhatásokat okozhatna. Az egyszerű függvények (anélkül @step) továbbra is működnek belül @workflow; egyszerűen újrafuttatják őket, amikor a munkafolyamat folytatódik.

from agent_framework import InMemoryCheckpointStorage, step, workflow

storage = InMemoryCheckpointStorage()

@step  # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
    return (await agent.run(prompt)).text

# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
    return len(text) > 0

@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
    draft = await call_llm(f"Write about: {topic}")
    ok = await validate(draft)
    return draft if ok else ""

@step egy paramétert name is elfogad:

@step(name="transform")
async def transform_data(raw: dict) -> str:
    ...

Tekintse meg python/samples/03-workflows/functional/steps_and_checkpointing.py a teljes példát.

WorkflowRunContext

WorkflowRunContext (rövid alias: RunContext) a munkafolyamatba és a lépésfüggvényekbe injektált végrehajtási környezet. Csak HITL, kulcs/érték állapot vagy egyéni események használatakor van rá szüksége.

Importálja a következőből agent_framework:

from agent_framework import WorkflowRunContext, workflow

ctx.request_info() — Ember a rendszerben

ctx.request_info() felfüggeszti a munkafolyamatot, hogy megvárja a külső bemenetet:

@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
    draft = await write_draft(topic)
    feedback = await ctx.request_info(
        {"draft": draft, "instructions": "Please review this draft"},
        response_type=str,
        request_id="review_request",
    )
    return await revise_draft(draft, feedback)

Paraméterek:

Paraméter Típus Description
request_data Any Adatcsomag, amely leírja, hogy milyen bemenetre van szükség (szótár, Pydantic-modell, karakterlánc, ...).
response_type type A válasz várható Python típusa.
request_id str | None A kérés stabil azonosítója. Ha nincs megadva, véletlenszerű UUID jön létre.

Visszajátszás szemantikája: Az első végrehajtáskor request_info() egy belső jelet ad (a kód nem látható), amely felfüggeszti a munkafolyamatot. A hívó megkap egy WorkflowRunResult-t get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS-vel. Folytatás hívással .run(responses={request_id: value}) – a munkafolyamat felülről újrafut, és request_info() azonnal visszaadja a megadott értéket.

@step-dekorált függvények, amelyek a felfüggesztés előtt futottak, újraindításkor gyorsítótárazott eredményeiket adják vissza újrafuttatás helyett.

A válasz kezelése:

# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS

requests = result1.get_request_info_events()
print(requests[0].request_id)  # "review_request"

# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
    responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)

Tekintse meg python/samples/03-workflows/functional/hitl_review.py a teljes példát.

ctx.request_info() függvényen belül @step is támogatott.

ctx.add_event() — Egyéni események

Alkalmazásspecifikus ctx.add_event() eseményeket bocsáthat ki a keretrendszer életciklus-eseményei mellett. További részletekért és példákért lásd: Egyéni események kibocsátása.

ctx.get_state() / ctx.set_state() — Kulcs/érték állapota

Használja a ctx.get_state() és ctx.set_state() elemeket az értékek tárolására, amelyek megmaradnak a HITL-megszakítások során, és szerepelnek az ellenőrzőpontokban. További részletekért lásd: Munkafolyamat állapota.

Az állapotértékek JSON-szerializálhatóaknak kell lenniük az Ellenőrzőpont-tároló konfigurálásakor.

ctx.is_streaming()

Amikor az aktuális futtatást True-vel indították, stream=True-t ad vissza. Hasznos belső lépésfüggvények, amelyek a streamelési mód alapján szeretnék módosítani a működésüket.

get_run_context()

Egy futó munkafolyamat bármely pontjáról lekéri az aktívt WorkflowRunContext – olyan segédfüggvényekben hasznos, amelyek nem deklarálnak paramétert ctx :

from agent_framework import get_run_context

async def helper():
    ctx = get_run_context()
    if ctx is not None:
        ctx.set_state("helper_ran", True)

Akkor ad None vissza, ha egy futó munkafolyamaton kívül hívják meg.

Párhuzamosság asyncio.gather -tel

Használja a standard Python párhuzamosságot a fan-out/fan-in feladatokhoz – nincs szükség keretrendszerbeli primitívekre.

import asyncio
from agent_framework import workflow

@workflow
async def research_pipeline(topic: str) -> str:
    web, papers, news = await asyncio.gather(
        research_web(topic),
        research_papers(topic),
        research_news(topic),
    )
    return await synthesize([web, papers, news])

asyncio.gather akkor is működik, ha a függvényeket dekorátorral látják el @step.

Tekintse meg python/samples/03-workflows/functional/parallel_pipeline.py a teljes példát.

Ügynökök meghívása munkafolyamatokon belül

Az ügynökhívások egyszerű függvényhívásként működnek belül @workflow:

from agent_framework import Agent, workflow

writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)

@workflow
async def poem_workflow(topic: str) -> str:
    poem = (await writer.run(f"Write a poem about: {topic}")).text
    review = (await reviewer.run(f"Review this poem: {poem}")).text
    return f"Poem:\n{poem}\n\nReview: {review}"

Adja hozzá @step az ügynököt hívó függvényekhez, ha azt szeretné, hogy az eredmények gyorsítótárazva legyenek a HITL-folytatások vagy az ellenőrzőpont-visszaállítások során.

from agent_framework import step

@step
async def write_poem(topic: str) -> str:
    return (await writer.run(f"Write a poem about: {topic}")).text

Tekintse meg python/samples/03-workflows/functional/agent_integration.py a teljes példát.

.as_agent() – Munkafolyamat használata ügynökként

Csomagolja be a(z) FunctionalWorkflow ügynökkompatibilis objektummá a következő segítségével: .as_agent().

from agent_framework import workflow

@workflow
async def poem_workflow(topic: str) -> str:
    ...

# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")

# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)

# Or use in a larger workflow or orchestration

.as_agent() Olyan FunctionalWorkflowAgent felületet ad vissza, amely ugyanazt run() az interfészt teszi elérhetővé, mint más ügynökobjektumok, így a funkcionális munkafolyamatok bármely olyan rendszerrel összeállíthatók, amelyek ügynököket fogadnak el.

Paraméter Típus Description
name str | None Az ügynök megjelenítendő neve. A munkafolyamat nevének alapértelmezett értéke.

Lásd python/samples/03-workflows/functional/agent_integration.py példaként.

Samples

Futtatható példák a következő mintamappákban találhatók:

Következő lépések

Kapcsolódó témakörök:

A funkcionális munkafolyamat API jelenleg nem érhető el a C# számára.