Hello ASHLIN GABRIEL RAJAN,
Welcome to the Microsoft Q&A and thank you for posting your questions here.
I understand that you would like to know how you can handle the Human in the loop for concurrent agents and topic-subscription based scenarios.
To effectively integrate Human in the Loop (HIL) in concurrent and topic subscription based AutoGen solutions, consider the following approach:
- Work on your design or architecture by:
- Implement a dedicated queue or decision manager that routes requests to humans asynchronously.
- Instead of a simple event bus , consider Azure Service Bus, RabbitMQ, or Kafka for better handling of concurrent agent communication. - https://learn.microsoft.com/en us/azure/service bus messaging/service bus messaging overview
- Use state management solutions (Redis, CosmosDB) to track pending requests and avoid conflicts.
- Let your APIs follow a callback based model to handle human intervention efficiently. For an example API Flow for Asynchronous Human Review
- Agent publishes a task needing human validation to a queue and human receives a notification (via Teams, Email, Slack, or a UI ).
- If human doesn't respond within X minutes , agent takes a default action.
- Agents receive the response and proceed with execution . This is a REST API code snippet example with Webhooks for HIL:
from flask import Flask, request, jsonify from threading import Thread import time app = Flask(__name__) Simulated Decision Storage (Can be replaced with Redis or a DB) pending_decisions = {} @app.route('/request_decision', methods=['POST']) def request_decision(): data = request.json task_id = data['task_id'] Store pending decision (Simulate async request to human) pending_decisions[task_id] = "Pending" print(f"Human review needed for task {task_id}") return jsonify({"message": "Decision request sent", "task_id": task_id}) @app.route('/submit_decision', methods=['POST']) def submit_decision(): data = request.json task_id = data['task_id'] decision = data['decision'] if task_id in pending_decisions: pending_decisions[task_id] = decision return jsonify({"message": "Decision received", "task_id": task_id}) return jsonify({"error": "Task ID not found"}), 400 Background process to check decision status def monitor_decisions(): while True: for task_id, status in pending_decisions.items(): if status == "Pending": print(f"Task {task_id} still awaiting human decision...") time.sleep(5) Thread(target=monitor_decisions, daemon=True).start() if __name__ == '__main__': app.run(debug=True)
- The next is to enhance AutoGen Implementation, instead of the basic event bus, integrate a proper queue system:
The above shows, you can use a queue for pending human decisions, instead of assuming immediate response which supports scaling to multiple human reviewers if needed.from autogen import Agent import queue Simulated Human Decision Queue human_review_queue = queue.Queue() class TaskAgent(Agent): def on_event(self, event): if event.topic == 'task_requires_human_review': human_review_queue.put(event.data) print(f"Task {event.data['task_id']} awaiting human review.") class HumanAgent(Agent): def on_event(self, event): if event.topic == 'human_review_completed': decision = event.data.get('decision') print(f"Human made decision: {decision}")
- Check out the below resources and references to add for more details:
- https://microsoft.github.io/autogen/stable/api reference/index.html
- https://learn.microsoft.com/en us/azure/architecture/best practices/api design
- https://learn.microsoft.com/en us/azure/event grid/event handlers
- https://www.typhoon hil.com/documentation/typhoon hil api documentation/hil_api.html
- https://learn.microsoft.com/en us/azure/service bus messaging/service bus messaging overview
- https://developer.mozilla.org/en US/docs/Web/API/WebSockets_API
I hope this is helpful! Do not hesitate to let me know if you have any other questions or clarifications.
Please don't forget to close up the thread here by upvoting and accept it as an answer if it is helpful.