From ffa609af1bcaf0e5f98b576b3db6cc6d0690246f Mon Sep 17 00:00:00 2001 From: Husain Baghwala Date: Mon, 29 Dec 2025 17:04:45 +0530 Subject: [PATCH 01/16] fix: include message_id in error responses for chat function --- src/services/commonServices/common.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/services/commonServices/common.py b/src/services/commonServices/common.py index f6f03ba3..7ec32498 100644 --- a/src/services/commonServices/common.py +++ b/src/services/commonServices/common.py @@ -409,6 +409,7 @@ async def chat(request_body): error_object = { "success": False, "error": combined_error_string, + "message_id": parsed_data.get('message_id') } else: # Single error case @@ -416,6 +417,7 @@ async def chat(request_body): error_object = { "success": False, "error": error_string, + "message_id": parsed_data.get('message_id') } if parsed_data['is_playground'] and parsed_data['body']['bridge_configurations'].get('playground_response_format'): await sendResponse(parsed_data['body']['bridge_configurations']['playground_response_format'], error_object, success=False, variables=parsed_data.get('variables',{})) From 333553e78c1d9fba01256fbcb7fc4919cea0c401 Mon Sep 17 00:00:00 2001 From: adityajunwal Date: Mon, 29 Dec 2025 18:23:50 +0530 Subject: [PATCH 02/16] Big Solved: Testcase Score not working --- src/services/testcase_service.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/services/testcase_service.py b/src/services/testcase_service.py index 6e6c97a4..945a298a 100644 --- a/src/services/testcase_service.py +++ b/src/services/testcase_service.py @@ -182,8 +182,10 @@ async def get_testcase_configuration( if not db_config.get("success"): raise TestcaseValidationError(db_config.get("error", "Failed to get configuration")) - return db_config + primary_bridge_id = db_config.get('primary_bridge_id') + bridge_configurations = db_config.get('bridge_configurations', {}) + return bridge_configurations[primary_bridge_id] async def process_single_testcase(testcase: Dict[str, Any], db_config: Dict[str, Any]) -> Dict[str, Any]: """ @@ -197,6 +199,10 @@ async def process_single_testcase(testcase: Dict[str, Any], db_config: Dict[str, Dictionary containing testcase result """ try: + # Deep copy the configuration to avoid race conditions in parallel execution + if 'configuration' in db_config: + db_config['configuration'] = db_config['configuration'].copy() + # Set conversation in db_config db_config['configuration']['conversation'] = testcase.get('conversation', []) From d354ee16c950e5a605276e8255b3a7fb12060c03 Mon Sep 17 00:00:00 2001 From: Husain Baghwala Date: Tue, 30 Dec 2025 13:35:26 +0530 Subject: [PATCH 03/16] feat: integrate Hippocampus API for saving chatbot conversations --- config.py | 5 +- .../commonServices/baseService/utils.py | 12 ++++ .../queueService/queueLogService.py | 12 ++++ src/services/utils/hippocampus_utils.py | 55 +++++++++++++++++++ 4 files changed, 83 insertions(+), 1 deletion(-) create mode 100644 src/services/utils/hippocampus_utils.py diff --git a/config.py b/config.py index 5a6e692f..08985296 100644 --- a/config.py +++ b/config.py @@ -56,4 +56,7 @@ class Config: DOCSTAR_COLLECTION_ID = os.getenv('DOCSTAR_COLLECTION_ID') AI_ML_APIKEY = os.getenv('AI_ML_APIKEY') AI_MIDDLEWARE_PAUTH_KEY = os.getenv('AI_MIDDLEWARE_PAUTH_KEY') - OPENAI_API_KEY_GPT_5_NANO = os.getenv('OPENAI_API_KEY_GPT_5_NANO') \ No newline at end of file + OPENAI_API_KEY_GPT_5_NANO = os.getenv('OPENAI_API_KEY_GPT_5_NANO') + HIPPOCAMPUS_API_URL = os.getenv('HIPPOCAMPUS_API_URL') + HIPPOCAMPUS_API_KEY = os.getenv('HIPPOCAMPUS_API_KEY') + HIPPOCAMPUS_COLLECTION_ID = os.getenv('HIPPOCAMPUS_COLLECTION_ID') \ No newline at end of file diff --git a/src/services/commonServices/baseService/utils.py b/src/services/commonServices/baseService/utils.py index 8acb0bab..3e167795 100644 --- a/src/services/commonServices/baseService/utils.py +++ b/src/services/commonServices/baseService/utils.py @@ -405,6 +405,11 @@ async def make_request_data(request: Request): async def make_request_data_and_publish_sub_queue(parsed_data, result, params, thread_info=None): suggestion_content = {'data': {'content': {}}} suggestion_content['data']['content'] = result.get('historyParams', {}).get('message') + + # Extract user and assistant messages for Hippocampus + user_message = parsed_data.get('user', '') + assistant_message = result.get('historyParams', {}).get('message', '') + data = { "save_sub_thread_id_and_name" : { "org_id" : parsed_data.get('org_id'), @@ -456,6 +461,13 @@ async def make_request_data_and_publish_sub_queue(parsed_data, result, params, t "check_chatbot_suggestions" : { "bridgeType" : parsed_data.get('bridgeType'), }, + "save_to_hippocampus" : { + "bridgeType" : parsed_data.get('bridgeType'), + "user_message" : user_message, + "assistant_message" : assistant_message, + "bridge_id" : parsed_data.get('bridge_id'), + "bridge_name" : parsed_data.get('name', '') + }, "type" : parsed_data.get('type'), "save_files_to_redis" : { "thread_id" : parsed_data.get('thread_id'), diff --git a/src/services/commonServices/queueService/queueLogService.py b/src/services/commonServices/queueService/queueLogService.py index bdfba14d..c1b76f25 100644 --- a/src/services/commonServices/queueService/queueLogService.py +++ b/src/services/commonServices/queueService/queueLogService.py @@ -10,6 +10,7 @@ from src.services.commonServices.baseService.utils import total_token_calculation, save_files_to_redis from src.controllers.conversationController import save_sub_thread_id_and_name from src.services.commonServices.queueService.baseQueue import BaseQueue +from src.services.utils.hippocampus_utils import save_conversation_to_hippocampus class Queue2(BaseQueue): @@ -33,6 +34,17 @@ async def process_messages(self, messages): # If message type is 'image', only run save_sub_thread_id_and_name if messages.get('type') == 'image': return + + # Save conversation to Hippocampus for chatbot bridge types + hippocampus_data = messages.get('save_to_hippocampus', {}) + if hippocampus_data.get('bridgeType'): + await save_conversation_to_hippocampus( + user_message=hippocampus_data.get('user_message', ''), + assistant_message=hippocampus_data.get('assistant_message', ''), + bridge_id=hippocampus_data.get('bridge_id', ''), + bridge_name=hippocampus_data.get('bridge_name', '') + ) + # await create(**messages['metrics_service']) await validateResponse(**messages['validateResponse']) await total_token_calculation(**messages['total_token_calculation']) diff --git a/src/services/utils/hippocampus_utils.py b/src/services/utils/hippocampus_utils.py new file mode 100644 index 00000000..e83fb084 --- /dev/null +++ b/src/services/utils/hippocampus_utils.py @@ -0,0 +1,55 @@ +from config import Config +from src.services.utils.logger import logger +from src.services.utils.apiservice import fetch + + +async def save_conversation_to_hippocampus(user_message, assistant_message, bridge_id, bridge_name=''): + """ + Save conversation to Hippocampus API for chatbot bridge types. + + Args: + user_message: The user's message content + assistant_message: The assistant's response content + bridge_id: The bridge/agent ID (used as ownerId) + bridge_name: The bridge/agent name (used as title) + """ + try: + if not Config.HIPPOCAMPUS_API_KEY or not Config.HIPPOCAMPUS_COLLECTION_ID: + logger.warning("Hippocampus API key or collection ID not configured") + return + + # Combine user and assistant messages as content + content = f"User: {user_message}\nAssistant: {assistant_message}" + + # Use bridge_name if available, otherwise fallback to bridge_id + title = bridge_name if bridge_name else bridge_id + + payload = { + "collectionId": Config.HIPPOCAMPUS_COLLECTION_ID, + "title": title, + "ownerId": bridge_id, + "content": content, + "settings": { + "strategy": "custom", + "chunkingUrl": "https://flow.sokt.io/func/scriQywSNndU", + "chunkSize": 1000 + } + } + + headers = { + "x-api-key": Config.HIPPOCAMPUS_API_KEY, + "Content-Type": "application/json" + } + + response_data, response_headers = await fetch( + url=Config.HIPPOCAMPUS_API_URL, + method="POST", + headers=headers, + json_body=payload + ) + + logger.info(f"Successfully saved conversation to Hippocampus for bridge_id: {bridge_id}") + + except Exception as e: + logger.error(f"Error saving conversation to Hippocampus: {str(e)}") + From 08c11fca0056796d85e3e6ce4bbea695c5543bd0 Mon Sep 17 00:00:00 2001 From: adityajunwal Date: Tue, 30 Dec 2025 19:12:01 +0530 Subject: [PATCH 04/16] batch api service --- docs/CHAT_COMPLETION_FLOW.md | 2 +- src/services/cache_service.py | 40 ++- .../commonServices/Google/gemini_batch.py | 107 ++++++++ .../commonServices/Google/gemini_run_batch.py | 215 ++++++++++++++++ .../commonServices/Mistral/mistral_batch.py | 104 ++++++++ .../Mistral/mistral_run_batch.py | 227 +++++++++++++++++ .../anthropicCall.py} | 2 +- .../anthropicModelRun.py} | 0 .../anthropic/anthropic_batch.py | 108 ++++++++ .../anthropic/anthropic_run_batch.py | 140 ++++++++++ .../commonServices/baseService/baseService.py | 2 +- .../commonServices/groq/groq_batch.py | 104 ++++++++ .../commonServices/groq/groq_run_batch.py | 239 ++++++++++++++++++ .../commonServices/openAI/openai_batch.py | 3 +- .../commonServices/openAI/openai_run_batch.py | 156 +++++++++++- src/services/utils/ai_middleware_format.py | 155 +++++++++++- src/services/utils/batch_script.py | 144 +++++------ src/services/utils/batch_script_utils.py | 19 ++ src/services/utils/helper.py | 22 +- 19 files changed, 1684 insertions(+), 105 deletions(-) create mode 100644 src/services/commonServices/Google/gemini_batch.py create mode 100644 src/services/commonServices/Google/gemini_run_batch.py create mode 100644 src/services/commonServices/Mistral/mistral_batch.py create mode 100644 src/services/commonServices/Mistral/mistral_run_batch.py rename src/services/commonServices/{anthrophic/antrophicCall.py => anthropic/anthropicCall.py} (99%) rename src/services/commonServices/{anthrophic/antrophicModelRun.py => anthropic/anthropicModelRun.py} (100%) create mode 100644 src/services/commonServices/anthropic/anthropic_batch.py create mode 100644 src/services/commonServices/anthropic/anthropic_run_batch.py create mode 100644 src/services/commonServices/groq/groq_batch.py create mode 100644 src/services/commonServices/groq/groq_run_batch.py create mode 100644 src/services/utils/batch_script_utils.py diff --git a/docs/CHAT_COMPLETION_FLOW.md b/docs/CHAT_COMPLETION_FLOW.md index 2fdc79f5..4b23ebce 100644 --- a/docs/CHAT_COMPLETION_FLOW.md +++ b/docs/CHAT_COMPLETION_FLOW.md @@ -197,7 +197,7 @@ The `getConfiguration` function assembles: **Service Mapping:** - `openai` → `UnifiedOpenAICase` - `gemini` → `GeminiHandler` -- `anthropic` → `Antrophic` +- `anthropic` → `Anthropic` - `groq` → `Groq` - `openai_response` → `OpenaiResponse` - `open_router` → `OpenRouter` diff --git a/src/services/cache_service.py b/src/services/cache_service.py index 44111e5b..585b46d2 100644 --- a/src/services/cache_service.py +++ b/src/services/cache_service.py @@ -117,4 +117,42 @@ def make_json_serializable(data): except (TypeError, OverflowError): return str(data) -__all__ = ['delete_in_cache', 'store_in_cache', 'find_in_cache', 'find_in_cache_and_expire', 'store_in_cache_permanent_until_read', 'verify_ttl', 'clear_cache'] +async def acquire_lock(lock_key: str, ttl: int = 600) -> bool: + """ + Acquire a distributed lock using Redis SET NX EX pattern. + + Args: + lock_key: Unique identifier for the lock + ttl: Time-to-live in seconds (default: 600 seconds = 10 minutes) + + Returns: + True if lock was acquired, False otherwise + """ + try: + full_key = f"{REDIS_PREFIX}lock_{lock_key}" + # SET NX EX: Set if Not eXists with EXpiration + result = await client.set(full_key, "locked", nx=True, ex=ttl) + return result is not None + except Exception as e: + logger.error(f"Error acquiring lock for {lock_key}: {str(e)}") + return False + +async def release_lock(lock_key: str) -> bool: + """ + Release a distributed lock. + + Args: + lock_key: Unique identifier for the lock + + Returns: + True if lock was released, False otherwise + """ + try: + full_key = f"{REDIS_PREFIX}lock_{lock_key}" + result = await client.delete(full_key) + return result > 0 + except Exception as e: + logger.error(f"Error releasing lock for {lock_key}: {str(e)}") + return False + +__all__ = ['delete_in_cache', 'store_in_cache', 'find_in_cache', 'find_in_cache_and_expire', 'store_in_cache_permanent_until_read', 'verify_ttl', 'clear_cache', 'acquire_lock', 'release_lock'] diff --git a/src/services/commonServices/Google/gemini_batch.py b/src/services/commonServices/Google/gemini_batch.py new file mode 100644 index 00000000..a0acfc4c --- /dev/null +++ b/src/services/commonServices/Google/gemini_batch.py @@ -0,0 +1,107 @@ +import pydash as _ +from ..baseService.baseService import BaseService +from ..createConversations import ConversationService +from src.configs.constant import service_name +import json +import uuid +from ...cache_service import store_in_cache +from src.configs.constant import redis_keys +from src.services.commonServices.Google.gemini_run_batch import create_batch_file, process_batch_file + + +class GeminiBatch(BaseService): + async def batch_execute(self): + system_prompt = self.configuration.get('prompt', '') + batch_requests = [] + message_mappings = [] + + # Validate batch_variables if provided + batch_variables = self.batch_variables if hasattr(self, 'batch_variables') and self.batch_variables else None + if batch_variables is not None: + if not isinstance(batch_variables, list): + return { + "success": False, + "message": "batch_variables must be an array" + } + if len(batch_variables) != len(self.batch): + return { + "success": False, + "message": f"batch_variables array length ({len(batch_variables)}) must match batch array length ({len(self.batch)})" + } + + # Construct batch requests in Gemini JSONL format + for idx, message in enumerate(self.batch, start=1): + # Generate a unique key for each request + custom_id = str(uuid.uuid4()) + + # Construct Gemini native format request + request_content = { + 'contents': [ + { + 'parts': [ + {'text': message} + ] + } + ] + } + + # Add system instruction + request_content['config'] = { + 'system_instruction': { + 'parts': [{'text': system_prompt}] + } + } + + # Add other config from customConfig (like temperature, max_tokens, etc.) + if self.customConfig: + if 'config' not in request_content: + request_content['config'] = {} + # Merge customConfig into config, excluding any messages/prompt fields + for key, value in self.customConfig.items(): + if key not in ['messages', 'prompt', 'model']: + request_content['config'][key] = value + + # Create JSONL entry with key and request + batch_entry = { + "key": custom_id, + "request": request_content + } + batch_requests.append(json.dumps(batch_entry)) + + # Store message mapping for response + mapping_item = { + "message": message, + "custom_id": custom_id + } + + # Add batch_variables to mapping if provided (idx-1 because enumerate starts at 1) + if batch_variables is not None: + mapping_item["variables"] = batch_variables[idx - 1] + + message_mappings.append(mapping_item) + + # Upload batch file and create batch job + uploaded_file = await create_batch_file(batch_requests, self.apikey) + batch_job = await process_batch_file(uploaded_file, self.apikey, self.model) + + batch_id = batch_job.name + batch_json = { + "id": batch_job.name, + "state": batch_job.state, + "create_time": batch_job.create_time, + "model": batch_job.model or self.model, + "apikey": self.apikey, + "webhook": self.webhook, + "batch_variables": batch_variables, + "custom_id_mapping": {item["custom_id"]: idx for idx, item in enumerate(message_mappings)}, + "service": self.service, + "uploaded_file": uploaded_file.name + } + cache_key = f"{redis_keys['batch_']}{batch_job.name}" + await store_in_cache(cache_key, batch_json, ttl = 86400) + return { + "success": True, + "message": "Response will be successfully sent to the webhook wihtin 24 hrs.", + "batch_id": batch_id, + "messages": message_mappings + } diff --git a/src/services/commonServices/Google/gemini_run_batch.py b/src/services/commonServices/Google/gemini_run_batch.py new file mode 100644 index 00000000..df5f8bab --- /dev/null +++ b/src/services/commonServices/Google/gemini_run_batch.py @@ -0,0 +1,215 @@ +import json +import uuid +import tempfile +import os +from google import genai +from google.genai import types + +async def create_batch_file(batch_requests, apiKey): + """ + Creates a JSONL file and uploads it to Gemini File API. + + Args: + batch_requests: List of JSON strings (JSONL entries) + apiKey: Gemini API key + + Returns: + Uploaded file object from Gemini File API + """ + try: + # Initialize Gemini client + client = genai.Client(api_key=apiKey) + + # Create JSONL file content + jsonl_content = "\n".join(batch_requests) + + # Create a temporary file to upload + with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False, encoding='utf-8') as temp_file: + temp_file.write(jsonl_content) + temp_file_path = temp_file.name + + try: + # Upload the JSONL file to Gemini File API + uploaded_file = client.files.upload( + file=temp_file_path, + config=types.UploadFileConfig( + display_name=f'batch-{uuid.uuid4()}', + mime_type='application/jsonl' + ) + ) + return uploaded_file + finally: + # Clean up temporary file + if os.path.exists(temp_file_path): + os.remove(temp_file_path) + except Exception as e: + print("Error in Gemini create_batch_file:", repr(e)) + print("Cause:", repr(getattr(e, "__cause__", None))) + raise + +async def process_batch_file(uploaded_file, apiKey, model): + """ + Creates a batch job using the uploaded file. + + Args: + uploaded_file: File object from create_batch_file + apiKey: Gemini API key + model: Model name to use for batch processing + + Returns: + Batch job object + """ + try: + # Initialize Gemini client + client = genai.Client(api_key=apiKey) + + # Create batch job with the uploaded file + batch_job = client.batches.create( + model=model, + src=uploaded_file.name, + config={ + 'display_name': f'batch-job-{uuid.uuid4()}', + } + ) + print(f"Created batch job: {batch_job.name}") + return batch_job + except Exception as e: + print(f"Error in Gemini process_batch_file: {e}") + raise + +async def retrieve_batch_status(batch_id, apiKey): + """ + Retrieves the status of a batch job. + + Args: + batch_id: Batch job name + apiKey: Gemini API key + + Returns: + Batch job object with current status + """ + try: + # Initialize Gemini client + client = genai.Client(api_key=apiKey) + + # Get batch job status + batch = client.batches.get(name=batch_id) + print(f"Batch status: {batch.state}") + return batch + except Exception as e: + print(f"Error in Gemini retrieve_batch_status: {e}") + raise + + +async def download_gemini_file(file_uri, apikey): + """ + Helper function to download and parse a Gemini batch result file. + + Args: + file_uri: The file URI to download + apikey: Gemini API key + + Returns: + List of parsed JSON lines, or empty list if file doesn't exist or fails to parse + """ + if not file_uri: + return [] + + try: + client = genai.Client(api_key=apikey) + file_content = client.files.get(name=file_uri).read() + + try: + results = [json.loads(line) for line in file_content.decode('utf-8').splitlines() if line.strip()] + return results + except json.JSONDecodeError as e: + print(f"JSON decoding error for file {file_uri}: {e}") + return [] + except Exception as e: + print(f"Error downloading file {file_uri}: {e}") + return [] + + +async def handle_batch_results(batch_id, apikey): + """ + Handle Gemini batch processing - retrieve status and process results. + + Args: + batch_id: Batch ID + apikey: Gemini API key + + Returns: + Tuple of (results, is_completed) + - For succeeded batches: (results_list, True) + - For failed/expired/cancelled with partial results: (combined_results, True) + - For failed/expired/cancelled without results: (error_info, True) + - For in-progress: (None, False) - continues polling + """ + batch = await retrieve_batch_status(batch_id, apikey) + state = batch.state + + # In-progress states - continue polling + if state in [types.BatchState.STATE_PENDING, types.BatchState.STATE_RUNNING]: + return None, False + + # Terminal states - download results + output_uri = batch.output_uri + + # For Gemini, there's no separate error file like OpenAI + # All results (success and errors) are in the output file + output_results = await download_gemini_file(output_uri, apikey) + + if output_results: + # We have some results (partial or complete) + return output_results, True + + # No results available - return error based on state + if state == types.BatchState.STATE_SUCCEEDED: + # Succeeded but no output - unusual case + error_info = [{ + "error": { + "message": "Batch succeeded but no result file was generated", + "type": "no_results", + "batch_status": state.name + }, + "status_code": 400 + }] + elif state == types.BatchState.STATE_FAILED: + error_info = [{ + "error": { + "message": f"Batch failed. Error: {getattr(batch, 'error', 'No error details available')}", + "type": "batch_failed", + "batch_status": state.name + }, + "status_code": 400 + }] + elif state == types.BatchState.STATE_EXPIRED: + error_info = [{ + "error": { + "message": "Batch expired - running or pending for more than 48 hours and no partial results available", + "type": "batch_expired", + "batch_status": state.name + }, + "status_code": 400 + }] + elif state == types.BatchState.STATE_CANCELLED: + error_info = [{ + "error": { + "message": "Batch was cancelled and no partial results available", + "type": "batch_cancelled", + "batch_status": state.name + }, + "status_code": 400 + }] + else: + # Unknown terminal state + error_info = [{ + "error": { + "message": f"Batch reached unknown terminal state: {state.name}", + "type": "unknown_status", + "batch_status": state.name + }, + "status_code": 400 + }] + + return error_info, True diff --git a/src/services/commonServices/Mistral/mistral_batch.py b/src/services/commonServices/Mistral/mistral_batch.py new file mode 100644 index 00000000..0f1fad98 --- /dev/null +++ b/src/services/commonServices/Mistral/mistral_batch.py @@ -0,0 +1,104 @@ +import pydash as _ +from ..baseService.baseService import BaseService +from ..createConversations import ConversationService +from src.configs.constant import service_name +import json +import uuid +from ...cache_service import store_in_cache +from src.configs.constant import redis_keys +from src.services.commonServices.Mistral.mistral_run_batch import create_batch_file, process_batch_file + + +class MistralBatch(BaseService): + async def batch_execute(self): + system_prompt = self.configuration.get('prompt', '') + batch_requests = [] + message_mappings = [] + + # Validate batch_variables if provided + batch_variables = self.batch_variables if hasattr(self, 'batch_variables') and self.batch_variables else None + if batch_variables is not None: + if not isinstance(batch_variables, list): + return { + "success": False, + "message": "batch_variables must be an array" + } + if len(batch_variables) != len(self.batch): + return { + "success": False, + "message": f"batch_variables array length ({len(batch_variables)}) must match batch array length ({len(self.batch)})" + } + + # Construct batch requests in Mistral JSONL format + for idx, message in enumerate(self.batch, start=1): + # Generate a unique custom_id for each request + custom_id = str(uuid.uuid4()) + + # Construct Mistral batch request body + request_body = { + "messages": [], + "max_tokens": self.customConfig.get("max_tokens", 1024) + } + + # Add system message + request_body["messages"].append({ + "role": "system", + "content": system_prompt + }) + + # Add user message + request_body["messages"].append({ + "role": "user", + "content": message + }) + + # Add other config from customConfig + if self.customConfig: + for key in ['temperature', 'top_p', 'random_seed', 'safe_prompt']: + if key in self.customConfig: + request_body[key] = self.customConfig[key] + + # Create JSONL entry with custom_id and body + batch_entry = { + "custom_id": custom_id, + "body": request_body + } + batch_requests.append(json.dumps(batch_entry)) + + # Store message mapping for response + mapping_item = { + "message": message, + "custom_id": custom_id + } + + # Add batch_variables to mapping if provided + if batch_variables is not None: + mapping_item["variables"] = batch_variables[idx - 1] + + message_mappings.append(mapping_item) + + # Upload batch file and create batch job + uploaded_file = await create_batch_file(batch_requests, self.apikey) + batch_job = await process_batch_file(uploaded_file, self.apikey, self.model) + + batch_id = batch_job.id + batch_json = { + "id": batch_job.id, + "status": batch_job.status, + "created_at": batch_job.created_at, + "model": self.model, + "apikey": self.apikey, + "webhook": self.webhook, + "batch_variables": batch_variables, + "custom_id_mapping": {item["custom_id"]: idx for idx, item in enumerate(message_mappings)}, + "service": self.service, + "uploaded_file_id": uploaded_file.id + } + cache_key = f"{redis_keys['batch_']}{batch_job.id}" + await store_in_cache(cache_key, batch_json, ttl = 86400) + return { + "success": True, + "message": "Response will be successfully sent to the webhook within 24 hrs.", + "batch_id": batch_id, + "messages": message_mappings + } diff --git a/src/services/commonServices/Mistral/mistral_run_batch.py b/src/services/commonServices/Mistral/mistral_run_batch.py new file mode 100644 index 00000000..220d24c5 --- /dev/null +++ b/src/services/commonServices/Mistral/mistral_run_batch.py @@ -0,0 +1,227 @@ +import json +import uuid +import tempfile +import os +from mistralai import Mistral + +async def create_batch_file(batch_requests, apiKey): + """ + Creates a JSONL file and uploads it to Mistral Files API. + + Args: + batch_requests: List of JSON strings (JSONL entries) + apiKey: Mistral API key + + Returns: + Uploaded file object from Mistral Files API + """ + try: + # Initialize Mistral client + client = Mistral(api_key=apiKey) + + # Create JSONL file content + jsonl_content = "\n".join(batch_requests) + + # Create a temporary file to upload + with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False, encoding='utf-8') as temp_file: + temp_file.write(jsonl_content) + temp_file_path = temp_file.name + + try: + # Open file and read content, then close it before upload + with open(temp_file_path, "rb") as f: + file_content = f.read() + + # Upload the JSONL file to Mistral Files API + uploaded_file = client.files.upload( + file={ + "file_name": f"batch-{uuid.uuid4()}.jsonl", + "content": file_content + }, + purpose="batch" + ) + return uploaded_file + finally: + # Clean up temporary file + if os.path.exists(temp_file_path): + os.remove(temp_file_path) + except Exception as e: + print("Error in Mistral create_batch_file:", repr(e)) + print("Cause:", repr(getattr(e, "__cause__", None))) + raise + +async def process_batch_file(uploaded_file, apiKey, model): + """ + Creates a batch job using the uploaded file. + + Args: + uploaded_file: File object from create_batch_file + apiKey: Mistral API key + model: Model name to use for batch processing + + Returns: + Batch job object + """ + try: + # Initialize Mistral client + client = Mistral(api_key=apiKey) + + # Create batch job with the uploaded file + batch_job = client.batch.jobs.create( + input_files=[uploaded_file.id], + model=model, + endpoint="/v1/chat/completions", + metadata={"source": "ai-middleware"} + ) + print(f"Created Mistral batch job: {batch_job.id}") + return batch_job + except Exception as e: + print(f"Error in Mistral process_batch_file: {e}") + raise + +async def retrieve_batch_status(batch_id, apiKey): + """ + Retrieves the status of a batch job. + + Args: + batch_id: Batch job ID + apiKey: Mistral API key + + Returns: + Batch job object with current status + """ + try: + # Initialize Mistral client + client = Mistral(api_key=apiKey) + + # Get batch job status + batch_job = client.batch.jobs.get(job_id=batch_id) + print(f"Mistral batch status: {batch_job.status}") + return batch_job + except Exception as e: + print(f"Error in Mistral retrieve_batch_status: {e}") + raise + + +async def download_mistral_file(file_id, apikey): + """ + Helper function to download and parse a Mistral batch result file. + + Args: + file_id: The file ID to download + apikey: Mistral API key + + Returns: + List of parsed JSON lines, or empty list if file doesn't exist or fails to parse + """ + if not file_id: + return [] + + try: + client = Mistral(api_key=apikey) + output_file_stream = client.files.download(file_id=file_id) + file_content_bytes = output_file_stream.read() + file_content_str = file_content_bytes.decode('utf-8') + + try: + results = [json.loads(line) for line in file_content_str.splitlines() if line.strip()] + return results + except json.JSONDecodeError as e: + print(f"JSON decoding error for file {file_id}: {e}") + return [] + except Exception as e: + print(f"Error downloading file {file_id}: {e}") + return [] + + +async def handle_batch_results(batch_id, apikey): + """ + Handle Mistral batch processing - retrieve status and process results. + + Mistral batch statuses: + - QUEUED: Batch is queued for processing + - RUNNING: Batch is currently being processed + - SUCCESS: Batch completed successfully + - FAILED: Batch failed + - TIMEOUT_EXCEEDED: Batch exceeded timeout + - CANCELLATION_REQUESTED: Cancellation is in progress + - CANCELLED: Batch was cancelled + + Args: + batch_id: Batch ID + apikey: Mistral API key + + Returns: + Tuple of (results, is_completed) + - For SUCCESS: (results_list, True) + - For FAILED/TIMEOUT_EXCEEDED/CANCELLED with partial results: (results, True) + - For FAILED/TIMEOUT_EXCEEDED/CANCELLED without results: (error_info, True) + - For in-progress: (None, False) - continues polling + """ + batch_job = await retrieve_batch_status(batch_id, apikey) + status = batch_job.status + + # In-progress states - continue polling + if status in ["QUEUED", "RUNNING", "CANCELLATION_REQUESTED"]: + return None, False + + # Terminal states - try to download results + output_file_id = batch_job.output_file if hasattr(batch_job, 'output_file') else None + + # Try to get results even for failed/timeout/cancelled batches (partial results) + output_results = await download_mistral_file(output_file_id, apikey) + + if output_results: + # We have some results (partial or complete) + return output_results, True + + # No results available - return error based on status + if status == "SUCCESS": + # Success but no output - unusual case + error_info = [{ + "error": { + "message": "Batch succeeded but no result file was generated", + "type": "no_results", + "batch_status": status + }, + "status_code": 400 + }] + elif status == "FAILED": + error_info = [{ + "error": { + "message": f"Batch failed. Error: {getattr(batch_job, 'error', 'No error details available')}", + "type": "batch_failed", + "batch_status": status + }, + "status_code": 400 + }] + elif status == "TIMEOUT_EXCEEDED": + error_info = [{ + "error": { + "message": "Batch exceeded timeout and no partial results available", + "type": "batch_timeout", + "batch_status": status + }, + "status_code": 400 + }] + elif status == "CANCELLED": + error_info = [{ + "error": { + "message": "Batch was cancelled and no partial results available", + "type": "batch_cancelled", + "batch_status": status + }, + "status_code": 400 + }] + else: + # Unknown terminal status + error_info = [{ + "error": { + "message": f"Batch reached unknown terminal status: {status}", + "type": "unknown_status", + "batch_status": status + }, + "status_code": 400 + }] + + return error_info, True diff --git a/src/services/commonServices/anthrophic/antrophicCall.py b/src/services/commonServices/anthropic/anthropicCall.py similarity index 99% rename from src/services/commonServices/anthrophic/antrophicCall.py rename to src/services/commonServices/anthropic/anthropicCall.py index 1ac2dbc4..99a48e68 100644 --- a/src/services/commonServices/anthrophic/antrophicCall.py +++ b/src/services/commonServices/anthropic/anthropicCall.py @@ -7,7 +7,7 @@ from src.services.utils.ai_middleware_format import Response_formatter -class Antrophic(BaseService): +class Anthropic(BaseService): async def execute(self): historyParams = {} tools = {} diff --git a/src/services/commonServices/anthrophic/antrophicModelRun.py b/src/services/commonServices/anthropic/anthropicModelRun.py similarity index 100% rename from src/services/commonServices/anthrophic/antrophicModelRun.py rename to src/services/commonServices/anthropic/anthropicModelRun.py diff --git a/src/services/commonServices/anthropic/anthropic_batch.py b/src/services/commonServices/anthropic/anthropic_batch.py new file mode 100644 index 00000000..a0d34eb8 --- /dev/null +++ b/src/services/commonServices/anthropic/anthropic_batch.py @@ -0,0 +1,108 @@ +import pydash as _ +from ..baseService.baseService import BaseService +from ..createConversations import ConversationService +from src.configs.constant import service_name +import json +import uuid +from ...cache_service import store_in_cache +from src.configs.constant import redis_keys +from src.services.commonServices.anthropic.anthropicCall import Anthropic + + +class AnthropicBatch(BaseService): + async def batch_execute(self): + system_prompt = self.configuration.get('prompt', '') + batch_requests = [] + message_mappings = [] + + # Validate batch_variables if provided + batch_variables = self.batch_variables if hasattr(self, 'batch_variables') and self.batch_variables else None + if batch_variables is not None: + if not isinstance(batch_variables, list): + return { + "success": False, + "message": "batch_variables must be an array" + } + if len(batch_variables) != len(self.batch): + return { + "success": False, + "message": f"batch_variables array length ({len(batch_variables)}) must match batch array length ({len(self.batch)})" + } + + # Construct batch requests in Anthropic format + for idx, message in enumerate(self.batch, start=1): + # Generate a unique custom_id for each request + custom_id = str(uuid.uuid4()) + + # Construct Anthropic message format + request_params = { + "model": self.model, + "max_tokens": self.customConfig.get("max_tokens", 1024), + "messages": [ + { + "role": "user", + "content": message + } + ] + } + + # Add system prompt + request_params["system"] = system_prompt + + # Add other config from customConfig + if self.customConfig: + # Add temperature, top_p, etc. + for key in ['temperature', 'top_p', 'top_k', 'stop_sequences']: + if key in self.customConfig: + request_params[key] = self.customConfig[key] + + # Create batch request entry with custom_id and params + batch_entry = { + "custom_id": custom_id, + "params": request_params + } + batch_requests.append(batch_entry) + + # Store message mapping for response + mapping_item = { + "message": message, + "custom_id": custom_id + } + + # Add batch_variables to mapping if provided + if batch_variables is not None: + mapping_item["variables"] = batch_variables[idx - 1] + + message_mappings.append(mapping_item) + + # Create batch using Anthropic API + message_batch = await create_batch_requests(batch_requests, self.apikey, self.model) + + batch_id = message_batch.id + batch_json = { + "id": message_batch.id, + "processing_status": message_batch.processing_status, + "request_counts": { + "processing": message_batch.request_counts.processing, + "succeeded": message_batch.request_counts.succeeded, + "errored": message_batch.request_counts.errored, + "canceled": message_batch.request_counts.canceled, + "expired": message_batch.request_counts.expired + }, + "created_at": message_batch.created_at, + "expires_at": message_batch.expires_at, + "apikey": self.apikey, + "webhook": self.webhook, + "batch_variables": batch_variables, + "custom_id_mapping": {item["custom_id"]: idx for idx, item in enumerate(message_mappings)}, + "service": self.service, + "model": self.model + } + cache_key = f"{redis_keys['batch_']}{message_batch.id}" + await store_in_cache(cache_key, batch_json, ttl = 86400) + return { + "success": True, + "message": "Response will be successfully sent to the webhook within 24 hrs.", + "batch_id": batch_id, + "messages": message_mappings + } diff --git a/src/services/commonServices/anthropic/anthropic_run_batch.py b/src/services/commonServices/anthropic/anthropic_run_batch.py new file mode 100644 index 00000000..22b04d54 --- /dev/null +++ b/src/services/commonServices/anthropic/anthropic_run_batch.py @@ -0,0 +1,140 @@ +import json +import uuid +from anthropic import Anthropic + +async def create_batch_requests(batch_requests, apiKey, model): + """ + Creates and submits a message batch to Anthropic API. + + Args: + batch_requests: List of request dictionaries + apiKey: Anthropic API key + model: Model name to use + + Returns: + Message batch object from Anthropic API + """ + try: + # Initialize Anthropic client + client = Anthropic(api_key=apiKey) + + # Create message batch + message_batch = client.messages.batches.create( + requests=batch_requests + ) + + print(f"Created Anthropic batch: {message_batch.id}") + return message_batch + + except Exception as e: + print("Error in Anthropic create_batch_requests:", repr(e)) + print("Cause:", repr(getattr(e, "__cause__", None))) + raise + +async def retrieve_batch_status(batch_id, apiKey): + """ + Retrieves the status of an Anthropic message batch. + + Args: + batch_id: Message batch ID + apiKey: Anthropic API key + + Returns: + Message batch object with current status + """ + try: + # Initialize Anthropic client + client = Anthropic(api_key=apiKey) + + # Get batch status + message_batch = client.messages.batches.retrieve(batch_id) + print(f"Anthropic batch status: {message_batch.processing_status}") + return message_batch + + except Exception as e: + print(f"Error in Anthropic retrieve_batch_status: {e}") + raise + +async def retrieve_batch_results(batch_id, apiKey): + """ + Retrieves the results of a completed Anthropic message batch. + + Args: + batch_id: Message batch ID + apiKey: Anthropic API key + + Returns: + List of batch results + """ + try: + # Initialize Anthropic client + client = Anthropic(api_key=apiKey) + + # Iterate through results + results = [] + for result in client.messages.batches.results(batch_id): + results.append(result) + + print(f"Retrieved {len(results)} results from Anthropic batch {batch_id}") + return results + + except Exception as e: + print(f"Error in Anthropic retrieve_batch_results: {e}") + raise + + +async def handle_batch_results(batch_id, apikey): + """ + Handle Anthropic batch processing - retrieve status and process results. + + Anthropic batch processing statuses: + - in_progress: Batch is being processed + - canceling: Batch is being canceled + - ended: Batch has finished (may contain succeeded, errored, canceled, or expired results) + + Args: + batch_id: Batch ID + apikey: Anthropic API key + + Returns: + Tuple of (results, is_completed) + - For ended batches: (results_list, True) - includes all result types + - For in_progress/canceling: (None, False) - continues polling + """ + message_batch = await retrieve_batch_status(batch_id, apikey) + processing_status = message_batch.processing_status + + # In-progress states - continue polling + if processing_status in ["in_progress", "canceling"]: + return None, False + + # Terminal state - retrieve all results + if processing_status == "ended": + # Retrieve batch results (includes succeeded, errored, canceled, expired) + results = await retrieve_batch_results(batch_id, apikey) + + if results: + results_list = [result.model_dump() for result in results] + return results_list, True + else: + # Ended but no results - unusual case + error_info = [{ + "error": { + "message": "Batch ended but no results were returned", + "type": "no_results", + "batch_status": processing_status + }, + "status_code": 400 + }] + return error_info, True + + # Unknown status + error_info = [{ + "error": { + "message": f"Batch has unknown processing status: {processing_status}", + "type": "unknown_status", + "batch_status": processing_status + }, + "status_code": 400 + }] + return error_info, True diff --git a/src/services/commonServices/baseService/baseService.py b/src/services/commonServices/baseService/baseService.py index 9860b232..f9d33cb0 100644 --- a/src/services/commonServices/baseService/baseService.py +++ b/src/services/commonServices/baseService/baseService.py @@ -7,7 +7,7 @@ from .utils import validate_tool_call, tool_call_formatter, sendResponse, make_code_mapping_by_service, process_data_and_run_tools from src.configs.serviceKeys import ServiceKeys from ..openAI.runModel import openai_response_model, openai_completion -from ..anthrophic.antrophicModelRun import anthropic_runmodel +from ..anthropic.anthropicModelRun import anthropic_runmodel from ..Mistral.mistral_model_run import mistral_model_run from ....configs.constant import service_name from ..groq.groqModelRun import groq_runmodel diff --git a/src/services/commonServices/groq/groq_batch.py b/src/services/commonServices/groq/groq_batch.py new file mode 100644 index 00000000..292f2eb0 --- /dev/null +++ b/src/services/commonServices/groq/groq_batch.py @@ -0,0 +1,104 @@ +import pydash as _ +from ..baseService.baseService import BaseService +from ..createConversations import ConversationService +from src.configs.constant import service_name +import json +import uuid +from ...cache_service import store_in_cache +from src.configs.constant import redis_keys +from .groq_run_batch import create_batch_file, process_batch_file + + +class GroqBatch(BaseService): + async def batch_execute(self): + system_prompt = self.configuration.get('prompt', '') + results = [] + message_mappings = [] + + # Validate batch_variables if provided + batch_variables = self.batch_variables if hasattr(self, 'batch_variables') and self.batch_variables else None + if batch_variables is not None: + if not isinstance(batch_variables, list): + return { + "success": False, + "message": "batch_variables must be an array" + } + if len(batch_variables) != len(self.batch): + return { + "success": False, + "message": f"batch_variables array length ({len(batch_variables)}) must match batch array length ({len(self.batch)})" + } + + # Construct batch requests in OpenAI format (Groq is OpenAI-compatible) + for idx, message in enumerate(self.batch, start=1): + # Generate a unique ID for each request + custom_id = str(uuid.uuid4()) + + # Construct OpenAI-compatible request + request_obj = { + "custom_id": custom_id, + "method": "POST", + "url": "/v1/chat/completions", + "body": { + "model": self.model, + "messages": [] + } + } + + # Add system message + request_obj["body"]["messages"].append({ + "role": "system", + "content": system_prompt + }) + + # Add user message + request_obj["body"]["messages"].append({ + "role": "user", + "content": message + }) + + # Add other config from customConfig (like temperature, max_tokens, etc.) + if self.customConfig: + for key, value in self.customConfig.items(): + if key not in ['messages', 'prompt', 'model']: + request_obj["body"][key] = value + + # Serialize to JSON string (one line per request for JSONL) + results.append(json.dumps(request_obj)) + + # Store message mapping for response + mapping_item = { + "message": message, + "custom_id": custom_id + } + + # Add batch_variables to mapping if provided (idx-1 because enumerate starts at 1) + if batch_variables is not None: + mapping_item["variables"] = batch_variables[idx - 1] + + message_mappings.append(mapping_item) + + # Upload batch file and create batch job using Groq's native library + batch_input_file = await create_batch_file(results, self.apikey) + batch_file = await process_batch_file(batch_input_file, self.apikey) + + batch_id = batch_file.id + batch_json = { + "id": batch_file.id, + "status": batch_file.status, + "created_at": batch_file.created_at, + "model": self.model, + "apikey": self.apikey, + "webhook": self.webhook, + "batch_variables": batch_variables, + "custom_id_mapping": {item["custom_id"]: idx for idx, item in enumerate(message_mappings)}, + "service": self.service + } + cache_key = f"{redis_keys['batch_']}{batch_file.id}" + await store_in_cache(cache_key, batch_json, ttl = 86400) + return { + "success": True, + "message": "Response will be successfully sent to the webhook within 24 hrs.", + "batch_id": batch_id, + "messages": message_mappings + } diff --git a/src/services/commonServices/groq/groq_run_batch.py b/src/services/commonServices/groq/groq_run_batch.py new file mode 100644 index 00000000..6ec06a4d --- /dev/null +++ b/src/services/commonServices/groq/groq_run_batch.py @@ -0,0 +1,239 @@ +import json +import io +from groq import AsyncGroq +import httpx +import certifi +import asyncio + + +async def create_batch_file(data, apiKey): + """ + Creates a JSONL file and uploads it to Groq's Files API. + + Args: + data: List of JSON strings (JSONL entries) + apiKey: Groq API key + + Returns: + Uploaded file object from Groq Files API + """ + try: + # Initialize Groq client + groq_client = AsyncGroq(api_key=apiKey) + + # Create JSONL file content + file_content = "\n".join(data) + filelike_obj = io.BytesIO(file_content.encode("utf-8")) + filelike_obj.name = "batch.jsonl" + filelike_obj.seek(0) + + # Upload the JSONL file to Groq Files API + batch_input_file = await groq_client.files.create( + file=filelike_obj, + purpose="batch" + ) + + print(f"Created Groq batch file: {batch_input_file.id}") + return batch_input_file + + except Exception as e: + print("Error in Groq create_batch_file:", repr(e)) + print("Cause:", repr(getattr(e, "__cause__", None))) + raise + + +async def process_batch_file(batch_input_file, apiKey): + """ + Creates a batch job using the uploaded file. + + Args: + batch_input_file: File object from create_batch_file + apiKey: Groq API key + + Returns: + Batch job object + """ + try: + # Initialize Groq client + groq_client = AsyncGroq(api_key=apiKey) + + batch_input_file_id = batch_input_file.id + + # Create batch job with the uploaded file + result = await groq_client.batches.create( + input_file_id=batch_input_file_id, + endpoint="/v1/chat/completions", + completion_window="24h" + ) + + print(f"Created Groq batch: {result.id}") + return result + + except Exception as e: + print(f"Error in Groq process_batch_file: {e}") + raise + + +async def retrieve_batch_status(batch_id, apiKey): + """ + Retrieves the status of a batch job. + + Args: + batch_id: Batch job ID + apiKey: Groq API key + + Returns: + Batch job object with current status + """ + try: + # Initialize Groq client + groq_client = AsyncGroq(api_key=apiKey) + + # Get batch status + batch = await groq_client.batches.retrieve(batch_id) + print(f"Groq batch status: {batch.status}") + return batch + + except Exception as e: + print(f"Error in Groq retrieve_batch_status: {e}") + raise + + +async def download_batch_file(file_id, apikey): + """ + Helper function to download and parse a Groq batch result file. + + Args: + file_id: The file ID to download + apikey: Groq API key + + Returns: + List of parsed JSON lines, or empty list if file doesn't exist or fails to parse + """ + if not file_id: + return [] + + limits = httpx.Limits( + max_keepalive_connections=5, + max_connections=10, + keepalive_expiry=30.0 + ) + + http_client = httpx.AsyncClient( + timeout=httpx.Timeout(60.0, connect=10.0), + transport=httpx.AsyncHTTPTransport( + retries=3, + verify=certifi.where() + ), + limits=limits, + follow_redirects=True + ) + + try: + groq_client = AsyncGroq(api_key=apikey, http_client=http_client) + + file_response = await groq_client.files.content(file_id) + file_content = await asyncio.to_thread(file_response.read) + + try: + results = [json.loads(line) for line in file_content.decode('utf-8').splitlines() if line.strip()] + return results + except json.JSONDecodeError as e: + print(f"JSON decoding error for file {file_id}: {e}") + return [] + except Exception as e: + print(f"Error downloading file {file_id}: {e}") + return [] + finally: + await http_client.aclose() + + +async def handle_batch_results(batch_id, apikey): + """ + Handle Groq batch processing - retrieve status and process results. + + Args: + batch_id: Batch ID + apikey: Groq API key + + Returns: + Tuple of (results, is_completed) + - For completed batches: (results_list, True) + - For failed/expired/cancelled with partial results: (combined_results, True) + - For failed/expired/cancelled without results: (error_info, True) + - For in-progress: (None, False) - continues polling + """ + batch = await retrieve_batch_status(batch_id, apikey) + status = batch.status + + # In-progress states - continue polling + if status in ['validating', 'in_progress', 'finalizing', 'cancelling']: + return None, False + + # Terminal states - download results (both success and error files) + output_file_id = batch.output_file_id + error_file_id = batch.error_file_id + + # Download both output and error files in parallel + output_results, error_results = await asyncio.gather( + download_batch_file(output_file_id, apikey), + download_batch_file(error_file_id, apikey) + ) + + # Combine results + all_results = output_results + error_results + + if all_results: + # We have some results (partial or complete) + return all_results, True + + # No results available - return error based on status + if status == 'completed': + # Completed but no files - unusual case + error_info = [{ + "error": { + "message": "Batch completed but no result files were generated", + "type": "no_results", + "batch_status": status + }, + "status_code": 400 + }] + elif status == 'failed': + error_info = [{ + "error": { + "message": f"Batch failed validation or processing. Errors: {getattr(batch, 'errors', 'No error details available')}", + "type": "batch_failed", + "batch_status": status + }, + "status_code": 400 + }] + elif status == 'expired': + error_info = [{ + "error": { + "message": "Batch expired - not completed within processing window and no partial results available", + "type": "batch_expired", + "batch_status": status + }, + "status_code": 400 + }] + elif status == 'cancelled': + error_info = [{ + "error": { + "message": "Batch was cancelled and no partial results available", + "type": "batch_cancelled", + "batch_status": status + }, + "status_code": 400 + }] + else: + # Unknown terminal status + error_info = [{ + "error": { + "message": f"Batch reached unknown terminal status: {status}", + "type": "unknown_status", + "batch_status": status + }, + "status_code": 400 + }] + + return error_info, True diff --git a/src/services/commonServices/openAI/openai_batch.py b/src/services/commonServices/openAI/openai_batch.py index d4da14ee..f835c595 100644 --- a/src/services/commonServices/openAI/openai_batch.py +++ b/src/services/commonServices/openAI/openai_batch.py @@ -91,7 +91,8 @@ async def batch_execute(self): "apikey": self.apikey, "webhook" : self.webhook, "batch_variables": batch_variables, - "custom_id_mapping": {item["custom_id"]: idx for idx, item in enumerate(message_mappings)} + "custom_id_mapping": {item["custom_id"]: idx for idx, item in enumerate(message_mappings)}, + "service": self.service } cache_key = f"{redis_keys['batch_']}{batch_file.id}" await store_in_cache(cache_key, batch_json, ttl = 86400) diff --git a/src/services/commonServices/openAI/openai_run_batch.py b/src/services/commonServices/openAI/openai_run_batch.py index 205dbc8c..1404af54 100644 --- a/src/services/commonServices/openAI/openai_run_batch.py +++ b/src/services/commonServices/openAI/openai_run_batch.py @@ -23,12 +23,13 @@ async def create_batch_file(data, apiKey): timeout=timeout, limits=limits, follow_redirects=True, - verify=certifi.where(), # ✅ put verify here - transport=httpx.AsyncHTTPTransport(retries=3), # ✅ retries here only + verify=certifi.where(), + transport=httpx.AsyncHTTPTransport(retries=3), ) try: openAI = AsyncOpenAI(api_key=apiKey, http_client=http_client) + batch_input_file = await openAI.files.create( file=filelike_obj, purpose="batch" @@ -37,8 +38,7 @@ async def create_batch_file(data, apiKey): finally: await http_client.aclose() except Exception as e: - # More useful debug than just {e} - print("Error in create_batch_file:", repr(e)) + print("Error in OpenAI create_batch_file:", repr(e)) print("Cause:", repr(getattr(e, "__cause__", None))) raise @@ -65,6 +65,7 @@ async def process_batch_file(batch_input_file, apiKey): try: openAI = AsyncOpenAI(api_key=apiKey, http_client=http_client) + result = await openAI.batches.create( input_file_id=batch_input_file_id, endpoint="/v1/chat/completions", @@ -75,7 +76,7 @@ async def process_batch_file(batch_input_file, apiKey): finally: await http_client.aclose() except Exception as e: - print(f"Error in process_batch_file: {e}") + print(f"Error in OpenAI process_batch_file: {e}") raise @@ -106,5 +107,148 @@ async def retrieve_batch_status(batch_id, apiKey): finally: await http_client.aclose() except Exception as e: - print(f"Error in retrieve_batch_status: {e}") + print(f"Error in OpenAI retrieve_batch_status: {e}") raise + + +async def download_batch_file(file_id, apikey): + """ + Helper function to download and parse a batch result file. + + Args: + file_id: The file ID to download + apikey: OpenAI API key + + Returns: + List of parsed JSON lines, or empty list if file doesn't exist or fails to parse + """ + if not file_id: + return [] + + limits = httpx.Limits( + max_keepalive_connections=5, + max_connections=10, + keepalive_expiry=30.0 + ) + + http_client = httpx.AsyncClient( + timeout=httpx.Timeout(60.0, connect=10.0), + transport=httpx.AsyncHTTPTransport( + retries=3, + verify=certifi.where() + ), + limits=limits, + follow_redirects=True + ) + + try: + import asyncio + openAI = AsyncOpenAI(api_key=apikey, http_client=http_client) + + file_response = await openAI.files.content(file_id) + file_content = await asyncio.to_thread(file_response.read) + + try: + results = [json.loads(line) for line in file_content.decode('utf-8').splitlines() if line.strip()] + return results + except json.JSONDecodeError as e: + print(f"JSON decoding error for file {file_id}: {e}") + return [] + except Exception as e: + print(f"Error downloading file {file_id}: {e}") + return [] + finally: + await http_client.aclose() + + +async def handle_batch_results(batch_id, apikey): + """ + Handle OpenAI batch processing - retrieve status and process results. + + Args: + batch_id: Batch ID + apikey: OpenAI API key + + Returns: + Tuple of (results, is_completed) + - For completed batches: (results_list, True) + - For failed/expired/cancelled with partial results: (combined_results, True) + - For failed/expired/cancelled without results: (error_info, True) + - For in-progress: (None, False) - continues polling + """ + batch = await retrieve_batch_status(batch_id, apikey) + status = batch.status + + # In-progress states - continue polling + if status in ['validating', 'in_progress', 'finalizing', 'cancelling']: + return None, False + + # Terminal states - download results (both success and error files) + output_file_id = batch.output_file_id + error_file_id = batch.error_file_id + + # Download both output and error files in parallel + import asyncio + output_results, error_results = await asyncio.gather( + download_batch_file(output_file_id, apikey), + download_batch_file(error_file_id, apikey) + ) + + # Combine results + all_results = output_results + error_results + + if all_results: + # We have some results (partial or complete) + return all_results, True + + # No results available - return error based on status + if status == 'completed': + # Completed but no files - unusual case + error_info = [{ + "error": { + "message": "Batch completed but no result files were generated", + "type": "no_results", + "batch_status": status + }, + "status_code": 400 + }] + elif status == 'failed': + error_info = [{ + "error": { + "message": f"Batch failed validation or processing. Errors: {getattr(batch, 'errors', 'No error details available')}", + "type": "batch_failed", + "batch_status": status + }, + "status_code": 400 + }] + elif status == 'expired': + error_info = [{ + "error": { + "message": "Batch expired - not completed within 24-hour window and no partial results available", + "type": "batch_expired", + "batch_status": status + }, + "status_code": 400 + }] + elif status == 'cancelled': + error_info = [{ + "error": { + "message": "Batch was cancelled and no partial results available", + "type": "batch_cancelled", + "batch_status": status + }, + "status_code": 400 + }] + else: + # Unknown terminal status + error_info = [{ + "error": { + "message": f"Batch reached unknown terminal status: {status}", + "type": "unknown_status", + "batch_status": status + }, + "status_code": 400 + }] + + return error_info, True + diff --git a/src/services/utils/ai_middleware_format.py b/src/services/utils/ai_middleware_format.py index 00799485..d7f9bcb4 100644 --- a/src/services/utils/ai_middleware_format.py +++ b/src/services/utils/ai_middleware_format.py @@ -3,7 +3,7 @@ from src.services.utils.apiservice import fetch from src.configs.constant import service_name -async def Response_formatter(response = {}, service = None, tools={}, type='chat', images = None): +async def Response_formatter(response = {}, service = None, tools={}, type='chat', images = None, isBatch = False): tools_data = tools if isinstance(tools_data, dict): for key, value in tools_data.items(): @@ -12,7 +12,8 @@ async def Response_formatter(response = {}, service = None, tools={}, type='chat tools_data[key] = json.loads(value) except json.JSONDecodeError: pass - if service == 'openai_batch': + if (service == 'openai' or service == 'groq' or service == 'mistral') and isBatch: + # Batch responses for OpenAI-compatible services (OpenAI, Groq, Mistral) return { "data" : { "id" : response.get("id", None), @@ -34,6 +35,60 @@ async def Response_formatter(response = {}, service = None, tools={}, type='chat } } + elif service == 'gemini' and isBatch: + # Gemini batch responses have a different structure + candidates = response.get("candidates", [{}]) + content_parts = candidates[0].get("content", {}).get("parts", [{}]) if candidates else [{}] + + return { + "data" : { + "id" : response.get("key", None), # Use the key from batch response as ID + "content" : content_parts[0].get("text", None) if content_parts else None, + "model" : response.get("modelVersion", None), + "role" : candidates[0].get("content", {}).get("role", "model") if candidates else "model", + "tools_data": tools_data or {}, + "images" : images, + "annotations" : None, + "fallback" : response.get('fallback') or False, + "firstAttemptError" : response.get('firstAttemptError') or '', + "finish_reason" : finish_reason_mapping(candidates[0].get("finishReason", "") if candidates else "") + }, + "usage" : { + "input_tokens" : response.get("usageMetadata", {}).get("promptTokenCount", None), + "output_tokens" : response.get("usageMetadata", {}).get("candidatesTokenCount", None), + "total_tokens" : response.get("usageMetadata", {}).get("totalTokenCount", None), + "cached_tokens" : response.get("usageMetadata", {}).get("cachedContentTokenCount", None) + } + } + elif service == 'anthropic' and isBatch: + # Anthropic batch responses follow standard Anthropic message format + content_blocks = response.get("content", []) + text_content = next((block.get("text") for block in content_blocks if block.get("type") == "text"), None) + + return { + "data" : { + "id" : response.get("id", None), + "content" : text_content, + "model" : response.get("model", None), + "role" : response.get("role", "assistant"), + "tools_data": tools_data or {}, + "images" : images, + "annotations" : None, + "fallback" : response.get('fallback') or False, + "firstAttemptError" : response.get('firstAttemptError') or '', + "finish_reason" : finish_reason_mapping(response.get("stop_reason", "")) + }, + "usage" : { + "input_tokens" : response.get("usage", {}).get("input_tokens", None), + "output_tokens" : response.get("usage", {}).get("output_tokens", None), + "total_tokens" : ( + response.get("usage", {}).get("input_tokens", 0) + + response.get("usage", {}).get("output_tokens", 0) + ), + "cache_read_input_tokens" : response.get("usage", {}).get("cache_read_input_tokens", None), + "cache_creation_input_tokens" : response.get("usage", {}).get("cache_creation_input_tokens", None) + } + } elif service == service_name['openai'] and (type != 'image' and type != 'embedding'): return { "data": { @@ -348,15 +403,103 @@ def finish_reason_mapping(finish_reason): } return finish_reason_mapping.get(finish_reason, "other") -async def Batch_Response_formatter(response = {}, service = None, tools={}, type='chat', images = None, batch_id = None, custom_id = None): +async def Batch_Response_formatter(response = {}, service = None, tools={}, type='chat', images = None, batch_id = None, custom_id = None, isBatch = True): """ Formatter specifically for batch responses that includes batch_id and custom_id for easy mapping + + Args: + isBatch: Boolean flag to indicate this is a batch response (default: True) """ - # Get the base formatted response - formatted_response = await Response_formatter(response=response, service=service, tools=tools, type=type, images=images) + # Get the base formatted response with isBatch flag + formatted_response = await Response_formatter(response=response, service=service, tools=tools, type=type, images=images, isBatch=isBatch) print(formatted_response) # Add batch_id and custom_id to the response for mapping formatted_response["batch_id"] = batch_id formatted_response["custom_id"] = custom_id + formatted_response["isBatch"] = isBatch + + return formatted_response + +async def process_batch_results(results, service, batch_id, batch_variables, custom_id_mapping): + """ + Common function to process batch results for all services. + + Args: + results: List of result items + service: Service name (e.g., 'gemini', 'anthropic') + batch_id: Batch ID + batch_variables: Optional batch variables + custom_id_mapping: Mapping of custom_id to index + + Returns: + List of formatted results + """ + formatted_results = [] + + for index, result_item in enumerate(results): + # Check if this is a pre-formatted error from terminal batch failure (failed, expired, cancelled) + # These come directly from the batch handler and are already formatted + if "error" in result_item and "status_code" in result_item and "custom_id" not in result_item: + # This is a terminal batch error (not an individual request error) + # Pass it through as-is, just add batch_id + result_item["batch_id"] = batch_id + formatted_results.append(result_item) + continue + + # Extract custom_id and result data (format varies by service) + if service == 'gemini': + custom_id = result_item.get("key", None) + result_data = result_item.get("response", {}) + result_data = custom_id + elif service == 'anthropic': + custom_id = result_item.get("custom_id", None) + result_data = result_item.get("result", {}) + if result_data.get("type") != "error": + result_data = result_data.get("message", {}) + elif service in ['openai', 'groq']: + custom_id = result_item.get("custom_id", None) + response = result_item.get("response", {}) + result_data = response.get("body", {}) + status_code = response.get("status_code", 200) + elif service == 'mistral': + custom_id = result_item.get("custom_id", None) + result_data = result_item.get("response", {}) + + # Check for errors + has_error = False + if service in {'openai', 'groq'}: + has_error = status_code >= 400 or "error" in result_data + elif service == 'anthropic': + has_error = result_data.get("type") == "error" + else: + has_error = result_data.get("error") + + if has_error: + formatted_content = { + "custom_id": custom_id, + "batch_id": batch_id, + "error": result_data.get("error", result_data), + "status_code": status_code if service in ['openai', 'groq'] else 400 + } + else: + # Format successful response + formatted_content = await Batch_Response_formatter( + response=result_data, + service=service, + tools={}, + type='chat', + images=None, + batch_id=batch_id, + custom_id=custom_id, + isBatch=True + ) + + # Add batch_variables to response if available + if batch_variables is not None and custom_id in custom_id_mapping: + variable_index = custom_id_mapping[custom_id] + if variable_index < len(batch_variables): + formatted_content["variables"] = batch_variables[variable_index] + + formatted_results.append(formatted_content) - return formatted_response \ No newline at end of file + return formatted_results diff --git a/src/services/utils/batch_script.py b/src/services/utils/batch_script.py index 40b70f36..5af62594 100644 --- a/src/services/utils/batch_script.py +++ b/src/services/utils/batch_script.py @@ -1,108 +1,92 @@ -from ..cache_service import find_in_cache_with_prefix, delete_in_cache -from openai import AsyncOpenAI +from ..cache_service import find_in_cache_with_prefix, delete_in_cache, acquire_lock, release_lock from ..utils.send_error_webhook import create_response_format from ..commonServices.baseService.baseService import sendResponse import asyncio -import json -import httpx -import certifi -from .ai_middleware_format import Batch_Response_formatter +from .ai_middleware_format import process_batch_results from src.configs.constant import redis_keys +from .batch_script_utils import get_batch_result_handler from globals import * + async def repeat_function(): while True: await check_batch_status() await asyncio.sleep(900) - async def check_batch_status(): try: print("Batch Script running...") batch_ids = await find_in_cache_with_prefix('batch_') if batch_ids is None: return - for id in batch_ids: - apikey = id.get('apikey') - webhook = id.get('webhook') - batch_id = id.get('id') - batch_variables = id.get('batch_variables') # Retrieve batch_variables from cache - custom_id_mapping = id.get('custom_id_mapping', {}) # Get mapping of custom_id to index - - if webhook.get('url') is not None: - response_format = create_response_format(webhook.get('url'), webhook.get('headers')) - - # Create httpx client with proper production configuration - limits = httpx.Limits( - max_keepalive_connections=5, - max_connections=10, - keepalive_expiry=30.0 - ) + + for batch_data in batch_ids: + apikey = batch_data.get('apikey') + webhook = batch_data.get('webhook') + batch_id = batch_data.get('id') + batch_variables = batch_data.get('batch_variables') + custom_id_mapping = batch_data.get('custom_id_mapping', {}) + service = batch_data.get('service') - http_client = httpx.AsyncClient( - timeout=httpx.Timeout(60.0, connect=10.0), - transport=httpx.AsyncHTTPTransport( - retries=3, - verify=certifi.where() - ), - limits=limits, - follow_redirects=True - ) + # Try to acquire lock for this batch + lock_acquired = await acquire_lock(batch_id) + if not lock_acquired: + logger.info(f"Batch {batch_id} is already being processed by another server, skipping...") + continue try: - openAI = AsyncOpenAI(api_key=apikey, http_client=http_client) - batch = await openAI.batches.retrieve(batch_id) - if batch.status == "completed": - file = batch.output_file_id or batch.error_file_id - file_response = None - if file is not None: - file_response = await openAI.files.content(file) - file_content = await asyncio.to_thread(file_response.read) - try: - # Split the data by newline and parse each JSON object separately - file_content = [json.loads(line) for line in file_content.decode('utf-8').splitlines() if line.strip()] - except json.JSONDecodeError as e: - print(f"JSON decoding error: {e}") - file_content = None - for index, content in enumerate(file_content): - response = content.get("response", {}) - response_body = response.get("body", {}) - status_code = response.get("status_code", 200) - custom_id = content.get("custom_id", None) + if webhook.get('url') is not None: + response_format = create_response_format(webhook.get('url'), webhook.get('headers')) + + try: + # Get the appropriate handler for this service + batch_result_handler = get_batch_result_handler(service) + + # Call the service-specific handler + results, is_completed = await batch_result_handler(batch_id, apikey) + + if is_completed: + # Batch has reached a terminal state (completed, failed, expired, cancelled) + if results: + # Process and format the results (could be success or error results) + formatted_results = await process_batch_results( + results, service, batch_id, batch_variables, custom_id_mapping + ) - # Check if response contains an error (status_code >= 400 or error in body) - if status_code >= 400 or "error" in response_body: - # Handle error response - formatted_content = { - "custom_id": custom_id, - "batch_id": batch_id, - "error": response_body.get("error", response_body), - "status_code": status_code - } - else: - # Handle successful response - formatted_content = await Batch_Response_formatter(response=response_body, service='openai_batch', tools={}, type='chat', images=None, batch_id=batch_id, custom_id=custom_id) + # Check if all responses are errors + has_success = any( + item.get("status_code") is None or item.get("status_code", 200) < 400 + for item in formatted_results + ) - # Add batch_variables to response if available - if batch_variables is not None and custom_id in custom_id_mapping: - variable_index = custom_id_mapping[custom_id] - if variable_index < len(batch_variables): - formatted_content["variables"] = batch_variables[variable_index] - - file_content[index] = formatted_content + await sendResponse(response_format, data=formatted_results, success=has_success) + else: + # No results but marked as completed - send generic error + error_response = [{ + "batch_id": batch_id, + "error": { + "message": "Batch completed but no results were returned", + "type": "no_results" + }, + "status_code": 500 + }] + await sendResponse(response_format, data=error_response, success=False) - # Check if all responses are errors - has_success = any(item.get("status_code") is None or item.get("status_code", 200) < 400 for item in file_content) + # Delete from cache after sending webhook + cache_key = f"{redis_keys['batch_']}{batch_id}" + await delete_in_cache(cache_key) + logger.info(f"Batch {batch_id} completed and removed from cache") + else: + # Batch still in progress, will check again on next poll + logger.info(f"Batch {batch_id} still in progress") - await sendResponse(response_format, data=file_content, success=has_success) - cache_key = f"{redis_keys['batch_']}{batch_id}" - await delete_in_cache(cache_key) + except Exception as error: + logger.error(f"Error processing batch {batch_id}: {str(error)}") finally: - # Ensure http_client is properly closed - await http_client.aclose() + # Always release the lock, even if an error occurred + await release_lock(batch_id) + + except Exception as error: logger.error(f"An error occurred while checking the batch status: {str(error)}") - - - \ No newline at end of file diff --git a/src/services/utils/batch_script_utils.py b/src/services/utils/batch_script_utils.py new file mode 100644 index 00000000..4c1cbeb1 --- /dev/null +++ b/src/services/utils/batch_script_utils.py @@ -0,0 +1,19 @@ +# Import service-specific batch handlers +from ..commonServices.openAI.openai_run_batch import handle_batch_results as openai_handle_batch +from ..commonServices.Mistral.mistral_run_batch import handle_batch_results as mistral_handle_batch +from ..commonServices.Google.gemini_run_batch import handle_batch_results as gemini_handle_batch +from ..commonServices.anthropic.anthropic_run_batch import handle_batch_results as anthropic_handle_batch +from ..commonServices.groq.groq_run_batch import handle_batch_results as groq_handle_batch + + + +BATCH_RESULT_HANDLERS = { + 'gemini': gemini_handle_batch, + 'anthropic': anthropic_handle_batch, + 'openai': openai_handle_batch, + 'groq': groq_handle_batch, + 'mistral': mistral_handle_batch, +} + +def get_batch_result_handler(service): + return BATCH_RESULT_HANDLERS.get(service) \ No newline at end of file diff --git a/src/services/utils/helper.py b/src/services/utils/helper.py index 31aa7bc6..1b2d7375 100644 --- a/src/services/utils/helper.py +++ b/src/services/utils/helper.py @@ -11,10 +11,14 @@ from src.configs.model_configuration import model_config_document import jwt from ..commonServices.openAI.openai_batch import OpenaiBatch +from ..commonServices.Google.gemini_batch import GeminiBatch +from ..commonServices.anthropic.anthropic_batch import AnthropicBatch +from ..commonServices.groq.groq_batch import GroqBatch +from ..commonServices.Mistral.mistral_batch import MistralBatch from ..commonServices.openAI.openai_response import OpenaiResponse from ..commonServices.groq.groqCall import Groq from ..commonServices.grok.grokCall import Grok -from ..commonServices.anthrophic.antrophicCall import Antrophic +from ..commonServices.anthropic.anthropicCall import Anthropic from ..commonServices.openRouter.openRouter_call import OpenRouter from ..commonServices.Mistral.mistral_call import Mistral from ...configs.constant import service_name @@ -230,7 +234,7 @@ async def create_service_handler(params, service): elif service == service_name['gemini']: class_obj = GeminiHandler(params) elif service == service_name['anthropic']: - class_obj = Antrophic(params) + class_obj = Anthropic(params) elif service == service_name['groq']: class_obj = Groq(params) elif service == service_name['grok']: @@ -303,12 +307,14 @@ async def create_service_handler_for_batch(params, service): class_obj = None if service == service_name['openai']: class_obj = OpenaiBatch(params) - # elif service == service_name['gemini']: - # class_obj = GeminiHandler(params) - # elif service == service_name['anthropic']: - # class_obj = Antrophic(params) - # elif service == service_name['groq']: - # class_obj = Groq(params) + elif service == service_name['gemini']: + class_obj = GeminiBatch(params) + elif service == service_name['anthropic']: + class_obj = AnthropicBatch(params) + elif service == service_name['groq']: + class_obj = GroqBatch(params) + elif service == service_name['mistral']: + class_obj = MistralBatch(params) else: raise ValueError(f"Unsupported batch service: {service}") From b79159350f457ab7f589925cc9edc19c94808310 Mon Sep 17 00:00:00 2001 From: Husain Baghwala Date: Wed, 31 Dec 2025 15:48:07 +0530 Subject: [PATCH 05/16] refactor: update conversation saving to Hippocampus API with JSON format and increase chunk size --- src/services/utils/hippocampus_utils.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/services/utils/hippocampus_utils.py b/src/services/utils/hippocampus_utils.py index e83fb084..ce0747af 100644 --- a/src/services/utils/hippocampus_utils.py +++ b/src/services/utils/hippocampus_utils.py @@ -1,3 +1,4 @@ +import json from config import Config from src.services.utils.logger import logger from src.services.utils.apiservice import fetch @@ -18,8 +19,12 @@ async def save_conversation_to_hippocampus(user_message, assistant_message, brid logger.warning("Hippocampus API key or collection ID not configured") return - # Combine user and assistant messages as content - content = f"User: {user_message}\nAssistant: {assistant_message}" + # Create content as stringified JSON with question and answer + content_obj = { + "question": user_message, + "answer": assistant_message + } + content = json.dumps(content_obj) # Use bridge_name if available, otherwise fallback to bridge_id title = bridge_name if bridge_name else bridge_id @@ -32,7 +37,7 @@ async def save_conversation_to_hippocampus(user_message, assistant_message, brid "settings": { "strategy": "custom", "chunkingUrl": "https://flow.sokt.io/func/scriQywSNndU", - "chunkSize": 1000 + "chunkSize": 4000 } } From e05011cf64c65acb6d05a75c40d29e2895c4004e Mon Sep 17 00:00:00 2001 From: Husain Baghwala Date: Wed, 31 Dec 2025 17:10:59 +0530 Subject: [PATCH 06/16] feat: Introduce `chatbot_auto_answers` configuration to conditionally save conversations to Hippocampus using `agent_id`. --- .../commonServices/baseService/utils.py | 3 ++- .../queueService/queueLogService.py | 4 ++-- src/services/utils/common_utils.py | 1 + src/services/utils/getConfiguration.py | 3 ++- src/services/utils/hippocampus_utils.py | 20 +++++++++++-------- 5 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/services/commonServices/baseService/utils.py b/src/services/commonServices/baseService/utils.py index 3e167795..199815d0 100644 --- a/src/services/commonServices/baseService/utils.py +++ b/src/services/commonServices/baseService/utils.py @@ -466,7 +466,8 @@ async def make_request_data_and_publish_sub_queue(parsed_data, result, params, t "user_message" : user_message, "assistant_message" : assistant_message, "bridge_id" : parsed_data.get('bridge_id'), - "bridge_name" : parsed_data.get('name', '') + "bridge_name" : parsed_data.get('name', ''), + "chatbot_auto_answers": parsed_data.get('chatbot_auto_answers') }, "type" : parsed_data.get('type'), "save_files_to_redis" : { diff --git a/src/services/commonServices/queueService/queueLogService.py b/src/services/commonServices/queueService/queueLogService.py index c1b76f25..b74ac9a7 100644 --- a/src/services/commonServices/queueService/queueLogService.py +++ b/src/services/commonServices/queueService/queueLogService.py @@ -37,11 +37,11 @@ async def process_messages(self, messages): # Save conversation to Hippocampus for chatbot bridge types hippocampus_data = messages.get('save_to_hippocampus', {}) - if hippocampus_data.get('bridgeType'): + if hippocampus_data.get('bridgeType') and hippocampus_data.get('chatbot_auto_answers'): await save_conversation_to_hippocampus( user_message=hippocampus_data.get('user_message', ''), assistant_message=hippocampus_data.get('assistant_message', ''), - bridge_id=hippocampus_data.get('bridge_id', ''), + agent_id=hippocampus_data.get('bridge_id', ''), bridge_name=hippocampus_data.get('bridge_name', '') ) diff --git a/src/services/utils/common_utils.py b/src/services/utils/common_utils.py index f7925d79..09942eee 100644 --- a/src/services/utils/common_utils.py +++ b/src/services/utils/common_utils.py @@ -192,6 +192,7 @@ def parse_request_body(request_body): "transfer_request_id": body.get('transfer_request_id'), "orchestrator_flag": body.get('orchestrator_flag'), "batch_variables": body.get('batch_variables'), + "chatbot_auto_answers": body.get('chatbot_auto_answers') } diff --git a/src/services/utils/getConfiguration.py b/src/services/utils/getConfiguration.py index b0fb754d..40f638e2 100644 --- a/src/services/utils/getConfiguration.py +++ b/src/services/utils/getConfiguration.py @@ -173,7 +173,8 @@ async def _prepare_configuration_response(configuration, service, bridge_id, api "is_embed": result.get('bridges', {}).get("folder_type") == 'embed', "user_id": result.get("bridges", {}).get("user_id"), 'folder_id': result.get('bridges', {}).get('folder_id'), - 'web_search_filters': web_search_filters_value + 'web_search_filters': web_search_filters_value, + 'chatbot_auto_answers': result.get('bridges', {}).get('chatbot_auto_answers') } return None, base_config, result, resolved_bridge_id diff --git a/src/services/utils/hippocampus_utils.py b/src/services/utils/hippocampus_utils.py index ce0747af..94f132c0 100644 --- a/src/services/utils/hippocampus_utils.py +++ b/src/services/utils/hippocampus_utils.py @@ -4,14 +4,14 @@ from src.services.utils.apiservice import fetch -async def save_conversation_to_hippocampus(user_message, assistant_message, bridge_id, bridge_name=''): +async def save_conversation_to_hippocampus(user_message, assistant_message, agent_id, bridge_name=''): """ Save conversation to Hippocampus API for chatbot bridge types. Args: user_message: The user's message content assistant_message: The assistant's response content - bridge_id: The bridge/agent ID (used as ownerId) + agent_id: The bridge/agent ID (used as ownerId) bridge_name: The bridge/agent name (used as title) """ try: @@ -24,16 +24,19 @@ async def save_conversation_to_hippocampus(user_message, assistant_message, brid "question": user_message, "answer": assistant_message } - content = json.dumps(content_obj) + # content = json.dumps(content_obj) - # Use bridge_name if available, otherwise fallback to bridge_id - title = bridge_name if bridge_name else bridge_id + # Use bridge_name if available, otherwise fallback to agent_id + title = bridge_name if bridge_name else agent_id payload = { "collectionId": Config.HIPPOCAMPUS_COLLECTION_ID, "title": title, - "ownerId": bridge_id, - "content": content, + "ownerId": agent_id, + "content": content_obj, + + + "settings": { "strategy": "custom", "chunkingUrl": "https://flow.sokt.io/func/scriQywSNndU", @@ -46,6 +49,7 @@ async def save_conversation_to_hippocampus(user_message, assistant_message, brid "Content-Type": "application/json" } + response_data, response_headers = await fetch( url=Config.HIPPOCAMPUS_API_URL, method="POST", @@ -53,7 +57,7 @@ async def save_conversation_to_hippocampus(user_message, assistant_message, brid json_body=payload ) - logger.info(f"Successfully saved conversation to Hippocampus for bridge_id: {bridge_id}") + logger.info(f"Successfully saved conversation to Hippocampus for agent_id: {agent_id}") except Exception as e: logger.error(f"Error saving conversation to Hippocampus: {str(e)}") From a0e4316d2f766f20847216f3700202de7233f739 Mon Sep 17 00:00:00 2001 From: Husain Baghwala Date: Wed, 31 Dec 2025 17:40:40 +0530 Subject: [PATCH 07/16] fix: Validate `HIPPOCAMPUS_API_URL` configuration and send stringified JSON content to Hippocampus. --- src/services/utils/hippocampus_utils.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/services/utils/hippocampus_utils.py b/src/services/utils/hippocampus_utils.py index 94f132c0..220db856 100644 --- a/src/services/utils/hippocampus_utils.py +++ b/src/services/utils/hippocampus_utils.py @@ -15,8 +15,8 @@ async def save_conversation_to_hippocampus(user_message, assistant_message, agen bridge_name: The bridge/agent name (used as title) """ try: - if not Config.HIPPOCAMPUS_API_KEY or not Config.HIPPOCAMPUS_COLLECTION_ID: - logger.warning("Hippocampus API key or collection ID not configured") + if not Config.HIPPOCAMPUS_API_KEY or not Config.HIPPOCAMPUS_COLLECTION_ID or not Config.HIPPOCAMPUS_API_URL: + logger.warning("Hippocampus API key, collection ID, or API URL not configured") return # Create content as stringified JSON with question and answer @@ -24,7 +24,7 @@ async def save_conversation_to_hippocampus(user_message, assistant_message, agen "question": user_message, "answer": assistant_message } - # content = json.dumps(content_obj) + content = json.dumps(content_obj) # Use bridge_name if available, otherwise fallback to agent_id title = bridge_name if bridge_name else agent_id @@ -33,7 +33,7 @@ async def save_conversation_to_hippocampus(user_message, assistant_message, agen "collectionId": Config.HIPPOCAMPUS_COLLECTION_ID, "title": title, "ownerId": agent_id, - "content": content_obj, + "content": content, From 8ad71b8ad3c906f633a34013bac8a21dbaf5ff32 Mon Sep 17 00:00:00 2001 From: Husain Baghwala Date: Wed, 31 Dec 2025 18:29:07 +0530 Subject: [PATCH 08/16] refactor: remove environment variable for Hippocampus API URL and use hardcoded value in utils --- config.py | 1 - src/services/utils/hippocampus_utils.py | 9 ++++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/config.py b/config.py index 08985296..ace406fb 100644 --- a/config.py +++ b/config.py @@ -57,6 +57,5 @@ class Config: AI_ML_APIKEY = os.getenv('AI_ML_APIKEY') AI_MIDDLEWARE_PAUTH_KEY = os.getenv('AI_MIDDLEWARE_PAUTH_KEY') OPENAI_API_KEY_GPT_5_NANO = os.getenv('OPENAI_API_KEY_GPT_5_NANO') - HIPPOCAMPUS_API_URL = os.getenv('HIPPOCAMPUS_API_URL') HIPPOCAMPUS_API_KEY = os.getenv('HIPPOCAMPUS_API_KEY') HIPPOCAMPUS_COLLECTION_ID = os.getenv('HIPPOCAMPUS_COLLECTION_ID') \ No newline at end of file diff --git a/src/services/utils/hippocampus_utils.py b/src/services/utils/hippocampus_utils.py index 220db856..d6a4829e 100644 --- a/src/services/utils/hippocampus_utils.py +++ b/src/services/utils/hippocampus_utils.py @@ -3,6 +3,9 @@ from src.services.utils.logger import logger from src.services.utils.apiservice import fetch +# Hardcoded Hippocampus API URL +HIPPOCAMPUS_API_URL = 'http://hippocampus.gtwy.ai/resource' + async def save_conversation_to_hippocampus(user_message, assistant_message, agent_id, bridge_name=''): """ @@ -15,8 +18,8 @@ async def save_conversation_to_hippocampus(user_message, assistant_message, agen bridge_name: The bridge/agent name (used as title) """ try: - if not Config.HIPPOCAMPUS_API_KEY or not Config.HIPPOCAMPUS_COLLECTION_ID or not Config.HIPPOCAMPUS_API_URL: - logger.warning("Hippocampus API key, collection ID, or API URL not configured") + if not Config.HIPPOCAMPUS_API_KEY or not Config.HIPPOCAMPUS_COLLECTION_ID: + logger.warning("Hippocampus API key or collection ID not configured") return # Create content as stringified JSON with question and answer @@ -51,7 +54,7 @@ async def save_conversation_to_hippocampus(user_message, assistant_message, agen response_data, response_headers = await fetch( - url=Config.HIPPOCAMPUS_API_URL, + url=HIPPOCAMPUS_API_URL, method="POST", headers=headers, json_body=payload From 2401505ec22a414c1afd56847e28feacc112b673 Mon Sep 17 00:00:00 2001 From: Prayanshrajput Date: Wed, 31 Dec 2025 19:02:01 +0530 Subject: [PATCH 09/16] add gtwy web search inbuild tool --- config.py | 3 +- src/configs/constant.py | 4 ++ .../commonServices/baseService/utils.py | 7 ++- .../utils/built_in_tools/firecrawl.py | 61 +++++++++++++++++++ src/services/utils/getConfiguration.py | 5 +- src/services/utils/getConfiguration_utils.py | 42 +++++++++++++ 6 files changed, 117 insertions(+), 5 deletions(-) create mode 100644 src/services/utils/built_in_tools/firecrawl.py diff --git a/config.py b/config.py index ace406fb..14643216 100644 --- a/config.py +++ b/config.py @@ -58,4 +58,5 @@ class Config: AI_MIDDLEWARE_PAUTH_KEY = os.getenv('AI_MIDDLEWARE_PAUTH_KEY') OPENAI_API_KEY_GPT_5_NANO = os.getenv('OPENAI_API_KEY_GPT_5_NANO') HIPPOCAMPUS_API_KEY = os.getenv('HIPPOCAMPUS_API_KEY') - HIPPOCAMPUS_COLLECTION_ID = os.getenv('HIPPOCAMPUS_COLLECTION_ID') \ No newline at end of file + HIPPOCAMPUS_COLLECTION_ID = os.getenv('HIPPOCAMPUS_COLLECTION_ID') + FIRECRAWL_API_KEY = os.getenv('FIRECRAWL_API_KEY') \ No newline at end of file diff --git a/src/configs/constant.py b/src/configs/constant.py index 74d7b528..d4c392cb 100644 --- a/src/configs/constant.py +++ b/src/configs/constant.py @@ -71,4 +71,8 @@ "gemini":"gemini-2.5-flash", "ai_ml": "gpt-oss-20b", "grok": "grok-4-fast" +} + +inbuild_tools = { + "Gtwy_Web_Search":"Gtwy_Web_Search" } \ No newline at end of file diff --git a/src/services/commonServices/baseService/utils.py b/src/services/commonServices/baseService/utils.py index 199815d0..bee3ed4b 100644 --- a/src/services/commonServices/baseService/utils.py +++ b/src/services/commonServices/baseService/utils.py @@ -12,9 +12,10 @@ from globals import * from src.db_services.ConfigurationServices import get_bridges_without_tools, update_bridge from src.services.utils.ai_call_util import call_gtwy_agent +from src.services.utils.built_in_tools.firecrawl import call_firecrawl_scrape from globals import * from src.services.cache_service import store_in_cache, find_in_cache, client, REDIS_PREFIX -from src.configs.constant import redis_keys +from src.configs.constant import redis_keys,inbuild_tools def clean_json(data): """Recursively remove keys with empty string, empty list, or empty dictionary.""" @@ -252,6 +253,8 @@ async def process_data_and_run_tools(codes_mapping, self): agent_args["bridge_configurations"] = self.bridge_configurations task = call_gtwy_agent(agent_args) + elif self.tool_id_and_name_mapping[name].get('type') == inbuild_tools["Gtwy_Web_Search"]: + task = call_firecrawl_scrape(tool_data.get("args")) else: task = axios_work(tool_data.get("args"), self.tool_id_and_name_mapping[name]) tasks.append((tool_call_key, tool_data, task)) @@ -509,5 +512,3 @@ async def save_files_to_redis(thread_id, sub_thread_id, bridge_id, files): await store_in_cache(cache_key, files, 604800) else: await store_in_cache(cache_key, files, 604800) - - diff --git a/src/services/utils/built_in_tools/firecrawl.py b/src/services/utils/built_in_tools/firecrawl.py new file mode 100644 index 00000000..311b782f --- /dev/null +++ b/src/services/utils/built_in_tools/firecrawl.py @@ -0,0 +1,61 @@ +from config import Config +from src.services.utils.apiservice import fetch +from src.services.utils.logger import logger + + +async def call_firecrawl_scrape(args): + url = (args or {}).get('url') if isinstance(args, dict) else None + if not url: + return { + 'response': {'error': 'url is required for web_crawling tool'}, + 'metadata': {'type': 'function'}, + 'status': 0 + } + + api_key = Config.FIRECRAWL_API_KEY + if not api_key: + return { + 'response': {'error': 'web_crawling tool is not configured'}, + 'metadata': {'type': 'function'}, + 'status': 0 + } + + payload = {'url': url} + formats = args.get('formats') if isinstance(args, dict) else None + if formats: + if isinstance(formats, list): + payload['formats'] = formats + elif isinstance(formats, str): + payload['formats'] = [formats] + else: + payload['formats'] = [str(formats)] + + request_headers = { + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {api_key}' + } + + try: + response, headers = await fetch( + "https://api.firecrawl.dev/v2/scrape", + 'POST', + request_headers, + None, + payload + ) + data = response.get('data') if isinstance(response, dict) and 'data' in response else response + return { + 'response': data, + 'metadata': { + 'type': 'function', + 'flowHitId': headers.get('flowHitId') if isinstance(headers, dict) else None + }, + 'status': 1 + } + except Exception as exc: + logger.error(f"Firecrawl scrape failed: {exc}") + return { + 'response': {'error': str(exc)}, + 'metadata': {'type': 'function'}, + 'status': 0 + } diff --git a/src/services/utils/getConfiguration.py b/src/services/utils/getConfiguration.py index 40f638e2..57ada568 100644 --- a/src/services/utils/getConfiguration.py +++ b/src/services/utils/getConfiguration.py @@ -6,7 +6,7 @@ from .getConfiguration_utils import ( validate_bridge, get_bridge_data, setup_configuration, setup_tool_choice, setup_tools, setup_api_key, setup_pre_tools, add_rag_tool, - add_anthropic_json_schema, add_connected_agents + add_anthropic_json_schema, add_connected_agents, add_web_crawling_tool ) from .update_and_check_cost import check_bridge_api_folder_limits @@ -132,6 +132,9 @@ async def _prepare_configuration_response(configuration, service, bridge_id, api ) add_rag_tool(tools, tool_id_and_name_mapping, rag_data) + + gtwy_web_search_filters = web_search_filters or result.get('bridges', {}).get('gtwy_web_search_filters') or {} + add_web_crawling_tool(tools, tool_id_and_name_mapping, built_in_tools or result.get('bridges', {}).get('built_in_tools'), gtwy_web_search_filters) add_anthropic_json_schema(service, configuration, tools) if rag_data: diff --git a/src/services/utils/getConfiguration_utils.py b/src/services/utils/getConfiguration_utils.py index 8e4a8868..97d2559c 100644 --- a/src/services/utils/getConfiguration_utils.py +++ b/src/services/utils/getConfiguration_utils.py @@ -4,6 +4,7 @@ from src.services.commonServices.baseService.utils import makeFunctionName from src.services.utils.service_config_utils import tool_choice_function_name_formatter from config import Config +from src.configs.constant import inbuild_tools apiCallModel = db['apicalls'] from globals import * @@ -275,6 +276,47 @@ def add_rag_tool(tools, tool_id_and_name_mapping, rag_data): "type": "RAG" } +def _should_enable_web_crawling_tool(built_in_tools): + if not built_in_tools: + return False + return inbuild_tools["Gtwy_Web_Search"] in built_in_tools + +def add_web_crawling_tool(tools, tool_id_and_name_mapping, built_in_tools, gtwy_web_search_filters=None): + """Add Firecrawl-based web crawling tool when requested via built-in tools.""" + if not _should_enable_web_crawling_tool(built_in_tools): + return + + tools.append({ + 'type': 'function', + 'name': inbuild_tools["Gtwy_Web_Search"], + 'description': 'Search and extract content from any website URL. This tool scrapes web pages and returns their content in various formats. Use this when you need to: fetch real-time information from websites, extract article content, retrieve documentation, access public web data, or get current information not in your training data. If enum is provided for URL, only use URLs from those allowed domains.', + 'properties': { + 'url': { + 'description': 'The complete URL of the website to scrape (must start with http:// or https://). Example: https://example.com/page', + 'type': 'string', + 'enum': gtwy_web_search_filters if (gtwy_web_search_filters and len(gtwy_web_search_filters) > 0) else [], + 'required_params': [], + 'parameter': {} + }, + 'formats': { + 'description': 'Optional list of output formats. Available formats include: "markdown" (default, clean text), "html" (raw HTML), "screenshot" (visual capture), "links" (extracted URLs). If not specified, returns markdown format.', + 'type': 'array', + 'items': { + 'type': 'string' + }, + 'enum': [], + 'required_params': [], + 'parameter': {} + } + }, + 'required': ['url'] + }) + + tool_id_and_name_mapping[inbuild_tools["Gtwy_Web_Search"]] = { + 'type': inbuild_tools["Gtwy_Web_Search"], + 'name': inbuild_tools["Gtwy_Web_Search"] + } + def add_anthropic_json_schema(service, configuration, tools): """Add JSON schema response format for Anthropic service""" if (service != 'anthropic' or From e84859363ec7f0f627dd4b4fc45f708052ca3af9 Mon Sep 17 00:00:00 2001 From: adityajunwal Date: Fri, 2 Jan 2026 13:50:31 +0530 Subject: [PATCH 10/16] Fixed a Bug in Anthropic Service --- src/services/commonServices/anthropic/anthropic_batch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/commonServices/anthropic/anthropic_batch.py b/src/services/commonServices/anthropic/anthropic_batch.py index a0d34eb8..49e9cb4c 100644 --- a/src/services/commonServices/anthropic/anthropic_batch.py +++ b/src/services/commonServices/anthropic/anthropic_batch.py @@ -7,7 +7,7 @@ from ...cache_service import store_in_cache from src.configs.constant import redis_keys from src.services.commonServices.anthropic.anthropicCall import Anthropic - +from .anthropic_run_batch import create_batch_requests class AnthropicBatch(BaseService): async def batch_execute(self): From 4c01b436c727a8f53cdc570620190df5e1cfdb44 Mon Sep 17 00:00:00 2001 From: Husain Baghwala Date: Fri, 2 Jan 2026 15:59:23 +0530 Subject: [PATCH 11/16] fix: Correctly retrieve 'chatbot_auto_answers' from bridge_data instead of result --- src/services/utils/getConfiguration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/utils/getConfiguration.py b/src/services/utils/getConfiguration.py index 57ada568..e569947f 100644 --- a/src/services/utils/getConfiguration.py +++ b/src/services/utils/getConfiguration.py @@ -177,7 +177,7 @@ async def _prepare_configuration_response(configuration, service, bridge_id, api "user_id": result.get("bridges", {}).get("user_id"), 'folder_id': result.get('bridges', {}).get('folder_id'), 'web_search_filters': web_search_filters_value, - 'chatbot_auto_answers': result.get('bridges', {}).get('chatbot_auto_answers') + 'chatbot_auto_answers': bridge_data.get('bridges', {}).get('chatbot_auto_answers') } return None, base_config, result, resolved_bridge_id From 9851643235ea79c42f0f8ead8686b04505c42cf4 Mon Sep 17 00:00:00 2001 From: Viasocket Interns Date: Sat, 3 Jan 2026 12:39:45 +0530 Subject: [PATCH 12/16] openai-image generation support (#1139) --- src/services/commonServices/common.py | 6 +-- .../commonServices/openAI/image_model.py | 50 +++++++++++++------ 2 files changed, 38 insertions(+), 18 deletions(-) diff --git a/src/services/commonServices/common.py b/src/services/commonServices/common.py index 7ec32498..6d1b5754 100644 --- a/src/services/commonServices/common.py +++ b/src/services/commonServices/common.py @@ -599,11 +599,11 @@ async def image(request_body): raise ValueError(result) # Create latency object using utility function + if result.get('response') and result['response'].get('data'): + result['response']['data']['id'] = parsed_data['message_id'] + await sendResponse(parsed_data['response_format'], result["response"], success=True, variables=parsed_data.get('variables',{})) latency = create_latency_object(timer, params) if not parsed_data['is_playground']: - if result.get('response') and result['response'].get('data'): - result['response']['data']['id'] = parsed_data['message_id'] - await sendResponse(parsed_data['response_format'], result["response"], success=True, variables=parsed_data.get('variables',{})) # Update usage metrics for successful API calls update_usage_metrics(parsed_data, params, latency, result=result, success=True) # Process background tasks (handles both transfer and non-transfer cases) diff --git a/src/services/commonServices/openAI/image_model.py b/src/services/commonServices/openAI/image_model.py index 25d9ec9c..52db266e 100644 --- a/src/services/commonServices/openAI/image_model.py +++ b/src/services/commonServices/openAI/image_model.py @@ -1,4 +1,5 @@ import traceback +import base64 from openai import AsyncOpenAI import uuid import asyncio @@ -13,23 +14,42 @@ async def OpenAIImageModel(configuration, apiKey, execution_time_logs, timer): chat_completion = await openai_config.images.generate(**configuration) execution_time_logs.append({"step": "OpenAI image Processing time", "time_taken": timer.stop("OpenAI image Processing time")}) response = chat_completion.to_dict() - # Process all images in the response data array for i, image_data in enumerate(response['data']): - # Get original OpenAI image URL - original_image_url = image_data['url'] - - # Generate predictable GCP URL immediately and start background upload - gcp_url = await uploadDoc( - file=original_image_url, - folder='generated-images', - real_time=False, - content_type='image/png' - ) - - # Add both URLs to response - response['data'][i]['original_url'] = original_image_url - response['data'][i]['url'] = gcp_url # Primary URL (GCP) + # Check if response contains URL or base64 data + if 'url' in image_data: + # URL format - download and upload to GCP + original_image_url = image_data['url'] + + # Generate predictable GCP URL immediately and start background upload + gcp_url = await uploadDoc( + file=original_image_url, + folder='generated-images', + real_time=False, + content_type='image/png' + ) + + # Add both URLs to response + response['data'][i]['original_url'] = original_image_url + response['data'][i]['url'] = gcp_url # Primary URL (GCP) + elif 'b64_json' in image_data: + # Base64 format - decode and upload to GCP + # Decode base64 to bytes + image_bytes = base64.b64decode(image_data['b64_json']) + + # Upload to GCP + gcp_url = await uploadDoc( + file=image_bytes, + folder='generated-images', + real_time=False, + content_type='image/png' + ) + + # Add GCP URL to response and keep original b64_json + response['data'][i]['url'] = gcp_url # Primary URL (GCP) + + else: + raise ValueError(f"Image data contains neither 'url' nor 'b64_json' key. Available keys: {list(image_data.keys())}") return { 'success': True, From e88ac6058677ac539da04d5b3ddaf7e391d74d4d Mon Sep 17 00:00:00 2001 From: Viasocket Interns Date: Sat, 3 Jan 2026 19:56:36 +0530 Subject: [PATCH 13/16] Removed untested services (#1142) --- src/services/utils/helper.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/services/utils/helper.py b/src/services/utils/helper.py index 1b2d7375..3c300945 100644 --- a/src/services/utils/helper.py +++ b/src/services/utils/helper.py @@ -304,17 +304,12 @@ def calculate_usage(model, model_response, service): return usage async def create_service_handler_for_batch(params, service): + # Currently only supports openai and anthropic class_obj = None if service == service_name['openai']: class_obj = OpenaiBatch(params) - elif service == service_name['gemini']: - class_obj = GeminiBatch(params) elif service == service_name['anthropic']: class_obj = AnthropicBatch(params) - elif service == service_name['groq']: - class_obj = GroqBatch(params) - elif service == service_name['mistral']: - class_obj = MistralBatch(params) else: raise ValueError(f"Unsupported batch service: {service}") From 5232cb27205bed92efd7e17eecad82927d5574e1 Mon Sep 17 00:00:00 2001 From: Husain Baghwala <89205542+Husainbw786@users.noreply.github.com> Date: Sun, 4 Jan 2026 23:16:19 -0800 Subject: [PATCH 14/16] feat: Enhance logging for saving conversations to Hippocampus in Queue2 service (#1143) Co-authored-by: Husain Baghwala --- src/services/commonServices/queueService/queueLogService.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/services/commonServices/queueService/queueLogService.py b/src/services/commonServices/queueService/queueLogService.py index b74ac9a7..b6a640c0 100644 --- a/src/services/commonServices/queueService/queueLogService.py +++ b/src/services/commonServices/queueService/queueLogService.py @@ -36,8 +36,10 @@ async def process_messages(self, messages): return # Save conversation to Hippocampus for chatbot bridge types + logger.info(f"Saving conversation to Hippocampus for bridge type: {messages.get('save_to_hippocampus', {}).get('bridgeType')}") hippocampus_data = messages.get('save_to_hippocampus', {}) if hippocampus_data.get('bridgeType') and hippocampus_data.get('chatbot_auto_answers'): + logger.info(f"Saving conversation to Hippocampus for bridge type after if condition: {hippocampus_data.get('bridgeType')} and chatbot auto answers: {hippocampus_data.get('chatbot_auto_answers')}") await save_conversation_to_hippocampus( user_message=hippocampus_data.get('user_message', ''), assistant_message=hippocampus_data.get('assistant_message', ''), From b70bbf4841970759755f290539abc10dac1bc977 Mon Sep 17 00:00:00 2001 From: Husain Baghwala <89205542+Husainbw786@users.noreply.github.com> Date: Sun, 4 Jan 2026 23:34:27 -0800 Subject: [PATCH 15/16] Logger added dev (#1144) * feat: Enhance logging for saving conversations to Hippocampus in Queue2 service * fix: Update logging message for saving conversations to Hippocampus in Queue2 service * fix: Correct logging message for saving conversations to Hippocampus in Queue2 service --------- Co-authored-by: Husain Baghwala --- src/services/commonServices/queueService/queueLogService.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/commonServices/queueService/queueLogService.py b/src/services/commonServices/queueService/queueLogService.py index b6a640c0..0b88486a 100644 --- a/src/services/commonServices/queueService/queueLogService.py +++ b/src/services/commonServices/queueService/queueLogService.py @@ -36,7 +36,7 @@ async def process_messages(self, messages): return # Save conversation to Hippocampus for chatbot bridge types - logger.info(f"Saving conversation to Hippocampus for bridge type: {messages.get('save_to_hippocampus', {}).get('bridgeType')}") + logger.info(f"Saving conversation to Hippocampus1 {messages['save_to_hippocampus']}.") hippocampus_data = messages.get('save_to_hippocampus', {}) if hippocampus_data.get('bridgeType') and hippocampus_data.get('chatbot_auto_answers'): logger.info(f"Saving conversation to Hippocampus for bridge type after if condition: {hippocampus_data.get('bridgeType')} and chatbot auto answers: {hippocampus_data.get('chatbot_auto_answers')}") From 3c66937d54fec4a23e4af1b5deef1840a903fe0d Mon Sep 17 00:00:00 2001 From: Husain Baghwala <89205542+Husainbw786@users.noreply.github.com> Date: Mon, 5 Jan 2026 00:11:20 -0800 Subject: [PATCH 16/16] refactor: Remove redundant logging statements in Queue2 service for saving conversations to Hippocampus (#1145) Co-authored-by: Husain Baghwala --- src/services/commonServices/queueService/queueLogService.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/services/commonServices/queueService/queueLogService.py b/src/services/commonServices/queueService/queueLogService.py index 0b88486a..b74ac9a7 100644 --- a/src/services/commonServices/queueService/queueLogService.py +++ b/src/services/commonServices/queueService/queueLogService.py @@ -36,10 +36,8 @@ async def process_messages(self, messages): return # Save conversation to Hippocampus for chatbot bridge types - logger.info(f"Saving conversation to Hippocampus1 {messages['save_to_hippocampus']}.") hippocampus_data = messages.get('save_to_hippocampus', {}) if hippocampus_data.get('bridgeType') and hippocampus_data.get('chatbot_auto_answers'): - logger.info(f"Saving conversation to Hippocampus for bridge type after if condition: {hippocampus_data.get('bridgeType')} and chatbot auto answers: {hippocampus_data.get('chatbot_auto_answers')}") await save_conversation_to_hippocampus( user_message=hippocampus_data.get('user_message', ''), assistant_message=hippocampus_data.get('assistant_message', ''),