Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
This guide helps you upgrade your Python workflows to the latest API changes introduced in version 1.0.0b251104.
Overview of Changes
This release includes two major improvements to the workflow system:
1. Consolidated Workflow Execution APIs
The workflow execution methods have been unified for simplicity:
- Unified
run_stream()andrun()methods: Replace separate checkpoint-specific methods (run_stream_from_checkpoint(),run_from_checkpoint()) - Single interface: Use
checkpoint_idparameter to resume from checkpoints instead of separate methods - Flexible checkpointing: Configure checkpoint storage at build time or override at runtime
- Clearer semantics: Mutually exclusive
message(new run) andcheckpoint_id(resume) parameters
2. Simplified Request-Response System
The request-response system has been streamlined:
- No more
RequestInfoExecutor: Executors can now send requests directly - New
@response_handlerdecorator: ReplaceRequestResponsemessage handlers - Simplified request types: No inheritance from
RequestInfoMessagerequired - Built-in capabilities: All executors automatically support request-response functionality
- Cleaner workflow graphs: Remove
RequestInfoExecutornodes from your workflows
Part 1: Unified Workflow Execution APIs
We recommend migrating to the consolidated workflow APIs first, as this forms the foundation for all workflow execution patterns.
Resuming from Checkpoints
Before (Old API):
# OLD: Separate method for checkpoint resume
async for event in workflow.run_stream_from_checkpoint(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
):
print(f"Event: {event}")
After (New API):
# NEW: Unified method with checkpoint_id parameter
async for event in workflow.run_stream(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage # Optional if configured at build time
):
print(f"Event: {event}")
Key differences:
- Use
checkpoint_idparameter instead of separate method - Cannot provide both
messageandcheckpoint_id(mutually exclusive) - Must provide either
message(new run) orcheckpoint_id(resume) checkpoint_storageis optional if checkpointing was configured at build time
Non-Streaming API
The non-streaming run() method follows the same pattern:
Old:
result = await workflow.run_from_checkpoint(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
)
New:
result = await workflow.run(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage # Optional if configured at build time
)
Checkpoint Resume with Pending Requests
Important Breaking Change: When resuming from a checkpoint that has pending RequestInfoEvent objects, the new API re-emits these events automatically. You must capture and respond to them.
Before (Old Behavior):
# OLD: Could provide responses directly during resume
responses = {
"request-id-1": "user response data",
"request-id-2": "another response"
}
async for event in workflow.run_stream_from_checkpoint(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage,
responses=responses # No longer supported
):
print(f"Event: {event}")
After (New Behavior):
# NEW: Capture re-emitted pending requests
requests: dict[str, Any] = {}
async for event in workflow.run_stream(checkpoint_id="checkpoint-id"):
if event.type == "request_info":
# Pending requests are automatically re-emitted
print(f"Pending request re-emitted: {event.request_id}")
requests[event.request_id] = event.data
# Collect user responses
responses: dict[str, Any] = {}
for request_id, request_data in requests.items():
response = handle_request(request_data) # Your logic here
responses[request_id] = response
# Send responses back to workflow
async for event in workflow.send_responses_streaming(responses):
if event.type == "output":
print(f"Workflow output: {event.data}")
Complete Human-in-the-Loop Example
Here's a complete example showing checkpoint resume with pending human approval:
from agent_framework import (
Executor,
FileCheckpointStorage,
WorkflowBuilder,
handler,
response_handler,
)
# ... (Executor definitions omitted for brevity)
async def run_interactive_session(
workflow: Workflow,
initial_message: str | None = None,
checkpoint_id: str | None = None,
) -> str:
"""Run workflow until completion, handling human input interactively."""
requests: dict[str, HumanApprovalRequest] = {}
responses: dict[str, str] | None = None
completed_output: str | None = None
while True:
# Determine which API to call
if responses:
# Send responses from previous iteration
event_stream = workflow.send_responses_streaming(responses)
requests.clear()
responses = None
else:
# Start new run or resume from checkpoint
if initial_message:
event_stream = workflow.run_stream(initial_message)
elif checkpoint_id:
event_stream = workflow.run_stream(checkpoint_id=checkpoint_id)
else:
raise ValueError("Either initial_message or checkpoint_id required")
# Process events
async for event in event_stream:
if event.type == "status":
print(event)
if event.type == "output":
completed_output = event.data
if event.type == "request_info":
if isinstance(event.data, HumanApprovalRequest):
requests[event.request_id] = event.data
# Check completion
if completed_output:
break
# Prompt for user input if we have pending requests
if requests:
responses = prompt_for_responses(requests)
continue
raise RuntimeError("Workflow stopped without completing or requesting input")
return completed_output
Part 2: Simplified Request-Response System
After migrating to the unified workflow APIs, update your request-response patterns to use the new integrated system.
1. Update Imports
Before:
from agent_framework import (
RequestInfoExecutor,
RequestInfoMessage,
RequestResponse,
# ... other imports
)
After:
from agent_framework import (
response_handler,
# ... other imports
# Remove: RequestInfoExecutor, RequestInfoMessage, RequestResponse
)
2. Update Request Types
Before:
from dataclasses import dataclass
from agent_framework import RequestInfoMessage
@dataclass
class UserApprovalRequest(RequestInfoMessage):
"""Request for user approval."""
prompt: str = ""
context: str = ""
After:
from dataclasses import dataclass
@dataclass
class UserApprovalRequest:
"""Request for user approval."""
prompt: str = ""
context: str = ""
3. Update Workflow Graph
Before:
# Old pattern: Required RequestInfoExecutor in workflow
approval_executor = ApprovalRequiredExecutor(id="approval")
request_info_executor = RequestInfoExecutor(id="request_info")
workflow = (
WorkflowBuilder(start_executor=approval_executor)
.add_edge(approval_executor, request_info_executor)
.add_edge(request_info_executor, approval_executor)
.build()
)
After:
# New pattern: Direct request-response capabilities
approval_executor = ApprovalRequiredExecutor(id="approval")
workflow = (
WorkflowBuilder(start_executor=approval_executor)
.build()
)
4. Update Request Sending
Before:
class ApprovalRequiredExecutor(Executor):
@handler
async def process(self, message: str, ctx: WorkflowContext[UserApprovalRequest]) -> None:
request = UserApprovalRequest(
prompt=f"Please approve: {message}",
context="Important operation"
)
await ctx.send_message(request)
After:
class ApprovalRequiredExecutor(Executor):
@handler
async def process(self, message: str, ctx: WorkflowContext) -> None:
request = UserApprovalRequest(
prompt=f"Please approve: {message}",
context="Important operation"
)
await ctx.request_info(request_data=request, response_type=bool)
5. Update Response Handling
Before:
class ApprovalRequiredExecutor(Executor):
@handler
async def handle_approval(
self,
response: RequestResponse[UserApprovalRequest, bool],
ctx: WorkflowContext[Never, str]
) -> None:
if response.data:
await ctx.yield_output("Approved!")
else:
await ctx.yield_output("Rejected!")
After:
class ApprovalRequiredExecutor(Executor):
@response_handler
async def handle_approval(
self,
original_request: UserApprovalRequest,
approved: bool,
ctx: WorkflowContext
) -> None:
if approved:
await ctx.yield_output("Approved!")
else:
await ctx.yield_output("Rejected!")
Summary of Benefits
Unified Workflow APIs
- Simplified Interface: Single method for initial runs and checkpoint resume
- Clearer Semantics: Mutually exclusive parameters make intent explicit
- Flexible Checkpointing: Configure at build time or override at runtime
- Reduced Cognitive Load: Fewer methods to remember and maintain
Request-Response System
- Simplified Architecture: No need for separate
RequestInfoExecutorcomponents - Type Safety: Direct type specification in
request_info()calls - Cleaner Code: Fewer imports and simpler workflow graphs
- Better Performance: Reduced message routing overhead
- Enhanced Debugging: Clearer execution flow and error handling
Testing Your Migration
Part 1 Checklist: Workflow APIs
- Update API Calls: Replace
run_stream_from_checkpoint()withrun_stream(checkpoint_id=...) - Update API Calls: Replace
run_from_checkpoint()withrun(checkpoint_id=...) - Remove
responsesparameter: Delete anyresponsesarguments from checkpoint resume calls - Add event capture: Implement logic to capture re-emitted request_info events (
event.type == "request_info") - Test checkpoint resume: Verify pending requests are re-emitted and handled correctly
Part 2 Checklist: Request-Response System
- Verify Imports: Ensure no old imports remain (
RequestInfoExecutor,RequestInfoMessage,RequestResponse) - Check Request Types: Confirm removal of
RequestInfoMessageinheritance - Test Workflow Graph: Verify removal of
RequestInfoExecutornodes - Validate Handlers: Ensure
@response_handlerdecorators are applied - Test End-to-End: Run complete workflow scenarios
Next Steps
After completing the migration:
- Review the updated Requests and Responses Tutorial
- Explore advanced patterns in the User Guide
- Check out updated samples in the repository
For additional help, refer to the Agent Framework documentation or reach out to the team and community.