Condividi tramite


Guida all'aggiornamento: API del flusso di lavoro e sistema di Request-Response

Questa guida consente di aggiornare i flussi di lavoro Python alle modifiche api più recenti introdotte nella versione 1.0.0b251104.

Panoramica delle modifiche

Questa versione include due importanti miglioramenti al sistema del flusso di lavoro:

1. API di esecuzione del flusso di lavoro consolidate

I metodi di esecuzione del flusso di lavoro sono stati unificati per semplicità:

  • Metodi unificati run_stream() erun(): sostituire metodi specifici del checkpoint separati (run_stream_from_checkpoint(), run_from_checkpoint())
  • Interfaccia singola: usare il checkpoint_id parametro per riprendere da checkpoint anziché metodi separati
  • Checkpoint flessibili: configurare la memorizzazione dei checkpoint in fase di compilazione o sovrascrivere in fase di esecuzione
  • Semantica più chiara: parametri mutuamente esclusivi message (nuova esecuzione) e checkpoint_id (ripresa)

2. Sistema Request-Response semplificato

Il sistema di richiesta-risposta è stato semplificato:

  • Non più RequestInfoExecutor: gli executor possono ora inviare richieste direttamente
  • Nuovo @response_handler decorator: sostituire RequestResponse i gestori di eventi
  • Tipi di richiesta semplificati: nessuna ereditarietà da RequestInfoMessage richiesta
  • Funzionalità predefinite: tutti gli executor supportano automaticamente la funzionalità request-response
  • Grafici del flusso di lavoro più puliti: rimuovere RequestInfoExecutor nodi dai flussi di lavoro

Parte 1: API di esecuzione unificata del flusso di lavoro

È consigliabile eseguire prima la migrazione alle API del flusso di lavoro consolidate, perché costituisce la base per tutti i modelli di esecuzione del flusso di lavoro.

Riprendere dai checkpoint

Prima (API precedente):

# OLD: Separate method for checkpoint resume
async for event in workflow.run_stream_from_checkpoint(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage
):
    print(f"Event: {event}")

Dopo (nuova API):

# NEW: Unified method with checkpoint_id parameter
async for event in workflow.run_stream(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage  # Optional if configured at build time
):
    print(f"Event: {event}")

Differenze principali:

  • Usare checkpoint_id il parametro anziché un metodo separato
  • Non può fornire sia message che checkpoint_id (che si escludono a vicenda)
  • Deve fornire message (nuova esecuzione) o checkpoint_id (ripresa)
  • checkpoint_storage è facoltativo se il checkpoint è stato configurato in fase di compilazione

API non di streaming

Il metodo non di streaming run() segue lo stesso modello:

Vecchio:

result = await workflow.run_from_checkpoint(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage
)

Nuovo:

result = await workflow.run(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage  # Optional if configured at build time
)

Ripristina punto di controllo con richieste in sospeso

Modifica importante che causa un'interruzione: quando si riprende da un checkpoint con oggetti in sospeso RequestInfoEvent , la nuova API genera nuovamente questi eventi automaticamente. Devi catturarli e rispondere.

Prima (comportamento passato):

# OLD: Could provide responses directly during resume
responses = {
    "request-id-1": "user response data",
    "request-id-2": "another response"
}

async for event in workflow.run_stream_from_checkpoint(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage,
    responses=responses  # No longer supported
):
    print(f"Event: {event}")

Dopo (nuovo comportamento):

# NEW: Capture re-emitted pending requests
requests: dict[str, Any] = {}

async for event in workflow.run_stream(checkpoint_id="checkpoint-id"):
    if event.type == "request_info":
        # Pending requests are automatically re-emitted
        print(f"Pending request re-emitted: {event.request_id}")
        requests[event.request_id] = event.data

# Collect user responses
responses: dict[str, Any] = {}
for request_id, request_data in requests.items():
    response = handle_request(request_data)  # Your logic here
    responses[request_id] = response

# Send responses back to workflow
async for event in workflow.send_responses_streaming(responses):
    if event.type == "output":
        print(f"Workflow output: {event.data}")

Esempio completo di human-in-the-loop

Ecco un esempio completo che mostra la ripresa del checkpoint con l'approvazione umana in attesa.

from agent_framework import (
    Executor,
    FileCheckpointStorage,
    WorkflowBuilder,
    handler,
    response_handler,
)

# ... (Executor definitions omitted for brevity)

async def run_interactive_session(
    workflow: Workflow,
    initial_message: str | None = None,
    checkpoint_id: str | None = None,
) -> str:
    """Run workflow until completion, handling human input interactively."""

    requests: dict[str, HumanApprovalRequest] = {}
    responses: dict[str, str] | None = None
    completed_output: str | None = None

    while True:
        # Determine which API to call
        if responses:
            # Send responses from previous iteration
            event_stream = workflow.send_responses_streaming(responses)
            requests.clear()
            responses = None
        else:
            # Start new run or resume from checkpoint
            if initial_message:
                event_stream = workflow.run_stream(initial_message)
            elif checkpoint_id:
                event_stream = workflow.run_stream(checkpoint_id=checkpoint_id)
            else:
                raise ValueError("Either initial_message or checkpoint_id required")

        # Process events
        async for event in event_stream:
            if event.type == "status":
                print(event)
            if event.type == "output":
                completed_output = event.data
            if event.type == "request_info":
                if isinstance(event.data, HumanApprovalRequest):
                    requests[event.request_id] = event.data

        # Check completion
        if completed_output:
            break

        # Prompt for user input if we have pending requests
        if requests:
            responses = prompt_for_responses(requests)
            continue

        raise RuntimeError("Workflow stopped without completing or requesting input")

    return completed_output

