diff --git a/src/routes/v2/modelRouter.py b/src/routes/v2/modelRouter.py index 16c06870..a12f4c5a 100644 --- a/src/routes/v2/modelRouter.py +++ b/src/routes/v2/modelRouter.py @@ -24,11 +24,25 @@ async def auth_and_rate_limit(request: Request): async def chat_completion(request: Request, db_config: dict = Depends(add_configuration_data_to_body)): request.state.is_playground = False request.state.version = 2 - if db_config.get('orchestrator_id'): - result = await orchestrator_chat(request) - return result data_to_send = await make_request_data(request) response_format = data_to_send.get('body',{}).get('configuration', {}).get('response_format', {}) + + if db_config.get('orchestrator_id'): + # If orchestrator_id exists and response_format is non-default, use queue + if response_format and response_format.get('type') != 'default': + try: + # Publish the orchestrator message to the queue + await queue_obj.publish_message(data_to_send) + return {"success": True, "message": "Your response will be sent through configured means."} + except Exception as e: + # Log the error and return a meaningful error response + logger.error(f"Failed to publish orchestrator message: {str(e)}") + raise HTTPException(status_code=500, detail="Failed to publish orchestrator message.") + else: + # Direct orchestrator call for default response format + result = await orchestrator_chat(request) + return result + if response_format and response_format.get('type') != 'default': try: # Publish the message to the queue diff --git a/src/services/commonServices/common.py b/src/services/commonServices/common.py index 9d6a56e5..5fc31280 100644 --- a/src/services/commonServices/common.py +++ b/src/services/commonServices/common.py @@ -151,7 +151,11 @@ async def chat(request_body): @handle_exceptions async def orchestrator_chat(request_body): try: - body = await request_body.json() + # Handle both API request (with .json() method) and queue message (dict) + if hasattr(request_body, 'json'): + body = await request_body.json() + else: + body = request_body.get('body', {}) # Extract user query from the request user = body.get('user') thread_id = body.get('thread_id') diff --git a/src/services/commonServices/queueService/queueService.py b/src/services/commonServices/queueService/queueService.py index e4d7a1cb..32885cf9 100644 --- a/src/services/commonServices/queueService/queueService.py +++ b/src/services/commonServices/queueService/queueService.py @@ -1,7 +1,7 @@ import asyncio import json from config import Config -from src.services.commonServices.common import chat, image +from src.services.commonServices.common import chat, image, orchestrator_chat from aio_pika.abc import AbstractIncomingMessage from src.services.utils.logger import logger from src.services.utils.common_utils import process_background_tasks @@ -24,7 +24,21 @@ def __init__(self): async def process_messages(self, messages): """Implement your batch processing logic here.""" - type = messages.get("body",{}).get('configuration',{}).get('type') + # Check if this is an orchestrator request + body = messages.get("body", {}) + + # Check for orchestrator indicators in the request + has_orchestrator_id = body.get('master_agent_id') or body.get('orchestrator_id') + has_agent_configurations = body.get('agent_configurations') or body.get('master_agent_config') + + # If it looks like an orchestrator request, handle it accordingly + if has_orchestrator_id and has_agent_configurations: + # Call orchestrator_chat same as chat - both expect messages format + await orchestrator_chat(messages) + return + + # Handle regular chat/image requests + type = body.get('configuration', {}).get('type') if type == 'image': await image(messages) return