Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Warning
İşlevsel iş akışı API'si deneyseldir ve gelecek sürümlerde bildirimde bulunmadan değiştirilebilir veya kaldırılmaya tabidir.
İşlevsel iş akışı API'si, iş akışlarını düz Python zaman uyumsuz işlevler olarak yazmanızı sağlar. Yürütücü sınıflarını tanımlamak, kenarları bağlamak ve WorkflowBuilder kullanmak yerine, bir async işlevini @workflow ile dekore eder ve mantığınızı ifade etmek için yerel Python denetim akışını kullanırsınız — if/else, for döngüleri, asyncio.gather.
Graf API'siyle yan yana karşılaştırma için İş akışlarına genel bakış sayfasındaki İş Akışı API'leri bölümüne bakın.
@workflow dekoratörü
@workflow bir async fonksiyona uygulayarak FunctionalWorkflow nesnesine dönüştürün:
from agent_framework import workflow
@workflow
async def text_pipeline(text: str) -> str:
upper = await to_upper_case(text)
return await reverse_text(upper)
Dekoratör, @workflow isteğe bağlı bağımsız değişkenler içeren parametreli bir formu destekler:
from agent_framework import InMemoryCheckpointStorage, workflow
storage = InMemoryCheckpointStorage()
@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
...
@workflow Parametreler
| Parametre | Tür | Description |
|---|---|---|
name |
str | None |
İş akışının görünen adı. varsayılan olarak işlevinin __name__öğesini kullanır. |
description |
str | None |
İsteğe bağlı insan tarafından okunabilir açıklama. |
checkpoint_storage |
CheckpointStorage | None |
Çalıştırmalar arasında adım sonuçlarını kalıcı olarak saklamak için varsayılan depolama alanı.
run() çağrısı başına geçersiz kılınabilir. |
İş akışı işlev imzası
İş akışı işlevinin ilk parametresine.run()geçirilen girişi alır.
ctx: WorkflowRunContext Parametreyi yalnızca HITL, anahtar/değer durumu veya özel olaylara ihtiyacınız olduğunda ekleyin; aksi takdirde isteğe bağlıdır:
# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
result = await process(data)
return result
# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
feedback = await ctx.request_info({"draft": data}, response_type=str)
return feedback
WorkflowRunContext, önce tür ek açıklaması daha sonra ise parametre adı ctx ile algılanır, bu nedenle hem ctx: WorkflowRunContext hem de yalnızca ctx parametresi çalışır.
İş akışı çalıştırma
.run() tarafından döndürülen FunctionalWorkflow nesnesinde @workflow çağrısı yapın.
# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world") # str — the raw return value
# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text) # first output as a string
print(result.get_outputs()) # list of all outputs
print(result.get_final_state()) # WorkflowRunState.IDLE
run() Parametreler
| Parametre | Tür | Description |
|---|---|---|
message |
Any | None |
Girdi, iş akışı işlevinin ilk argümanı olarak işlevine geçirilir. |
stream |
bool |
Eğer True ise, ResponseStream nesneleri veren bir WorkflowEvent döndürür. Varsayılan olarak False değerini alır. |
responses |
dict[str, Any] | None |
request_id tarafından anahtar belirlenen HITL yanıtları. Askıya alınmış bir iş akışını sürdürmek için kullanılır. |
checkpoint_id |
str | None |
Geri yüklemek için denetim noktası.
checkpoint_storage Ayarlanması gerekir. |
checkpoint_storage |
CheckpointStorage | None |
Bu çalıştırma için dekoratörde varsayılan depolama kümesini geçersiz kılar. |
include_status_events |
bool |
Akış dışı sonuda durum değişikliği olaylarını ekleyin. |
Tam olarak bir adet message, responses, veya checkpoint_id her çağrı için sağlanmalıdır.
WorkflowRunResult
run() (akış dışı) bir WorkflowRunResultdöndürür. Temel yöntemler:
| Yöntem / özellik | İadeler | Description |
|---|---|---|
.text |
str |
İlk çıkışı bir dize olarak ver. Dize çıkışı yoksa boş dize. |
.get_outputs() |
list[Any] |
İş akışı tarafından yayılan tüm çıkışlar. |
.get_final_state() |
WorkflowRunState |
Son çalıştırma durumu (IDLE, IDLE_WITH_PENDING_REQUESTS, FAILED, ...). |
.get_request_info_events() |
list[WorkflowEvent] |
Durum IDLE_WITH_PENDING_REQUESTS olduğunda bekleyen HITL istekleri. |
Yayın
Üretilen olayları anında almak için stream=True geçirin:
from agent_framework import workflow
@workflow
async def data_pipeline(url: str) -> str:
raw = await fetch_data(url)
return await transform_data(raw)
# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
if event.type == "output":
print(f"Output: {event.data}")
# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")
Tam bir örnek için bkz python/samples/03-workflows/functional/basic_streaming_pipeline.py .
@step dekoratörü
@step gönüllü kullanılan bir dekoratördür; bağımsız zaman uyumsuz işlevlere sonuç önbelleği, olay yayma ve adım başına denetim noktası ekleme özellikleri ekler.
from agent_framework import step, workflow
@step
async def fetch_data(url: str) -> dict:
# expensive — hits a real API
return await http_get(url)
@workflow
async def pipeline(url: str) -> str:
raw = await fetch_data(url)
return process(raw)
bir iş akışının içinde neler @step yapar?
-
Sonuçları önbelleğe alır —
(step_name, call_index)tarafından depolanır. HITL özgeçmişinde veya denetim noktası geri yüklemesinde tamamlanmış bir adım, kaydedilen sonucu yeniden çalıştırmak yerine anında geri döndürür. -
Olayları tetikler —
executor_invoked/executor_completed/executor_failedgözlemlenebilirlik amacıyla tetiklenir. Önbellek isabetinde,executor_bypassedbunun yerine gönderilir. -
Denetim noktalarını kaydeder ; iş akışında
checkpoint_storagevarsa, her adım tamamlandıktan sonra bir denetim noktası kaydedilir. -
Enjekte
WorkflowRunContext— step işlevi birctx: WorkflowRunContextparametre bildirirse etkin bağlam otomatik olarak eklenir.
Çalışan bir iş akışının dışında @step saydamdır; işlev, dekore edilmemiş sürümüyle aynı şekilde davranır ve bu da onu tek başına tamamen test edilebilir hale getirir.
@step ne zaman kullanılmalı?
@step olan işlevlerde kullanın: aracı çağrıları, dış API istekleri veya özgeçmişte yeniden yürütmenin maliyetli olabileceği veya yan etkileri olan herhangi bir işlem. Düz işlevler (@step olmadan) @workflow içinde hala çalışır; iş akışı yeniden başlatıldığında yeniden yürütülür.
from agent_framework import InMemoryCheckpointStorage, step, workflow
storage = InMemoryCheckpointStorage()
@step # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
return (await agent.run(prompt)).text
# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
return len(text) > 0
@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
draft = await call_llm(f"Write about: {topic}")
ok = await validate(draft)
return draft if ok else ""
@step ayrıca bir name parametre kabul eder:
@step(name="transform")
async def transform_data(raw: dict) -> str:
...
Tam bir örnek için bkz python/samples/03-workflows/functional/steps_and_checkpointing.py .
WorkflowRunContext
WorkflowRunContext (kısa diğer ad: RunContext) , iş akışı ve adım işlevlerine eklenen yürütme bağlamıdır. Yalnızca HITL, anahtar/değer durumu veya özel olayları kullandığınızda buna ihtiyacınız olur.
Kaynağından içeri aktar:agent_framework
from agent_framework import WorkflowRunContext, workflow
ctx.request_info() — Döngüdeki insan
ctx.request_info() dış girişi beklemek için iş akışını askıya alır:
@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
draft = await write_draft(topic)
feedback = await ctx.request_info(
{"draft": draft, "instructions": "Please review this draft"},
response_type=str,
request_id="review_request",
)
return await revise_draft(draft, feedback)
Parametreler:
| Parametre | Tür | Description |
|---|---|---|
request_data |
Any |
Hangi girişin gerekli olduğunu açıklayan yük (dikte, Pydantik modeli, dize, ...). |
response_type |
type |
Yanıtın Python türü bekleniyor. |
request_id |
str | None |
Bu istek için kararlı tanımlayıcı. Atlanırsa rastgele bir UUID oluşturulur. |
Semantiği yeniden yürütme: İlk yürütmede, request_info() iş akışını askıya alan bir iç sinyal (kodunuz tarafından asla görünmez) oluşturur. Çağıran, WorkflowRunResult ile bir get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS alır. Çağırarak .run(responses={request_id: value}) devam edin— iş akışı en üstten yeniden yürütülür ve request_info() sağlanan değeri hemen döndürür.
@stepAskıya alınmadan önce çalıştırılan - dekore edilmiş işlevler, önbelleğe alınan sonuçlarını yeniden yürütmek yerine özgeçmişte döndürür.
Yanıtı işleme:
# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
requests = result1.get_request_info_events()
print(requests[0].request_id) # "review_request"
# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)
Tam bir örnek için bkz python/samples/03-workflows/functional/hitl_review.py .
ctx.request_info(), @step işlevlerinin içinde de desteklenir.
ctx.add_event() — Özel olaylar
Çerçeve yaşam döngüsü olaylarının yanı sıra uygulamaya özgü olayları yaymak için kullanın ctx.add_event() . Tüm ayrıntılar ve örnekler için bkz. Özel olayları yayma.
ctx.get_state()
/
ctx.set_state() — Anahtar/değer durumu
HITL kesintilerinde kalıcı olan ve denetim noktalarına dahil edilen değerleri depolamak için ctx.get_state() ve ctx.set_state() kullanın. Tüm ayrıntılar için bkz. İş akışı durumu.
Denetim noktası depolaması yapılandırıldığında durum değerleri JSON serileştirilebilir olmalıdır.
ctx.is_streaming()
Geçerli çalıştırma True ile başlatıldığında stream=True döndürür. Akış moduna göre davranışlarını ayarlamak isteyen step function'lar içinde kullanışlıdır.
get_run_context()
Çalışan bir iş akışının içindeki herhangi bir yerden etkin WorkflowRunContext değerini alır; parametre bildirmeyen ctx yardımcı işlevlerde kullanışlıdır:
from agent_framework import get_run_context
async def helper():
ctx = get_run_context()
if ctx is not None:
ctx.set_state("helper_ran", True)
Çalışan bir iş akışının dışında çağrıldığında None döndürür.
asyncio.gather ile paralellik
Fan-out/fan-in için standart Python eşzamanlılığını kullanın. Çerçeve ilkel yapılarına gerek yoktur.
import asyncio
from agent_framework import workflow
@workflow
async def research_pipeline(topic: str) -> str:
web, papers, news = await asyncio.gather(
research_web(topic),
research_papers(topic),
research_news(topic),
)
return await synthesize([web, papers, news])
asyncio.gather ayrıca işlevler @step ile süslendiğinde de çalışır.
Tam bir örnek için bkz python/samples/03-workflows/functional/parallel_pipeline.py .
İş akışları içindeki etmenleri çağırma
Ajan çağrıları, @workflow içinde basit işlev çağrıları olarak çalışır.
from agent_framework import Agent, workflow
writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)
@workflow
async def poem_workflow(topic: str) -> str:
poem = (await writer.run(f"Write a poem about: {topic}")).text
review = (await reviewer.run(f"Review this poem: {poem}")).text
return f"Poem:\n{poem}\n\nReview: {review}"
Sonuçların HITL özgeçmişlerinde veya denetim noktası geri yüklemelerinde önbelleğe alınmasını istediğinizde aracı çağırma işlevlerine @step ekleyin.
from agent_framework import step
@step
async def write_poem(topic: str) -> str:
return (await writer.run(f"Write a poem about: {topic}")).text
Tam bir örnek için bkz python/samples/03-workflows/functional/agent_integration.py .
.as_agent() — Bir iş akışını aracı olarak kullanma
FunctionalWorkflow öğesini .as_agent() ile bir aracı uyumlu nesne olarak sarın.
from agent_framework import workflow
@workflow
async def poem_workflow(topic: str) -> str:
...
# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")
# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)
# Or use in a larger workflow or orchestration
.as_agent(), diğer aracı nesneleriyle aynı FunctionalWorkflowAgent arabirimi kullanıma sunan ve işlevsel iş akışlarını aracı kabul eden herhangi bir sistemle birleştirilebilir hale getiren bir run() döndürür.
| Parametre | Tür | Description |
|---|---|---|
name |
str | None |
Temsilcinin görünen adı. Varsayılan olarak iş akışı adını kullanır. |
Örnek için bkz python/samples/03-workflows/functional/agent_integration.py .
Samples
Çalıştırılabilir örnekler aşağıdaki örnek klasörlerde verilmiştir:
-
python/samples/01-get-started/— giriş@workflowörnekleri -
python/samples/03-workflows/functional/— tam özellikli işlevsel iş akışı örnekleri
Sonraki Adımlar
İlgili konular:
- Yürütücüler — graf tabanlı API'de işlem birimleri
- Döngüde insan - Graf tabanlı iş akışlarında HITL
- Kontrol noktaları — kontrol noktası depolama ve devam ettirme
- Olaylar — iş akışı olay türleri
- İş Akışlarını Aracı Olarak Kullanma
İşlevsel iş akışı API'si şu anda C# için kullanılamıyor.