API de Fluxo de Trabalho Funcional

Warning

A API funcional do fluxo de trabalho é experimental e sujeita a alterações ou remoção em versões futuras sem aviso prévio.

A API de workflow funcional permite-lhe escrever fluxos de trabalho como funções simples em Python assíncronas. Em vez de definir classes executor, ligar arestas e usar WorkflowBuilder, decorar uma função async com @workflow e utilizar o fluxo de controlo nativo do Python — if/else, for loops, asyncio.gather — para exprimir a sua lógica.

Para uma comparação lado-a-lado com a API de gráficos, consulte APIs de Workflows na visão geral de Workflows.

@workflow decorador

Aplique @workflow a uma async função para a converter num FunctionalWorkflow objeto:

from agent_framework import workflow

@workflow
async def text_pipeline(text: str) -> str:
    upper = await to_upper_case(text)
    return await reverse_text(upper)

O @workflow decorador suporta uma forma parametrizada com argumentos opcionais:

from agent_framework import InMemoryCheckpointStorage, workflow

storage = InMemoryCheckpointStorage()

@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
    ...

@workflow Parâmetros

Parâmetro Tipo Description
name str | None Nome de exibição para o fluxo de trabalho. Por defeito, a função __name__.
description str | None Descrição opcional legível por humanos.
checkpoint_storage CheckpointStorage | None O armazenamento padrão para a persistência dos resultados das etapas entre execuções. Pode ser anulado por chamada em run().

Assinatura de função do fluxo de trabalho

O primeiro parâmetro da função de workflow recebe a entrada passada para .run(). Adicione um ctx: WorkflowRunContext parâmetro apenas quando precisar de HITL, estado chave/valor ou eventos personalizados — caso contrário, é opcional:

# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
    result = await process(data)
    return result

# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
    feedback = await ctx.request_info({"draft": data}, response_type=str)
    return feedback

WorkflowRunContext é detetado primeiro pela anotação do tipo, depois pelo nome ctxdo parâmetro , pelo que tanto ctx: WorkflowRunContext como um parâmetro nu ctx funcionam.

Execução de um fluxo de trabalho

Chamada .run() ao FunctionalWorkflow objeto devolvido por @workflow:

# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world")   # str — the raw return value

# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text)                # first output as a string
print(result.get_outputs())       # list of all outputs
print(result.get_final_state())   # WorkflowRunState.IDLE

run() Parâmetros

Parâmetro Tipo Description
message Any | None A entrada é passada para a função de fluxo de trabalho como seu primeiro argumento.
stream bool Se True, devolve um ResponseStream, que gera WorkflowEvent objetos. O padrão é False.
responses dict[str, Any] | None Respostas HITL associadas a request_id. Usado para retomar um fluxo de trabalho suspenso.
checkpoint_id str | None Ponto de restauro. É necessário que checkpoint_storage seja configurado.
checkpoint_storage CheckpointStorage | None Substitui o armazenamento padrão definido no decorador para esta execução.
include_status_events bool Inclua eventos de alteração de estado no resultado não transmitido por streaming.

Exatamente uma de message, responses ou checkpoint_id deve ser fornecida por chamada.

WorkflowRunResult

run() (não em fluxo) retorna um WorkflowRunResult. Métodos-chave:

Método / propriedade Devoluções Description
.text str Primeira saída como uma cadeia de caracteres. String vazio se não houver string output.
.get_outputs() list[Any] Todas as saídas emitidas pelo fluxo de trabalho.
.get_final_state() WorkflowRunState Estado final de execução (IDLE, IDLE_WITH_PENDING_REQUESTS, FAILED, ...).
.get_request_info_events() list[WorkflowEvent] Pedidos HITL pendentes quando o estado é IDLE_WITH_PENDING_REQUESTS.

Streaming

Passe stream=True para receber eventos à medida que forem produzidos:

from agent_framework import workflow

@workflow
async def data_pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return await transform_data(raw)

# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
    if event.type == "output":
        print(f"Output: {event.data}")

# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")

Veja python/samples/03-workflows/functional/basic_streaming_pipeline.py para um exemplo completo.

@step decorador

@step é um decorador opcional que adiciona cache de resultados, disparo de eventos e registo de pontos de verificação por passo a funções assíncronas individuais.

from agent_framework import step, workflow

@step
async def fetch_data(url: str) -> dict:
    # expensive — hits a real API
    return await http_get(url)

@workflow
async def pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return process(raw)

O que @step faz dentro de um fluxo de trabalho

  • Armazena em cache os resultados — o resultado é armazenado por (step_name, call_index). No processo de retomar ou restaurar no HITL, um passo concluído devolve instantaneamente o seu resultado guardado, em vez de ser reexecutado.
  • Emite eventosexecutor_invoked / executor_completed / executor_failed são emitidos para observabilidade. Num acerto de cache, executor_bypassed é emitido em vez disso.
  • Guarda pontos de verificação — se o fluxo de trabalho tiver checkpoint_storage, um ponto de verificação é guardado após a conclusão de cada etapa.
  • Injeta WorkflowRunContext — se a função step declarar um ctx: WorkflowRunContext parâmetro, o contexto ativo é automaticamente injetado.

Fora de um fluxo de trabalho em execução, @step é transparente — a função comporta-se de forma idêntica à sua versão sem decoração, tornando-a totalmente testável isoladamente.

Quando usar @step

Use @step em funções que são caras de reexecutar: chamadas de agentes, pedidos de API externos, ou qualquer operação em que a reexecução ao reiniciar seria dispendiosa ou teria efeitos colaterais. As funções simples (sem @step) continuam a funcionar dentro de @workflow; simplesmente são reexecutadas quando o fluxo de trabalho recomeça.

from agent_framework import InMemoryCheckpointStorage, step, workflow

