Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questa guida consente di aggiornare i flussi di lavoro Python alle modifiche api più recenti introdotte nella versione 1.0.0b251104.
Panoramica delle modifiche
Questa versione include due importanti miglioramenti al sistema del flusso di lavoro:
1. API di esecuzione del flusso di lavoro consolidate
I metodi di esecuzione del flusso di lavoro sono stati unificati per semplicità:
-
Metodi unificati
run_stream()erun(): sostituire metodi specifici del checkpoint separati (run_stream_from_checkpoint(),run_from_checkpoint()) -
Interfaccia singola: usare il
checkpoint_idparametro per riprendere da checkpoint anziché metodi separati - Checkpoint flessibili: configurare la memorizzazione dei checkpoint in fase di compilazione o sovrascrivere in fase di esecuzione
-
Semantica più chiara: parametri mutuamente esclusivi
message(nuova esecuzione) echeckpoint_id(ripresa)
2. Sistema Request-Response semplificato
Il sistema di richiesta-risposta è stato semplificato:
-
Non più
RequestInfoExecutor: gli executor possono ora inviare richieste direttamente -
Nuovo
@response_handlerdecorator: sostituireRequestResponsei gestori di eventi -
Tipi di richiesta semplificati: nessuna ereditarietà da
RequestInfoMessagerichiesta - Funzionalità predefinite: tutti gli executor supportano automaticamente la funzionalità request-response
-
Grafici del flusso di lavoro più puliti: rimuovere
RequestInfoExecutornodi dai flussi di lavoro
Parte 1: API di esecuzione unificata del flusso di lavoro
È consigliabile eseguire prima la migrazione alle API del flusso di lavoro consolidate, perché costituisce la base per tutti i modelli di esecuzione del flusso di lavoro.
Riprendere dai checkpoint
Prima (API precedente):
# OLD: Separate method for checkpoint resume
async for event in workflow.run_stream_from_checkpoint(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
):
print(f"Event: {event}")
Dopo (nuova API):
# NEW: Unified method with checkpoint_id parameter
async for event in workflow.run_stream(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage # Optional if configured at build time
):
print(f"Event: {event}")
Differenze principali:
- Usare
checkpoint_idil parametro anziché un metodo separato - Non può fornire sia
messagechecheckpoint_id(che si escludono a vicenda) - Deve fornire
message(nuova esecuzione) ocheckpoint_id(ripresa) -
checkpoint_storageè facoltativo se il checkpoint è stato configurato in fase di compilazione
API non di streaming
Il metodo non di streaming run() segue lo stesso modello:
Vecchio:
result = await workflow.run_from_checkpoint(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
)
Nuovo:
result = await workflow.run(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage # Optional if configured at build time
)
Ripristina punto di controllo con richieste in sospeso
Modifica importante che causa un'interruzione: quando si riprende da un checkpoint con oggetti in sospeso RequestInfoEvent , la nuova API genera nuovamente questi eventi automaticamente. Devi catturarli e rispondere.
Prima (comportamento passato):
# OLD: Could provide responses directly during resume
responses = {
"request-id-1": "user response data",
"request-id-2": "another response"
}
async for event in workflow.run_stream_from_checkpoint(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage,
responses=responses # No longer supported
):
print(f"Event: {event}")
Dopo (nuovo comportamento):
# NEW: Capture re-emitted pending requests
requests: dict[str, Any] = {}
async for event in workflow.run_stream(checkpoint_id="checkpoint-id"):
if event.type == "request_info":
# Pending requests are automatically re-emitted
print(f"Pending request re-emitted: {event.request_id}")
requests[event.request_id] = event.data
# Collect user responses
responses: dict[str, Any] = {}
for request_id, request_data in requests.items():
response = handle_request(request_data) # Your logic here
responses[request_id] = response
# Send responses back to workflow
async for event in workflow.send_responses_streaming(responses):
if event.type == "output":
print(f"Workflow output: {event.data}")
Esempio completo di human-in-the-loop
Ecco un esempio completo che mostra la ripresa del checkpoint con l'approvazione umana in attesa.
from agent_framework import (
Executor,
FileCheckpointStorage,
WorkflowBuilder,
handler,
response_handler,
)
# ... (Executor definitions omitted for brevity)
async def run_interactive_session(
workflow: Workflow,
initial_message: str | None = None,
checkpoint_id: str | None = None,
) -> str:
"""Run workflow until completion, handling human input interactively."""
requests: dict[str, HumanApprovalRequest] = {}
responses: dict[str, str] | None = None
completed_output: str | None = None
while True:
# Determine which API to call
if responses:
# Send responses from previous iteration
event_stream = workflow.send_responses_streaming(responses)
requests.clear()
responses = None
else:
# Start new run or resume from checkpoint
if initial_message:
event_stream = workflow.run_stream(initial_message)
elif checkpoint_id:
event_stream = workflow.run_stream(checkpoint_id=checkpoint_id)
else:
raise ValueError("Either initial_message or checkpoint_id required")
# Process events
async for event in event_stream:
if event.type == "status":
print(event)
if event.type == "output":
completed_output = event.data
if event.type == "request_info":
if isinstance(event.data, HumanApprovalRequest):
requests[event.request_id] = event.data
# Check completion
if completed_output:
break
# Prompt for user input if we have pending requests
if requests:
responses = prompt_for_responses(requests)
continue
raise RuntimeError("Workflow stopped without completing or requesting input")
return completed_output
Parte 2: Sistema Request-Response semplificato
Dopo la migrazione alle API del flusso di lavoro unificato, aggiornare i modelli di richiesta-risposta per usare il nuovo sistema integrato.
1. Aggiornare le importazioni
Before:
from agent_framework import (
RequestInfoExecutor,
RequestInfoMessage,
RequestResponse,
# ... other imports
)
After:
from agent_framework import (
response_handler,
# ... other imports
# Remove: RequestInfoExecutor, RequestInfoMessage, RequestResponse
)
2. Aggiornare i tipi di richiesta
Before:
from dataclasses import dataclass
from agent_framework import RequestInfoMessage
@dataclass
class UserApprovalRequest(RequestInfoMessage):
"""Request for user approval."""
prompt: str = ""
context: str = ""
After:
from dataclasses import dataclass
@dataclass
class UserApprovalRequest:
"""Request for user approval."""
prompt: str = ""
context: str = ""
3. Aggiornare il grafico del flusso di lavoro
Before:
# Old pattern: Required RequestInfoExecutor in workflow
approval_executor = ApprovalRequiredExecutor(id="approval")
request_info_executor = RequestInfoExecutor(id="request_info")
workflow = (
WorkflowBuilder(start_executor=approval_executor)
.add_edge(approval_executor, request_info_executor)
.add_edge(request_info_executor, approval_executor)
.build()
)
After:
# New pattern: Direct request-response capabilities
approval_executor = ApprovalRequiredExecutor(id="approval")
workflow = (
WorkflowBuilder(start_executor=approval_executor)
.build()
)
4. Aggiornare l'invio di richieste
Before:
class ApprovalRequiredExecutor(Executor):
@handler
async def process(self, message: str, ctx: WorkflowContext[UserApprovalRequest]) -> None:
request = UserApprovalRequest(
prompt=f"Please approve: {message}",
context="Important operation"
)
await ctx.send_message(request)
After:
class ApprovalRequiredExecutor(Executor):
@handler
async def process(self, message: str, ctx: WorkflowContext) -> None:
request = UserApprovalRequest(
prompt=f"Please approve: {message}",
context="Important operation"
)
await ctx.request_info(request_data=request, response_type=bool)
5. Aggiornare la gestione delle risposte
Before:
class ApprovalRequiredExecutor(Executor):
@handler
async def handle_approval(
self,
response: RequestResponse[UserApprovalRequest, bool],
ctx: WorkflowContext[Never, str]
) -> None:
if response.data:
await ctx.yield_output("Approved!")
else:
await ctx.yield_output("Rejected!")
After:
class ApprovalRequiredExecutor(Executor):
@response_handler
async def handle_approval(
self,
original_request: UserApprovalRequest,
approved: bool,
ctx: WorkflowContext
) -> None:
if approved:
await ctx.yield_output("Approved!")
else:
await ctx.yield_output("Rejected!")
Riepilogo dei vantaggi
API del flusso di lavoro unificato
- Interfaccia semplificata: metodo singolo per le esecuzioni iniziali e la ripresa del checkpoint
- Semantica più chiara: i parametri che si escludono a vicenda rendono esplicita la finalità
- Punto di controllo flessibile: configurare in fase di compilazione o sovrascrivere in fase di esecuzione
- Riduzione del carico cognitivo: meno metodi da ricordare e gestire
sistema Richiesta-Risposta
-
Architettura semplificata: nessuna necessità di componenti separati
RequestInfoExecutor -
Sicurezza dei tipi: specificazione diretta del tipo nelle chiamate
request_info() - Codice più pulito: meno importazioni e grafici del flusso di lavoro più semplici
- Prestazioni migliori: riduzione del sovraccarico di routing dei messaggi
- Debug avanzato: flusso di esecuzione più chiaro e gestione degli errori
Test della migrazione
Elenco di controllo parte 1: API del flusso di lavoro
-
Aggiornare le chiamate API: sostituire
run_stream_from_checkpoint()conrun_stream(checkpoint_id=...) -
Aggiornare le chiamate API: sostituire
run_from_checkpoint()conrun(checkpoint_id=...) -
Rimuovi
responsesparametro: eliminare tuttiresponsesgli argomenti dalle chiamate di ripristino del checkpoint -
Aggiungere l'acquisizione di eventi: implementare la logica per acquisire eventi di request_info generati nuovamente (
event.type == "request_info") - Ripresa del checkpoint di test: verificare che le richieste in sospeso vengano ricreate e gestite correttamente
Elenco di controllo parte 2: sistema Request-Response
-
Verificare le importazioni: assicurarsi che nessuna importazione precedente rimanga (
RequestInfoExecutor,RequestInfoMessage,RequestResponse) -
Controllare i tipi di richiesta: confermare la rimozione dell'ereditarietà
RequestInfoMessage -
Grafo del flusso di lavoro di test: verificare la rimozione dei
RequestInfoExecutornodi -
Convalida gestori: assicurarsi che
@response_handlergli elementi Decorator siano applicati - Test end-to-end: eseguire scenari di flusso di lavoro completi
Passaggi successivi
Al termine della migrazione:
- Esaminare l'esercitazione sulle richieste e le risposte aggiornate
- Esplorare i modelli avanzati nella Guida per l'utente
- Vedere gli esempi aggiornati nel repository
Per altre informazioni, vedere la documentazione di Agent Framework o contattare il team e la community.