Parte 2: Sistema Request-Response semplificato

Dopo la migrazione alle API del flusso di lavoro unificato, aggiornare i modelli di richiesta-risposta per usare il nuovo sistema integrato.

1. Aggiornare le importazioni

Before:

from agent_framework import (
    RequestInfoExecutor,
    RequestInfoMessage,
    RequestResponse,
    # ... other imports
)

After:

from agent_framework import (
    response_handler,
    # ... other imports
    # Remove: RequestInfoExecutor, RequestInfoMessage, RequestResponse
)

2. Aggiornare i tipi di richiesta

Before:

from dataclasses import dataclass
from agent_framework import RequestInfoMessage

@dataclass
class UserApprovalRequest(RequestInfoMessage):
    """Request for user approval."""
    prompt: str = ""
    context: str = ""

After:

from dataclasses import dataclass

@dataclass
class UserApprovalRequest:
    """Request for user approval."""
    prompt: str = ""
    context: str = ""

3. Aggiornare il grafico del flusso di lavoro

Before:

# Old pattern: Required RequestInfoExecutor in workflow
approval_executor = ApprovalRequiredExecutor(id="approval")
request_info_executor = RequestInfoExecutor(id="request_info")

workflow = (
    WorkflowBuilder(start_executor=approval_executor)
    .add_edge(approval_executor, request_info_executor)
    .add_edge(request_info_executor, approval_executor)
    .build()
)

After:

# New pattern: Direct request-response capabilities
approval_executor = ApprovalRequiredExecutor(id="approval")

workflow = (
    WorkflowBuilder(start_executor=approval_executor)
    .build()
)

4. Aggiornare l'invio di richieste

Before:

class ApprovalRequiredExecutor(Executor):
    @handler
    async def process(self, message: str, ctx: WorkflowContext[UserApprovalRequest]) -> None:
        request = UserApprovalRequest(
            prompt=f"Please approve: {message}",
            context="Important operation"
        )
        await ctx.send_message(request)

After:

class ApprovalRequiredExecutor(Executor):
    @handler
    async def process(self, message: str, ctx: WorkflowContext) -> None:
        request = UserApprovalRequest(
            prompt=f"Please approve: {message}",
            context="Important operation"
        )
        await ctx.request_info(request_data=request, response_type=bool)

5. Aggiornare la gestione delle risposte

Before:

class ApprovalRequiredExecutor(Executor):
    @handler
    async def handle_approval(
        self,
        response: RequestResponse[UserApprovalRequest, bool],
        ctx: WorkflowContext[Never, str]
    ) -> None:
        if response.data:
            await ctx.yield_output("Approved!")
        else:
            await ctx.yield_output("Rejected!")

After:

class ApprovalRequiredExecutor(Executor):
    @response_handler
    async def handle_approval(
        self,
        original_request: UserApprovalRequest,
        approved: bool,
        ctx: WorkflowContext
    ) -> None:
        if approved:
            await ctx.yield_output("Approved!")
        else:
            await ctx.yield_output("Rejected!")

Riepilogo dei vantaggi

API del flusso di lavoro unificato

  1. Interfaccia semplificata: metodo singolo per le esecuzioni iniziali e la ripresa del checkpoint
  2. Semantica più chiara: i parametri che si escludono a vicenda rendono esplicita la finalità
  3. Punto di controllo flessibile: configurare in fase di compilazione o sovrascrivere in fase di esecuzione
  4. Riduzione del carico cognitivo: meno metodi da ricordare e gestire

sistema Richiesta-Risposta

  1. Architettura semplificata: nessuna necessità di componenti separati RequestInfoExecutor
  2. Sicurezza dei tipi: specificazione diretta del tipo nelle chiamate request_info()
  3. Codice più pulito: meno importazioni e grafici del flusso di lavoro più semplici
  4. Prestazioni migliori: riduzione del sovraccarico di routing dei messaggi
  5. Debug avanzato: flusso di esecuzione più chiaro e gestione degli errori

Test della migrazione

Elenco di controllo parte 1: API del flusso di lavoro

  1. Aggiornare le chiamate API: sostituire run_stream_from_checkpoint() con run_stream(checkpoint_id=...)
  2. Aggiornare le chiamate API: sostituire run_from_checkpoint() con run(checkpoint_id=...)
  3. Rimuovi responses parametro: eliminare tutti responses gli argomenti dalle chiamate di ripristino del checkpoint
  4. Aggiungere l'acquisizione di eventi: implementare la logica per acquisire eventi di request_info generati nuovamente (event.type == "request_info")
  5. Ripresa del checkpoint di test: verificare che le richieste in sospeso vengano ricreate e gestite correttamente

Elenco di controllo parte 2: sistema Request-Response

  1. Verificare le importazioni: assicurarsi che nessuna importazione precedente rimanga (RequestInfoExecutor, RequestInfoMessage, RequestResponse)
  2. Controllare i tipi di richiesta: confermare la rimozione dell'ereditarietà RequestInfoMessage
  3. Grafo del flusso di lavoro di test: verificare la rimozione dei RequestInfoExecutor nodi
  4. Convalida gestori: assicurarsi che @response_handler gli elementi Decorator siano applicati
  5. Test end-to-end: eseguire scenari di flusso di lavoro completi

Passaggi successivi

Al termine della migrazione:

  1. Esaminare l'esercitazione sulle richieste e le risposte aggiornate
  2. Esplorare i modelli avanzati nella Guida per l'utente
  3. Vedere gli esempi aggiornati nel repository

Per altre informazioni, vedere la documentazione di Agent Framework o contattare il team e la community.