From b6e42ddc84518489deac8078525ef625a7949433 Mon Sep 17 00:00:00 2001 From: Husain Baghwala Date: Mon, 8 Sep 2025 15:26:01 +0530 Subject: [PATCH 1/5] refactor: consolidate conversation and raw data models into unified schema --- models/postgres/pg_models.py | 41 +-- src/controllers/conversationController.py | 104 +----- src/db_services/conversationDbService.py | 185 +++++----- src/db_services/metrics_service.py | 220 ++++++------ src/db_services/unifiedConversationService.py | 319 ++++++++++++++++++ src/middlewares/interfaceMiddlewares.py | 10 - src/services/utils/common_utils.py | 2 +- 7 files changed, 520 insertions(+), 361 deletions(-) create mode 100644 src/db_services/unifiedConversationService.py diff --git a/models/postgres/pg_models.py b/models/postgres/pg_models.py index a6932f22..a626b6bf 100644 --- a/models/postgres/pg_models.py +++ b/models/postgres/pg_models.py @@ -12,52 +12,37 @@ class Conversation(Base): id = Column(Integer, primary_key=True) org_id = Column(String) thread_id = Column(String) - model_name = Column(String) bridge_id = Column(String) - message = Column(Text) - message_by = Column(String) - function = Column(JSON) - type = Column(Enum('chat', 'completion', 'embedding', name='enum_conversations_type'), nullable=False) - createdAt = Column(DateTime, default=func.now()) - updatedAt = Column(DateTime, default=func.now(), onupdate=func.now()) - chatbot_message = Column(Text) - is_reset = Column(Boolean, default=False) + user_message = Column(Text) + response = Column(Text) + chatbot_response = Column(Text) tools_call_data = Column(ARRAY(JSON)) user_feedback = Column(Integer) - message_id = Column(UUID(as_uuid=True), nullable=True) + response_id = Column(UUID(as_uuid=True), nullable=True) version_id = Column(String) sub_thread_id = Column(String, nullable=True) - revised_prompt = Column(Text, nullable=True) + revised_response = Column(Text, nullable=True) image_urls = Column(ARRAY(JSON), nullable=True) urls = Column(ARRAY(String), nullable=True) - AiConfig = Column(JSON, nullable=True) - annotations = Column(ARRAY(JSON), nullable=True) fallback_model = Column(String, nullable=True) - -class RawData(Base): - __tablename__ = 'raw_data' - __table_args__ = {'extend_existing': True} - - - id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, nullable=False) - org_id = Column(String) + error = Column(Text, nullable=True) + status = Column(Integer, nullable=True) # 0: failed, 1: success, 2: second time is high + createdAt = Column(DateTime, default=func.now()) + updatedAt = Column(DateTime, default=func.now(), onupdate=func.now()) authkey_name = Column(String) latency = Column(Float) service = Column(String) - status = Column(Boolean, nullable=False) - error = Column(Text, default='none') - model = Column(String) input_tokens = Column(Float) output_tokens = Column(Float) expected_cost = Column(Float) created_at = Column(DateTime, default=func.now()) - chat_id = Column(Integer, ForeignKey('conversations.id')) message_id = Column(UUID(as_uuid=True), nullable=True) variables = Column(JSON) - is_present = Column(Boolean, default=False) - firstAttemptError = Column(Text, nullable=True) - # conversation = relationship("Conversation", back_populates="raw_data") finish_reason = Column(String, nullable=True) + model_name = Column(String) + type = Column(String, nullable=False) + AiConfig = Column(JSON, nullable=True) + annotations = Column(ARRAY(JSON), nullable=True) class system_prompt_versionings(Base): __tablename__ = 'system_prompt_versionings' diff --git a/src/controllers/conversationController.py b/src/controllers/conversationController.py index 981fa215..10da4b0a 100644 --- a/src/controllers/conversationController.py +++ b/src/controllers/conversationController.py @@ -1,6 +1,3 @@ -import copy -from datetime import datetime -from config import Config from ..db_services import conversationDbService as chatbotDbService import traceback from globals import * @@ -11,112 +8,15 @@ from ..services.cache_service import find_in_cache, store_in_cache - -async def getThread(thread_id, sub_thread_id, org_id, bridge_id, bridgeType): +async def getThread(thread_id, sub_thread_id, org_id, bridge_id): try: chats = await chatbotDbService.find(org_id, thread_id, sub_thread_id, bridge_id) - if bridgeType: - filtered_chats = [] - for chat in chats: - if chat['is_reset']: - filtered_chats = [] - else: - filtered_chats.append(chat) - chats = filtered_chats chats = await add_tool_call_data_in_history(chats) return chats except Exception as err: logger.error(f"Error in getting thread:, {str(err)}, {traceback.format_exc()}") raise err -async def savehistory(thread_id, sub_thread_id, userMessage, botMessage, org_id, bridge_id, model_name, type, messageBy, userRole="user", tools={}, chatbot_message = "",tools_call_data = [],message_id = None, version_id = None, image_url = None, revised_prompt = None, urls = None, AiConfig = None, annotations = None, fallback_model = None): - try: - chatToSave = [{ - 'thread_id': thread_id, - 'sub_thread_id': sub_thread_id, - 'org_id': org_id, - 'model_name': model_name, - 'message': userMessage or "", - 'message_by': userRole, - 'type': type, - 'bridge_id': bridge_id, - 'message_id' : message_id, - 'version_id': version_id, - 'revised_prompt' : revised_prompt, - 'urls' : urls, - 'AiConfig' : AiConfig, - 'fallback_model' : fallback_model - }] - - if tools: - chatToSave.append({ - 'thread_id': thread_id, - 'sub_thread_id': sub_thread_id, - 'org_id': org_id, - 'model_name': model_name, - 'message': "", - 'message_by': "tools_call", - 'type': type, - 'bridge_id': bridge_id, - 'function': tools, - 'tools_call_data': tools_call_data, - 'message_id' : message_id, - 'version_id': version_id - }) - - if botMessage is not None: - chatToSave.append({ - 'thread_id': thread_id, - 'sub_thread_id': sub_thread_id, - 'org_id': org_id, - 'model_name': model_name, - 'message': botMessage or "", - 'message_by': messageBy, - 'type': type, - 'bridge_id': bridge_id, - 'function': botMessage if messageBy == "tool_calls" else {}, - 'chatbot_message' : chatbot_message or "", - 'message_id' : message_id, - 'revised_prompt' : revised_prompt, - "image_urls" : image_url, - 'version_id': version_id, - "annotations" : annotations, - "fallback_model" : fallback_model - }) - # sending data through rt layer - chatbotSaveCopy = copy.deepcopy(chatToSave) - for item in chatbotSaveCopy: - item["role"] = item.pop("message_by") - item["content"] = item.pop("message") - if item.get('created_at') is None: - item['created_at'] = str(datetime.now()) - if item.get('createdAt') is None: - item['createdAt'] = str(datetime.now()) - - response_format_copy = { - 'cred' : { - 'channel': org_id + bridge_id, - 'apikey': Config.RTLAYER_AUTH, - 'ttl': '1' - }, - 'type' : 'RTLayer' - } - dataToSend={ - 'Thread':{ - "thread_id" : thread_id, - "sub_thread_id": sub_thread_id, - "bridge_id":bridge_id - }, - "Messages":chatbotSaveCopy - } - await sendResponse(response_format_copy, dataToSend, True) - - result = chatbotDbService.createBulk(chatToSave) - return list(result) - except Exception as error: - logger.error(f"saveconversation error=>, {str(error)}, {traceback.format_exc()}") - raise error - async def add_tool_call_data_in_history(chats): tools_call_indices = [] @@ -188,4 +88,4 @@ async def save_sub_thread_id_and_name(thread_id, sub_thread_id, org_id, thread_f logger.error(f"Error in saving sub thread id and name:, {str(err)}") return { 'success': False, 'message': str(err) } # Exporting the functions -__all__ = ['getAllThreads', 'savehistory', 'getThread', 'getThreadHistory', 'getChatData'] +__all__ = ['getAllThreads', 'getThread', 'getThreadHistory', 'getChatData'] diff --git a/src/db_services/conversationDbService.py b/src/db_services/conversationDbService.py index 6d68e0d8..16de33f6 100644 --- a/src/db_services/conversationDbService.py +++ b/src/db_services/conversationDbService.py @@ -1,11 +1,9 @@ import json from models.index import combined_models as models -import sqlalchemy as sa -from sqlalchemy import func, and_ , insert, delete, or_ , update, select -from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy import func, and_ , or_ , update, select from ..services.cache_service import find_in_cache, store_in_cache from datetime import datetime -from models.postgres.pg_models import Conversation, RawData, system_prompt_versionings, user_bridge_config_history +from models.postgres.pg_models import Conversation, system_prompt_versionings, user_bridge_config_history from models.Timescale.timescale_models import Metrics_model from sqlalchemy.sql import text from globals import * @@ -14,63 +12,73 @@ pg = models['pg'] timescale = models['timescale'] - -def createBulk(conversations_data): - session = pg['session']() - try: - conversations = [Conversation(**data) for data in conversations_data] - session.add_all(conversations) - session.commit() - return [conversation.id for conversation in conversations] - except Exception as err : - logger.error(f"Error in creating bulk conversations: {str(err)}") - session.rollback() - finally : - session.close() - - -async def insertRawData(raw_data) : - session = pg['session']() - try: - raws = [RawData(**data) for data in raw_data] - session.add_all(raws) - session.commit() - except Exception as e: - session.rollback() - raise e - async def find(org_id, thread_id, sub_thread_id, bridge_id): try: session = pg['session']() + conversations = ( - session.query( - Conversation.message.label('content'), - Conversation.message_by.label('role'), - Conversation.createdAt, - Conversation.id, - Conversation.function, - Conversation.is_reset, - Conversation.tools_call_data, - RawData.error, - func.coalesce(Conversation.urls, []).label('urls') # Updated to handle 'urls' as an array - ) - .outerjoin(RawData, Conversation.id == RawData.chat_id) + session.query(Conversation) .filter( and_( Conversation.org_id == org_id, Conversation.thread_id == thread_id, Conversation.bridge_id == bridge_id, Conversation.sub_thread_id == sub_thread_id, - or_(RawData.error == '', RawData.error.is_(None)), - Conversation.message_by.in_(["user", "tools_call", "assistant"]) + or_(Conversation.error == '', Conversation.error.is_(None)) ) ) .order_by(Conversation.id.desc()) .limit(9) .all() ) - conversations.reverse() - return [conversation._asdict() for conversation in conversations] + + # Convert to format expected by existing code + result = [] + for conv in reversed(conversations): + # Create user message entry + if conv.get('user_message'): + result.append({ + 'content': conv.get('user_message'), + 'role': 'user', + 'createdAt': conv.get('createdAt'), + 'id': conv.get('id'), + 'function': {}, + 'is_reset': False, + 'tools_call_data': conv.get('tools_call_data', []), + 'error': conv.get('error', ''), + 'urls': conv.get('urls', []) + }) + + # Create tools call entry if exists + if conv.get('tools_call_data') and len(conv.get('tools_call_data', [])) > 0: + result.append({ + 'content': '', + 'role': 'tools_call', + 'createdAt': conv.get('createdAt'), + 'id': conv.get('id'), + 'function': conv.get('tools_call_data', [{}])[0], + 'is_reset': False, + 'tools_call_data': conv.get('tools_call_data'), + 'error': conv.get('error', ''), + 'urls': [] + }) + + # Create assistant message entry + if conv.get('response') or conv.get('chatbot_response'): + result.append({ + 'content': conv.get('response') or conv.get('chatbot_response'), + 'role': 'assistant', + 'createdAt': conv.get('createdAt'), + 'id': conv.get('id'), + 'function': {}, + 'is_reset': False, + 'tools_call_data': None, + 'error': conv.get('error', ''), + 'urls': [] + }) + + return result + except Exception as e: logger.error(f"Error in finding conversations: {str(e)}") return [] @@ -87,24 +95,16 @@ async def calculate_average_response_time(org_id, bridge_id): query = ( session.query( - func.avg( - func.cast( - # Use the proper JSONB extraction for PostgreSQL - func.jsonb_extract_path_text(RawData.latency, 'over_all_time'), - sa.Float - ) - ).label('avg_response_time') + func.avg(Conversation.latency).label('avg_response_time') ) - .select_from(Conversation) - .join(RawData, Conversation.id == RawData.chat_id) .filter( and_( Conversation.org_id == org_id, Conversation.bridge_id == bridge_id, - Conversation.message_by == 'user', - or_(RawData.error == '', RawData.error.is_(None)), - RawData.created_at >= yesterday_start, - RawData.created_at <= yesterday_end + Conversation.user_message.isnot(None), + or_(Conversation.error == '', Conversation.error.is_(None)), + Conversation.created_at >= yesterday_start, + Conversation.created_at <= yesterday_end ) ) .scalar() @@ -139,66 +139,43 @@ async def storeSystemPrompt(prompt, org_id, bridge_id): finally: session.close() -async def reset_and_mode_chat_history(org_id, bridge_id, thread_id, key, value): +async def add_bulk_user_entries(entries): session = pg['session']() try: - subquery = ( - select(Conversation.id) - .where( - and_( - Conversation.org_id == org_id, - Conversation.bridge_id == bridge_id, - Conversation.thread_id == thread_id - ) - ) - .order_by(Conversation.id.desc()) - .limit(1) - ) - stmt = ( - update(Conversation) - .where(Conversation.id == subquery.scalar_subquery()) - .values({key: value}) - .returning(Conversation.id) - ) - result = session.execute(stmt) - updated_conversation = result.fetchone() - if updated_conversation: - session.commit() - return { - 'success': True, - 'message': 'Chatbot reset successfully', - 'result': updated_conversation.id - } - else: - return { - 'success': True, - 'message': 'No conversation found to reset', - 'result': None - } - except Exception as error: + user_history = [user_bridge_config_history(**data) for data in entries] + session.add_all(user_history) + session.commit() + except Exception as e: session.rollback() - return { - 'success': False, - 'message': 'Error resetting chatbot', - 'result': str(error) - } + logger.error(f"Error in creating bulk user entries: {str(e)}") finally: session.close() -async def add_bulk_user_entries(entries): - session = pg['session']() +async def create_conversation_entry(conversation_data): + """ + Save conversation entry data to database + """ + session = postgres['session']() try: - user_history = [user_bridge_config_history(**data) for data in entries] - session.add_all(user_history) + from models.postgres.pg_models import ConversationEntry + + conversation_entry = ConversationEntry(**conversation_data) + session.add(conversation_entry) session.commit() + + conversation_id = conversation_entry.id + logger.info(f"Created conversation entry with ID: {conversation_id}") + + return conversation_id + except Exception as e: session.rollback() - logger.error(f"Error in creating bulk user entries: {str(e)}") + logger.error(f"Database error in create conversation entry: {str(e)}") + raise e finally: session.close() - async def timescale_metrics(metrics_data): async with timescale['session']() as session: try: diff --git a/src/db_services/metrics_service.py b/src/db_services/metrics_service.py index dbaadd76..cea06ecc 100644 --- a/src/db_services/metrics_service.py +++ b/src/db_services/metrics_service.py @@ -1,23 +1,85 @@ -import json import uuid import traceback -from datetime import datetime, timezone -from models.index import combined_models -from sqlalchemy import and_ -from ..controllers.conversationController import savehistory -from .conversationDbService import insertRawData, timescale_metrics +from datetime import datetime +from .conversationDbService import timescale_metrics, create_conversation_entry from ..services.cache_service import find_in_cache, store_in_cache from globals import * -from src.services.commonServices.baseService.utils import safe_float -# from src.services.utils.send_error_webhook import send_error_to_webhook -postgres = combined_models['pg'] -timescale = combined_models['timescale'] +def prepare_conversation_data(primary_data, history_params, version_id, thread_info): + """ + Prepare conversation entry data combining history params and dataset + """ + # Determine status based on success + status = 1 if primary_data.get('success', False) else 0 + + # Create single conversation entry data + conversation_data = { + 'org_id': primary_data['orgId'], + 'thread_id': thread_info.get('thread_id'), + 'bridge_id': history_params.get('bridgeId'), + 'user_message': history_params.get('user', ''), + 'response': history_params.get('message', ''), + 'chatbot_response': history_params.get('chatbot_message', ''), + 'tools_call_data': history_params.get('tools_call_data', []), + 'user_feedback': None, + 'response_id': str(uuid.uuid4()), + 'version_id': version_id, + 'sub_thread_id': thread_info.get('sub_thread_id'), + 'revised_response': history_params.get('revised_prompt'), + 'image_urls': history_params.get('image_url', []) if history_params.get('image_url') else [], + 'urls': history_params.get('urls', []), + 'fallback_model': history_params.get('fallback_model'), + 'error': primary_data.get('error', '') if not primary_data.get('success', False) else None, + 'status': status, + 'authkey_name': primary_data.get('apikey_object_id', {}).get(primary_data['service'], '') if primary_data.get('apikey_object_id') else '', + 'latency': primary_data.get('latency', 0), + 'service': primary_data['service'], + 'input_tokens': primary_data.get('inputTokens', 0), + 'output_tokens': primary_data.get('outputTokens', 0), + 'expected_cost': primary_data.get('expectedCost', 0), + 'message_id': primary_data.get('message_id'), + 'variables': primary_data.get('variables', {}), + 'finish_reason': primary_data.get('finish_reason'), + 'model_name': primary_data['model'], + 'type': history_params.get('type', 'chat'), + 'AiConfig': history_params.get('AiConfig'), + 'annotations': history_params.get('annotations', []), + 'createdAt': datetime.now(), + 'updatedAt': datetime.now() + } + + return conversation_data -def start_of_today(): - today = datetime.now() - return datetime(today.year, today.month, today.day, 0, 0, 0, 0) +def prepare_timescale_data(dataset, history_params, conversation_id): + """ + Prepare timescale metrics data + """ + insert_ai_data_in_pg = [ + { + 'org_id': data_object['orgId'], + 'authkey_name': data_object.get('apikey_object_id', {}).get(data_object['service'], '') if data_object.get('apikey_object_id') else '', + 'latency': data_object.get('latency', 0), + 'service': data_object['service'], + 'status': data_object.get('success', False), + 'error': data_object.get('error', '') if not data_object.get('success', False) else '', + 'model': data_object['model'], + 'input_tokens': data_object.get('inputTokens', 0), + 'output_tokens': data_object.get('outputTokens', 0), + 'expected_cost': data_object.get('expectedCost', 0), + 'created_at': datetime.now(), + 'chat_id': conversation_id, + 'message_id': data_object.get('message_id'), + 'variables': data_object.get('variables') or {}, + 'is_present': 'prompt' in data_object, + 'id': str(uuid.uuid4()), + 'firstAttemptError': history_params.get('firstAttemptError'), + 'finish_reason': data_object.get('finish_reason') + } + for data_object in dataset + ] + + return insert_ai_data_in_pg async def save_conversations_to_redis(conversations, version_id, thread_id, sub_thread_id, history_params): """ @@ -78,117 +140,43 @@ async def save_conversations_to_redis(conversations, version_id, thread_id, sub_ logger.error(f"Error saving conversations to Redis: {str(error)}") logger.error(traceback.format_exc()) -def end_of_today(): - today = datetime.now() - return datetime(today.year, today.month, today.day, 23, 59, 59, 999) - -async def find(org_id, start_time=None, end_time=None, limit=None, offset=None): - date_filter = and_(start_time, end_time) if start_time and end_time else and_(start_of_today(), end_of_today()) - query_options = { - 'where': { - 'org_id': org_id, - 'created_at': date_filter - }, - 'limit': limit, - 'offset': offset - } - model = timescale.daily_data if start_time and end_time else timescale.five_minute_data - return await model.find_all(**query_options) - -async def find_one(id): - model = timescale.raw_data - return await model.find_by_pk(id) - -async def find_one_pg(id): - model = postgres.raw_data - return await model.find_by_pk(id) - async def create(dataset, history_params, version_id, thread_info={}): try: conversations = [] if thread_info is not None: thread_id = thread_info.get('thread_id') sub_thread_id = thread_info.get('sub_thread_id') - conversations = thread_info.get('result', []) - result = await try_catch ( - savehistory, - history_params['thread_id'], history_params['sub_thread_id'], history_params['user'], history_params['message'], - history_params['org_id'], history_params['bridge_id'], history_params['model'], - history_params['channel'], history_params['type'], history_params['actor'], - history_params.get('tools'),history_params.get('chatbot_message'),history_params.get('tools_call_data'),history_params.get('message_id'), version_id, history_params.get('image_urls'), history_params.get('revised_prompt'), history_params.get('urls'), history_params.get('AiConfig'), history_params.get('annotations'), history_params.get('fallback_model') - ) - response = history_params.get('response',{}) - - # Save conversations to Redis with TTL of 30 days - if 'error' not in dataset[0] and conversations: - await save_conversations_to_redis(conversations, version_id, thread_id, sub_thread_id, history_params) + + # Get existing conversations from Redis + redis_key = f"conversation_{version_id}_{thread_id}_{sub_thread_id}" + conversations = await find_in_cache(redis_key) or [] - chat_id = result[0] - dataset[0]['chat_id'] = chat_id - dataset[0]['message_id'] = history_params.get('message_id') - - insert_ai_data_in_pg = [ - { - 'org_id': data_object['orgId'], - 'authkey_name': data_object.get('apikey_object_id', {}).get(data_object['service'], '') if data_object.get('apikey_object_id') else '', - 'latency': data_object.get('latency', 0), - 'service': data_object['service'], - 'status': data_object.get('success', False), - 'error': data_object.get('error', '') if not data_object.get('success', False) else '', - 'model': data_object['model'], - 'input_tokens': data_object.get('inputTokens', 0), - 'output_tokens': data_object.get('outputTokens', 0), - 'expected_cost': data_object.get('expectedCost', 0), - 'created_at': datetime.now(), - 'chat_id': data_object.get('chat_id'), - 'message_id': data_object.get('message_id'), - 'variables': data_object.get('variables') or {}, - 'is_present': 'prompt' in data_object, - 'id' : str(uuid.uuid4()), - 'firstAttemptError' : history_params.get('firstAttemptError'), - 'finish_reason' : response.get('data', {}).get('finish_reason') - } - for data_object in dataset - ] - metrics_data = [ - { - 'org_id': data_object['orgId'], - 'bridge_id': history_params['bridge_id'], - 'version_id' : version_id, - 'thread_id': history_params['thread_id'], - 'model': data_object['model'], - 'input_tokens': safe_float(data_object.get('inputTokens', 0), 0.0, "inputTokens"), - 'output_tokens': safe_float(data_object.get('outputTokens', 0), 0.0, "outputTokens"), - 'total_tokens': safe_float(data_object.get('totalTokens', 0),0.0, "totalTokens"), - 'apikey_id': data_object.get('apikey_object_id', {}).get(data_object['service'], '') if data_object.get('apikey_object_id') else '', - 'created_at': datetime.now(), # Remove timezone to match database expectations - 'latency': safe_float(json.loads(data_object.get('latency', {})).get('over_all_time', 0),0.0, "over_all_time"), - 'success' : data_object.get('success', False), - 'cost' : safe_float(data_object.get('expectedCost', 0), 0.0, 'expectedCost'), - 'time_zone' : 'Asia/Kolkata', - 'service' : data_object['service'] - } - for data_object in dataset - ] - await insertRawData(insert_ai_data_in_pg) + # Create single conversation entry combining history params and dataset + if dataset and len(dataset) > 0: + # Get the primary data object (assuming first one contains the main conversation data) + primary_data = dataset[0] + + # Prepare conversation data + conversation_data = prepare_conversation_data(primary_data, history_params, version_id, thread_info) + + # Save conversation entry to database + conversation_id = await create_conversation_entry(conversation_data) + + # Save conversations to Redis + if thread_info is not None: + await save_conversations_to_redis(conversations, version_id, thread_id, sub_thread_id, history_params) + + # Prepare and insert timescale metrics data + insert_ai_data_in_pg = prepare_timescale_data(dataset, history_params, conversation_id) + + await timescale_metrics(insert_ai_data_in_pg) + + return - # Create the cache key based on bridge_id (assuming it's always available) - cache_key = f"metrix_bridges{history_params['bridge_id']}" - - # Safely load the old total token value from the cache - cache_value = await find_in_cache(cache_key) - try: - oldTotalToken = json.loads(cache_value) if cache_value else 0 - except (json.JSONDecodeError, TypeError): - oldTotalToken = 0 - - # Calculate the total token sum, using .get() for 'totalTokens' to handle missing keys - totaltoken = sum(data_object.get('totalTokens', 0) for data_object in dataset) + oldTotalToken - # await send_error_to_webhook(history_params['bridge_id'], history_params['org_id'],totaltoken , 'metrix_limit_reached') - await store_in_cache(cache_key, float(totaltoken)) - await timescale_metrics(metrics_data) except Exception as error: - logger.error(f'Error during bulk insert of Ai middleware, {str(error)}') + logger.error(f"metrics_service create error: {str(error)}") + logger.error(traceback.format_exc()) + raise error # Exporting functions -__all__ = ["find", "create", "find_one", "find_one_pg"] +__all__ = ["create"] diff --git a/src/db_services/unifiedConversationService.py b/src/db_services/unifiedConversationService.py new file mode 100644 index 00000000..e0b2f4bf --- /dev/null +++ b/src/db_services/unifiedConversationService.py @@ -0,0 +1,319 @@ +import json +import uuid +import traceback +from datetime import datetime, timezone +from models.index import combined_models +from sqlalchemy import and_ +from globals import * + +postgres = combined_models['pg'] + +class UnifiedConversationService: + """ + Service for managing unified conversations that combine conversation and raw data + """ + + @staticmethod + async def create_unified_conversation( + org_id: str, + thread_id: str, + bridge_id: str, + user_message: str, + response: str, + chatbot_response: str, + model_name: str, + service: str, + type: str, + version_id: str = None, + sub_thread_id: str = None, + tools_call_data: list = None, + image_urls: list = None, + urls: list = None, + AiConfig: dict = None, + annotations: list = None, + fallback_model: str = None, + authkey_name: str = None, + latency: float = None, + input_tokens: float = None, + output_tokens: float = None, + expected_cost: float = None, + variables: dict = None, + finish_reason: str = None, + error: str = None, + status: int = 1, # 0: failed, 1: success, 2: second attempt success + message_id: str = None, + response_id: str = None, + revised_response: str = None, + user_feedback: int = None + ): + """ + Create a single unified conversation record + """ + try: + # Prepare tokens object + tokens = { + 'input_tokens': input_tokens or 0, + 'output_tokens': output_tokens or 0, + 'expected_cost': expected_cost or 0 + } + + # Create unified conversation data + unified_data = { + 'org_id': org_id, + 'thread_id': thread_id, + 'bridge_id': bridge_id, + 'user_message': user_message, + 'response': response, + 'chatbot_response': chatbot_response, + 'tools_call_data': tools_call_data or [], + 'user_feedback': user_feedback, + 'response_id': response_id, + 'version_id': version_id, + 'sub_thread_id': sub_thread_id, + 'revised_response': revised_response, + 'image_urls': image_urls or [], + 'urls': urls or [], + 'fallback_model': fallback_model, + 'error': error, + 'status': status, + 'authkey_name': authkey_name, + 'latency': latency or 0, + 'service': service, + 'tokens': tokens, + 'message_id': message_id, + 'variables': variables or {}, + 'finish_reason': finish_reason, + 'model_name': model_name, + 'type': type, + 'AiConfig': AiConfig, + 'annotations': annotations or [], + 'createdAt': datetime.now(), + 'updatedAt': datetime.now(), + 'created_at': datetime.now() + } + + # Insert into database + session = postgres['session']() + try: + from models.postgres.pg_models import UnifiedConversation + + unified_conversation = UnifiedConversation(**unified_data) + session.add(unified_conversation) + session.commit() + + conversation_id = unified_conversation.id + logger.info(f"Created unified conversation with ID: {conversation_id}") + + return conversation_id + + except Exception as e: + session.rollback() + logger.error(f"Database error in create_unified_conversation: {str(e)}") + raise e + finally: + session.close() + + except Exception as error: + logger.error(f"Error in create_unified_conversation: {str(error)}") + logger.error(traceback.format_exc()) + raise error + + @staticmethod + async def find_conversations( + org_id: str, + thread_id: str = None, + bridge_id: str = None, + start_time: datetime = None, + end_time: datetime = None, + limit: int = None, + offset: int = None + ): + """ + Find unified conversations with filters + """ + try: + session = postgres['session']() + try: + from models.postgres.pg_models import UnifiedConversation + + query = session.query(UnifiedConversation).filter( + UnifiedConversation.org_id == org_id + ) + + if thread_id: + query = query.filter(UnifiedConversation.thread_id == thread_id) + + if bridge_id: + query = query.filter(UnifiedConversation.bridge_id == bridge_id) + + if start_time and end_time: + query = query.filter( + and_( + UnifiedConversation.createdAt >= start_time, + UnifiedConversation.createdAt <= end_time + ) + ) + + if offset: + query = query.offset(offset) + + if limit: + query = query.limit(limit) + + query = query.order_by(UnifiedConversation.createdAt.desc()) + + conversations = query.all() + + # Convert to dict format + result = [] + for conv in conversations: + conv_dict = { + 'id': conv.get('id'), + 'org_id': conv.get('org_id'), + 'thread_id': conv.get('thread_id'), + 'bridge_id': conv.get('bridge_id'), + 'user_message': conv.get('user_message'), + 'response': conv.get('response'), + 'chatbot_response': conv.get('chatbot_response'), + 'tools_call_data': conv.get('tools_call_data'), + 'user_feedback': conv.get('user_feedback'), + 'response_id': conv.get('response_id'), + 'version_id': conv.get('version_id'), + 'sub_thread_id': conv.get('sub_thread_id'), + 'revised_response': conv.get('revised_response'), + 'image_urls': conv.get('image_urls'), + 'urls': conv.get('urls'), + 'fallback_model': conv.get('fallback_model'), + 'error': conv.get('error'), + 'status': conv.get('status'), + 'authkey_name': conv.get('authkey_name'), + 'latency': conv.get('latency'), + 'service': conv.get('service'), + 'tokens': conv.get('tokens'), + 'message_id': conv.get('message_id'), + 'variables': conv.get('variables'), + 'finish_reason': conv.get('finish_reason'), + 'model_name': conv.get('model_name'), + 'type': conv.get('type'), + 'AiConfig': conv.get('AiConfig'), + 'annotations': conv.get('annotations'), + 'createdAt': conv.get('createdAt'), + 'updatedAt': conv.get('updatedAt'), + 'created_at': conv.get('created_at') + } + result.append(conv_dict) + + return result + + finally: + session.close() + + except Exception as error: + logger.error(f"Error in find_conversations: {str(error)}") + logger.error(traceback.format_exc()) + raise error + + @staticmethod + async def find_by_id(conversation_id: int): + """ + Find unified conversation by ID + """ + try: + session = postgres['session']() + try: + from models.postgres.pg_models import UnifiedConversation + + conversation = session.query(UnifiedConversation).filter( + UnifiedConversation.id == conversation_id + ).first() + + if not conversation: + return None + + # Convert to dict format + conv_dict = { + 'id': conversation.id, + 'org_id': conversation.org_id, + 'thread_id': conversation.thread_id, + 'bridge_id': conversation.bridge_id, + 'user_message': conversation.user_message, + 'response': conversation.response, + 'chatbot_response': conversation.chatbot_response, + 'tools_call_data': conversation.tools_call_data, + 'user_feedback': conversation.user_feedback, + 'response_id': conversation.response_id, + 'version_id': conversation.version_id, + 'sub_thread_id': conversation.sub_thread_id, + 'revised_response': conversation.revised_response, + 'image_urls': conversation.image_urls, + 'urls': conversation.urls, + 'fallback_model': conversation.fallback_model, + 'error': conversation.error, + 'status': conversation.status, + 'authkey_name': conversation.authkey_name, + 'latency': conversation.latency, + 'service': conversation.service, + 'tokens': conversation.tokens, + 'message_id': conversation.message_id, + 'variables': conversation.variables, + 'finish_reason': conversation.finish_reason, + 'model_name': conversation.model_name, + 'type': conversation.type, + 'AiConfig': conversation.AiConfig, + 'annotations': conversation.annotations, + 'createdAt': conversation.createdAt, + 'updatedAt': conversation.updatedAt, + 'created_at': conversation.created_at + } + + return conv_dict + + finally: + session.close() + + except Exception as error: + logger.error(f"Error in find_by_id: {str(error)}") + logger.error(traceback.format_exc()) + raise error + + @staticmethod + async def update_conversation(conversation_id: int, **kwargs): + """ + Update unified conversation + """ + try: + session = postgres['session']() + try: + from models.postgres.pg_models import UnifiedConversation + + conversation = session.query(UnifiedConversation).filter( + UnifiedConversation.id == conversation_id + ).first() + + if not conversation: + logger.warning(f"Conversation with ID {conversation_id} not found") + return None + + # Update fields + for key, value in kwargs.items(): + if hasattr(conversation, key): + setattr(conversation, key, value) + + conversation.updatedAt = datetime.now() + + session.commit() + logger.info(f"Updated unified conversation with ID: {conversation_id}") + + return conversation_id + + except Exception as e: + session.rollback() + logger.error(f"Database error in update_conversation: {str(e)}") + raise e + finally: + session.close() + + except Exception as error: + logger.error(f"Error in update_conversation: {str(error)}") + logger.error(traceback.format_exc()) + raise error diff --git a/src/middlewares/interfaceMiddlewares.py b/src/middlewares/interfaceMiddlewares.py index 1b00273a..b50587d0 100644 --- a/src/middlewares/interfaceMiddlewares.py +++ b/src/middlewares/interfaceMiddlewares.py @@ -6,7 +6,6 @@ from ..routes.v2.modelRouter import chat_completion import json from .getDataUsingBridgeId import add_configuration_data_to_body -from ..db_services.conversationDbService import reset_and_mode_chat_history from ..services.commonServices.baseService.utils import sendResponse from ..services.utils.time import Timer from src.services.utils.apiservice import fetch @@ -142,15 +141,6 @@ async def reset_chatBot(request: Request, botId: str): channelId = f"{botId}{thread_id.strip() if thread_id and thread_id.strip() else userId}{sub_thread_id.strip() if sub_thread_id and sub_thread_id.strip() else userId}" channelId = channelId.replace(" ", "_") - bridges = await ConfigurationServices.get_bridge_by_slugname(org_id, slugName) - bridge_id = str(bridges.get('_id', '')) - if purpose == 'is_reset': - result = await reset_and_mode_chat_history(org_id, bridge_id, thread_id, 'is_reset', True) - id = f"{thread_id}_{ version_id or bridge_id}" - gpt_memory = bridges.get('gpt_momery') - if gpt_memory: - response = await fetch("https://flow.sokt.io/func/scrixTV20rkF", "POST", None, None, {"threadID": id}) - response_format = { diff --git a/src/services/utils/common_utils.py b/src/services/utils/common_utils.py index 065ebb70..087753b6 100644 --- a/src/services/utils/common_utils.py +++ b/src/services/utils/common_utils.py @@ -162,7 +162,7 @@ async def manage_threads(parsed_data): logger.info(f"Retrieved conversations from Redis cache: {redis_key}") else: # Fallback to database if not in cache - result = await try_catch(getThread, thread_id, sub_thread_id, org_id, bridge_id, bridge_type) + result = await try_catch(getThread, thread_id, sub_thread_id, org_id, bridge_id) if result: parsed_data['configuration']["conversation"] = result or [] else: From de6f5157d24facd0f7866c1fc7b5126fb90b774c Mon Sep 17 00:00:00 2001 From: Husain Baghwala Date: Mon, 13 Oct 2025 18:34:33 +0530 Subject: [PATCH 2/5] refactor: consolidate conversation models and update schema to use JSON columns --- models/postgres/pg_models.py | 14 ++++---- src/db_services/conversationDbService.py | 7 ++-- src/db_services/metrics_service.py | 11 +++---- src/db_services/unifiedConversationService.py | 32 +++++++++---------- 4 files changed, 30 insertions(+), 34 deletions(-) diff --git a/models/postgres/pg_models.py b/models/postgres/pg_models.py index a626b6bf..0456c739 100644 --- a/models/postgres/pg_models.py +++ b/models/postgres/pg_models.py @@ -6,7 +6,7 @@ from models.postgres.pg_connection import Base class Conversation(Base): - __tablename__ = 'conversations' + __tablename__ = 'agent_conversations' __table_args__ = {'extend_existing': True} id = Column(Integer, primary_key=True) @@ -16,14 +16,14 @@ class Conversation(Base): user_message = Column(Text) response = Column(Text) chatbot_response = Column(Text) - tools_call_data = Column(ARRAY(JSON)) + tools_call_data = Column(JSON) user_feedback = Column(Integer) response_id = Column(UUID(as_uuid=True), nullable=True) version_id = Column(String) sub_thread_id = Column(String, nullable=True) revised_response = Column(Text, nullable=True) - image_urls = Column(ARRAY(JSON), nullable=True) - urls = Column(ARRAY(String), nullable=True) + image_urls = Column(JSON, nullable=True) + urls = Column(JSON, nullable=True) fallback_model = Column(String, nullable=True) error = Column(Text, nullable=True) status = Column(Integer, nullable=True) # 0: failed, 1: success, 2: second time is high @@ -32,9 +32,7 @@ class Conversation(Base): authkey_name = Column(String) latency = Column(Float) service = Column(String) - input_tokens = Column(Float) - output_tokens = Column(Float) - expected_cost = Column(Float) + tokens = Column(JSON, nullable=True) created_at = Column(DateTime, default=func.now()) message_id = Column(UUID(as_uuid=True), nullable=True) variables = Column(JSON) @@ -42,7 +40,7 @@ class Conversation(Base): model_name = Column(String) type = Column(String, nullable=False) AiConfig = Column(JSON, nullable=True) - annotations = Column(ARRAY(JSON), nullable=True) + annotations = Column(JSON, nullable=True) class system_prompt_versionings(Base): __tablename__ = 'system_prompt_versionings' diff --git a/src/db_services/conversationDbService.py b/src/db_services/conversationDbService.py index 16de33f6..8342d22b 100644 --- a/src/db_services/conversationDbService.py +++ b/src/db_services/conversationDbService.py @@ -156,11 +156,12 @@ async def create_conversation_entry(conversation_data): """ Save conversation entry data to database """ - session = postgres['session']() + session = pg['session']() + print(conversation_data) try: - from models.postgres.pg_models import ConversationEntry + from models.postgres.pg_models import Conversation - conversation_entry = ConversationEntry(**conversation_data) + conversation_entry = Conversation(**conversation_data) session.add(conversation_entry) session.commit() diff --git a/src/db_services/metrics_service.py b/src/db_services/metrics_service.py index cea06ecc..678885c9 100644 --- a/src/db_services/metrics_service.py +++ b/src/db_services/metrics_service.py @@ -1,4 +1,3 @@ - import uuid import traceback from datetime import datetime @@ -23,7 +22,7 @@ def prepare_conversation_data(primary_data, history_params, version_id, thread_i 'chatbot_response': history_params.get('chatbot_message', ''), 'tools_call_data': history_params.get('tools_call_data', []), 'user_feedback': None, - 'response_id': str(uuid.uuid4()), + 'response_id': uuid.uuid4(), 'version_id': version_id, 'sub_thread_id': thread_info.get('sub_thread_id'), 'revised_response': history_params.get('revised_prompt'), @@ -35,10 +34,8 @@ def prepare_conversation_data(primary_data, history_params, version_id, thread_i 'authkey_name': primary_data.get('apikey_object_id', {}).get(primary_data['service'], '') if primary_data.get('apikey_object_id') else '', 'latency': primary_data.get('latency', 0), 'service': primary_data['service'], - 'input_tokens': primary_data.get('inputTokens', 0), - 'output_tokens': primary_data.get('outputTokens', 0), - 'expected_cost': primary_data.get('expectedCost', 0), - 'message_id': primary_data.get('message_id'), + 'tokens': {"input_tokens": primary_data.get('inputTokens', 0), "output_tokens": primary_data.get('outputTokens', 0), "expected_cost": primary_data.get('expectedCost', 0)}, + 'message_id': uuid.UUID(primary_data.get('message_id')) if primary_data.get('message_id') else None, 'variables': primary_data.get('variables', {}), 'finish_reason': primary_data.get('finish_reason'), 'model_name': primary_data['model'], @@ -46,7 +43,7 @@ def prepare_conversation_data(primary_data, history_params, version_id, thread_i 'AiConfig': history_params.get('AiConfig'), 'annotations': history_params.get('annotations', []), 'createdAt': datetime.now(), - 'updatedAt': datetime.now() + 'updatedAt': datetime.now(), } return conversation_data diff --git a/src/db_services/unifiedConversationService.py b/src/db_services/unifiedConversationService.py index e0b2f4bf..aecba038 100644 --- a/src/db_services/unifiedConversationService.py +++ b/src/db_services/unifiedConversationService.py @@ -95,9 +95,9 @@ async def create_unified_conversation( # Insert into database session = postgres['session']() try: - from models.postgres.pg_models import UnifiedConversation + from models.postgres.pg_models import Conversation - unified_conversation = UnifiedConversation(**unified_data) + unified_conversation = Conversation(**unified_data) session.add(unified_conversation) session.commit() @@ -134,23 +134,23 @@ async def find_conversations( try: session = postgres['session']() try: - from models.postgres.pg_models import UnifiedConversation + from models.postgres.pg_models import Conversation - query = session.query(UnifiedConversation).filter( - UnifiedConversation.org_id == org_id + query = session.query(Conversation).filter( + Conversation.org_id == org_id ) if thread_id: - query = query.filter(UnifiedConversation.thread_id == thread_id) + query = query.filter(Conversation.thread_id == thread_id) if bridge_id: - query = query.filter(UnifiedConversation.bridge_id == bridge_id) + query = query.filter(Conversation.bridge_id == bridge_id) if start_time and end_time: query = query.filter( and_( - UnifiedConversation.createdAt >= start_time, - UnifiedConversation.createdAt <= end_time + Conversation.createdAt >= start_time, + Conversation.createdAt <= end_time ) ) @@ -160,7 +160,7 @@ async def find_conversations( if limit: query = query.limit(limit) - query = query.order_by(UnifiedConversation.createdAt.desc()) + query = query.order_by(Conversation.createdAt.desc()) conversations = query.all() @@ -221,10 +221,10 @@ async def find_by_id(conversation_id: int): try: session = postgres['session']() try: - from models.postgres.pg_models import UnifiedConversation + from models.postgres.pg_models import Conversation - conversation = session.query(UnifiedConversation).filter( - UnifiedConversation.id == conversation_id + conversation = session.query(Conversation).filter( + Conversation.id == conversation_id ).first() if not conversation: @@ -284,10 +284,10 @@ async def update_conversation(conversation_id: int, **kwargs): try: session = postgres['session']() try: - from models.postgres.pg_models import UnifiedConversation + from models.postgres.pg_models import Conversation - conversation = session.query(UnifiedConversation).filter( - UnifiedConversation.id == conversation_id + conversation = session.query(Conversation).filter( + Conversation.id == conversation_id ).first() if not conversation: From 6ca64fab93ac38e7292a9cb1f108d260aad3b997 Mon Sep 17 00:00:00 2001 From: Husain Baghwala Date: Thu, 16 Oct 2025 13:23:37 +0530 Subject: [PATCH 3/5] refactor: remove response_id column and improve conversation data handling --- models/postgres/pg_models.py | 1 - src/db_services/conversationDbService.py | 4 ++-- src/db_services/metrics_service.py | 5 ++--- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/models/postgres/pg_models.py b/models/postgres/pg_models.py index 0456c739..efeb8a6f 100644 --- a/models/postgres/pg_models.py +++ b/models/postgres/pg_models.py @@ -18,7 +18,6 @@ class Conversation(Base): chatbot_response = Column(Text) tools_call_data = Column(JSON) user_feedback = Column(Integer) - response_id = Column(UUID(as_uuid=True), nullable=True) version_id = Column(String) sub_thread_id = Column(String, nullable=True) revised_response = Column(Text, nullable=True) diff --git a/src/db_services/conversationDbService.py b/src/db_services/conversationDbService.py index 8342d22b..6f7722b3 100644 --- a/src/db_services/conversationDbService.py +++ b/src/db_services/conversationDbService.py @@ -157,15 +157,15 @@ async def create_conversation_entry(conversation_data): Save conversation entry data to database """ session = pg['session']() - print(conversation_data) try: from models.postgres.pg_models import Conversation conversation_entry = Conversation(**conversation_data) session.add(conversation_entry) - session.commit() + session.flush() # Flush to get the ID without committing conversation_id = conversation_entry.id + session.commit() # Now commit the transaction logger.info(f"Created conversation entry with ID: {conversation_id}") return conversation_id diff --git a/src/db_services/metrics_service.py b/src/db_services/metrics_service.py index 678885c9..653d26c3 100644 --- a/src/db_services/metrics_service.py +++ b/src/db_services/metrics_service.py @@ -16,13 +16,12 @@ def prepare_conversation_data(primary_data, history_params, version_id, thread_i conversation_data = { 'org_id': primary_data['orgId'], 'thread_id': thread_info.get('thread_id'), - 'bridge_id': history_params.get('bridgeId'), + 'bridge_id': history_params.get('bridge_id'), 'user_message': history_params.get('user', ''), 'response': history_params.get('message', ''), 'chatbot_response': history_params.get('chatbot_message', ''), 'tools_call_data': history_params.get('tools_call_data', []), 'user_feedback': None, - 'response_id': uuid.uuid4(), 'version_id': version_id, 'sub_thread_id': thread_info.get('sub_thread_id'), 'revised_response': history_params.get('revised_prompt'), @@ -35,7 +34,7 @@ def prepare_conversation_data(primary_data, history_params, version_id, thread_i 'latency': primary_data.get('latency', 0), 'service': primary_data['service'], 'tokens': {"input_tokens": primary_data.get('inputTokens', 0), "output_tokens": primary_data.get('outputTokens', 0), "expected_cost": primary_data.get('expectedCost', 0)}, - 'message_id': uuid.UUID(primary_data.get('message_id')) if primary_data.get('message_id') else None, + 'message_id': history_params.get('message_id'), 'variables': primary_data.get('variables', {}), 'finish_reason': primary_data.get('finish_reason'), 'model_name': primary_data['model'], From a1e120f840c2d666124e961f9af1cf4c2bdac3ec Mon Sep 17 00:00:00 2001 From: Husain Baghwala Date: Thu, 16 Oct 2025 15:55:22 +0530 Subject: [PATCH 4/5] refactor: simplify metrics data structure and remove conversation_id dependency --- src/db_services/metrics_service.py | 43 ++++++++++++++---------------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/src/db_services/metrics_service.py b/src/db_services/metrics_service.py index 653d26c3..7d0282a9 100644 --- a/src/db_services/metrics_service.py +++ b/src/db_services/metrics_service.py @@ -4,6 +4,7 @@ from .conversationDbService import timescale_metrics, create_conversation_entry from ..services.cache_service import find_in_cache, store_in_cache from globals import * +import json def prepare_conversation_data(primary_data, history_params, version_id, thread_info): """ @@ -47,35 +48,31 @@ def prepare_conversation_data(primary_data, history_params, version_id, thread_i return conversation_data -def prepare_timescale_data(dataset, history_params, conversation_id): +def prepare_timescale_data(dataset, history_params): """ Prepare timescale metrics data """ - insert_ai_data_in_pg = [ + metrics_data = [ { - 'org_id': data_object['orgId'], - 'authkey_name': data_object.get('apikey_object_id', {}).get(data_object['service'], '') if data_object.get('apikey_object_id') else '', - 'latency': data_object.get('latency', 0), - 'service': data_object['service'], - 'status': data_object.get('success', False), - 'error': data_object.get('error', '') if not data_object.get('success', False) else '', - 'model': data_object['model'], - 'input_tokens': data_object.get('inputTokens', 0), - 'output_tokens': data_object.get('outputTokens', 0), - 'expected_cost': data_object.get('expectedCost', 0), - 'created_at': datetime.now(), - 'chat_id': conversation_id, - 'message_id': data_object.get('message_id'), - 'variables': data_object.get('variables') or {}, - 'is_present': 'prompt' in data_object, - 'id': str(uuid.uuid4()), - 'firstAttemptError': history_params.get('firstAttemptError'), - 'finish_reason': data_object.get('finish_reason') + 'org_id': dataset['orgId'], + 'bridge_id': history_params.get('bridge_id'), + 'version_id': history_params.get('version_id'), + 'thread_id': history_params.get('thread_id'), + 'model': history_params.get('model'), + 'input_tokens': dataset.get('inputTokens', 0) or 0.0, + 'output_tokens': dataset.get('outputTokens', 0) or 0.0, + 'total_tokens': dataset.get('totalTokens', 0) or 0.0, + 'apikey_id': dataset.get('apikey_object_id', {}).get(dataset['service'], '') if dataset.get('apikey_object_id') else '', + 'created_at': datetime.now(), # Remove timezone to match database expectations + 'latency': int(json.loads(dataset.get('latency', {})).get('over_all_time', 0)), + 'success': dataset.get('success', False), + 'cost': dataset.get('expectedCost', 0) or 0.0, + 'time_zone': 'Asia/Kolkata', + 'service': dataset['service'] } - for data_object in dataset ] - return insert_ai_data_in_pg + return metrics_data async def save_conversations_to_redis(conversations, version_id, thread_id, sub_thread_id, history_params): """ @@ -163,7 +160,7 @@ async def create(dataset, history_params, version_id, thread_info={}): await save_conversations_to_redis(conversations, version_id, thread_id, sub_thread_id, history_params) # Prepare and insert timescale metrics data - insert_ai_data_in_pg = prepare_timescale_data(dataset, history_params, conversation_id) + insert_ai_data_in_pg = prepare_timescale_data(dataset[0], history_params) await timescale_metrics(insert_ai_data_in_pg) From 9cfd113f27e229ec6b76ae261cb0e6ecb48bb106 Mon Sep 17 00:00:00 2001 From: Husain Baghwala Date: Thu, 16 Oct 2025 16:09:36 +0530 Subject: [PATCH 5/5] perf: optimize conversation query by selecting specific columns and reducing limit to 3 --- src/db_services/conversationDbService.py | 52 +++++++++++++++--------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/src/db_services/conversationDbService.py b/src/db_services/conversationDbService.py index 6f7722b3..e51795b6 100644 --- a/src/db_services/conversationDbService.py +++ b/src/db_services/conversationDbService.py @@ -17,7 +17,16 @@ async def find(org_id, thread_id, sub_thread_id, bridge_id): session = pg['session']() conversations = ( - session.query(Conversation) + session.query( + Conversation.id, + Conversation.user_message, + Conversation.response, + Conversation.chatbot_response, + Conversation.tools_call_data, + Conversation.error, + Conversation.urls, + Conversation.createdAt + ) .filter( and_( Conversation.org_id == org_id, @@ -28,52 +37,55 @@ async def find(org_id, thread_id, sub_thread_id, bridge_id): ) ) .order_by(Conversation.id.desc()) - .limit(9) + .limit(3) .all() ) # Convert to format expected by existing code result = [] for conv in reversed(conversations): + # Unpack the tuple result + conv_id, user_message, response, chatbot_response, tools_call_data, error, urls, created_at = conv + # Create user message entry - if conv.get('user_message'): + if user_message: result.append({ - 'content': conv.get('user_message'), + 'content': user_message, 'role': 'user', - 'createdAt': conv.get('createdAt'), - 'id': conv.get('id'), + 'createdAt': created_at, + 'id': conv_id, 'function': {}, 'is_reset': False, - 'tools_call_data': conv.get('tools_call_data', []), - 'error': conv.get('error', ''), - 'urls': conv.get('urls', []) + 'tools_call_data': tools_call_data or [], + 'error': error or '', + 'urls': urls or [] }) # Create tools call entry if exists - if conv.get('tools_call_data') and len(conv.get('tools_call_data', [])) > 0: + if tools_call_data and len(tools_call_data) > 0: result.append({ 'content': '', 'role': 'tools_call', - 'createdAt': conv.get('createdAt'), - 'id': conv.get('id'), - 'function': conv.get('tools_call_data', [{}])[0], + 'createdAt': created_at, + 'id': conv_id, + 'function': tools_call_data[0] if tools_call_data else {}, 'is_reset': False, - 'tools_call_data': conv.get('tools_call_data'), - 'error': conv.get('error', ''), + 'tools_call_data': tools_call_data, + 'error': error or '', 'urls': [] }) # Create assistant message entry - if conv.get('response') or conv.get('chatbot_response'): + if response or chatbot_response: result.append({ - 'content': conv.get('response') or conv.get('chatbot_response'), + 'content': response or chatbot_response, 'role': 'assistant', - 'createdAt': conv.get('createdAt'), - 'id': conv.get('id'), + 'createdAt': created_at, + 'id': conv_id, 'function': {}, 'is_reset': False, 'tools_call_data': None, - 'error': conv.get('error', ''), + 'error': error or '', 'urls': [] })