From da2cdc167372dd0a22bf04951cb650f37079d1e8 Mon Sep 17 00:00:00 2001 From: adityajunwal Date: Tue, 16 Dec 2025 15:43:43 +0530 Subject: [PATCH 1/7] Added the openAImiddleware and openai endpoint --- src/middlewares/openAImiddleware.py | 144 ++++++++++++++++++++++++++++ src/routes/v2/modelRouter.py | 7 ++ 2 files changed, 151 insertions(+) create mode 100644 src/middlewares/openAImiddleware.py diff --git a/src/middlewares/openAImiddleware.py b/src/middlewares/openAImiddleware.py new file mode 100644 index 00000000..8f03258e --- /dev/null +++ b/src/middlewares/openAImiddleware.py @@ -0,0 +1,144 @@ +import json +from typing import Any, Dict, List, Optional +from fastapi import HTTPException, Request +from .middleware import jwt_middleware +from .ratelimitMiddleware import rate_limit + + +def _extract_pauthkey_from_authorization(request: Request) -> str: + authorization = request.headers.get("Authorization") + if not authorization or not authorization.lower().startswith("bearer "): + raise HTTPException( + status_code=401, + detail="Authorization header with Bearer pauthkey is required.", + ) + + token = authorization.split(" ", 1)[1].strip() + if not token: + raise HTTPException(status_code=401, detail="Bearer token cannot be empty.") + return token + + +def _normalize_message_content(content: Any) -> Optional[str]: + if isinstance(content, str): + content = content.strip() + return content or None + + if isinstance(content, list): + text_parts: List[str] = [] + for item in content: + if isinstance(item, dict) and item.get("type") == "text": + text_value = (item.get("text") or "").strip() + if text_value: + text_parts.append(text_value) + merged = "\n".join(text_parts).strip() + return merged or None + + return None + + +def _extract_latest_user_message(messages: List[Dict[str, Any]]) -> Optional[str]: + for message in reversed(messages or []): + if message.get("role") != "user": + continue + normalized = _normalize_message_content(message.get("content")) + if normalized: + return normalized + return None + + +def _build_internal_body(payload: Dict[str, Any]) -> Dict[str, Any]: + model = payload.get("model") + if not isinstance(model, str): + raise HTTPException(status_code=400, detail="`model` must be provided.") + + if ":" not in model: + raise HTTPException( + status_code=400, + detail="`model` must include the agent identifier in the form gtwy-agent:.", + ) + + _, agent_id = model.split(":", 1) + agent_id = agent_id.strip() + if not agent_id: + raise HTTPException(status_code=400, detail="Invalid model identifier.") + + user_message = _extract_latest_user_message(payload.get("messages", [])) + if not user_message: + fallback = payload.get("input") or payload.get("prompt") + if isinstance(fallback, str) and fallback.strip(): + user_message = fallback.strip() + if not user_message: + raise HTTPException(status_code=400, detail="No user message found in payload.") + + metadata = payload.get("metadata") or {} + if not isinstance(metadata, dict): + metadata = {} + + configuration = payload.get("configuration") or {} + if not isinstance(configuration, dict): + configuration = {} + + response_format = payload.get("response_format") + if response_format: + if isinstance(response_format, dict): + configuration.setdefault("response_format", response_format) + elif isinstance(response_format, str): + configuration.setdefault( + "response_format", + { + "type": response_format, + "cred": {}, + }, + ) + + internal_body: Dict[str, Any] = { + "agent_id": agent_id, + "bridge_id": agent_id, + "user": user_message, + "messages": payload.get("messages", []), + "thread_id": payload.get("conversation_id") + or payload.get("thread_id") + or metadata.get("thread_id"), + "sub_thread_id": payload.get("sub_thread_id") or metadata.get("sub_thread_id"), + "variables": payload.get("variables") or metadata.get("variables") or {}, + "configuration": configuration, + "attachments": payload.get("attachments", []), + } + + return internal_body + + +def _override_request_body(request: Request, body: Dict[str, Any]) -> None: + body_bytes = json.dumps(body).encode("utf-8") + request._body = body_bytes # type: ignore[attr-defined] + request._json = body # type: ignore[attr-defined] + request._stream_consumed = True # type: ignore[attr-defined] + if "_form" in request.__dict__: + request.__dict__.pop("_form") + + +def _set_pauthkey_header(request: Request, token: str) -> None: + raw_headers = list(request.scope.get("headers", [])) + filtered_headers = [ + (name, value) + for name, value in raw_headers + if name.lower() != b"authorization" + ] + filtered_headers.append((b"pauthkey", token.encode("utf-8"))) + request.scope["headers"] = filtered_headers + if "_headers" in request.__dict__: + del request.__dict__["_headers"] + + +async def openai_middleware(request: Request): + payload = await request.json() + internal_body = _build_internal_body(payload) + token = _extract_pauthkey_from_authorization(request) + + _override_request_body(request, internal_body) + _set_pauthkey_header(request, token) + + await jwt_middleware(request) + await rate_limit(request, key_path="body.bridge_id", points=100) + await rate_limit(request, key_path="body.thread_id", points=20) \ No newline at end of file diff --git a/src/routes/v2/modelRouter.py b/src/routes/v2/modelRouter.py index cfba4525..a435931e 100644 --- a/src/routes/v2/modelRouter.py +++ b/src/routes/v2/modelRouter.py @@ -6,6 +6,7 @@ from src.services.commonServices.baseService.utils import make_request_data from ...middlewares.middleware import jwt_middleware from ...middlewares.getDataUsingBridgeId import add_configuration_data_to_body +from ...middlewares.openAImiddleware import openai_middleware from concurrent.futures import ThreadPoolExecutor from config import Config from src.services.commonServices.queueService.queueService import queue_obj @@ -57,6 +58,12 @@ async def chat_completion(request: Request, db_config: dict = Depends(add_config result = await chat_multiple_agents(data_to_send) return result +@router.post('/openai/v1/chat/completions', dependencies=[Depends(openai_middleware)]) +async def openai_sdk_chat_completion(request: Request, db_config: dict = Depends(add_configuration_data_to_body)): + """ + OpenAI SDK-compatible entrypoint that reuses the standard chat completion flow. + """ + return await chat_completion(request, db_config=db_config) @router.post('/playground/chat/completion/{bridge_id}', dependencies=[Depends(auth_and_rate_limit)]) async def playground_chat_completion_bridge(request: Request, db_config: dict = Depends(add_configuration_data_to_body)): From b398c8c7f0bfee67a046bf4bdaafc9f7637ab027 Mon Sep 17 00:00:00 2001 From: adityajunwal Date: Tue, 16 Dec 2025 18:11:01 +0530 Subject: [PATCH 2/7] Added the OpenAI response formatter --- src/middlewares/openAImiddleware.py | 1 + src/routes/v2/modelRouter.py | 59 ++++++++++++++++++++- src/services/utils/update_and_check_cost.py | 14 ++--- 3 files changed, 67 insertions(+), 7 deletions(-) diff --git a/src/middlewares/openAImiddleware.py b/src/middlewares/openAImiddleware.py index 8f03258e..656aceb9 100644 --- a/src/middlewares/openAImiddleware.py +++ b/src/middlewares/openAImiddleware.py @@ -138,6 +138,7 @@ async def openai_middleware(request: Request): _override_request_body(request, internal_body) _set_pauthkey_header(request, token) + request.state.openai_payload = payload await jwt_middleware(request) await rate_limit(request, key_path="body.bridge_id", points=100) diff --git a/src/routes/v2/modelRouter.py b/src/routes/v2/modelRouter.py index a435931e..5f48a145 100644 --- a/src/routes/v2/modelRouter.py +++ b/src/routes/v2/modelRouter.py @@ -1,6 +1,9 @@ from fastapi import APIRouter, Depends, Request, HTTPException from fastapi.responses import JSONResponse import asyncio +import uuid +import time +import json from src.services.commonServices.common import chat_multiple_agents, embedding, batch, run_testcases, image, orchestrator_chat from src.services.commonServices.baseService.utils import make_request_data @@ -23,6 +26,44 @@ async def auth_and_rate_limit(request: Request): await rate_limit(request,key_path='body.bridge_id' , points=100) await rate_limit(request,key_path='body.thread_id', points=20) +def _format_openai_response(chat_response: dict, original_payload: dict | None) -> dict: + response_data = chat_response.get("response", {}).get("data", {}) + usage_data = chat_response.get("response", {}).get("usage", {}) or {} + + message_content = response_data.get("content") + if isinstance(message_content, list): + message_content = "\n".join( + chunk.get("text", "") if isinstance(chunk, dict) else str(chunk) + for chunk in message_content + ).strip() + + finish_reason = response_data.get("finish_reason") or usage_data.get("finish_reason") + model = original_payload.get("model") if isinstance(original_payload, dict) else None + + return { + "id": f"chatcmpl-{uuid.uuid4().hex}", + "object": "chat.completion", + "created": int(time.time()), + "model": model, + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": message_content, + }, + "finish_reason": finish_reason or "stop", + "logprobs": None, + } + ], + "usage": { + "prompt_tokens": usage_data.get("input_tokens") or usage_data.get("prompt_tokens"), + "completion_tokens": usage_data.get("output_tokens") or usage_data.get("completion_tokens"), + "total_tokens": usage_data.get("total_tokens"), + }, + "system_fingerprint": None, + } + @router.post('/chat/completion', dependencies=[Depends(auth_and_rate_limit)]) async def chat_completion(request: Request, db_config: dict = Depends(add_configuration_data_to_body)): request.state.is_playground = False @@ -63,7 +104,23 @@ async def openai_sdk_chat_completion(request: Request, db_config: dict = Depends """ OpenAI SDK-compatible entrypoint that reuses the standard chat completion flow. """ - return await chat_completion(request, db_config=db_config) + internal_response = await chat_completion(request, db_config=db_config) + + if isinstance(internal_response, JSONResponse): + content = internal_response.body + try: + content_dict = json.loads(content) + except Exception: + content_dict = {} + if not content_dict.get("success", True): + raise HTTPException(status_code=500, detail=content_dict) + chat_response = content_dict + else: + chat_response = internal_response + + openai_payload = getattr(request.state, "openai_payload", {}) + formatted = _format_openai_response(chat_response, openai_payload) + return formatted @router.post('/playground/chat/completion/{bridge_id}', dependencies=[Depends(auth_and_rate_limit)]) async def playground_chat_completion_bridge(request: Request, db_config: dict = Depends(add_configuration_data_to_body)): diff --git a/src/services/utils/update_and_check_cost.py b/src/services/utils/update_and_check_cost.py index 76955b6c..b3220a36 100644 --- a/src/services/utils/update_and_check_cost.py +++ b/src/services/utils/update_and_check_cost.py @@ -127,9 +127,11 @@ async def check_bridge_api_folder_limits(result, bridge_data,version_id): if not isinstance(bridge_data, dict): return None - folder_identifier = result.get('folder_id') + result_data = result if isinstance(result, dict) else {} + + folder_identifier = result_data.get('folder_id') if folder_identifier: - folder_error = await _check_limit(limit_types['folder'],data=result,version_id=version_id) + folder_error = await _check_limit(limit_types['folder'],data=result_data,version_id=version_id) if folder_error: return folder_error @@ -137,12 +139,12 @@ async def check_bridge_api_folder_limits(result, bridge_data,version_id): if bridge_error: return bridge_error - service_identifier = result.get('service') + service_identifier = result_data.get('service') if service_identifier and ( - (result.get('apikeys') and service_identifier in result.get('apikeys', {})) or - (result.get('folder_apikeys') and service_identifier in result.get('folder_apikeys', {})) + (result_data.get('apikeys') and service_identifier in result_data.get('apikeys', {})) or + (result_data.get('folder_apikeys') and service_identifier in result_data.get('folder_apikeys', {})) ): - api_error = await _check_limit(limit_types['apikey'], data=result,version_id=version_id) + api_error = await _check_limit(limit_types['apikey'], data=result_data,version_id=version_id) if api_error: return api_error From e5f4422d21cdb51981738e23a5436c4136e277ee Mon Sep 17 00:00:00 2001 From: adityajunwal Date: Tue, 16 Dec 2025 18:47:09 +0530 Subject: [PATCH 3/7] Implement OpenAI-compatible chat completion adapter --- src/middlewares/openAImiddleware.py | 145 ++++++++++++++++++++ src/routes/v2/modelRouter.py | 64 +++++++++ src/services/utils/update_and_check_cost.py | 14 +- 3 files changed, 217 insertions(+), 6 deletions(-) create mode 100644 src/middlewares/openAImiddleware.py diff --git a/src/middlewares/openAImiddleware.py b/src/middlewares/openAImiddleware.py new file mode 100644 index 00000000..656aceb9 --- /dev/null +++ b/src/middlewares/openAImiddleware.py @@ -0,0 +1,145 @@ +import json +from typing import Any, Dict, List, Optional +from fastapi import HTTPException, Request +from .middleware import jwt_middleware +from .ratelimitMiddleware import rate_limit + + +def _extract_pauthkey_from_authorization(request: Request) -> str: + authorization = request.headers.get("Authorization") + if not authorization or not authorization.lower().startswith("bearer "): + raise HTTPException( + status_code=401, + detail="Authorization header with Bearer pauthkey is required.", + ) + + token = authorization.split(" ", 1)[1].strip() + if not token: + raise HTTPException(status_code=401, detail="Bearer token cannot be empty.") + return token + + +def _normalize_message_content(content: Any) -> Optional[str]: + if isinstance(content, str): + content = content.strip() + return content or None + + if isinstance(content, list): + text_parts: List[str] = [] + for item in content: + if isinstance(item, dict) and item.get("type") == "text": + text_value = (item.get("text") or "").strip() + if text_value: + text_parts.append(text_value) + merged = "\n".join(text_parts).strip() + return merged or None + + return None + + +def _extract_latest_user_message(messages: List[Dict[str, Any]]) -> Optional[str]: + for message in reversed(messages or []): + if message.get("role") != "user": + continue + normalized = _normalize_message_content(message.get("content")) + if normalized: + return normalized + return None + + +def _build_internal_body(payload: Dict[str, Any]) -> Dict[str, Any]: + model = payload.get("model") + if not isinstance(model, str): + raise HTTPException(status_code=400, detail="`model` must be provided.") + + if ":" not in model: + raise HTTPException( + status_code=400, + detail="`model` must include the agent identifier in the form gtwy-agent:.", + ) + + _, agent_id = model.split(":", 1) + agent_id = agent_id.strip() + if not agent_id: + raise HTTPException(status_code=400, detail="Invalid model identifier.") + + user_message = _extract_latest_user_message(payload.get("messages", [])) + if not user_message: + fallback = payload.get("input") or payload.get("prompt") + if isinstance(fallback, str) and fallback.strip(): + user_message = fallback.strip() + if not user_message: + raise HTTPException(status_code=400, detail="No user message found in payload.") + + metadata = payload.get("metadata") or {} + if not isinstance(metadata, dict): + metadata = {} + + configuration = payload.get("configuration") or {} + if not isinstance(configuration, dict): + configuration = {} + + response_format = payload.get("response_format") + if response_format: + if isinstance(response_format, dict): + configuration.setdefault("response_format", response_format) + elif isinstance(response_format, str): + configuration.setdefault( + "response_format", + { + "type": response_format, + "cred": {}, + }, + ) + + internal_body: Dict[str, Any] = { + "agent_id": agent_id, + "bridge_id": agent_id, + "user": user_message, + "messages": payload.get("messages", []), + "thread_id": payload.get("conversation_id") + or payload.get("thread_id") + or metadata.get("thread_id"), + "sub_thread_id": payload.get("sub_thread_id") or metadata.get("sub_thread_id"), + "variables": payload.get("variables") or metadata.get("variables") or {}, + "configuration": configuration, + "attachments": payload.get("attachments", []), + } + + return internal_body + + +def _override_request_body(request: Request, body: Dict[str, Any]) -> None: + body_bytes = json.dumps(body).encode("utf-8") + request._body = body_bytes # type: ignore[attr-defined] + request._json = body # type: ignore[attr-defined] + request._stream_consumed = True # type: ignore[attr-defined] + if "_form" in request.__dict__: + request.__dict__.pop("_form") + + +def _set_pauthkey_header(request: Request, token: str) -> None: + raw_headers = list(request.scope.get("headers", [])) + filtered_headers = [ + (name, value) + for name, value in raw_headers + if name.lower() != b"authorization" + ] + filtered_headers.append((b"pauthkey", token.encode("utf-8"))) + request.scope["headers"] = filtered_headers + if "_headers" in request.__dict__: + del request.__dict__["_headers"] + + +async def openai_middleware(request: Request): + payload = await request.json() + internal_body = _build_internal_body(payload) + token = _extract_pauthkey_from_authorization(request) + + _override_request_body(request, internal_body) + _set_pauthkey_header(request, token) + request.state.openai_payload = payload + + await jwt_middleware(request) + await rate_limit(request, key_path="body.bridge_id", points=100) + await rate_limit(request, key_path="body.thread_id", points=20) \ No newline at end of file diff --git a/src/routes/v2/modelRouter.py b/src/routes/v2/modelRouter.py index cfba4525..5f48a145 100644 --- a/src/routes/v2/modelRouter.py +++ b/src/routes/v2/modelRouter.py @@ -1,11 +1,15 @@ from fastapi import APIRouter, Depends, Request, HTTPException from fastapi.responses import JSONResponse import asyncio +import uuid +import time +import json from src.services.commonServices.common import chat_multiple_agents, embedding, batch, run_testcases, image, orchestrator_chat from src.services.commonServices.baseService.utils import make_request_data from ...middlewares.middleware import jwt_middleware from ...middlewares.getDataUsingBridgeId import add_configuration_data_to_body +from ...middlewares.openAImiddleware import openai_middleware from concurrent.futures import ThreadPoolExecutor from config import Config from src.services.commonServices.queueService.queueService import queue_obj @@ -22,6 +26,44 @@ async def auth_and_rate_limit(request: Request): await rate_limit(request,key_path='body.bridge_id' , points=100) await rate_limit(request,key_path='body.thread_id', points=20) +def _format_openai_response(chat_response: dict, original_payload: dict | None) -> dict: + response_data = chat_response.get("response", {}).get("data", {}) + usage_data = chat_response.get("response", {}).get("usage", {}) or {} + + message_content = response_data.get("content") + if isinstance(message_content, list): + message_content = "\n".join( + chunk.get("text", "") if isinstance(chunk, dict) else str(chunk) + for chunk in message_content + ).strip() + + finish_reason = response_data.get("finish_reason") or usage_data.get("finish_reason") + model = original_payload.get("model") if isinstance(original_payload, dict) else None + + return { + "id": f"chatcmpl-{uuid.uuid4().hex}", + "object": "chat.completion", + "created": int(time.time()), + "model": model, + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": message_content, + }, + "finish_reason": finish_reason or "stop", + "logprobs": None, + } + ], + "usage": { + "prompt_tokens": usage_data.get("input_tokens") or usage_data.get("prompt_tokens"), + "completion_tokens": usage_data.get("output_tokens") or usage_data.get("completion_tokens"), + "total_tokens": usage_data.get("total_tokens"), + }, + "system_fingerprint": None, + } + @router.post('/chat/completion', dependencies=[Depends(auth_and_rate_limit)]) async def chat_completion(request: Request, db_config: dict = Depends(add_configuration_data_to_body)): request.state.is_playground = False @@ -57,6 +99,28 @@ async def chat_completion(request: Request, db_config: dict = Depends(add_config result = await chat_multiple_agents(data_to_send) return result +@router.post('/openai/v1/chat/completions', dependencies=[Depends(openai_middleware)]) +async def openai_sdk_chat_completion(request: Request, db_config: dict = Depends(add_configuration_data_to_body)): + """ + OpenAI SDK-compatible entrypoint that reuses the standard chat completion flow. + """ + internal_response = await chat_completion(request, db_config=db_config) + + if isinstance(internal_response, JSONResponse): + content = internal_response.body + try: + content_dict = json.loads(content) + except Exception: + content_dict = {} + if not content_dict.get("success", True): + raise HTTPException(status_code=500, detail=content_dict) + chat_response = content_dict + else: + chat_response = internal_response + + openai_payload = getattr(request.state, "openai_payload", {}) + formatted = _format_openai_response(chat_response, openai_payload) + return formatted @router.post('/playground/chat/completion/{bridge_id}', dependencies=[Depends(auth_and_rate_limit)]) async def playground_chat_completion_bridge(request: Request, db_config: dict = Depends(add_configuration_data_to_body)): diff --git a/src/services/utils/update_and_check_cost.py b/src/services/utils/update_and_check_cost.py index 76955b6c..b3220a36 100644 --- a/src/services/utils/update_and_check_cost.py +++ b/src/services/utils/update_and_check_cost.py @@ -127,9 +127,11 @@ async def check_bridge_api_folder_limits(result, bridge_data,version_id): if not isinstance(bridge_data, dict): return None - folder_identifier = result.get('folder_id') + result_data = result if isinstance(result, dict) else {} + + folder_identifier = result_data.get('folder_id') if folder_identifier: - folder_error = await _check_limit(limit_types['folder'],data=result,version_id=version_id) + folder_error = await _check_limit(limit_types['folder'],data=result_data,version_id=version_id) if folder_error: return folder_error @@ -137,12 +139,12 @@ async def check_bridge_api_folder_limits(result, bridge_data,version_id): if bridge_error: return bridge_error - service_identifier = result.get('service') + service_identifier = result_data.get('service') if service_identifier and ( - (result.get('apikeys') and service_identifier in result.get('apikeys', {})) or - (result.get('folder_apikeys') and service_identifier in result.get('folder_apikeys', {})) + (result_data.get('apikeys') and service_identifier in result_data.get('apikeys', {})) or + (result_data.get('folder_apikeys') and service_identifier in result_data.get('folder_apikeys', {})) ): - api_error = await _check_limit(limit_types['apikey'], data=result,version_id=version_id) + api_error = await _check_limit(limit_types['apikey'], data=result_data,version_id=version_id) if api_error: return api_error From 8cd296f7976c00ec0cbd3e460f10d2017a8f0786 Mon Sep 17 00:00:00 2001 From: adityajunwal Date: Wed, 17 Dec 2025 16:13:22 +0530 Subject: [PATCH 4/7] refactored approach --- ...AImiddleware.py => openaiSDKmiddleware.py} | 71 +++++++++++++++---- src/routes/v2/modelRouter.py | 67 ++--------------- src/services/utils/openai_sdk_utils.py | 69 ++++++++++++++++++ 3 files changed, 133 insertions(+), 74 deletions(-) rename src/middlewares/{openAImiddleware.py => openaiSDKmiddleware.py} (68%) create mode 100644 src/services/utils/openai_sdk_utils.py diff --git a/src/middlewares/openAImiddleware.py b/src/middlewares/openaiSDKmiddleware.py similarity index 68% rename from src/middlewares/openAImiddleware.py rename to src/middlewares/openaiSDKmiddleware.py index 656aceb9..18664226 100644 --- a/src/middlewares/openAImiddleware.py +++ b/src/middlewares/openaiSDKmiddleware.py @@ -1,5 +1,7 @@ import json +import re from typing import Any, Dict, List, Optional + from fastapi import HTTPException, Request from .middleware import jwt_middleware from .ratelimitMiddleware import rate_limit @@ -27,10 +29,12 @@ def _normalize_message_content(content: Any) -> Optional[str]: if isinstance(content, list): text_parts: List[str] = [] for item in content: - if isinstance(item, dict) and item.get("type") == "text": - text_value = (item.get("text") or "").strip() - if text_value: - text_parts.append(text_value) + if isinstance(item, dict): + item_type = (item.get("type") or "").lower() + if item_type in {"text", "input_text", "output_text"}: + text_value = (item.get("text") or "").strip() + if text_value: + text_parts.append(text_value) merged = "\n".join(text_parts).strip() return merged or None @@ -47,9 +51,39 @@ def _extract_latest_user_message(messages: List[Dict[str, Any]]) -> Optional[str return None -def _build_internal_body(payload: Dict[str, Any]) -> Dict[str, Any]: - model = payload.get("model") - if not isinstance(model, str): +def _extract_text_from_input(input_value: Any) -> Optional[str]: + if isinstance(input_value, str): + text = input_value.strip() + return text or None + + if isinstance(input_value, dict): + return _normalize_message_content(input_value.get("content")) + + if isinstance(input_value, list): + segments: List[str] = [] + for chunk in input_value: + if isinstance(chunk, dict): + # Prefer nested content array but fall back to text directly + content = chunk.get("content") + extracted = _normalize_message_content(content) + if extracted: + segments.append(extracted) + elif isinstance(chunk.get("text"), str): + text_value = chunk["text"].strip() + if text_value: + segments.append(text_value) + merged = "\n".join(segments).strip() + return merged or None + + return None + + +_AGENT_MODEL_PREFIX = "gtwy-agent" +_AGENT_ID_PATTERN = re.compile(r"^[a-zA-Z0-9._-]+$") + + +def _parse_agent_identifier(model: Any) -> str: + if not isinstance(model, str) or not model.strip(): raise HTTPException(status_code=400, detail="`model` must be provided.") if ":" not in model: @@ -58,16 +92,27 @@ def _build_internal_body(payload: Dict[str, Any]) -> Dict[str, Any]: detail="`model` must include the agent identifier in the form gtwy-agent:.", ) - _, agent_id = model.split(":", 1) + prefix, agent_id = model.split(":", 1) + if prefix.strip().lower() != _AGENT_MODEL_PREFIX: + raise HTTPException( + status_code=400, + detail="`model` must start with gtwy-agent: to reference a gateway agent.", + ) + agent_id = agent_id.strip() - if not agent_id: - raise HTTPException(status_code=400, detail="Invalid model identifier.") + if not agent_id or not _AGENT_ID_PATTERN.match(agent_id): + raise HTTPException(status_code=400, detail="Invalid agent identifier supplied.") + + return agent_id + + +def _build_internal_body(payload: Dict[str, Any]) -> Dict[str, Any]: + agent_id = _parse_agent_identifier(payload.get("model")) user_message = _extract_latest_user_message(payload.get("messages", [])) if not user_message: fallback = payload.get("input") or payload.get("prompt") - if isinstance(fallback, str) and fallback.strip(): - user_message = fallback.strip() + user_message = _extract_text_from_input(fallback) if not user_message: raise HTTPException(status_code=400, detail="No user message found in payload.") @@ -131,7 +176,7 @@ def _set_pauthkey_header(request: Request, token: str) -> None: del request.__dict__["_headers"] -async def openai_middleware(request: Request): +async def openai_sdk_middleware(request: Request): payload = await request.json() internal_body = _build_internal_body(payload) token = _extract_pauthkey_from_authorization(request) diff --git a/src/routes/v2/modelRouter.py b/src/routes/v2/modelRouter.py index 5f48a145..c1cb890e 100644 --- a/src/routes/v2/modelRouter.py +++ b/src/routes/v2/modelRouter.py @@ -1,21 +1,21 @@ from fastapi import APIRouter, Depends, Request, HTTPException from fastapi.responses import JSONResponse import asyncio -import uuid -import time import json from src.services.commonServices.common import chat_multiple_agents, embedding, batch, run_testcases, image, orchestrator_chat from src.services.commonServices.baseService.utils import make_request_data from ...middlewares.middleware import jwt_middleware from ...middlewares.getDataUsingBridgeId import add_configuration_data_to_body -from ...middlewares.openAImiddleware import openai_middleware +from ...middlewares.openaiSDKmiddleware import openai_sdk_middleware + from concurrent.futures import ThreadPoolExecutor from config import Config from src.services.commonServices.queueService.queueService import queue_obj from src.middlewares.ratelimitMiddleware import rate_limit from models.mongo_connection import db from globals import * +from src.services.utils.openai_sdk_utils import run_openai_chat_and_format router = APIRouter() @@ -26,44 +26,6 @@ async def auth_and_rate_limit(request: Request): await rate_limit(request,key_path='body.bridge_id' , points=100) await rate_limit(request,key_path='body.thread_id', points=20) -def _format_openai_response(chat_response: dict, original_payload: dict | None) -> dict: - response_data = chat_response.get("response", {}).get("data", {}) - usage_data = chat_response.get("response", {}).get("usage", {}) or {} - - message_content = response_data.get("content") - if isinstance(message_content, list): - message_content = "\n".join( - chunk.get("text", "") if isinstance(chunk, dict) else str(chunk) - for chunk in message_content - ).strip() - - finish_reason = response_data.get("finish_reason") or usage_data.get("finish_reason") - model = original_payload.get("model") if isinstance(original_payload, dict) else None - - return { - "id": f"chatcmpl-{uuid.uuid4().hex}", - "object": "chat.completion", - "created": int(time.time()), - "model": model, - "choices": [ - { - "index": 0, - "message": { - "role": "assistant", - "content": message_content, - }, - "finish_reason": finish_reason or "stop", - "logprobs": None, - } - ], - "usage": { - "prompt_tokens": usage_data.get("input_tokens") or usage_data.get("prompt_tokens"), - "completion_tokens": usage_data.get("output_tokens") or usage_data.get("completion_tokens"), - "total_tokens": usage_data.get("total_tokens"), - }, - "system_fingerprint": None, - } - @router.post('/chat/completion', dependencies=[Depends(auth_and_rate_limit)]) async def chat_completion(request: Request, db_config: dict = Depends(add_configuration_data_to_body)): request.state.is_playground = False @@ -99,28 +61,11 @@ async def chat_completion(request: Request, db_config: dict = Depends(add_config result = await chat_multiple_agents(data_to_send) return result -@router.post('/openai/v1/chat/completions', dependencies=[Depends(openai_middleware)]) -async def openai_sdk_chat_completion(request: Request, db_config: dict = Depends(add_configuration_data_to_body)): - """ - OpenAI SDK-compatible entrypoint that reuses the standard chat completion flow. - """ - internal_response = await chat_completion(request, db_config=db_config) - if isinstance(internal_response, JSONResponse): - content = internal_response.body - try: - content_dict = json.loads(content) - except Exception: - content_dict = {} - if not content_dict.get("success", True): - raise HTTPException(status_code=500, detail=content_dict) - chat_response = content_dict - else: - chat_response = internal_response +@router.post('/openai/v1/responses', dependencies=[Depends(openai_sdk_middleware)]) +async def openai_sdk_responses(request: Request, db_config: dict = Depends(add_configuration_data_to_body)): + return await run_openai_chat_and_format(request, db_config, chat_completion) - openai_payload = getattr(request.state, "openai_payload", {}) - formatted = _format_openai_response(chat_response, openai_payload) - return formatted @router.post('/playground/chat/completion/{bridge_id}', dependencies=[Depends(auth_and_rate_limit)]) async def playground_chat_completion_bridge(request: Request, db_config: dict = Depends(add_configuration_data_to_body)): diff --git a/src/services/utils/openai_sdk_utils.py b/src/services/utils/openai_sdk_utils.py new file mode 100644 index 00000000..e192356f --- /dev/null +++ b/src/services/utils/openai_sdk_utils.py @@ -0,0 +1,69 @@ +import json +import time +import uuid +from typing import Any, Awaitable, Callable, Dict + +from fastapi import HTTPException, Request +from fastapi.responses import JSONResponse + + +def format_openai_response(chat_response: Dict[str, Any], original_payload: Dict[str, Any] | None) -> Dict[str, Any]: + response_data = chat_response.get("response", {}).get("data", {}) + usage_data = chat_response.get("response", {}).get("usage", {}) or {} + + message_content = response_data.get("content") + if isinstance(message_content, list): + message_content = "\n".join( + chunk.get("text", "") if isinstance(chunk, dict) else str(chunk) + for chunk in message_content + ).strip() + + finish_reason = response_data.get("finish_reason") or usage_data.get("finish_reason") + model = original_payload.get("model") if isinstance(original_payload, dict) else None + + return { + "id": f"chatcmpl-{uuid.uuid4().hex}", + "object": "chat.completion", + "created": int(time.time()), + "model": model, + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": message_content, + }, + "finish_reason": finish_reason or "stop", + "logprobs": None, + } + ], + "usage": { + "prompt_tokens": usage_data.get("input_tokens") or usage_data.get("prompt_tokens"), + "completion_tokens": usage_data.get("output_tokens") or usage_data.get("completion_tokens"), + "total_tokens": usage_data.get("total_tokens"), + }, + "system_fingerprint": None, + } + + +async def run_openai_chat_and_format( + request: Request, + db_config: Dict[str, Any], + chat_handler: Callable[[Request, Dict[str, Any]], Awaitable[Any]], +) -> Dict[str, Any]: + internal_response = await chat_handler(request, db_config) + + if isinstance(internal_response, JSONResponse): + content = internal_response.body + try: + content_dict = json.loads(content) + except Exception: + content_dict = {} + if not content_dict.get("success", True): + raise HTTPException(status_code=500, detail=content_dict) + chat_response = content_dict + else: + chat_response = internal_response + + openai_payload = getattr(request.state, "openai_payload", {}) + return format_openai_response(chat_response, openai_payload) From 1afa026fc46201d0572f00de0ecdeb4e2bee3e99 Mon Sep 17 00:00:00 2001 From: adityajunwal Date: Wed, 17 Dec 2025 19:51:28 +0530 Subject: [PATCH 5/7] Fixed the model key problem --- src/middlewares/openaiSDKmiddleware.py | 38 +++---- src/services/utils/common_utils.py | 12 ++- src/services/utils/openai_sdk_utils.py | 142 +++++++++++++++++++++---- 3 files changed, 144 insertions(+), 48 deletions(-) diff --git a/src/middlewares/openaiSDKmiddleware.py b/src/middlewares/openaiSDKmiddleware.py index 18664226..f23c8840 100644 --- a/src/middlewares/openaiSDKmiddleware.py +++ b/src/middlewares/openaiSDKmiddleware.py @@ -1,7 +1,5 @@ import json -import re from typing import Any, Dict, List, Optional - from fastapi import HTTPException, Request from .middleware import jwt_middleware from .ratelimitMiddleware import rate_limit @@ -63,7 +61,6 @@ def _extract_text_from_input(input_value: Any) -> Optional[str]: segments: List[str] = [] for chunk in input_value: if isinstance(chunk, dict): - # Prefer nested content array but fall back to text directly content = chunk.get("content") extracted = _normalize_message_content(content) if extracted: @@ -78,36 +75,22 @@ def _extract_text_from_input(input_value: Any) -> Optional[str]: return None -_AGENT_MODEL_PREFIX = "gtwy-agent" -_AGENT_ID_PATTERN = re.compile(r"^[a-zA-Z0-9._-]+$") - - -def _parse_agent_identifier(model: Any) -> str: - if not isinstance(model, str) or not model.strip(): - raise HTTPException(status_code=400, detail="`model` must be provided.") - - if ":" not in model: - raise HTTPException( - status_code=400, - detail="`model` must include the agent identifier in the form gtwy-agent:.", - ) +def _extract_agent_identifier(payload: Dict[str, Any]) -> str: + agent_id = payload.get("agent_id") or payload.get("bridge_id") - prefix, agent_id = model.split(":", 1) - if prefix.strip().lower() != _AGENT_MODEL_PREFIX: + if isinstance(agent_id, str): + agent_id = agent_id.strip() + if not agent_id: raise HTTPException( status_code=400, - detail="`model` must start with gtwy-agent: to reference a gateway agent.", + detail="`agent_id` must be included in the request body.", ) - - agent_id = agent_id.strip() - if not agent_id or not _AGENT_ID_PATTERN.match(agent_id): - raise HTTPException(status_code=400, detail="Invalid agent identifier supplied.") - - return agent_id + return str(agent_id) def _build_internal_body(payload: Dict[str, Any]) -> Dict[str, Any]: - agent_id = _parse_agent_identifier(payload.get("model")) + agent_id = _extract_agent_identifier(payload) + llm_model = payload.get("model") user_message = _extract_latest_user_message(payload.get("messages", [])) if not user_message: @@ -124,6 +107,9 @@ def _build_internal_body(payload: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(configuration, dict): configuration = {} + if isinstance(llm_model, str) and llm_model.strip(): + configuration.setdefault("model", llm_model.strip()) + response_format = payload.get("response_format") if response_format: if isinstance(response_format, dict): diff --git a/src/services/utils/common_utils.py b/src/services/utils/common_utils.py index 796c31ea..fa02b6f0 100644 --- a/src/services/utils/common_utils.py +++ b/src/services/utils/common_utils.py @@ -52,11 +52,19 @@ def setup_agent_pre_tools(parsed_data, bridge_configurations): # Get required params from pre_tools_data required_params = pre_tools_data.get('required_params', []) + # Get variables_path mapping for the current agent + variables_path = current_config.get('variables_path', {}).get(pre_tools_data.get('function_name')) + # Build args from agent's own variables args = {} for param in required_params: - if param in agent_variables: - args[param] = agent_variables[param] + # Check if there's a mapping in variables_path for this param + if param in variables_path: + # Get the mapped variable name + mapped_variable = variables_path[param] + # Use the mapped variable to get value from agent_variables + if mapped_variable in agent_variables: + args[param] = agent_variables[mapped_variable] # Update the pre_tools args with agent-specific variables parsed_data['pre_tools']['args'] = args diff --git a/src/services/utils/openai_sdk_utils.py b/src/services/utils/openai_sdk_utils.py index e192356f..5480a55e 100644 --- a/src/services/utils/openai_sdk_utils.py +++ b/src/services/utils/openai_sdk_utils.py @@ -1,12 +1,35 @@ import json import time import uuid -from typing import Any, Awaitable, Callable, Dict +from typing import Any, Awaitable, Callable, Dict, List, Optional from fastapi import HTTPException, Request from fastapi.responses import JSONResponse +def _build_output_blocks(message_content: str) -> List[Dict[str, Any]]: + reasoning_block = { + "id": f"rs_{uuid.uuid4().hex}", + "type": "reasoning", + "summary": [], + } + + message_block = { + "id": f"msg_{uuid.uuid4().hex}", + "type": "message", + "status": "completed", + "role": "assistant", + "content": [ + { + "type": "output_text", + "text": message_content, + } + ], + } + + return [reasoning_block, message_block] + + def format_openai_response(chat_response: Dict[str, Any], original_payload: Dict[str, Any] | None) -> Dict[str, Any]: response_data = chat_response.get("response", {}).get("data", {}) usage_data = chat_response.get("response", {}).get("usage", {}) or {} @@ -17,40 +40,115 @@ def format_openai_response(chat_response: Dict[str, Any], original_payload: Dict chunk.get("text", "") if isinstance(chunk, dict) else str(chunk) for chunk in message_content ).strip() + elif not isinstance(message_content, str): + message_content = str(message_content or "") + message_content = message_content.strip() finish_reason = response_data.get("finish_reason") or usage_data.get("finish_reason") model = original_payload.get("model") if isinstance(original_payload, dict) else None + response_id = f"resp_{uuid.uuid4().hex}" + created_at = int(time.time()) + return { - "id": f"chatcmpl-{uuid.uuid4().hex}", - "object": "chat.completion", - "created": int(time.time()), + "id": response_id, + "object": "response", + "created_at": created_at, + "status": "completed", + "background": False, + "billing": {"payer": "developer"}, + "error": None, + "incomplete_details": None, + "instructions": None, + "max_output_tokens": None, + "max_tool_calls": None, "model": model, - "choices": [ - { - "index": 0, - "message": { - "role": "assistant", - "content": message_content, - }, - "finish_reason": finish_reason or "stop", - "logprobs": None, - } - ], + "output": _build_output_blocks(message_content), + "parallel_tool_calls": True, + "previous_response_id": None, + "prompt_cache_key": None, + "prompt_cache_retention": None, + "reasoning": {"effort": "medium", "summary": None}, + "safety_identifier": None, + "service_tier": "default", + "store": True, + "temperature": original_payload.get("temperature") if isinstance(original_payload, dict) else None, + "text": {"format": {"type": "text"}, "verbosity": "medium"}, + "tool_choice": original_payload.get("tool_choice") if isinstance(original_payload, dict) else "auto", + "tools": original_payload.get("tools") if isinstance(original_payload, dict) else [], + "top_logprobs": 0, + "top_p": original_payload.get("top_p") if isinstance(original_payload, dict) else 1, + "truncation": "disabled", "usage": { - "prompt_tokens": usage_data.get("input_tokens") or usage_data.get("prompt_tokens"), - "completion_tokens": usage_data.get("output_tokens") or usage_data.get("completion_tokens"), + "input_tokens": usage_data.get("input_tokens"), + "input_tokens_details": { + "cached_tokens": usage_data.get("cached_input_tokens", 0), + }, + "output_tokens": usage_data.get("output_tokens"), + "output_tokens_details": { + "reasoning_tokens": usage_data.get("reasoning_tokens"), + }, "total_tokens": usage_data.get("total_tokens"), }, - "system_fingerprint": None, + "user": original_payload.get("user") if isinstance(original_payload, dict) else None, + "metadata": original_payload.get("metadata") if isinstance(original_payload, dict) else {}, + "output_text": message_content, + "finish_reason": finish_reason or "stop", + } + + +def _format_error_detail( + message: str, + error_type: str = "invalid_request_error", + code: Optional[str] = None, + param: Optional[str] = None, +) -> Dict[str, Any]: + return { + "error": { + "message": message, + "type": error_type, + "param": param, + "code": code, + } } +def _extract_error_message(error_payload: Dict[str, Any]) -> str: + error_value = error_payload.get("error") + + if isinstance(error_value, str): + return error_value + + if isinstance(error_value, dict): + return error_value.get("message") or error_value.get("detail") or json.dumps(error_value) + + if isinstance(error_payload.get("detail"), str): + return error_payload["detail"] + + return json.dumps(error_payload) + + +def _handle_failed_response( + response_payload: Dict[str, Any], + status_code: int = 400, +) -> None: + message = _extract_error_message(response_payload) + error_type = response_payload.get("error_type") or "invalid_request_error" + code = response_payload.get("error_code") + param = response_payload.get("error_param") + + raise HTTPException( + status_code=status_code, + detail=_format_error_detail(message, error_type=error_type, code=code, param=param), + ) + + async def run_openai_chat_and_format( request: Request, db_config: Dict[str, Any], chat_handler: Callable[[Request, Dict[str, Any]], Awaitable[Any]], ) -> Dict[str, Any]: + openai_payload = getattr(request.state, "openai_payload", {}) internal_response = await chat_handler(request, db_config) if isinstance(internal_response, JSONResponse): @@ -60,10 +158,14 @@ async def run_openai_chat_and_format( except Exception: content_dict = {} if not content_dict.get("success", True): - raise HTTPException(status_code=500, detail=content_dict) + status_code = content_dict.get("status_code") or 400 + _handle_failed_response(content_dict, status_code=status_code) chat_response = content_dict else: chat_response = internal_response - openai_payload = getattr(request.state, "openai_payload", {}) + if isinstance(chat_response, dict) and not chat_response.get("success", True): + status_code = chat_response.get("status_code") or 400 + _handle_failed_response(chat_response, status_code=status_code) + return format_openai_response(chat_response, openai_payload) From caa9dda47ef2fbb7dd243db837194aad7c5fd916 Mon Sep 17 00:00:00 2001 From: adityajunwal Date: Thu, 18 Dec 2025 15:25:52 +0530 Subject: [PATCH 6/7] Simplified payload handling and strict payload validation --- src/middlewares/openaiSDKmiddleware.py | 45 ++++---------------------- src/services/utils/openai_sdk_utils.py | 10 +++--- 2 files changed, 11 insertions(+), 44 deletions(-) diff --git a/src/middlewares/openaiSDKmiddleware.py b/src/middlewares/openaiSDKmiddleware.py index f23c8840..6a3ec1d7 100644 --- a/src/middlewares/openaiSDKmiddleware.py +++ b/src/middlewares/openaiSDKmiddleware.py @@ -29,7 +29,7 @@ def _normalize_message_content(content: Any) -> Optional[str]: for item in content: if isinstance(item, dict): item_type = (item.get("type") or "").lower() - if item_type in {"text", "input_text", "output_text"}: + if item_type in {"text", "input_text"}: text_value = (item.get("text") or "").strip() if text_value: text_parts.append(text_value) @@ -39,16 +39,6 @@ def _normalize_message_content(content: Any) -> Optional[str]: return None -def _extract_latest_user_message(messages: List[Dict[str, Any]]) -> Optional[str]: - for message in reversed(messages or []): - if message.get("role") != "user": - continue - normalized = _normalize_message_content(message.get("content")) - if normalized: - return normalized - return None - - def _extract_text_from_input(input_value: Any) -> Optional[str]: if isinstance(input_value, str): text = input_value.strip() @@ -92,47 +82,26 @@ def _build_internal_body(payload: Dict[str, Any]) -> Dict[str, Any]: agent_id = _extract_agent_identifier(payload) llm_model = payload.get("model") - user_message = _extract_latest_user_message(payload.get("messages", [])) - if not user_message: - fallback = payload.get("input") or payload.get("prompt") - user_message = _extract_text_from_input(fallback) + user_message = _extract_text_from_input(payload.get("input")) + if not user_message: raise HTTPException(status_code=400, detail="No user message found in payload.") - metadata = payload.get("metadata") or {} - if not isinstance(metadata, dict): - metadata = {} - configuration = payload.get("configuration") or {} - if not isinstance(configuration, dict): - configuration = {} if isinstance(llm_model, str) and llm_model.strip(): configuration.setdefault("model", llm_model.strip()) - response_format = payload.get("response_format") - if response_format: - if isinstance(response_format, dict): - configuration.setdefault("response_format", response_format) - elif isinstance(response_format, str): - configuration.setdefault( - "response_format", - { - "type": response_format, - "cred": {}, - }, - ) - internal_body: Dict[str, Any] = { "agent_id": agent_id, "bridge_id": agent_id, + "user": user_message, "messages": payload.get("messages", []), "thread_id": payload.get("conversation_id") - or payload.get("thread_id") - or metadata.get("thread_id"), - "sub_thread_id": payload.get("sub_thread_id") or metadata.get("sub_thread_id"), - "variables": payload.get("variables") or metadata.get("variables") or {}, + or payload.get("thread_id") or None, + "sub_thread_id": payload.get("sub_thread_id") or None, + "variables": payload.get("variables") or {}, "configuration": configuration, "attachments": payload.get("attachments", []), } diff --git a/src/services/utils/openai_sdk_utils.py b/src/services/utils/openai_sdk_utils.py index 5480a55e..49dfa5ec 100644 --- a/src/services/utils/openai_sdk_utils.py +++ b/src/services/utils/openai_sdk_utils.py @@ -104,12 +104,10 @@ def _format_error_detail( param: Optional[str] = None, ) -> Dict[str, Any]: return { - "error": { - "message": message, - "type": error_type, - "param": param, - "code": code, - } + "message": message, + "type": error_type, + "param": param, + "code": code } From 8fd48db7aff8db0218b03e3ff182d887ea753b15 Mon Sep 17 00:00:00 2001 From: adityajunwal Date: Thu, 18 Dec 2025 15:30:42 +0530 Subject: [PATCH 7/7] removed v1 from endpoint URL --- src/routes/v2/modelRouter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/routes/v2/modelRouter.py b/src/routes/v2/modelRouter.py index c1cb890e..d5e715b3 100644 --- a/src/routes/v2/modelRouter.py +++ b/src/routes/v2/modelRouter.py @@ -62,7 +62,7 @@ async def chat_completion(request: Request, db_config: dict = Depends(add_config return result -@router.post('/openai/v1/responses', dependencies=[Depends(openai_sdk_middleware)]) +@router.post('/openai/responses', dependencies=[Depends(openai_sdk_middleware)]) async def openai_sdk_responses(request: Request, db_config: dict = Depends(add_configuration_data_to_body)): return await run_openai_chat_and_format(request, db_config, chat_completion)