İşlevsel İş Akışı API'si

Warning

İşlevsel iş akışı API'si deneyseldir ve gelecek sürümlerde bildirimde bulunmadan değiştirilebilir veya kaldırılmaya tabidir.

İşlevsel iş akışı API'si, iş akışlarını düz Python zaman uyumsuz işlevler olarak yazmanızı sağlar. Yürütücü sınıflarını tanımlamak, kenarları bağlamak ve WorkflowBuilder kullanmak yerine, bir async işlevini @workflow ile dekore eder ve mantığınızı ifade etmek için yerel Python denetim akışını kullanırsınız — if/else, for döngüleri, asyncio.gather.

Graf API'siyle yan yana karşılaştırma için İş akışlarına genel bakış sayfasındaki İş Akışı API'leri bölümüne bakın.

@workflow dekoratörü

@workflow bir async fonksiyona uygulayarak FunctionalWorkflow nesnesine dönüştürün:

from agent_framework import workflow

@workflow
async def text_pipeline(text: str) -> str:
    upper = await to_upper_case(text)
    return await reverse_text(upper)

Dekoratör, @workflow isteğe bağlı bağımsız değişkenler içeren parametreli bir formu destekler:

from agent_framework import InMemoryCheckpointStorage, workflow

storage = InMemoryCheckpointStorage()

@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
    ...

@workflow Parametreler

Parametre Tür Description
name str | None İş akışının görünen adı. varsayılan olarak işlevinin __name__öğesini kullanır.
description str | None İsteğe bağlı insan tarafından okunabilir açıklama.
checkpoint_storage CheckpointStorage | None Çalıştırmalar arasında adım sonuçlarını kalıcı olarak saklamak için varsayılan depolama alanı. run() çağrısı başına geçersiz kılınabilir.

İş akışı işlev imzası

İş akışı işlevinin ilk parametresine.run()geçirilen girişi alır. ctx: WorkflowRunContext Parametreyi yalnızca HITL, anahtar/değer durumu veya özel olaylara ihtiyacınız olduğunda ekleyin; aksi takdirde isteğe bağlıdır:

# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
    result = await process(data)
    return result

# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
    feedback = await ctx.request_info({"draft": data}, response_type=str)
    return feedback

WorkflowRunContext, önce tür ek açıklaması daha sonra ise parametre adı ctx ile algılanır, bu nedenle hem ctx: WorkflowRunContext hem de yalnızca ctx parametresi çalışır.

İş akışı çalıştırma

.run() tarafından döndürülen FunctionalWorkflow nesnesinde @workflow çağrısı yapın.

# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world")   # str — the raw return value

# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text)                # first output as a string
print(result.get_outputs())       # list of all outputs
print(result.get_final_state())   # WorkflowRunState.IDLE

run() Parametreler

Parametre Tür Description
message Any | None Girdi, iş akışı işlevinin ilk argümanı olarak işlevine geçirilir.
stream bool Eğer True ise, ResponseStream nesneleri veren bir WorkflowEvent döndürür. Varsayılan olarak False değerini alır.
responses dict[str, Any] | None request_id tarafından anahtar belirlenen HITL yanıtları. Askıya alınmış bir iş akışını sürdürmek için kullanılır.
checkpoint_id str | None Geri yüklemek için denetim noktası. checkpoint_storage Ayarlanması gerekir.
checkpoint_storage CheckpointStorage | None Bu çalıştırma için dekoratörde varsayılan depolama kümesini geçersiz kılar.
include_status_events bool Akış dışı sonuda durum değişikliği olaylarını ekleyin.

Tam olarak bir adet message, responses, veya checkpoint_id her çağrı için sağlanmalıdır.

WorkflowRunResult

run() (akış dışı) bir WorkflowRunResultdöndürür. Temel yöntemler:

Yöntem / özellik İadeler Description
.text str İlk çıkışı bir dize olarak ver. Dize çıkışı yoksa boş dize.
.get_outputs() list[Any] İş akışı tarafından yayılan tüm çıkışlar.
.get_final_state() WorkflowRunState Son çalıştırma durumu (IDLE, IDLE_WITH_PENDING_REQUESTS, FAILED, ...).
.get_request_info_events() list[WorkflowEvent] Durum IDLE_WITH_PENDING_REQUESTS olduğunda bekleyen HITL istekleri.

