API Alur Kerja Fungsional

Warning

API alur kerja fungsional bersifat eksperimental dan dapat berubah atau dihapus dalam versi mendatang tanpa pemberitahuan.

API alur kerja fungsi ini memungkinkan Anda menulis alur kerja sebagai fungsi asinkron Python biasa. Alih-alih mendefinisikan kelas pelaksana, tepi kabel, dan menggunakan , Anda menghias fungsi /> dan menggunakan alur kontrol Python asli — , perulangan , — untuk mengekspresikan logika Anda.

Untuk perbandingan berdampingan dengan API grafik, lihat API Alur Kerja pada gambaran umum Alur Kerja.

@workflow dekorator

Terapkan @workflow ke async fungsi untuk mengonversinya menjadi FunctionalWorkflow objek:

from agent_framework import workflow

@workflow
async def text_pipeline(text: str) -> str:
    upper = await to_upper_case(text)
    return await reverse_text(upper)

Dekorator @workflow mendukung formulir berparameter dengan argumen opsional:

from agent_framework import InMemoryCheckpointStorage, workflow

storage = InMemoryCheckpointStorage()

@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
    ...

@workflow parameter

Parameter Type Description
name str | None Nama tampilan untuk alur kerja. Mengacu pada fungsi __name__.
description str | None Deskripsi opsional yang dapat dibaca manusia.
checkpoint_storage CheckpointStorage | None Penyimpanan default untuk menyimpan hasil langkah di antara pengulangan. Dapat ditimpa per panggilan di run().

Tanda tangan fungsi alur kerja

Parameter pertama fungsi alur kerja menerima input yang diteruskan ke .run(). Tambahkan parameter ctx: WorkflowRunContext hanya ketika Anda memerlukan HITL, keadaan kunci/nilai, atau event kustom — sebaliknya, parameter ini bersifat opsional.

# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
    result = await process(data)
    return result

# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
    feedback = await ctx.request_info({"draft": data}, response_type=str)
    return feedback

WorkflowRunContext terdeteksi oleh anotasi jenis terlebih dahulu, kemudian oleh nama parameter ctx, sehingga baik ctx: WorkflowRunContext maupun parameter kosong ctx berfungsi.

Menjalankan alur kerja

Panggil .run() pada objek FunctionalWorkflow yang dikembalikan oleh @workflow.

# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world")   # str — the raw return value

# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text)                # first output as a string
print(result.get_outputs())       # list of all outputs
print(result.get_final_state())   # WorkflowRunState.IDLE

run() parameter

Parameter Type Description
message Any | None Masukan diteruskan ke fungsi alur kerja sebagai argumen pertama.
stream bool Jika True, mengembalikan ResponseStream yang menghasilkan WorkflowEvent objek. Secara default menjadi False.
responses dict[str, Any] | None Respons HITL didasarkan pada request_id. Digunakan untuk melanjutkan alur kerja yang ditangguhkan.
checkpoint_id str | None Titik pemeriksaan untuk pemulihan. checkpoint_storage harus diatur.
checkpoint_storage CheckpointStorage | None Mengambil alih set penyimpanan default pada dekorator untuk eksekusi ini.
include_status_events bool Sertakan event perubahan status dalam hasil non-streaming.

Tepat salah satu dari message, responses, atau checkpoint_id harus disediakan per panggilan.

WorkflowRunResult

run() (non-streaming) akan mengembalikan sebuah WorkflowRunResult. Metode utama:

Metode / Properti Returns Description
.text str Keluaran pertama sebagai string. String kosong jika tidak ada output string.
.get_outputs() list[Any] Semua output yang dipancarkan oleh alur kerja.
.get_final_state() WorkflowRunState Status eksekusi akhir (IDLE, IDLE_WITH_PENDING_REQUESTS, FAILED, ...).
.get_request_info_events() list[WorkflowEvent] Permintaan HITL yang tertunda ketika statusnya adalah IDLE_WITH_PENDING_REQUESTS.

Siaran Langsung

Teruskan stream=True untuk menerima event segera setelah diproduksi.

from agent_framework import workflow

@workflow
async def data_pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return await transform_data(raw)

# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
    if event.type == "output":
        print(f"Output: {event.data}")

# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")

Lihat python/samples/03-workflows/functional/basic_streaming_pipeline.py untuk contoh lengkapnya.

@step dekorator

@step adalah dekorator opsional yang menambahkan penembolokan hasil, pemancaran peristiwa, dan checkpoint per langkah ke fungsi asinkron individual.

from agent_framework import step, workflow

@step
async def fetch_data(url: str) -> dict:
    # expensive — hits a real API
    return await http_get(url)

@workflow
async def pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return process(raw)

Apa yang @step dilakukan di dalam alur kerja

  • Hasil cache — hasilnya disimpan oleh (step_name, call_index). Pada resume atau pemulihan titik pemeriksaan HITL, langkah yang telah selesai menghasilkan kembali hasil yang disimpan secepatnya, alih-alih mengeksekusi ulang.
  • Memancarkan peristiwaexecutor_invoked / executor_completed / executor_failed dipancarkan untuk pengamatan. Pada hit cache, executor_bypassed dipancarkan sebagai gantinya.
  • Menyimpan titik pemeriksaan — jika alur kerja memiliki checkpoint_storage, titik pemeriksaan disimpan setelah setiap langkah selesai.
  • Menyuntikkan WorkflowRunContext — jika fungsi langkah mendeklarasikan ctx: WorkflowRunContext parameter, konteks aktif secara otomatis disuntikkan.

Di luar alur kerja yang sedang berjalan, @step transparan — fungsi berperilaku identik dengan versi yang tidak terdekorasi, sehingga sepenuhnya dapat diuji dalam isolasi.

Kapan harus menggunakan @step

Gunakan @step pada fungsi yang mahal untuk dijalankan kembali: panggilan agen, permintaan API eksternal, atau operasi apa pun di mana eksekusi ulang saat dilanjutkan akan mahal atau memiliki efek samping. Fungsi biasa (tanpa @step) masih berfungsi di dalam @workflow; mereka hanya menjalankan kembali ketika alur kerja dilanjutkan.

from agent_framework import InMemoryCheckpointStorage, step, workflow

storage = InMemoryCheckpointStorage()

@step  # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
    return (await agent.run(prompt)).text

# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
    return len(text) > 0

@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
    draft = await call_llm(f"Write about: {topic}")
    ok = await validate(draft)
    return draft if ok else ""

@step juga menerima name parameter:

@step(name="transform")
async def transform_data(raw: dict) -> str:
    ...

Lihat python/samples/03-workflows/functional/steps_and_checkpointing.py untuk contoh lengkapnya.

WorkflowRunContext

WorkflowRunContext (alias pendek: RunContext) adalah konteks eksekusi yang disuntikkan ke dalam alur kerja dan fungsi langkah. Anda hanya memerlukannya saat menggunakan HITL, status kunci/nilai, atau peristiwa kustom.

Impor dari agent_framework:

from agent_framework import WorkflowRunContext, workflow

ctx.request_info() — Manusia-dalam-sistem

ctx.request_info() menangguhkan alur kerja untuk menunggu input eksternal:

@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
    draft = await write_draft(topic)
    feedback = await ctx.request_info(
        {"draft": draft, "instructions": "Please review this draft"},
        response_type=str,
        request_id="review_request",
    )
    return await revise_draft(draft, feedback)

Parameter:

Parameter Type Description
request_data Any Payload yang menjelaskan input apa yang diperlukan (dict, model Pydantic, string, ...).
response_type type Jenis respons Python yang diharapkan.
request_id str | None Pengidentifikasi stabil untuk permintaan ini. UUID acak akan dihasilkan jika tidak diberikan.

Putar ulang semantik: Pada eksekusi pertama, request_info() menaikkan sinyal internal (tidak pernah terlihat oleh kode Anda) yang menangguhkan alur kerja. Pemanggil menerima WorkflowRunResult dengan get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS. Lanjutkan dengan memanggil .run(responses={request_id: value}) — alur kerja dijalankan kembali dari bagian atas, dan request_info() segera mengembalikan nilai yang disediakan.

@step-decorated functions yang berjalan sebelum suspensi mengembalikan hasil cache mereka saat dilanjutkan alih-alih mengeksekusi ulang.