storage = InMemoryCheckpointStorage()

@step  # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
    return (await agent.run(prompt)).text

# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
    return len(text) > 0

@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
    draft = await call_llm(f"Write about: {topic}")
    ok = await validate(draft)
    return draft if ok else ""

@step Aceita também um name parâmetro:

@step(name="transform")
async def transform_data(raw: dict) -> str:
    ...

Veja python/samples/03-workflows/functional/steps_and_checkpointing.py para um exemplo completo.

WorkflowRunContext

WorkflowRunContext (pseudónimo curto: RunContext) é o contexto de execução injetado no fluxo de trabalho e nas funções de etapas. Só precisas quando usas HITL, estado chave/valor ou eventos personalizados.

Importe-o de agent_framework:

from agent_framework import WorkflowRunContext, workflow

ctx.request_info() — Humano no ciclo

ctx.request_info() suspende o fluxo de trabalho para esperar por entrada externa:

@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
    draft = await write_draft(topic)
    feedback = await ctx.request_info(
        {"draft": draft, "instructions": "Please review this draft"},
        response_type=str,
        request_id="review_request",
    )
    return await revise_draft(draft, feedback)

Parâmetros:

Parâmetro Tipo Description
request_data Any Carga útil que descreve que entrada é necessária (dict, modelo Pydantic, string, ...).
response_type type Esperava o tipo de resposta em Python.
request_id str | None Identificador estável para esta requisição. Um UUID aleatório é gerado se for omitido.

Semântica de repetição: Na primeira execução, request_info() gera um sinal interno (nunca visível para o teu código) que suspende o fluxo de trabalho. O chamador recebe um WorkflowRunResult com get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS. Retomar chamando .run(responses={request_id: value}) — o fluxo de trabalho é reexecutado a partir do topo e request_info() devolve imediatamente o valor fornecido.

@step-as funções decoradas que foram executadas antes da suspensão devolvem os seus resultados armazenados em cache ao serem retomadas ao invés de serem reexecutadas.

Como lidar com a resposta:

# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS

requests = result1.get_request_info_events()
print(requests[0].request_id)  # "review_request"

# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
    responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)

Veja python/samples/03-workflows/functional/hitl_review.py para um exemplo completo.

ctx.request_info() também é suportado dentro das @step funções.

ctx.add_event() — Eventos personalizados

Use ctx.add_event() para emitir eventos específicos da aplicação juntamente com eventos do ciclo de vida do framework. Para detalhes completos e exemplos, veja Emitir eventos personalizados.

ctx.get_state() / ctx.set_state() — Estado chave/valor

Utilizar ctx.get_state() e ctx.set_state() para armazenar valores que persistem através das interrupções HITL e que são incluídos nos Checkpoints. Para detalhes completos, veja Estado do fluxo de trabalho.

Os valores de estado devem ser serializáveis em JSON quando o armazenamento de checkpoint está configurado.

ctx.is_streaming()

Retorna True quando a execução atual foi iniciada com stream=True. Funções úteis dentro do passo que querem ajustar o seu comportamento com base no modo de streaming.

get_run_context()

Recupera o ativo WorkflowRunContext a partir de qualquer ponto de um fluxo de trabalho em execução — útil em funções de apoio que não declaram um parâmetro ctx.

from agent_framework import get_run_context

async def helper():
    ctx = get_run_context()
    if ctx is not None:
        ctx.set_state("helper_ran", True)

Retorna None quando chamado fora de um fluxo de trabalho em curso.

Paralelismo com asyncio.gather

Use concorrência padrão em Python para fan-out/fan-in — não são necessárias primitivas de framework:

import asyncio
from agent_framework import workflow

@workflow
async def research_pipeline(topic: str) -> str:
    web, papers, news = await asyncio.gather(
        research_web(topic),
        research_papers(topic),
        research_news(topic),
    )
    return await synthesize([web, papers, news])

asyncio.gather também funciona quando as funções são decoradas com @step.

Veja python/samples/03-workflows/functional/parallel_pipeline.py para um exemplo completo.

Chamar agentes dentro de fluxos de trabalho

As chamadas de agente funcionam como chamadas de função simples dentro de @workflow:

from agent_framework import Agent, workflow

writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)

@workflow
async def poem_workflow(topic: str) -> str:
    poem = (await writer.run(f"Write a poem about: {topic}")).text
    review = (await reviewer.run(f"Review this poem: {poem}")).text
    return f"Poem:\n{poem}\n\nReview: {review}"

Adicione @step funções de chamada de agentes quando quiser que os seus resultados sejam armazenados em cache em retomadas HITL ou restaurações de checkpoints:

from agent_framework import step

@step
async def write_poem(topic: str) -> str:
    return (await writer.run(f"Write a poem about: {topic}")).text

Veja python/samples/03-workflows/functional/agent_integration.py para um exemplo completo.

.as_agent() — Utilização de um fluxo de trabalho como agente

Envolva a FunctionalWorkflow como um objeto compatível com agente com .as_agent():

from agent_framework import workflow

@workflow
async def poem_workflow(topic: str) -> str:
    ...

# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")

# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)

# Or use in a larger workflow or orchestration

.as_agent() retorna um FunctionalWorkflowAgent que expõe a mesma run() interface que outros objetos de agente, tornando fluxos de trabalho funcionais componíveis com qualquer sistema que aceite agentes.

Parâmetro Tipo Description
name str | None Nome de exibição para o agente. Por defeito, o nome do fluxo de trabalho.

Veja python/samples/03-workflows/functional/agent_integration.py como um exemplo.

Samples

Exemplos executáveis encontram-se nas seguintes pastas de exemplo:

Passos seguintes

Tópicos relacionados:

A API funcional do fluxo de trabalho não está disponível para C# neste momento.