Yayın

Üretilen olayları anında almak için stream=True geçirin:

from agent_framework import workflow

@workflow
async def data_pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return await transform_data(raw)

# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
    if event.type == "output":
        print(f"Output: {event.data}")

# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")

Tam bir örnek için bkz python/samples/03-workflows/functional/basic_streaming_pipeline.py .

@step dekoratörü

@step gönüllü kullanılan bir dekoratördür; bağımsız zaman uyumsuz işlevlere sonuç önbelleği, olay yayma ve adım başına denetim noktası ekleme özellikleri ekler.

from agent_framework import step, workflow

@step
async def fetch_data(url: str) -> dict:
    # expensive — hits a real API
    return await http_get(url)

@workflow
async def pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return process(raw)

bir iş akışının içinde neler @step yapar?

  • Sonuçları önbelleğe alır(step_name, call_index)tarafından depolanır. HITL özgeçmişinde veya denetim noktası geri yüklemesinde tamamlanmış bir adım, kaydedilen sonucu yeniden çalıştırmak yerine anında geri döndürür.
  • Olayları tetiklerexecutor_invoked / executor_completed / executor_failed gözlemlenebilirlik amacıyla tetiklenir. Önbellek isabetinde, executor_bypassed bunun yerine gönderilir.
  • Denetim noktalarını kaydeder ; iş akışında checkpoint_storagevarsa, her adım tamamlandıktan sonra bir denetim noktası kaydedilir.
  • Enjekte WorkflowRunContext — step işlevi bir ctx: WorkflowRunContext parametre bildirirse etkin bağlam otomatik olarak eklenir.

Çalışan bir iş akışının dışında @step saydamdır; işlev, dekore edilmemiş sürümüyle aynı şekilde davranır ve bu da onu tek başına tamamen test edilebilir hale getirir.

@step ne zaman kullanılmalı?

@step olan işlevlerde kullanın: aracı çağrıları, dış API istekleri veya özgeçmişte yeniden yürütmenin maliyetli olabileceği veya yan etkileri olan herhangi bir işlem. Düz işlevler (@step olmadan) @workflow içinde hala çalışır; iş akışı yeniden başlatıldığında yeniden yürütülür.

from agent_framework import InMemoryCheckpointStorage, step, workflow

storage = InMemoryCheckpointStorage()

@step  # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
    return (await agent.run(prompt)).text

# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
    return len(text) > 0

@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
    draft = await call_llm(f"Write about: {topic}")
    ok = await validate(draft)
    return draft if ok else ""

@step ayrıca bir name parametre kabul eder:

@step(name="transform")
async def transform_data(raw: dict) -> str:
    ...

Tam bir örnek için bkz python/samples/03-workflows/functional/steps_and_checkpointing.py .

WorkflowRunContext

WorkflowRunContext (kısa diğer ad: RunContext) , iş akışı ve adım işlevlerine eklenen yürütme bağlamıdır. Yalnızca HITL, anahtar/değer durumu veya özel olayları kullandığınızda buna ihtiyacınız olur.

Kaynağından içeri aktar:agent_framework

from agent_framework import WorkflowRunContext, workflow

ctx.request_info() — Döngüdeki insan

ctx.request_info() dış girişi beklemek için iş akışını askıya alır:

@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
    draft = await write_draft(topic)
    feedback = await ctx.request_info(
        {"draft": draft, "instructions": "Please review this draft"},
        response_type=str,
        request_id="review_request",
    )
    return await revise_draft(draft, feedback)

Parametreler:

Parametre Tür Description
request_data Any Hangi girişin gerekli olduğunu açıklayan yük (dikte, Pydantik modeli, dize, ...).
response_type type Yanıtın Python türü bekleniyor.
request_id str | None Bu istek için kararlı tanımlayıcı. Atlanırsa rastgele bir UUID oluşturulur.

Semantiği yeniden yürütme: İlk yürütmede, request_info() iş akışını askıya alan bir iç sinyal (kodunuz tarafından asla görünmez) oluşturur. Çağıran, WorkflowRunResult ile bir get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS alır. Çağırarak .run(responses={request_id: value}) devam edin— iş akışı en üstten yeniden yürütülür ve request_info() sağlanan değeri hemen döndürür.