Menangani respons:

# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS

requests = result1.get_request_info_events()
print(requests[0].request_id)  # "review_request"

# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
    responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)

Lihat python/samples/03-workflows/functional/hitl_review.py untuk contoh lengkapnya.

ctx.request_info() juga didukung di dalam fungsi @step.

ctx.add_event() — Peristiwa kustom

Gunakan ctx.add_event() untuk memancarkan peristiwa khusus aplikasi bersama peristiwa siklus hidup kerangka kerja. Untuk detail dan contoh lengkap, lihat Memancarkan peristiwa kustom.

ctx.get_state() / ctx.set_state() — Status kunci/nilai

Gunakan ctx.get_state() dan ctx.set_state() untuk menyimpan nilai yang tetap ada selama gangguan HITL dan termasuk dalam titik pemeriksaan. Untuk detail selengkapnya, lihat Status alur kerja.

Nilai status harus dapat diserialisasikan JSON saat penyimpanan titik pemeriksaan dikonfigurasi.

ctx.is_streaming()

Mengembalikan True ketika eksekusi saat ini dimulai dengan stream=True. Berguna dalam konteks fungsi langkah yang ingin menyesuaikan perilakunya berdasarkan mode streaming.

get_run_context()

Mengambil WorkflowRunContext yang aktif dari mana saja di dalam alur kerja yang sedang berjalan — berguna dalam fungsi pembantu yang tidak mendeklarasikan parameter ctx:

from agent_framework import get_run_context

async def helper():
    ctx = get_run_context()
    if ctx is not None:
        ctx.set_state("helper_ran", True)

Mengembalikan None saat dipanggil di luar alur kerja yang sedang berjalan.

Paralelisme dengan asyncio.gather

Gunakan konkurensi Python standar untuk fan-out/fan-in — tidak diperlukan primitif kerangka kerja:

import asyncio
from agent_framework import workflow

@workflow
async def research_pipeline(topic: str) -> str:
    web, papers, news = await asyncio.gather(
        research_web(topic),
        research_papers(topic),
        research_news(topic),
    )
    return await synthesize([web, papers, news])

asyncio.gather juga berfungsi ketika fungsi dihiasi dengan @step.

Lihat python/samples/03-workflows/functional/parallel_pipeline.py untuk contoh lengkapnya.

Memanggil agen di dalam alur kerja

Panggilan agen berfungsi sebagai panggilan fungsi biasa di dalam @workflow:

from agent_framework import Agent, workflow

writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)

@workflow
async def poem_workflow(topic: str) -> str:
    poem = (await writer.run(f"Write a poem about: {topic}")).text
    review = (await reviewer.run(f"Review this poem: {poem}")).text
    return f"Poem:\n{poem}\n\nReview: {review}"

Tambahkan @step ke fungsi panggilan agen saat Anda ingin hasilnya di-cache di seluruh resume HITL atau pemulihan titik pemeriksaan:

from agent_framework import step

@step
async def write_poem(topic: str) -> str:
    return (await writer.run(f"Write a poem about: {topic}")).text

Lihat python/samples/03-workflows/functional/agent_integration.py untuk contoh lengkapnya.

.as_agent() — Menggunakan alur kerja sebagai agen

Bungkus FunctionalWorkflow sebagai objek yang kompatibel dengan agen dengan .as_agent():

from agent_framework import workflow

@workflow
async def poem_workflow(topic: str) -> str:
    ...

# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")

# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)

# Or use in a larger workflow or orchestration

.as_agent()FunctionalWorkflowAgent mengembalikan sebuah FunctionalWorkflowAgent yang mengekspos antarmuka yang sama dengan objek agen lain, membuat alur kerja fungsional yang dapat dikomposisikan dengan sistem apa pun yang menerima agen.

Parameter Type Description
name str | None Nama tampilan untuk agen. Menggunakan nama alur kerja sebagai default.

Lihat python/samples/03-workflows/functional/agent_integration.py contohnya.

Samples

Contoh yang dapat dijalankan ada di folder sampel berikut:

Langkah berikutnya

Pembuat & Pelaksanaan Alur Kerja

Topik terkait:

API alur kerja fungsi tidak tersedia untuk C# saat ini.