Condividi tramite


Flussi di lavoro con AG-UI

Annotazioni

Il supporto del flusso di lavoro per l'integrazione .NET AG-UI sarà presto disponibile.

Questa esercitazione illustra come esporre i flussi di lavoro di Agent Framework tramite un endpoint AG-UI. I flussi di lavoro orchestrano più agenti e strumenti in un grafo di esecuzione definito, e l'integrazione AG-UI trasmette eventi avanzati dei flussi di lavoro — come il tracciamento dei passi, gli snapshot delle attività, le interruzioni e gli eventi personalizzati — ai client web in tempo reale.

Prerequisiti

Prima di iniziare, assicurati di avere:

  • Python 3.10 o versione successiva
  • agent-framework-ag-ui installato
  • Familiarità con l'esercitazione introduttiva
  • Conoscenza di base dei flussi di lavoro di Agent Framework

Quando usare flussi di lavoro con AG-UI

Usare un flusso di lavoro anziché un singolo agente quando necessario:

  • Orchestrazione multi-agente: Smistare le attività fra agenti specializzati (ad esempio, valutazione → rimborso → ordine)
  • Passaggi di esecuzione strutturati: tenere traccia dello stato di avanzamento nelle fasi definite con STEP_STARTED / STEP_FINISHED eventi
  • Interrompere/riprendere i flussi: sospendere l'esecuzione per raccogliere l'input umano o le approvazioni, quindi riprendere
  • Streaming di eventi personalizzati: generare eventi specifici del dominio (request_info, status, workflow_output) al client

Avvolgimento di un flusso di lavoro con AgentFrameworkWorkflow

AgentFrameworkWorkflow è un wrapper leggero che adatta un elemento nativo Workflow al protocollo AG-UI. È possibile fornire un'istanza del flusso di lavoro predefinita o una factory che crea un nuovo flusso di lavoro per ogni thread.

Istanza diretta

Usare un'istanza diretta quando un singolo oggetto di flusso di lavoro può gestire in modo sicuro tutte le richieste, ad esempio pipeline stateless:

from agent_framework import Workflow
from agent_framework.ag_ui import AgentFrameworkWorkflow

workflow = build_my_workflow()  # returns a Workflow

ag_ui_workflow = AgentFrameworkWorkflow(
    workflow=workflow,
    name="my-workflow",
    description="Single-instance workflow.",
)

Factory con ambito thread

Usare workflow_factory quando ogni thread di conversazione necessita del proprio stato del flusso di lavoro. La fabbrica riceve thread_id e restituisce un nuovo Workflow.

from agent_framework.ag_ui import AgentFrameworkWorkflow

ag_ui_workflow = AgentFrameworkWorkflow(
    workflow_factory=lambda thread_id: build_my_workflow(),
    name="my-workflow",
    description="Thread-scoped workflow.",
)

Importante

È necessario passare oworkflowoppureworkflow_factory, non entrambi. Il wrapper genera un oggetto ValueError se vengono forniti entrambi.

Registrazione dell'endpoint

Registrare il flusso di lavoro con add_agent_framework_fastapi_endpoint lo stesso modo in cui si registra un singolo agente:

from fastapi import FastAPI
from agent_framework.ag_ui import (
    AgentFrameworkWorkflow,
    add_agent_framework_fastapi_endpoint,
)

app = FastAPI(title="Workflow AG-UI Server")

ag_ui_workflow = AgentFrameworkWorkflow(
    workflow_factory=lambda thread_id: build_my_workflow(),
    name="handoff-demo",
    description="Multi-agent handoff workflow.",
)

add_agent_framework_fastapi_endpoint(
    app=app,
    agent=ag_ui_workflow,
    path="/workflow",
)

È anche possibile passare direttamente un semplice Workflow — l'endpoint lo avvolge automaticamente in AgentFrameworkWorkflow:

add_agent_framework_fastapi_endpoint(app, my_workflow, "/workflow")

eventi AG-UI generati dai flussi di lavoro

Le esecuzioni del flusso di lavoro generano un set più completo di eventi AG-UI rispetto alle esecuzioni con agente singolo:

Event Quando viene emesso Descrizione
RUN_STARTED Inizio del processo Contrassegna l'inizio dell'esecuzione del flusso di lavoro
STEP_STARTED Inizia un executor o un superstep step_name identifica l'agente o il passaggio (ad esempio, "triage_agent")
TEXT_MESSAGE_* L'agente produce testo Eventi standard di streaming di testo
TOOL_CALL_* Agent richiama uno strumento Eventi di chiamata agli strumenti standard
STEP_FINISHED Completa un executor o un superstep Chiude il passaggio per il rilevamento dello stato dell'interfaccia utente
CUSTOM (status) Modifiche dello stato del flusso di lavoro Contiene {"state": "<value>"} nel valore dell'evento
CUSTOM (request_info) Il flusso di lavoro richiede l'input umano Contiene il payload della richiesta affinché il client possa visualizzare un prompt
CUSTOM (workflow_output) Il flusso di lavoro produce risultati Contiene i dati di output finali o intermedi
RUN_FINISHED Esecuzione completata Può includere interrupts se il flusso di lavoro è in attesa di input

I client possono usare gli eventi STEP_STARTED / STEP_FINISHED per mostrare gli indicatori di avanzamento che mostrano quale agente è attualmente attivo.

Interrompi e riprendi

I flussi di lavoro possono sospendere l'esecuzione per raccogliere l'input umano o le approvazioni degli strumenti. L'integrazione AG-UI gestisce questa operazione tramite il protocollo interrupt/resume.