@stepAskıya alınmadan önce çalıştırılan - dekore edilmiş işlevler, önbelleğe alınan sonuçlarını yeniden yürütmek yerine özgeçmişte döndürür.

Yanıtı işleme:

# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS

requests = result1.get_request_info_events()
print(requests[0].request_id)  # "review_request"

# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
    responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)

Tam bir örnek için bkz python/samples/03-workflows/functional/hitl_review.py .

ctx.request_info(), @step işlevlerinin içinde de desteklenir.

ctx.add_event() — Özel olaylar

Çerçeve yaşam döngüsü olaylarının yanı sıra uygulamaya özgü olayları yaymak için kullanın ctx.add_event() . Tüm ayrıntılar ve örnekler için bkz. Özel olayları yayma.

ctx.get_state() / ctx.set_state() — Anahtar/değer durumu

HITL kesintilerinde kalıcı olan ve denetim noktalarına dahil edilen değerleri depolamak için ctx.get_state() ve ctx.set_state() kullanın. Tüm ayrıntılar için bkz. İş akışı durumu.

Denetim noktası depolaması yapılandırıldığında durum değerleri JSON serileştirilebilir olmalıdır.

ctx.is_streaming()

Geçerli çalıştırma True ile başlatıldığında stream=True döndürür. Akış moduna göre davranışlarını ayarlamak isteyen step function'lar içinde kullanışlıdır.

get_run_context()

Çalışan bir iş akışının içindeki herhangi bir yerden etkin WorkflowRunContext değerini alır; parametre bildirmeyen ctx yardımcı işlevlerde kullanışlıdır:

from agent_framework import get_run_context

async def helper():
    ctx = get_run_context()
    if ctx is not None:
        ctx.set_state("helper_ran", True)

Çalışan bir iş akışının dışında çağrıldığında None döndürür.

asyncio.gather ile paralellik

Fan-out/fan-in için standart Python eşzamanlılığını kullanın. Çerçeve ilkel yapılarına gerek yoktur.

import asyncio
from agent_framework import workflow

@workflow
async def research_pipeline(topic: str) -> str:
    web, papers, news = await asyncio.gather(
        research_web(topic),
        research_papers(topic),
        research_news(topic),
    )
    return await synthesize([web, papers, news])

asyncio.gather ayrıca işlevler @step ile süslendiğinde de çalışır.

Tam bir örnek için bkz python/samples/03-workflows/functional/parallel_pipeline.py .

İş akışları içindeki etmenleri çağırma

Ajan çağrıları, @workflow içinde basit işlev çağrıları olarak çalışır.

from agent_framework import Agent, workflow

writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)

@workflow
async def poem_workflow(topic: str) -> str:
    poem = (await writer.run(f"Write a poem about: {topic}")).text
    review = (await reviewer.run(f"Review this poem: {poem}")).text
    return f"Poem:\n{poem}\n\nReview: {review}"

Sonuçların HITL özgeçmişlerinde veya denetim noktası geri yüklemelerinde önbelleğe alınmasını istediğinizde aracı çağırma işlevlerine @step ekleyin.

from agent_framework import step

@step
async def write_poem(topic: str) -> str:
    return (await writer.run(f"Write a poem about: {topic}")).text

Tam bir örnek için bkz python/samples/03-workflows/functional/agent_integration.py .

.as_agent() — Bir iş akışını aracı olarak kullanma

FunctionalWorkflow öğesini .as_agent() ile bir aracı uyumlu nesne olarak sarın.

from agent_framework import workflow

@workflow
async def poem_workflow(topic: str) -> str:
    ...

# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")

# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)

# Or use in a larger workflow or orchestration

.as_agent(), diğer aracı nesneleriyle aynı FunctionalWorkflowAgent arabirimi kullanıma sunan ve işlevsel iş akışlarını aracı kabul eden herhangi bir sistemle birleştirilebilir hale getiren bir run() döndürür.

Parametre Tür Description
name str | None Temsilcinin görünen adı. Varsayılan olarak iş akışı adını kullanır.

Örnek için bkz python/samples/03-workflows/functional/agent_integration.py .

Samples

Çalıştırılabilir örnekler aşağıdaki örnek klasörlerde verilmiştir:

Sonraki Adımlar

İlgili konular:

İşlevsel iş akışı API'si şu anda C# için kullanılamıyor.