Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Výstraha
Funkční rozhraní API pracovního postupu je experimentální a může se změnit nebo odebrat v budoucích verzích bez předchozího upozornění.
Funkční rozhraní API pracovního postupu umožňuje psát pracovní postupy jako prosté Python asynchronní funkce. Místo definování tříd vykonavatele, propojováním hran a použití WorkflowBuilder, zdobíte funkci async s pomocí @workflow a využijete nativní řídicí struktury Pythonu — if/else, for smyčky, asyncio.gather — k vyjádření logiky.
Souběžné porovnání s rozhraním GRAPH API najdete v tématu Rozhraní API pracovního postupu v přehledu Pracovních postupů.
@workflow dekorátor
Použijte @workflow na funkci async k jejímu převedení na objekt FunctionalWorkflow.
from agent_framework import workflow
@workflow
async def text_pipeline(text: str) -> str:
upper = await to_upper_case(text)
return await reverse_text(upper)
Dekorátor @workflow podporuje parametrizovaný formulář s volitelnými argumenty:
from agent_framework import InMemoryCheckpointStorage, workflow
storage = InMemoryCheckpointStorage()
@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
...
@workflow parametry
| Parametr | Typ | Description |
|---|---|---|
name |
str | None |
Zobrazovaný název pracovního postupu Výchozí hodnota funkce __name__. |
description |
str | None |
Volitelný popis čitelný člověkem. |
checkpoint_storage |
CheckpointStorage | None |
Výchozí úložiště pro zachování výsledků kroku mezi spuštěními Lze přepsat při volání v run(). |
Podpis funkce workflowu
První parametr funkce pracovního postupu obdrží vstup předaný do .run().
ctx: WorkflowRunContext Přidejte parametr pouze v případě, že potřebujete HITL, stav klíče/hodnoty nebo vlastní události – v opačném případě je volitelný:
# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
result = await process(data)
return result
# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
feedback = await ctx.request_info({"draft": data}, response_type=str)
return feedback
WorkflowRunContext je nejprve rozpoznána podle anotace typu, pak podle názvu parametru ctx, takže jak ctx: WorkflowRunContext, tak holý parametr ctx pracuje.
Spuštění pracovního postupu
Zavolejte .run() na objektu FunctionalWorkflow, který byl vrácen funkcí @workflow.
# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world") # str — the raw return value
# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text) # first output as a string
print(result.get_outputs()) # list of all outputs
print(result.get_final_state()) # WorkflowRunState.IDLE
run() parametry
| Parametr | Typ | Description |
|---|---|---|
message |
Any | None |
Vstup předaný funkci pracovního postupu jako první argument. |
stream |
bool |
Pokud Truevrátí hodnotu ResponseStream , která vrací objekty WorkflowEvent . Výchozí hodnota je False. |
responses |
dict[str, Any] | None |
Odpovědi HITL podle klíče request_id. Slouží k obnovení pozastaveného pracovního postupu. |
checkpoint_id |
str | None |
Kontrolní bod, ze které se má provést obnovení Je vyžadováno nastavit checkpoint_storage. |
checkpoint_storage |
CheckpointStorage | None |
Přepíše výchozí úložiště nastavené dekorátorem pro toto spuštění. |
include_status_events |
bool |
Zahrňte události změny stavu do výsledku, který není streamovaný. |
Přesně jeden z message, responses nebo checkpoint_id musí být uveden pro každý hovor.
WorkflowRunResult
run() (bez streamování) vrátí WorkflowRunResulthodnotu. Klíčové metody:
| Metoda / vlastnost | Returns | Description |
|---|---|---|
.text |
str |
První výstup jako řetězec. Prázdný řetězec, pokud nejsou žádné výstupy řetězců. |
.get_outputs() |
list[Any] |
Všechny výstupy generované pracovním postupem. |
.get_final_state() |
WorkflowRunState |
Konečný stav spuštění (IDLE, IDLE_WITH_PENDING_REQUESTS, FAILED...). |
.get_request_info_events() |
list[WorkflowEvent] |
Čekající požadavky HITL, když je stav IDLE_WITH_PENDING_REQUESTS. |
Streamování
Předat stream=True k přijetí událostí, jakmile jsou vytvořeny:
from agent_framework import workflow
@workflow
async def data_pipeline(url: str) -> str:
raw = await fetch_data(url)
return await transform_data(raw)
# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
if event.type == "output":
print(f"Output: {event.data}")
# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")
Podívejte se python/samples/03-workflows/functional/basic_streaming_pipeline.py na úplný příklad.
@step dekorátor
@step je dekorátor, který se dá dobrovolně aktivovat a přidává ukládání výsledků v mezipaměti, vyvolávání událostí a kontrolování jednotlivých kroků k asynchronním funkcím:
from agent_framework import step, workflow
@step
async def fetch_data(url: str) -> dict:
# expensive — hits a real API
return await http_get(url)
@workflow
async def pipeline(url: str) -> str:
raw = await fetch_data(url)
return process(raw)
Co @step dělá uvnitř pracovního postupu
-
Ukládá výsledky do mezipaměti – výsledek je uložen
(step_name, call_index). Při obnovení HITL nebo obnovení kontrolního bodu vrátí dokončený krok okamžitě uložený výsledek místo opětovného spuštění. -
Generuje události – pro lepší pozorovatelnost jsou emitovány
executor_invoked/executor_completed/executor_failed. Při zásahu do mezipaměti se místo toho vygenerujeexecutor_bypassed. -
Uloží kontrolní body – pokud pracovní postup obsahuje
checkpoint_storage, kontrolní bod se uloží po dokončení každého kroku. -
Vstřikuje
WorkflowRunContext— pokud funkce kroku deklarujectx: WorkflowRunContextparametr, automaticky se vloží do aktivního kontextu.
Mimo spuštěný pracovní postup @step je transparentní — funkce se chová stejně jako její neozdobená verze, takže je plně testovatelná v izolaci.
Kdy použít atribut @step
Funkce, které jsou @step, jako hovory agenta, externí požadavky rozhraní API nebo jakákoli operace, kde by opětovné spuštění při obnovení bylo nákladné nebo mělo vedlejší účinky, používají . Prosté funkce (bez @step) stále pracují uvnitř @workflow. Při obnovení pracovního postupu se jednoduše znovu spustí.
from agent_framework import InMemoryCheckpointStorage, step, workflow
storage = InMemoryCheckpointStorage()
@step # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
return (await agent.run(prompt)).text
# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
return len(text) > 0
@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
draft = await call_llm(f"Write about: {topic}")
ok = await validate(draft)
return draft if ok else ""
@step také přijímá name parametr:
@step(name="transform")
async def transform_data(raw: dict) -> str:
...
Podívejte se python/samples/03-workflows/functional/steps_and_checkpointing.py na úplný příklad.
WorkflowRunContext
WorkflowRunContext (krátký alias: RunContext) je kontext provádění vkládaný do funkcí pracovního postupu a krokových funkcí. Potřebujete ho jenom v případě, že používáte HITL, stav klíče/hodnoty nebo vlastní události.
Naimportujte ho z agent_framework:
from agent_framework import WorkflowRunContext, workflow
ctx.request_info() — Člověk-zapojen-do-procesu
ctx.request_info() pozastaví pracovní postup, aby čekal na externí vstup:
@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
draft = await write_draft(topic)
feedback = await ctx.request_info(
{"draft": draft, "instructions": "Please review this draft"},
response_type=str,
request_id="review_request",
)
return await revise_draft(draft, feedback)
Parametry:
| Parametr | Typ | Description |
|---|---|---|
request_data |
Any |
Datová část popisující, jaký vstup je potřeba (diktování, Pydantický model, řetězec, ...). |
response_type |
type |
Očekávaný Python typ odpovědi. |
request_id |
str | None |
Stabilní identifikátor pro tento dotaz. Pokud tento parametr vynecháte, vygeneruje se náhodné UUID. |
Semantika přehrání: Při prvním spuštění request_info() vyvolá interní signál (který není nikdy viditelný pro váš kód), který pozastaví workflow. Volající obdrží WorkflowRunResult s get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS. Pokračovat voláním .run(responses={request_id: value}) – pracovní postup se znovu spustí z horní části a request_info() vrátí zadanou hodnotu okamžitě.
@step-zdobené funkce, které běžely před pozastavením, při obnovení vrací výsledek uložený v mezipaměti místo opětovného spuštění.
Zpracování odpovědi:
# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
requests = result1.get_request_info_events()
print(requests[0].request_id) # "review_request"
# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)
Podívejte se python/samples/03-workflows/functional/hitl_review.py na úplný příklad.
ctx.request_info() podporuje se také uvnitř @step funkcí.
ctx.add_event() — Vlastní události
Slouží ctx.add_event() k generování událostí specifických pro aplikaci spolu s událostmi životního cyklu frameworku. Úplné podrobnosti a příklady najdete v tématu Generování vlastních událostí.
ctx.get_state()
/
ctx.set_state() — Stav klíč/hodnota
Používejte ctx.get_state() a ctx.set_state() ukládejte hodnoty, které se zachovají napříč přerušeními HITL a jsou součástí kontrolních bodů. Úplné podrobnosti najdete v tématu Stav pracovního postupu.
Hodnoty stavu musí být serializovatelné ve formátu JSON, pokud je nakonfigurované úložiště kontrolních bodů.
ctx.is_streaming()
Vrátí True, když bylo aktuální spuštění zahájeno s stream=True. Užitečné v rámci krokových funkcí, které upravují své chování na základě režimu streamování.
get_run_context()
Načte aktivní WorkflowRunContext z libovolného místa uvnitř spuštěného pracovního postupu – užitečné v pomocných funkcích, které nehlásí ctx parametr:
from agent_framework import get_run_context
async def helper():
ctx = get_run_context()
if ctx is not None:
ctx.set_state("helper_ran", True)
Vrátí None když je volána mimo spuštěný pracovní postup.
Paralelismus s využitím asyncio.gather
Pro použití vzoru fan-out/fan-in použijte standardní souběžnosti v Pythonu – není potřeba žádná rámcová primitiva.
import asyncio
from agent_framework import workflow
@workflow
async def research_pipeline(topic: str) -> str:
web, papers, news = await asyncio.gather(
research_web(topic),
research_papers(topic),
research_news(topic),
)
return await synthesize([web, papers, news])
asyncio.gather funguje také v případech, kdy jsou funkce zdobeny @step.
Podívejte se python/samples/03-workflows/functional/parallel_pipeline.py na úplný příklad.
Volání agentů v pracovních postupech
Volání agentů fungují jako běžné volání funkcí uvnitř @workflow.
from agent_framework import Agent, workflow
writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)
@workflow
async def poem_workflow(topic: str) -> str:
poem = (await writer.run(f"Write a poem about: {topic}")).text
review = (await reviewer.run(f"Review this poem: {poem}")).text
return f"Poem:\n{poem}\n\nReview: {review}"
Pokud chcete, aby výsledky byly uloženy v mezipaměti napříč obnoveními HITL nebo obnovením kontrolních bodů, přidejte @step do funkcí volání agentů:
from agent_framework import step
@step
async def write_poem(topic: str) -> str:
return (await writer.run(f"Write a poem about: {topic}")).text
Podívejte se python/samples/03-workflows/functional/agent_integration.py na úplný příklad.
.as_agent() — Použití pracovního postupu jako agenta
Zabalení objektu FunctionalWorkflow kompatibilního s agentem pomocí .as_agent():
from agent_framework import workflow
@workflow
async def poem_workflow(topic: str) -> str:
...
# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")
# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)
# Or use in a larger workflow or orchestration
.as_agent() vrátí objekt, FunctionalWorkflowAgent který zveřejňuje stejné run() rozhraní jako ostatní objekty agenta, takže funkční pracovní postupy jsou kompozovatelné s libovolným systémem, který přijímá agenty.
| Parametr | Typ | Description |
|---|---|---|
name |
str | None |
Zobrazovaný název agenta Výchozí hodnota je název pracovního postupu. |
Podívejte se python/samples/03-workflows/functional/agent_integration.py na příklad.
Samples
Spustitelné příklady jsou v následujících ukázkových složkách:
-
python/samples/01-get-started/— úvodní@workflowpříklady -
python/samples/03-workflows/functional/— ukázky funkčních pracovních postupů s plnou funkčností
Další kroky
Související témata:
- Exekutory – jednotky zpracování v rozhraní API založeném na grafech
- Human-in-the-loop – HITL v pracovních postupech založených na grafech
- Kontrolní body – úložiště a obnovení kontrolních bodů
- Události – typy událostí pracovního postupu
- Použití workflowů jako agentů
Funkční rozhraní API pracovního postupu není v tuto chvíli k dispozici pro jazyk C#.