Funzionamento degli interrupt

  1. Durante l'esecuzione, il flusso di lavoro genera una richiesta in sospeso( ad esempio, una HandoffAgentUserRequest richiesta di altri dettagli o uno strumento con approval_mode="always_require").

  2. Il bridge AG-UI genera un CUSTOM evento contenente name="request_info" i dati della richiesta.

  3. L'esecuzione termina con un evento RUN_FINISHED il cui campo interrupts contiene un elenco di oggetti di richiesta in sospeso.

    {
      "type": "RUN_FINISHED",
      "threadId": "abc123",
      "runId": "run_xyz",
      "interrupts": [
        {
          "id": "request-id-1",
          "value": { "request_type": "HandoffAgentUserRequest", "data": "..." }
        }
      ]
    }
    
  4. Il client esegue il rendering dell'interfaccia utente per consentire all'utente di rispondere (un input di testo, un pulsante di approvazione e così via).

Come funziona il curriculum

Il client invia una nuova richiesta con il payload contenente le resume risposte dell'utente identificate dall'ID dell'interrupt:

{
  "threadId": "abc123",
  "messages": [],
  "resume": {
    "interrupts": [
      {
        "id": "request-id-1",
        "value": "User's response text or approval decision"
      }
    ]
  }
}

Il server converte il payload resume in risposte del flusso di lavoro e continua l'esecuzione da dove è stata sospesa.

Esempio completo: Flusso di lavoro di trasferimento multi-agente

Questo esempio mostra un flusso di lavoro di supporto clienti con tre agenti che si consegnano l'uno all'altro, usano strumenti che richiedono l'approvazione e richiedono l'input umano quando necessario.

Definire gli agenti e gli strumenti

"""AG-UI workflow server with multi-agent handoff."""

import os

from agent_framework import Agent, Message, Workflow, tool
from agent_framework.ag_ui import (
    AgentFrameworkWorkflow,
    add_agent_framework_fastapi_endpoint,
)
from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.orchestrations import HandoffBuilder
from azure.identity import AzureCliCredential
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware


@tool(approval_mode="always_require")
def submit_refund(refund_description: str, amount: str, order_id: str) -> str:
    """Capture a refund request for manual review before processing."""
    return f"Refund recorded for order {order_id} (amount: {amount}): {refund_description}"


@tool(approval_mode="always_require")
def submit_replacement(order_id: str, shipping_preference: str, replacement_note: str) -> str:
    """Capture a replacement request for manual review before processing."""
    return f"Replacement recorded for order {order_id} (shipping: {shipping_preference}): {replacement_note}"


@tool(approval_mode="never_require")
def lookup_order_details(order_id: str) -> dict[str, str]:
    """Return order details for a given order ID."""
    return {
        "order_id": order_id,
        "item_name": "Wireless Headphones",
        "amount": "$129.99",
        "status": "delivered",
    }

Costruire il flusso di lavoro

def create_handoff_workflow() -> Workflow:
    """Build a handoff workflow with triage, refund, and order agents."""
    client = AzureOpenAIResponsesClient(
        project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
        deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
        credential=AzureCliCredential(),
    )

    triage = Agent(id="triage_agent", name="triage_agent", instructions="...", client=client)
    refund = Agent(id="refund_agent", name="refund_agent", instructions="...", client=client,
                   tools=[lookup_order_details, submit_refund])
    order = Agent(id="order_agent", name="order_agent", instructions="...", client=client,
                  tools=[lookup_order_details, submit_replacement])

    def termination_condition(conversation: list[Message]) -> bool:
        for msg in reversed(conversation):
            if msg.role == "assistant" and (msg.text or "").strip().lower().endswith("case complete."):
                return True
        return False

    builder = HandoffBuilder(
        name="support_workflow",
        participants=[triage, refund, order],
        termination_condition=termination_condition,
    )
    builder.add_handoff(triage, [refund], description="Route refund requests.")
    builder.add_handoff(triage, [order], description="Route replacement requests.")
    builder.add_handoff(refund, [order], description="Route to order after refund.")
    builder.add_handoff(order, [triage], description="Route back after completion.")

    return builder.with_start_agent(triage).build()

Creare l'app FastAPI

app = FastAPI(title="Workflow AG-UI Demo")
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

ag_ui_workflow = AgentFrameworkWorkflow(
    workflow_factory=lambda _thread_id: create_handoff_workflow(),
    name="support_workflow",
    description="Customer support handoff workflow.",
)

add_agent_framework_fastapi_endpoint(
    app=app,
    agent=ag_ui_workflow,
    path="/support",
)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8888)

Sequenza di eventi

Un'interazione a più turni tipica produce eventi come:

RUN_STARTED           threadId=abc123
STEP_STARTED          stepName=triage_agent
TEXT_MESSAGE_START     role=assistant
TEXT_MESSAGE_CONTENT   delta="I'll look into your refund..."
TEXT_MESSAGE_END
STEP_FINISHED         stepName=triage_agent
STEP_STARTED          stepName=refund_agent
TOOL_CALL_START       toolCallName=lookup_order_details
TOOL_CALL_ARGS        delta='{"order_id":"12345"}'
TOOL_CALL_END
TOOL_CALL_START       toolCallName=submit_refund
TOOL_CALL_ARGS        delta='{"order_id":"12345","amount":"$129.99",...}'
TOOL_CALL_END
RUN_FINISHED          interrupts=[{id: "...", value: {function_approval_request}}]

Il client può quindi visualizzare una finestra di dialogo di approvazione e riprendere con la decisione dell'utente.

Passaggi successivi

Risorse aggiuntive