Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Warning
API alur kerja fungsional bersifat eksperimental dan dapat berubah atau dihapus dalam versi mendatang tanpa pemberitahuan.
API alur kerja fungsi ini memungkinkan Anda menulis alur kerja sebagai fungsi asinkron Python biasa. Alih-alih mendefinisikan kelas pelaksana, tepi kabel, dan menggunakan
Untuk perbandingan berdampingan dengan API grafik, lihat API Alur Kerja pada gambaran umum Alur Kerja.
@workflow dekorator
Terapkan @workflow ke async fungsi untuk mengonversinya menjadi FunctionalWorkflow objek:
from agent_framework import workflow
@workflow
async def text_pipeline(text: str) -> str:
upper = await to_upper_case(text)
return await reverse_text(upper)
Dekorator @workflow mendukung formulir berparameter dengan argumen opsional:
from agent_framework import InMemoryCheckpointStorage, workflow
storage = InMemoryCheckpointStorage()
@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
...
@workflow parameter
| Parameter | Type | Description |
|---|---|---|
name |
str | None |
Nama tampilan untuk alur kerja. Mengacu pada fungsi __name__. |
description |
str | None |
Deskripsi opsional yang dapat dibaca manusia. |
checkpoint_storage |
CheckpointStorage | None |
Penyimpanan default untuk menyimpan hasil langkah di antara pengulangan. Dapat ditimpa per panggilan di run(). |
Tanda tangan fungsi alur kerja
Parameter pertama fungsi alur kerja menerima input yang diteruskan ke .run(). Tambahkan parameter ctx: WorkflowRunContext hanya ketika Anda memerlukan HITL, keadaan kunci/nilai, atau event kustom — sebaliknya, parameter ini bersifat opsional.
# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
result = await process(data)
return result
# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
feedback = await ctx.request_info({"draft": data}, response_type=str)
return feedback
WorkflowRunContext terdeteksi oleh anotasi jenis terlebih dahulu, kemudian oleh nama parameter ctx, sehingga baik ctx: WorkflowRunContext maupun parameter kosong ctx berfungsi.
Menjalankan alur kerja
Panggil .run() pada objek FunctionalWorkflow yang dikembalikan oleh @workflow.
# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world") # str — the raw return value
# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text) # first output as a string
print(result.get_outputs()) # list of all outputs
print(result.get_final_state()) # WorkflowRunState.IDLE
run() parameter
| Parameter | Type | Description |
|---|---|---|
message |
Any | None |
Masukan diteruskan ke fungsi alur kerja sebagai argumen pertama. |
stream |
bool |
Jika True, mengembalikan ResponseStream yang menghasilkan WorkflowEvent objek. Secara default menjadi False. |
responses |
dict[str, Any] | None |
Respons HITL didasarkan pada request_id. Digunakan untuk melanjutkan alur kerja yang ditangguhkan. |
checkpoint_id |
str | None |
Titik pemeriksaan untuk pemulihan.
checkpoint_storage harus diatur. |
checkpoint_storage |
CheckpointStorage | None |
Mengambil alih set penyimpanan default pada dekorator untuk eksekusi ini. |
include_status_events |
bool |
Sertakan event perubahan status dalam hasil non-streaming. |
Tepat salah satu dari message, responses, atau checkpoint_id harus disediakan per panggilan.
WorkflowRunResult
run() (non-streaming) akan mengembalikan sebuah WorkflowRunResult. Metode utama:
| Metode / Properti | Returns | Description |
|---|---|---|
.text |
str |
Keluaran pertama sebagai string. String kosong jika tidak ada output string. |
.get_outputs() |
list[Any] |
Semua output yang dipancarkan oleh alur kerja. |
.get_final_state() |
WorkflowRunState |
Status eksekusi akhir (IDLE, IDLE_WITH_PENDING_REQUESTS, FAILED, ...). |
.get_request_info_events() |
list[WorkflowEvent] |
Permintaan HITL yang tertunda ketika statusnya adalah IDLE_WITH_PENDING_REQUESTS. |
Siaran Langsung
Teruskan stream=True untuk menerima event segera setelah diproduksi.
from agent_framework import workflow
@workflow
async def data_pipeline(url: str) -> str:
raw = await fetch_data(url)
return await transform_data(raw)
# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
if event.type == "output":
print(f"Output: {event.data}")
# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")
Lihat python/samples/03-workflows/functional/basic_streaming_pipeline.py untuk contoh lengkapnya.
@step dekorator
@step adalah dekorator opsional yang menambahkan penembolokan hasil, pemancaran peristiwa, dan checkpoint per langkah ke fungsi asinkron individual.
from agent_framework import step, workflow
@step
async def fetch_data(url: str) -> dict:
# expensive — hits a real API
return await http_get(url)
@workflow
async def pipeline(url: str) -> str:
raw = await fetch_data(url)
return process(raw)
Apa yang @step dilakukan di dalam alur kerja
-
Hasil cache — hasilnya disimpan oleh
(step_name, call_index). Pada resume atau pemulihan titik pemeriksaan HITL, langkah yang telah selesai menghasilkan kembali hasil yang disimpan secepatnya, alih-alih mengeksekusi ulang. -
Memancarkan peristiwa —
executor_invoked/executor_completed/executor_faileddipancarkan untuk pengamatan. Pada hit cache,executor_bypasseddipancarkan sebagai gantinya. -
Menyimpan titik pemeriksaan — jika alur kerja memiliki
checkpoint_storage, titik pemeriksaan disimpan setelah setiap langkah selesai. -
Menyuntikkan
WorkflowRunContext— jika fungsi langkah mendeklarasikanctx: WorkflowRunContextparameter, konteks aktif secara otomatis disuntikkan.
Di luar alur kerja yang sedang berjalan, @step transparan — fungsi berperilaku identik dengan versi yang tidak terdekorasi, sehingga sepenuhnya dapat diuji dalam isolasi.
Kapan harus menggunakan @step
Gunakan @step pada fungsi yang mahal untuk dijalankan kembali: panggilan agen, permintaan API eksternal, atau operasi apa pun di mana eksekusi ulang saat dilanjutkan akan mahal atau memiliki efek samping. Fungsi biasa (tanpa @step) masih berfungsi di dalam @workflow; mereka hanya menjalankan kembali ketika alur kerja dilanjutkan.
from agent_framework import InMemoryCheckpointStorage, step, workflow
storage = InMemoryCheckpointStorage()
@step # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
return (await agent.run(prompt)).text
# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
return len(text) > 0
@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
draft = await call_llm(f"Write about: {topic}")
ok = await validate(draft)
return draft if ok else ""
@step juga menerima name parameter:
@step(name="transform")
async def transform_data(raw: dict) -> str:
...
Lihat python/samples/03-workflows/functional/steps_and_checkpointing.py untuk contoh lengkapnya.
WorkflowRunContext
WorkflowRunContext (alias pendek: RunContext) adalah konteks eksekusi yang disuntikkan ke dalam alur kerja dan fungsi langkah. Anda hanya memerlukannya saat menggunakan HITL, status kunci/nilai, atau peristiwa kustom.
Impor dari agent_framework:
from agent_framework import WorkflowRunContext, workflow
ctx.request_info() — Manusia-dalam-sistem
ctx.request_info() menangguhkan alur kerja untuk menunggu input eksternal:
@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
draft = await write_draft(topic)
feedback = await ctx.request_info(
{"draft": draft, "instructions": "Please review this draft"},
response_type=str,
request_id="review_request",
)
return await revise_draft(draft, feedback)
Parameter:
| Parameter | Type | Description |
|---|---|---|
request_data |
Any |
Payload yang menjelaskan input apa yang diperlukan (dict, model Pydantic, string, ...). |
response_type |
type |
Jenis respons Python yang diharapkan. |
request_id |
str | None |
Pengidentifikasi stabil untuk permintaan ini. UUID acak akan dihasilkan jika tidak diberikan. |
Putar ulang semantik: Pada eksekusi pertama, request_info() menaikkan sinyal internal (tidak pernah terlihat oleh kode Anda) yang menangguhkan alur kerja. Pemanggil menerima WorkflowRunResult dengan get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS. Lanjutkan dengan memanggil .run(responses={request_id: value}) — alur kerja dijalankan kembali dari bagian atas, dan request_info() segera mengembalikan nilai yang disediakan.
@step-decorated functions yang berjalan sebelum suspensi mengembalikan hasil cache mereka saat dilanjutkan alih-alih mengeksekusi ulang.
Menangani respons:
# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
requests = result1.get_request_info_events()
print(requests[0].request_id) # "review_request"
# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)
Lihat python/samples/03-workflows/functional/hitl_review.py untuk contoh lengkapnya.
ctx.request_info() juga didukung di dalam fungsi @step.
ctx.add_event() — Peristiwa kustom
Gunakan ctx.add_event() untuk memancarkan peristiwa khusus aplikasi bersama peristiwa siklus hidup kerangka kerja. Untuk detail dan contoh lengkap, lihat Memancarkan peristiwa kustom.
ctx.get_state()
/
ctx.set_state() — Status kunci/nilai
Gunakan ctx.get_state() dan ctx.set_state() untuk menyimpan nilai yang tetap ada selama gangguan HITL dan termasuk dalam titik pemeriksaan. Untuk detail selengkapnya, lihat Status alur kerja.
Nilai status harus dapat diserialisasikan JSON saat penyimpanan titik pemeriksaan dikonfigurasi.
ctx.is_streaming()
Mengembalikan True ketika eksekusi saat ini dimulai dengan stream=True. Berguna dalam konteks fungsi langkah yang ingin menyesuaikan perilakunya berdasarkan mode streaming.
get_run_context()
Mengambil WorkflowRunContext yang aktif dari mana saja di dalam alur kerja yang sedang berjalan — berguna dalam fungsi pembantu yang tidak mendeklarasikan parameter ctx:
from agent_framework import get_run_context
async def helper():
ctx = get_run_context()
if ctx is not None:
ctx.set_state("helper_ran", True)
Mengembalikan None saat dipanggil di luar alur kerja yang sedang berjalan.
Paralelisme dengan asyncio.gather
Gunakan konkurensi Python standar untuk fan-out/fan-in — tidak diperlukan primitif kerangka kerja:
import asyncio
from agent_framework import workflow
@workflow
async def research_pipeline(topic: str) -> str:
web, papers, news = await asyncio.gather(
research_web(topic),
research_papers(topic),
research_news(topic),
)
return await synthesize([web, papers, news])
asyncio.gather juga berfungsi ketika fungsi dihiasi dengan @step.
Lihat python/samples/03-workflows/functional/parallel_pipeline.py untuk contoh lengkapnya.
Memanggil agen di dalam alur kerja
Panggilan agen berfungsi sebagai panggilan fungsi biasa di dalam @workflow:
from agent_framework import Agent, workflow
writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)
@workflow
async def poem_workflow(topic: str) -> str:
poem = (await writer.run(f"Write a poem about: {topic}")).text
review = (await reviewer.run(f"Review this poem: {poem}")).text
return f"Poem:\n{poem}\n\nReview: {review}"
Tambahkan @step ke fungsi panggilan agen saat Anda ingin hasilnya di-cache di seluruh resume HITL atau pemulihan titik pemeriksaan:
from agent_framework import step
@step
async def write_poem(topic: str) -> str:
return (await writer.run(f"Write a poem about: {topic}")).text
Lihat python/samples/03-workflows/functional/agent_integration.py untuk contoh lengkapnya.
.as_agent() — Menggunakan alur kerja sebagai agen
Bungkus FunctionalWorkflow sebagai objek yang kompatibel dengan agen dengan .as_agent():
from agent_framework import workflow
@workflow
async def poem_workflow(topic: str) -> str:
...
# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")
# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)
# Or use in a larger workflow or orchestration
.as_agent()FunctionalWorkflowAgent mengembalikan sebuah FunctionalWorkflowAgent yang mengekspos antarmuka yang sama dengan objek agen lain, membuat alur kerja fungsional yang dapat dikomposisikan dengan sistem apa pun yang menerima agen.
| Parameter | Type | Description |
|---|---|---|
name |
str | None |
Nama tampilan untuk agen. Menggunakan nama alur kerja sebagai default. |
Lihat python/samples/03-workflows/functional/agent_integration.py contohnya.
Samples
Contoh yang dapat dijalankan ada di folder sampel berikut:
-
python/samples/01-get-started/— contoh pengantar@workflow -
python/samples/03-workflows/functional/— sampel alur kerja fungsional fitur lengkap
Langkah berikutnya
Topik terkait:
- Pelaksana — unit pemrosesan di API berbasis grafik
- Human-in-the-loop — HITL dalam alur kerja berbasis grafik
- Titik pemeriksaan — penyimpanan titik pemeriksaan dan lanjutkan
- Peristiwa — jenis peristiwa alur kerja
- Menggunakan Alur Kerja sebagai Agen
API alur kerja fungsi tidak tersedia untuk C# saat ini.